142 lines
3.8 KiB
Python
142 lines
3.8 KiB
Python
from io import BytesIO
|
|
from typing import Callable
|
|
|
|
from bu4.parsing.linked import Parsed
|
|
from bu4.parsing.lname import PName
|
|
from bu4.codes import CODE_NULL, CODE_CALL, CODE_MAKE, CODE_NAME, CODE_SKIP, CODE_QUOT, CODE_QOPN, CODE_QCLS
|
|
from bu4.parsing.lcall import PCall
|
|
from bu4.parsing.lnull import PNull
|
|
|
|
from bu4.parsing.llambda import PLambda
|
|
|
|
|
|
class ParseState:
|
|
pass
|
|
|
|
|
|
class PSRead(ParseState):
|
|
pass
|
|
|
|
|
|
class PSFinal(ParseState):
|
|
def __init__(self, parsed: Parsed):
|
|
self.parsed = parsed
|
|
|
|
|
|
class PSATarget:
|
|
def given(self, parsed: Parsed) -> ParseState:
|
|
raise NotImplementedError
|
|
|
|
|
|
class PSAfter(ParseState):
|
|
def __init__(self, state, target: PSATarget):
|
|
self.state = state
|
|
self.target = target
|
|
|
|
|
|
class PSAA(PSATarget):
|
|
def __init__(self, target: PSATarget, aftertarget: PSATarget):
|
|
self.target = target
|
|
self.aftertarget = aftertarget
|
|
|
|
def given(self, parsed: Parsed) -> ParseState:
|
|
return PSAfter(self.target.given(parsed), self.aftertarget)
|
|
|
|
|
|
class PSALambda(PSATarget):
|
|
def __init__(self, lambda_: Callable[[Parsed], ParseState]):
|
|
self.lambda_ = lambda_
|
|
|
|
def given(self, parsed: Parsed) -> ParseState:
|
|
return self.lambda_(parsed)
|
|
|
|
|
|
class PSAEndsWith(PSATarget):
|
|
def __init__(self, parser: 'Parser', c: int, msg: str):
|
|
self.parser = parser
|
|
self.c = c
|
|
self.msg = msg
|
|
|
|
def given(self, parsed: Parsed) -> ParseState:
|
|
assert self.parser.read() == self.c, self.msg
|
|
return PSFinal(parsed)
|
|
|
|
|
|
_apacall = PSAfter(
|
|
PSRead(),
|
|
PSALambda(
|
|
lambda argument: PSAfter(
|
|
PSRead(),
|
|
PSALambda(
|
|
lambda lambda_: PSFinal(
|
|
PCall(
|
|
argument,
|
|
lambda_
|
|
)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
|
|
|
|
class Parser:
|
|
def __init__(self, s: bytes):
|
|
self.__s = BytesIO(s)
|
|
|
|
def read(self) -> int:
|
|
bytes_ = self.__s.read(1)
|
|
return bytes_[0] if bytes_ else 0
|
|
|
|
def parse_name(self) -> bytes:
|
|
s = BytesIO()
|
|
while True:
|
|
c = self.read()
|
|
if not c:
|
|
return s.getvalue()
|
|
s.write(bytes([c]))
|
|
|
|
def _state_read(self, state: PSAfter):
|
|
c = self.read()
|
|
if c == CODE_NULL:
|
|
state.state = PSFinal(PNull())
|
|
elif c == CODE_CALL:
|
|
state.state = _apacall
|
|
elif c == CODE_MAKE:
|
|
state.state = (lambda name: PSAfter(
|
|
PSRead(),
|
|
PSALambda(
|
|
lambda value: PSFinal(PLambda(name, value))
|
|
)
|
|
))(self.parse_name())
|
|
elif c == CODE_NAME:
|
|
state.state = PSFinal(PName(self.parse_name()))
|
|
elif c == CODE_SKIP:
|
|
pass
|
|
elif c == CODE_QUOT:
|
|
state.target = PSAA(PSAEndsWith(self, CODE_QUOT, "quot expected"), state.target)
|
|
elif c == CODE_QOPN:
|
|
state.target = PSAA(PSAEndsWith(self, CODE_QCLS, "qcls expected"), state.target)
|
|
else:
|
|
raise ValueError(f"unknown control: {hex(c)}")
|
|
return state
|
|
|
|
def _state_next(self, state: PSAfter):
|
|
if isinstance(state.state, PSAfter):
|
|
state.state, state.target = state.state.state, PSAA(state.state.target, state.target)
|
|
elif isinstance(state.state, PSFinal):
|
|
state = state.target.given(state.state.parsed)
|
|
elif isinstance(state.state, PSRead):
|
|
state = self._state_read(state)
|
|
return state
|
|
|
|
def parse(self) -> Parsed:
|
|
state = PSAfter(PSRead(), PSALambda(lambda parsed: PSFinal(parsed)))
|
|
while True:
|
|
if isinstance(state, PSFinal):
|
|
return state.parsed
|
|
elif isinstance(state, PSAfter):
|
|
state = self._state_next(state)
|
|
else:
|
|
raise TypeError
|