68 lines
2.3 KiB
Python
68 lines
2.3 KiB
Python
# Copyright (c) PARRRATE T&V 2021. All rights reserved.
|
|
|
|
from io import BytesIO
|
|
from typing import Iterable, Type
|
|
|
|
from bu4.parsing.abstractparser import AbstractParser
|
|
from bu4.parsing.constructs.parsed import Parsed
|
|
from bu4.parsing.extensions.extension import Extension
|
|
from bu4.parsing.states.parsestate import ParseState
|
|
from bu4.parsing.states.psafter import PSAfter
|
|
from bu4.parsing.states.psfinal import PSFinal
|
|
from bu4.parsing.states.psread import PSRead
|
|
from bu4.parsing.targets.pschain import PSChain
|
|
from bu4.parsing.targets.pslambda import PSLambda
|
|
|
|
__all__ = ('Parser',)
|
|
|
|
|
|
class Parser(AbstractParser):
|
|
def __init__(self, source: bytes, extensions: Iterable[Type[Extension]]):
|
|
self.__source = BytesIO(source)
|
|
self.__extensions: dict[int, Extension] = {}
|
|
for extension in extensions:
|
|
if extension.code in self.__extensions:
|
|
raise ValueError('code overload')
|
|
self.__extensions[extension.code] = extension(self)
|
|
|
|
def read(self) -> int:
|
|
bytes_ = self.__source.read(1)
|
|
return bytes_[0] if bytes_ else 0
|
|
|
|
def parse_name(self) -> bytes:
|
|
s = BytesIO()
|
|
while True:
|
|
c = self.read()
|
|
if not c:
|
|
return s.getvalue()
|
|
s.write(bytes([c]))
|
|
|
|
def _state_read(self, state: PSAfter) -> ParseState:
|
|
code = self.read()
|
|
extension = self.__extensions.get(code)
|
|
if extension is None:
|
|
raise ValueError(f'unknown control: {hex(code)}')
|
|
state = extension.apply(state)
|
|
return state
|
|
|
|
def _state_next(self, state: PSAfter) -> ParseState:
|
|
if isinstance(state.state, PSAfter):
|
|
state.state, state.target = state.state.state, PSChain(state.state.target, state.target)
|
|
elif isinstance(state.state, PSFinal):
|
|
state = state.target.given(state.state.parsed)
|
|
elif isinstance(state.state, PSRead):
|
|
state = self._state_read(state)
|
|
else:
|
|
raise TypeError
|
|
return state
|
|
|
|
def parse(self) -> Parsed:
|
|
state = PSAfter(PSRead(), PSLambda(lambda parsed: PSFinal(parsed)))
|
|
while True:
|
|
if isinstance(state, PSFinal):
|
|
return state.parsed
|
|
elif isinstance(state, PSAfter):
|
|
state = self._state_next(state)
|
|
else:
|
|
raise TypeError
|