68 lines
2.4 KiB
Python
68 lines
2.4 KiB
Python
# Copyright (c) PARRRATE T&V 2021. All rights reserved.
|
|
|
|
from io import BytesIO
|
|
from typing import Iterable, Type
|
|
|
|
from bu4.parsing.abstractparser import AbstractParser
|
|
from bu4.parsing.constructs.parsed import Parsed
|
|
from bu4.parsing.extensions.extension import Extension
|
|
from bu4.parsing.states.parsingstate import ParsingState
|
|
from bu4.parsing.states.afterparsing import AfterParsing
|
|
from bu4.parsing.states.parsingfinished import ParsingFinished
|
|
from bu4.parsing.states.parsingread import ParsingRead
|
|
from bu4.parsing.targets.apchain import APChain
|
|
from bu4.parsing.targets.aplambda import APLambda
|
|
|
|
__all__ = ('Parser',)
|
|
|
|
|
|
class Parser(AbstractParser):
|
|
def __init__(self, source: bytes, extensions: Iterable[Type[Extension]]):
|
|
self.__source = BytesIO(source)
|
|
self.__extensions: dict[int, Extension] = {}
|
|
for extension in extensions:
|
|
if extension.code in self.__extensions:
|
|
raise ValueError('code overload')
|
|
self.__extensions[extension.code] = extension(self)
|
|
|
|
def read(self) -> int:
|
|
bytes_ = self.__source.read(1)
|
|
return bytes_[0] if bytes_ else 0
|
|
|
|
def parse_name(self) -> bytes:
|
|
s = BytesIO()
|
|
while True:
|
|
c = self.read()
|
|
if not c:
|
|
return s.getvalue()
|
|
s.write(bytes([c]))
|
|
|
|
def _state_read(self, state: AfterParsing) -> ParsingState:
|
|
code = self.read()
|
|
extension = self.__extensions.get(code)
|
|
if extension is None:
|
|
raise ValueError(f'unknown control: {hex(code)}')
|
|
state = extension.apply(state)
|
|
return state
|
|
|
|
def _state_next(self, state: AfterParsing) -> ParsingState:
|
|
if isinstance(state.state, AfterParsing):
|
|
state.state, state.target = state.state.state, APChain(state.state.target, state.target)
|
|
elif isinstance(state.state, ParsingFinished):
|
|
state = state.target.given(state.state.parsed)
|
|
elif isinstance(state.state, ParsingRead):
|
|
state = self._state_read(state)
|
|
else:
|
|
raise TypeError
|
|
return state
|
|
|
|
def parse(self) -> Parsed:
|
|
state = AfterParsing(ParsingRead(), APLambda(lambda parsed: ParsingFinished(parsed)))
|
|
while True:
|
|
if isinstance(state, ParsingFinished):
|
|
return state.parsed
|
|
elif isinstance(state, AfterParsing):
|
|
state = self._state_next(state)
|
|
else:
|
|
raise TypeError
|