# Copyright (c) PARRRATE T&V 2021. All rights reserved. from io import BytesIO from typing import Iterable, Type from bu4.parsing.abstractparser import AbstractParser from bu4.parsing.constructs.parsed import Parsed from bu4.parsing.extensions.extension import Extension from bu4.parsing.states.parsestate import ParseState from bu4.parsing.states.psafter import PSAfter from bu4.parsing.states.psfinal import PSFinal from bu4.parsing.states.psread import PSRead from bu4.parsing.targets.pschain import PSChain from bu4.parsing.targets.pslambda import PSLambda __all__ = ('Parser',) class Parser(AbstractParser): def __init__(self, source: bytes, extensions: Iterable[Type[Extension]]): self.__source = BytesIO(source) self.__extensions: dict[int, Extension] = {} for extension in extensions: if extension.code in self.__extensions: raise ValueError('code overload') self.__extensions[extension.code] = extension(self) def read(self) -> int: bytes_ = self.__source.read(1) return bytes_[0] if bytes_ else 0 def parse_name(self) -> bytes: s = BytesIO() while True: c = self.read() if not c: return s.getvalue() s.write(bytes([c])) def _state_read(self, state: PSAfter) -> ParseState: code = self.read() extension = self.__extensions.get(code) if extension is None: raise ValueError(f'unknown control: {hex(code)}') state = extension.apply(state) return state def _state_next(self, state: PSAfter) -> ParseState: if isinstance(state.state, PSAfter): state.state, state.target = state.state.state, PSChain(state.state.target, state.target) elif isinstance(state.state, PSFinal): state = state.target.given(state.state.parsed) elif isinstance(state.state, PSRead): state = self._state_read(state) else: raise TypeError return state def parse(self) -> Parsed: state = PSAfter(PSRead(), PSLambda(lambda parsed: PSFinal(parsed))) while True: if isinstance(state, PSFinal): return state.parsed elif isinstance(state, PSAfter): state = self._state_next(state) else: raise TypeError