This commit is contained in:
AF 2022-07-26 01:52:59 +03:00
parent 96f200bbd0
commit c1a7ab1128
14 changed files with 574 additions and 13 deletions

View File

@ -23,14 +23,14 @@ class TLNode(RecursiveMentionable, Generic[ElementType]):
self.parametres = parametres self.parametres = parametres
self.source = source self.source = source
if parametres.leaf: if parametres.leaf:
assert len(node_cache) == 0 assert_eq(len(node_cache), 0)
self.elements = parametres.branching self.elements = parametres.branching
assert len(element_cache) == self.elements assert_eq(len(element_cache), self.elements)
self.element_cache = element_cache self.element_cache = element_cache
else: else:
assert len(element_cache) == 0 assert_eq(len(element_cache), 0)
self.nodes = parametres.branching self.nodes = parametres.branching
assert len(node_cache) == self.nodes assert_eq(len(node_cache), self.nodes)
self.node_cache = node_cache self.node_cache = node_cache
def bytes_no(self, index: int) -> bytes: def bytes_no(self, index: int) -> bytes:

View File

@ -61,6 +61,10 @@ def gather(
) -> Awaitable[tuple[T0, T1, T2, T3, T4]]: ... ) -> Awaitable[tuple[T0, T1, T2, T3, T4]]: ...
@overload
def gather(*args: Coroutine[Any, Any, T0]) -> Awaitable[tuple[T0, ...]]: ...
def gather(*args): def gather(*args):
return _gather(*args) return _gather(*args)
@ -76,5 +80,5 @@ async def alist(aiterable: AsyncIterable[T]) -> list[T]:
return [x async for x in aiterable] return [x async for x in aiterable]
async def aidentity(value: T) -> Coroutine[Any, Any, T]: async def aidentity(value: T) -> T:
return value return value

View File

@ -43,7 +43,7 @@ class HashPoint(Generic[Mentioned]):
return _hash(source) return _hash(source)
@classmethod @classmethod
def bytes_of_mentioned(cls, mentioned: Mentionable): def bytes_of_mentioned(cls, mentioned: Mentionable) -> bytes:
assert isinstance(mentioned, Mentionable) assert isinstance(mentioned, Mentionable)
topology_hash: bytes = mentioned.__topology_hash__() topology_hash: bytes = mentioned.__topology_hash__()
assert isinstance(topology_hash, bytes) assert isinstance(topology_hash, bytes)

View File

@ -15,7 +15,7 @@ class StaticMentionable(Mentionable, abc.ABC):
def from_bytes(cls: Type[StaticMentioned], source: bytes, resolver: HashResolver) -> StaticMentioned: def from_bytes(cls: Type[StaticMentioned], source: bytes, resolver: HashResolver) -> StaticMentioned:
raise NotImplementedError raise NotImplementedError
def __factory__(self) -> RainbowFactory[StaticMentioned]: def __factory__(self: StaticMentioned) -> RainbowFactory[StaticMentioned]:
return self.factory() return self.factory()
@classmethod @classmethod
@ -26,12 +26,9 @@ class StaticMentionable(Mentionable, abc.ABC):
class StaticFactory(RainbowFactory[StaticMentioned], Generic[StaticMentioned]): class StaticFactory(RainbowFactory[StaticMentioned], Generic[StaticMentioned]):
def __init__(self, cls: Type[StaticMentioned]): def __init__(self, cls: Type[StaticMentioned]):
assert issubclass(cls, StaticMentionable) assert issubclass(cls, StaticMentionable)
self.cls: Type[StaticMentioned] = cls self.cls = cls
def from_bytes(self, source: bytes, resolver: HashResolver) -> StaticMentioned: def from_bytes(self, source: bytes, resolver: HashResolver) -> StaticMentioned:
assert isinstance(source, bytes) assert isinstance(source, bytes)
assert isinstance(resolver, HashResolver) assert isinstance(resolver, HashResolver)
if issubclass(self.cls, StaticMentionable): return self.cls.from_bytes(source, resolver)
return self.cls.from_bytes(source, resolver)
else:
raise TypeError

View File

@ -1,9 +1,9 @@
from typing import Any, Callable, Coroutine, Generic, TypeVar from typing import Any, Callable, Coroutine, Generic, TypeVar
from rainbowadn.core import * from rainbowadn.core import *
__all__ = ('Reduce',) __all__ = ('Reduce',)
Element = TypeVar('Element') Element = TypeVar('Element')
Out = TypeVar('Out') Out = TypeVar('Out')

View File

@ -0,0 +1,17 @@
__all__ = (
'IAtomic',
'IAuto', 'IAutoFactory',
'IByte',
'Inlining',
'IPair',
'IStatic', 'IStaticFactory',
'IUnit',
)
from .iatomic import IAtomic
from .iauto import IAuto, IAutoFactory
from .ibyte import IByte
from .inlining import Inlining
from .ipair import *
from .istatic import IStatic, IStaticFactory
from .iunit import IUnit

View File

@ -0,0 +1,25 @@
import abc
from typing import Type, TypeVar
from rainbowadn.core import *
from .istatic import *
__all__ = ('IAtomic',)
InlinedAtomic = TypeVar('InlinedAtomic')
class IAtomic(IStatic, abc.ABC):
def __topology_hash__(self) -> bytes:
return HashPoint.hash(b'')
@classmethod
def from_bytes(cls: Type[InlinedAtomic], source: bytes, resolver: HashResolver) -> InlinedAtomic:
assert issubclass(cls, IAtomic)
assert isinstance(source, bytes)
assert isinstance(resolver, HashResolver)
return cls._from_bytes(source)
@classmethod
def _from_bytes(cls: Type[InlinedAtomic], source: bytes) -> InlinedAtomic:
raise NotImplementedError

View File

@ -0,0 +1,305 @@
import heapq
from io import BytesIO
from typing import Callable, Generic, Iterable, Optional, Type, TypeAlias, TypeVar
from rainbowadn.core import *
from .inlining import *
__all__ = ('IAuto', 'IAutoFactory',)
_IList: TypeAlias = list[tuple[int, Mentionable]]
_UList: TypeAlias = list[tuple[int, HashPoint]]
_SList: TypeAlias = list[tuple[int, RainbowFactory, int]]
_VList: TypeAlias = list[tuple[int, RainbowFactory]]
_MCall: TypeAlias = Callable[[bytes, HashResolver, _IList, _UList], None]
_MTuple: TypeAlias = tuple[int, _MCall, int]
_IAuto = TypeVar('_IAuto')
class IAuto(RecursiveMentionable):
def __init__(
self,
inlined: _IList,
uninlined: _UList
):
assert isinstance(inlined, list)
assert isinstance(uninlined, list)
for index, mentionable in inlined:
assert isinstance(index, int)
assert isinstance(mentionable, Mentionable)
for index, hashpoint in uninlined:
assert isinstance(index, int)
assert isinstance(hashpoint, HashPoint)
self.inlined = inlined
self.uninlined = uninlined
def points(self) -> Iterable[HashPoint]:
inlined_points: list[tuple[int, HashPoint]] = []
uninlined_points: list[tuple[int, HashPoint]] = []
for index, inlined in self.inlined:
if isinstance(inlined, RecursiveMentionable):
for point in inlined.points():
assert isinstance(point, HashPoint)
inlined_points.append((index, point))
for index, uninlined in self.uninlined:
uninlined_points.append((index, uninlined))
merged_points: Iterable[tuple[int, HashPoint]] = heapq.merge(inlined_points, uninlined_points)
for _, point in merged_points:
assert isinstance(point, HashPoint)
yield point
def __bytes__(self):
inlined_bytes: list[tuple[int, bytes]] = []
uninlined_bytes: list[tuple[int, bytes]] = []
for index, inlined in self.inlined:
inlined_bytes.append((index, bytes(inlined)))
for index, uninlined in self.uninlined:
uninlined_bytes.append((index, bytes(uninlined)))
merged_bytes: Iterable[tuple[int, bytes]] = heapq.merge(inlined_bytes, uninlined_bytes)
return b''.join(source for _, source in merged_bytes)
def __factory__(self: _IAuto) -> RainbowFactory[_IAuto]:
sized: _SList = []
inlined_unsized: _VList = []
uninlined_unsized: _VList = []
for index, inlined in self.inlined:
factory = inlined.__factory__()
size = Inlining.factory_size(factory)
if size is None:
inlined_unsized.append((index, factory))
else:
assert isinstance(size, int)
sized.append((index, factory, size))
for index, uninlined in self.uninlined:
uninlined_unsized.append((index, uninlined.factory))
merged_unsized: Iterable[tuple[int, RainbowFactory]] = heapq.merge(inlined_unsized, uninlined_unsized)
return IAutoFactory(
type(self),
sized,
list(merged_unsized)
)
def hashpoints(self) -> Iterable[HashPoint]:
inlined_hashpoints: list[tuple[int, HashPoint]] = []
uninlined_hashpoints: list[tuple[int, HashPoint]] = []
for index, inlined in self.inlined:
inlined_hashpoints.append((index, HashPoint.of(inlined)))
for index, uninlined in self.uninlined:
uninlined_hashpoints.append((index, uninlined))
merged_hashpoints: Iterable[tuple[int, HashPoint]] = heapq.merge(inlined_hashpoints, uninlined_hashpoints)
for _, hashpoint in merged_hashpoints:
yield hashpoint
async def values(self) -> Iterable[Mentionable]:
return await gather(*map(HashPoint.resolve, self.hashpoints()))
@classmethod
def factory(cls: Type[_IAuto], *factories: RainbowFactory) -> RainbowFactory[_IAuto]:
assert issubclass(cls, IAuto)
return IAutoFactory.of(cls, *factories)
@classmethod
def from_values(cls: Type[_IAuto], *values: Mentionable) -> _IAuto:
assert issubclass(cls, IAuto)
sized: _IList = []
unsized: _IList = []
for index, value in enumerate(values):
assert isinstance(value, Mentionable)
factory = value.__factory__()
if Inlining.factory_size(factory) is None:
unsized.append((index, value))
else:
sized.append((index, value))
inlined: _IList
uninlined: _UList
match unsized:
case [(_, _)]:
inlined = list(heapq.merge(sized, unsized))
uninlined = []
case _:
inlined = sized
uninlined = [(index, HashPoint.of(value)) for index, value in unsized]
return cls(inlined, uninlined)
@classmethod
async def _utuple_to_ituple(cls, utuple: tuple[int, HashPoint]) -> tuple[int, Mentionable]:
assert isinstance(utuple, tuple)
index, hashpoint = utuple
return index, await hashpoint.resolve()
@classmethod
async def from_hashpoints(cls: Type[_IAuto], *hashpoints: HashPoint) -> _IAuto:
assert issubclass(cls, IAuto)
sized: _UList = []
unsized: _UList = []
for index, hashpoint in enumerate(hashpoints):
assert isinstance(hashpoint, HashPoint)
factory = hashpoint.factory
if Inlining.factory_size(factory) is None:
unsized.append((index, hashpoint))
else:
sized.append((index, hashpoint))
inlined: _IList
uninlined: _UList
match unsized:
case [(_, _)]:
inlined = list(await gather(*map(cls._utuple_to_ituple, heapq.merge(sized, unsized))))
uninlined = []
case _:
inlined = list(await gather(*map(cls._utuple_to_ituple, sized)))
uninlined = unsized
return cls(inlined, uninlined)
async def str(self, tab: int) -> str:
assert isinstance(tab, int)
formatted = await gather(*(hash_point_format(hashpoint, tab) for hashpoint in self.hashpoints()))
return f'{tabulate(tab)}'.join(formatted)
class IAutoFactory(Inlining[IAuto], Generic[_IAuto]):
def __init__(
self,
typed: Type[_IAuto],
sized: _SList,
unsized: _VList
):
assert issubclass(typed, IAuto)
assert isinstance(sized, list)
assert isinstance(unsized, list)
self.typed = typed
for index, factory, size in sized:
assert isinstance(index, int)
assert isinstance(factory, RainbowFactory)
assert isinstance(size, int)
for index, factory in unsized:
assert isinstance(index, int)
assert isinstance(factory, RainbowFactory)
self.sized = sized
self.unsized = unsized
match unsized:
case [(infix_index, infix_factory)]:
self.infix = True
self.infix_index: int = infix_index
self.infix_factory: RainbowFactory = infix_factory
self.prefix: _SList = list(
(index, factory, size) for index, factory, size in sized if index < infix_index
)
self.prefix_size = self._affix_size(self.prefix)
assert isinstance(self.prefix_size, int)
self.postfix: _SList = list(
(index, factory, size) for index, factory, size in sized if index > infix_index
)
self.postfix_size = self._affix_size(self.postfix)
assert isinstance(self.postfix_size, int)
case _:
self.infix = False
self.merged = self._merged()
assert isinstance(self.merged, list)
@classmethod
def of(cls, typed: Type[_IAuto], *factories: RainbowFactory) -> Inlining[_IAuto]:
sized: _SList = []
unsized: _VList = []
for index, factory in enumerate(factories):
assert isinstance(factory, RainbowFactory)
size = Inlining.factory_size(factory)
if size is None:
unsized.append((index, factory))
else:
assert isinstance(size, int)
sized.append((index, factory, size))
return cls(typed, sized, unsized)
@classmethod
def _affix_size(cls, affix: _SList) -> int:
assert isinstance(affix, list)
return sum(size for _, _, size in affix)
@classmethod
def _unsized_mtuple(cls, index: int, factory: RainbowFactory) -> _MTuple:
assert isinstance(index, int)
assert isinstance(factory, RainbowFactory)
def wrap(source: bytes, resolver: HashResolver, inlined: _IList, uninlined: _UList) -> None:
assert isinstance(source, bytes)
assert isinstance(resolver, HashResolver)
assert isinstance(inlined, list)
assert isinstance(uninlined, list)
assert_eq(len(source), HashPoint.HASH_LENGTH)
uninlined.append((index, ResolverMetaOrigin(resolver).hash_point(factory, source)))
return index, wrap, HashPoint.HASH_LENGTH
@classmethod
def _sized_mtuple(cls, index: int, factory: RainbowFactory, size: int) -> _MTuple:
assert isinstance(index, int)
assert isinstance(factory, RainbowFactory)
assert isinstance(size, int)
def wrap(source: bytes, resolver: HashResolver, inlined: _IList, uninlined: _UList) -> None:
assert isinstance(source, bytes)
assert isinstance(resolver, HashResolver)
assert isinstance(inlined, list)
assert isinstance(uninlined, list)
assert_eq(len(source), size)
inlined.append((index, factory.from_bytes(source, resolver)))
return index, wrap, size
def _merged(self) -> list[tuple[_MCall, int]]:
unsized_mergeable: list[_MTuple] = []
sized_mergeable: list[_MTuple] = []
for index, factory in self.unsized:
unsized_mergeable.append(
self._unsized_mtuple(index, factory)
)
for index, factory, size in self.sized:
sized_mergeable.append(
self._sized_mtuple(index, factory, size)
)
merged: Iterable[_MTuple] = heapq.merge(unsized_mergeable, sized_mergeable)
return [(method, size) for _, method, size in merged]
def size(self) -> Optional[int]:
if self.infix:
return None
else:
return sum(size for _, size in self.merged)
@classmethod
def _parse_affix(
cls, source: bytes, resolver: HashResolver, affix: _SList
) -> Iterable[tuple[int, Mentionable]]:
assert isinstance(source, bytes)
assert isinstance(resolver, HashResolver)
assert isinstance(affix, list)
reader = BytesIO(source)
for index, factory, size in affix:
yield index, factory.from_bytes(reader.read(size), resolver)
assert not reader.read()
def _parse_infix(self, source: bytes, resolver: HashResolver) -> Iterable[tuple[int, Mentionable]]:
assert isinstance(source, bytes)
assert isinstance(resolver, HashResolver)
yield from self._parse_affix(source[:self.prefix_size], resolver, self.prefix)
yield self.infix_index, self.infix_factory.from_bytes(source[self.prefix_size:-self.postfix_size], resolver)
yield from self._parse_affix(source[-self.postfix_size:], resolver, self.postfix)
def _parse_merged(self, source: bytes, resolver: HashResolver) -> _IAuto:
assert isinstance(source, bytes)
assert isinstance(resolver, HashResolver)
reader = BytesIO(source)
inlined: _IList = []
uninlined: _UList = []
for method, size in self.merged:
method(reader.read(size), resolver, inlined, uninlined)
assert not reader.read()
return self.typed(inlined, uninlined)
def from_bytes(self, source: bytes, resolver: HashResolver) -> _IAuto:
assert isinstance(source, bytes)
assert isinstance(resolver, HashResolver)
if self.infix:
return self.typed(list(self._parse_infix(source, resolver)), [])
else:
return self._parse_merged(source, resolver)

View File

@ -0,0 +1,26 @@
from .iatomic import *
__all__ = ('IByte',)
class IByte(IAtomic):
def __init__(self, value: int):
assert isinstance(value, int)
assert value in range(256)
self.value = value
@classmethod
def size(cls) -> int:
return 1
@classmethod
def _from_bytes(cls, source: bytes) -> 'IByte':
assert isinstance(source, bytes)
[value] = source
return IByte(value)
def __bytes__(self):
return bytes([self.value])
def __str__(self):
return f'{self.value:02x}'

View File

@ -0,0 +1,23 @@
import abc
from typing import Any, Callable, Coroutine, Optional, TypeAlias, TypeVar
from rainbowadn.core import *
__all__ = ('Inlining',)
Inlined = TypeVar('Inlined')
_FTuple: TypeAlias = tuple[int, Callable[[bytes, HashResolver], HashPoint], int]
_HTuple: TypeAlias = tuple[int, Callable[[], Coroutine[Any, Any, bytes]]]
class Inlining(RainbowFactory[Inlined], abc.ABC):
def size(self) -> Optional[int]:
raise NotImplementedError
@classmethod
def factory_size(cls, factory: RainbowFactory) -> Optional[int]:
assert isinstance(factory, RainbowFactory)
if isinstance(factory, Inlining):
return factory.size()
else:
return None

View File

@ -0,0 +1,30 @@
from typing import Generic, TypeVar
from rainbowadn.core import *
from .iauto import *
__all__ = ('IPair',)
E0 = TypeVar('E0')
E1 = TypeVar('E1')
class IPair(IAuto, Generic[E0, E1]):
e0: HashPoint[E0]
e1: HashPoint[E1]
def __init__(self, *args):
super().__init__(*args)
self.e0, self.e1 = self.hashpoints()
@classmethod
async def off(cls, e0: HashPoint[E0], e1: HashPoint[E1]) -> 'IPair[E0, E1]':
assert isinstance(e0, HashPoint)
assert isinstance(e1, HashPoint)
return await cls.from_hashpoints(e0, e1)
@classmethod
def of(cls, v0: E0, v1: E1) -> 'IPair[E0, E1]':
assert isinstance(v0, Mentionable)
assert isinstance(v1, Mentionable)
return cls.from_values(v0, v1)

View File

@ -0,0 +1,41 @@
import abc
from typing import Generic, Optional, Type, TypeVar
from rainbowadn.core import *
from .inlining import *
__all__ = ('IStatic', 'IStaticFactory',)
StaticInlined = TypeVar('StaticInlined')
class IStatic(Mentionable, abc.ABC):
@classmethod
def size(cls) -> int:
raise NotImplementedError
@classmethod
def from_bytes(cls: Type[StaticInlined], source: bytes, resolver: HashResolver) -> StaticInlined:
raise NotImplementedError
def __factory__(self: StaticInlined) -> RainbowFactory[StaticInlined]:
return self.factory()
@classmethod
def factory(cls: Type[StaticInlined]) -> RainbowFactory[StaticInlined]:
assert issubclass(cls, IStatic)
return IStaticFactory(cls)
class IStaticFactory(Inlining[StaticInlined], Generic[StaticInlined]):
def __init__(self, cls: Type[StaticInlined]):
assert issubclass(cls, IStatic)
self.cls = cls
def size(self) -> Optional[int]:
return self.cls.size()
def from_bytes(self, source: bytes, resolver: HashResolver) -> StaticInlined:
assert isinstance(source, bytes)
assert isinstance(resolver, HashResolver)
return self.cls.from_bytes(source, resolver)

View File

@ -0,0 +1,22 @@
from rainbowadn.core import *
from .iatomic import *
__all__ = ('IUnit',)
class IUnit(IAtomic):
@classmethod
def size(cls) -> int:
return 0
@classmethod
def _from_bytes(cls, source: bytes) -> 'IUnit':
assert isinstance(source, bytes)
assert_eq(len(source), 0)
return IUnit()
def __bytes__(self):
return b''
def __str__(self):
return f'()'

View File

@ -0,0 +1,71 @@
import unittest
from rainbowadn.atomic import *
from rainbowadn.core import *
from rainbowadn.inlining import *
from .resolvers import *
class TestInlining(unittest.IsolatedAsyncioTestCase):
@classmethod
async def _format(cls, *hps: HashPoint):
result = []
for hp in hps:
result.append(await hash_point_format(hp, 0))
return result
async def test_unit(self):
print(await (IAuto.factory(IUnit.factory(), IUnit.factory()).from_bytes(b'', FailResolver())).str(0))
print(bytes(IAuto.from_values(IUnit(), IUnit())))
async def test_byte(self):
# iB short for IByte
print(await (IAuto.factory(IByte.factory(), IByte.factory()).from_bytes(b'iB', FailResolver())).str(0))
print(bytes(IAuto.from_values(IByte(0x69), IByte(0x42))))
async def test_mixed(self):
print(await (IAuto.factory(IByte.factory(), IUnit.factory()).from_bytes(b'!', FailResolver())).str(0))
print(bytes(IAuto.from_values(IByte(0x21), IUnit())))
async def test_complex_infix(self):
print(b := bytes(IAuto.from_values(IByte(0x21), Integer(0x21), IByte(0x21))))
print(
await IAuto.factory(
IByte.factory(), Integer.factory(), IByte.factory()
).from_bytes(b, FailResolver()).str(0)
)
async def test_complex_reference(self):
dr = default_resolver()
print(
(
b := bytes(
await dr.migrate_resolved(
IAuto.from_values(IByte(0x21), Integer(0x21), Integer(0x21), IByte(0x21))
)
)
).hex()
)
print(
await IAuto.factory(
IByte.factory(), Integer.factory(), Integer.factory(), IByte.factory()
).from_bytes(b, dr).str(0)
)
async def test_pair(self):
with self.subTest('of'):
pair = IPair.of(IUnit(), IUnit())
print(await pair.str(0))
print(bytes(pair).hex())
with self.subTest('off'):
pair = await IPair.off(HashPoint.of(IUnit()), HashPoint.of(IUnit()))
print(await pair.str(0))
print(bytes(pair).hex())
with self.subTest('integer'):
pair = IPair.of(Integer(0x21), IUnit())
print(await pair.str(0))
print(bytes(pair).hex())
with self.subTest('integer 2'):
pair = IPair.of(Integer(0x21), Integer(0x21))
print(await pair.str(0))
print(bytes(pair).hex())