rainbowadn/rainbowadn/testing/test_bridge.py
2022-07-17 18:47:37 +03:00

166 lines
5.7 KiB
Python

import os
import random
import time
import unittest
from typing import Any
from nacl.signing import SigningKey
from rainbowadn.atomic import *
from rainbowadn.collection.comparison import *
from rainbowadn.collection.linear import *
from rainbowadn.collection.trees.binary import *
from rainbowadn.core import *
from rainbowadn.flow.bridge import *
from rainbowadn.flow.core import *
from rainbowadn.flow.primitive import *
from rainbowadn.flow.sequence import *
from rainbowadn.flow.stacked import *
from rainbowadn.flow13 import *
from rainbowadn.nullability import *
from rainbowadn.v13 import *
class PrintDispatch(SequenceDispatch[HashPoint, None]):
async def on_first(self, element: HashPoint) -> None:
print('first', await hash_point_format(element, 0))
async def on_last(self, element: HashPoint) -> None:
print('last', await hash_point_format(element, 0))
async def on_pair(self, previous: HashPoint, element: HashPoint) -> None:
print('pair', await hash_point_format(previous, 0), await hash_point_format(element, 0))
def loose(self) -> SequenceDispatch[HashPoint, None]:
return self
class NoneReduce(Reduce[None, None]):
def __init__(self):
super().__init__(None)
async def reduce(self, out: None, element: None) -> None:
return None
def merge(self, left: None, right: None) -> None:
return None
class PrintReduce(Reduce[tuple[Nullable[HashPoint], HashPoint], None]):
def __init__(self):
super().__init__(None)
async def reduce(self, out: None, element: tuple[Nullable[HashPoint], HashPoint]) -> None:
nullable_, hashpoint_ = element
print(
'reduce',
'-' if nullable_.null() else await hash_point_format(nullable_.resolve(), 0),
await hash_point_format(hashpoint_, 0)
)
return None
def merge(self, left: None, right: None) -> None:
return None
def loose(self) -> Reduce[tuple[Nullable[HashPoint], HashPoint], None]:
return self
class TestBridge(unittest.IsolatedAsyncioTestCase):
@classmethod
def element_of(cls, stack: Stack[Plain]) -> HashPoint[Plain]:
return stack.element
@classmethod
def element_mapper(cls) -> Mapper[Stack[Plain], HashPoint[Plain]]:
return CallableMapper(cls.element_of)
@classmethod
async def bridge(cls) -> Reducer[SequenceDispatcher[HashPoint[Plain], Any], Any]:
hp: HashPoint[Stack[Plain]] = Stack.off(Plain.factory(), [Plain(b'A'), Plain(b'B'), Plain(b'C')]).hashpoint()
print(await hash_point_format(hp, 0))
bridge = StackBridge(hp).over_elements()
return bridge
async def test_stack_bridge(self):
set_gather_linear()
bridge = await self.bridge()
assert_none_strict(
await bridge.reduce(MapReduce(DispatchMapper(PrintDispatch().loose()).loose(), NoneReduce()).loose())
)
async def test_stacked(self):
set_gather_linear()
bridge = await self.bridge()
assert_none_strict(
await StackedReducer(bridge).loose().reduce(PrintReduce().loose())
)
async def test_iterator(self):
set_gather_linear()
bridge = ListBridge(list(range(13)))
print(list(await bridge.reduce(FlowIterate([]).loose())))
@classmethod
async def abt_of(cls, *plains: bytes) -> ActiveBinaryTree[Plain, Integer]:
abt: ActiveBinaryTree[Plain, Integer] = ActiveBinaryTree.empty(AVL(PlainComparator(Fail())), Plain.factory())
for plain in plains:
abt = await abt.add(HashPoint.of(Plain(plain)))
return abt
async def test_flowstandard(self):
set_gather_linear()
set0 = {os.urandom(8).hex().encode() for _ in range(512)}
abt0: ActiveBinaryTree[Plain, Integer] = await self.abt_of(*set0)
abt1: ActiveBinaryTree[Plain, Integer] = abt0
for _ in range(32):
abt1 = await abt1.add(HashPoint.of(Plain(os.urandom(8).hex().encode())))
fs0 = FlowStandard(abt0.protocolized())
fs1 = FlowStandard(abt1.protocolized())
_t = time.process_time()
for _ in range(16):
await fs0.verify_subset(UnitReducer(fs1))
with self.assertWarns(RuntimeWarning):
for _ in range(16):
with self.assertRaises(ValueError):
await fs1.verify_subset(UnitReducer(fs0))
print('verification time', time.process_time() - _t)
async def test_flow13(self):
set_gather_linear()
key = SigningKey.generate()
subject = Subject(key.verify_key)
transaction = await FlowTransaction.make(
[],
[FlowCoinData.of(subject, 1000)],
[],
)
print(assert_true(await transaction.verify()))
cheque = await FlowCheque.make([transaction])
print(assert_true(await cheque.verify()))
block = await BankBlock.empty().add(cheque)
print(assert_true(await block.verify()))
enable_newline()
print(await block.reference.str(0))
disable_newline()
async def test_13_stress(self):
set_gather_linear()
block: BankBlock = BankBlock.empty()
for _ in range(8):
block = await block.add(
await FlowCheque.make(
[
await FlowTransaction.make(
[],
[
FlowCoinData.of(Subject(SigningKey.generate().verify_key), 0)
for _ in range(random.randint(4, 7))
],
[]
)
for _ in range(random.randint(4, 7))
]
)
)