68 lines
2.5 KiB
Python
68 lines
2.5 KiB
Python
import unittest
|
|
|
|
from rainbowadn.atomic import Plain
|
|
from rainbowadn.collection.linear import Stack
|
|
from rainbowadn.core import HashPoint, assert_none_strict, hash_point_format
|
|
from rainbowadn.flow.bridge import StackBridge
|
|
from rainbowadn.flow.core import MapReduce, Reduce
|
|
from rainbowadn.flow.sequence import DispatchMapper, SequenceDispatch
|
|
from rainbowadn.flow.stacked import StackedReducer
|
|
from rainbowadn.nullability import Nullable
|
|
|
|
|
|
class PrintDispatch(SequenceDispatch[HashPoint, None]):
|
|
async def on_first(self, element: HashPoint) -> None:
|
|
print('first', await hash_point_format(element, 0))
|
|
|
|
async def on_last(self, element: HashPoint) -> None:
|
|
print('last', await hash_point_format(element, 0))
|
|
|
|
async def on_pair(self, previous: HashPoint, element: HashPoint) -> None:
|
|
print('pair', await hash_point_format(previous, 0), await hash_point_format(element, 0))
|
|
|
|
def loose(self) -> SequenceDispatch[HashPoint, None]:
|
|
return self
|
|
|
|
|
|
class NoneReduce(Reduce[None, None]):
|
|
def __init__(self):
|
|
super().__init__(None)
|
|
|
|
async def reduce(self, out: None, element: None) -> None:
|
|
return None
|
|
|
|
|
|
class PrintReduce(Reduce[tuple[Nullable[HashPoint], HashPoint], None]):
|
|
def __init__(self):
|
|
super().__init__(None)
|
|
|
|
async def reduce(self, out: None, element: tuple[Nullable[HashPoint], HashPoint]) -> None:
|
|
nullable_, hashpoint_ = element
|
|
print(
|
|
'reduce',
|
|
'-' if nullable_.null() else await hash_point_format(nullable_.resolve(), 0),
|
|
await hash_point_format(hashpoint_, 0)
|
|
)
|
|
return None
|
|
|
|
def loose(self) -> Reduce[tuple[Nullable[HashPoint], HashPoint], None]:
|
|
return self
|
|
|
|
|
|
class TestBridge(unittest.IsolatedAsyncioTestCase):
|
|
async def test_stack_bridge(self):
|
|
hp: HashPoint[Stack[Plain]] = Stack.off(Plain.factory(), [Plain(b'A'), Plain(b'B'), Plain(b'C')]).hashpoint()
|
|
bridge = StackBridge(hp)
|
|
print(await hash_point_format(hp, 0))
|
|
assert_none_strict(
|
|
await bridge.reduce(MapReduce(DispatchMapper(PrintDispatch().loose()).loose(), NoneReduce()).loose())
|
|
)
|
|
|
|
async def test_stacked(self):
|
|
hp: HashPoint[Stack[Plain]] = Stack.off(Plain.factory(), [Plain(b'A'), Plain(b'B'), Plain(b'C')]).hashpoint()
|
|
bridge = StackBridge(hp)
|
|
print(await hash_point_format(hp, 0))
|
|
assert_none_strict(
|
|
await StackedReducer(bridge.loose()).loose().reduce(PrintReduce().loose())
|
|
)
|