50 lines
1.6 KiB
Python
50 lines
1.6 KiB
Python
# Copyright (c) PARRRATE T&V 2021. All rights reserved.
|
|
|
|
from io import BytesIO
|
|
from typing import Iterable, Type, TypeVar
|
|
|
|
from bu4.parsing.abstractparser import AbstractParser
|
|
from bu4.parsing.extensions.abstractextension import AbstractExtension
|
|
from bu4.parsing.extensions.extension import Extension
|
|
from bu4.parsing.states.parsingread import ParsingRead
|
|
from bu4.transform.states.transformfinished import TransformFinished
|
|
|
|
__all__ = ('Parser',)
|
|
|
|
T = TypeVar('T')
|
|
|
|
|
|
class Parser(AbstractParser[T]):
|
|
def __init__(self, source: bytes, extensions: Iterable[Type[Extension[T]]]):
|
|
self.__source = BytesIO(source)
|
|
self.__extensions: dict[int, Extension[T]] = {}
|
|
for extension in extensions:
|
|
if extension.code in self.__extensions:
|
|
raise ValueError('code overload')
|
|
self.__extensions[extension.code] = extension(self)
|
|
|
|
def read(self) -> int:
|
|
bytes_ = self.__source.read(1)
|
|
return bytes_[0] if bytes_ else 0
|
|
|
|
def parse_name(self) -> bytes:
|
|
s = BytesIO()
|
|
while True:
|
|
c = self.read()
|
|
if not c:
|
|
return s.getvalue()
|
|
s.write(bytes([c]))
|
|
|
|
def extension_for(self, code: int) -> AbstractExtension[T]:
|
|
extension = self.__extensions.get(code)
|
|
if extension is None:
|
|
raise ValueError(f'unknown control: {hex(code)}')
|
|
return extension
|
|
|
|
def parse(self) -> T:
|
|
state = ParsingRead(self)
|
|
while True:
|
|
if isinstance(state, TransformFinished):
|
|
return state.transformed
|
|
state = state.next()
|