rainbowadn/rainbowadn/testing/test_all.py

203 lines
7.6 KiB
Python

import os
import string
import time
import unittest
from typing import Any
import nacl.signing
from rainbowadn.atomic import *
from rainbowadn.chain import *
from rainbowadn.collection.comparison import *
from rainbowadn.collection.linear import *
from rainbowadn.collection.pair import *
from rainbowadn.collection.trees.binary import *
from rainbowadn.core import *
from rainbowadn.encryption import *
from rainbowadn.instrument import *
from rainbowadn.nullability import *
from rainbowadn.v13 import *
from rainbowadn.wrisbt import *
from .resolvers import *
class TestAll(unittest.IsolatedAsyncioTestCase):
"""examples rather than real tests"""
@classmethod
def dr(cls) -> ExtendableResolver:
dr = DictResolver()
# dr = DelayedResolver(dr, lambda: 0.000)
return dr
async def test_bankchain(self):
set_gather_linear()
with self.subTest('create empty'):
bank: BankChain = BankChain.empty(ReductionChainMetaFactory().loose())
with self.subTest('prepare transactions'):
key_0 = nacl.signing.SigningKey.generate()
transaction_0 = Transaction.make(
[],
[CoinData.of(Subject(key_0.verify_key), 1_000_000)],
[]
)
coin_0, coin_1 = await transaction_0.coins(MINT_CONST, NotNull(HashPoint.of(Subject(key_0.verify_key))))
with self.subTest('add transactions'):
bank = await bank.adds(
[
transaction_0,
Transaction.make(
[coin_1],
[CoinData.of(Subject(nacl.signing.SigningKey.generate().verify_key), 10_000)],
[key_0]
),
]
)
with self.subTest('add empty'):
bank = await bank.adds(
[]
)
print(await bank.reference.str(0))
with self.subTest('verify'):
assert_true(await bank.verify())
with self.subTest('recover'):
bank = BankChain.from_reference(
ReductionChainMetaFactory(), await self.dr().migrate_resolved(bank.reference)
)
set_gather_asyncio()
print('recovering')
print(await bank.reference.str(0))
print('recovered')
with self.subTest('verify'):
assert_true(await bank.verify())
async def test_wrisbt(self):
set_gather_linear()
with self.subTest('setup'):
stoptime = time.process_time()
def measure(message: str) -> float:
nonlocal stoptime
now = time.process_time()
delta = now - stoptime
print(message, delta)
stoptime = now
return delta
n = 5000
keysize = 7
with self.subTest('create empty'):
btree: WrisbtRoot = WrisbtRoot.empty(WrisbtParametres(5, keysize))
measure('init')
with self.subTest('add keys', n=n):
for _ in range(n):
key = os.urandom(keysize)
assert_false(await btree.contains(key))
btree = await btree.add(key)
assert_true(await btree.contains(key))
measure('add')
with self.subTest('save'):
btree = await self.dr().migrate_resolved(btree)
measure('save')
set_gather_asyncio()
with self.subTest('resolve and iterate'):
assert_eq(len(await btree.keys()), n)
print(btree.height)
measure('resolve and iterate')
with self.subTest('resolve and add', n=n):
for _ in range(n):
key = os.urandom(keysize)
assert_false(await btree.contains(key))
btree = await btree.add(key)
assert_true(await btree.contains(key))
print(btree.height)
measure('resolve and add')
async def test_wrisbt_index(self):
set_gather_linear()
with self.subTest('create empty'):
factory: RainbowFactory[Pair[Plain, Plain]] = PairFactory(Plain.factory(), Plain.factory()).loose()
chain: ChainCollectionInterface[Any, Pair[Plain, Plain], WrisbtRoot] = BlockChainFactory(
WrisbtChainProtocol(factory, 2).loose()
).empty().loose()
with self.subTest('fill'):
for _ in range(100):
chain = await chain.add(
HashPoint.of(
Pair(
HashPoint.of(Plain(os.urandom(16))),
HashPoint.of(Plain(os.urandom(16)))
)
)
)
with self.subTest('check'):
set_gather_asyncio()
assert_true(await chain.verify())
with self.subTest('measure height'):
reference = await chain.actual_state()
assert not reference.null()
print((await reference.resolve()).height)
async def test_avl(self):
set_gather_linear()
tree: ActiveBinaryTree[Plain, Integer] = ActiveBinaryTree.empty(
AVL(PlainComparator(Replace())), Plain.factory()
)
for i in range(26):
tree = await tree.add(HashPoint.of(Plain(bytes([ord('A') + i]))))
print(await tree.reference.str(0))
async def test_avl_stress(self):
set_gather_linear()
protocol = AVL(PlainComparator(Replace()))
tree: ActiveBinaryTree[Plain, Integer] = ActiveBinaryTree.empty(
protocol, Plain.factory()
)
for i in range(250):
tree = await tree.add(HashPoint.of(Plain(os.urandom(16))))
print(await AVL.height(tree.protocolized()))
async def test_encryption(self):
set_gather_linear()
encrypt_ctr = Counter(Encrypted, 'encrypt')
with self.subTest('setup'):
key = b'a' * 32
with self.subTest('create empty'):
tree: ActiveBinaryTree[Plain, Integer] = ActiveBinaryTree.empty(
AVL(PlainComparator(Replace())), Plain.factory()
)
with self.subTest('fill'):
for char in string.ascii_uppercase:
tree = await tree.add(HashPoint.of(Plain(char.encode())))
print(await tree.reference.str(0))
with self.subTest('encrypt'):
target = tree.reference
with encrypt_ctr:
target = (await Encrypted.encrypt(target, key)).decrypted
print(encrypt_ctr.counter)
tree = tree.create(target)
print(await tree.reference.str(0))
with self.subTest('alter'):
tree = await tree.add(HashPoint.of(Plain(b'NEWKEY')))
tree = await tree.remove(HashPoint.of(Plain(b'F')))
print(await tree.reference.str(0))
with self.subTest('encrypt and migrate'):
target = tree.reference
with encrypt_ctr:
eeed = await Encrypted.encrypt(target, key)
print(encrypt_ctr.counter)
print(await (await self.dr().migrate_resolved(eeed)).decrypted.str(0))
with self.subTest('re-encrypt'):
new_key = b'b' * 32
target = eeed.decrypted
with encrypt_ctr:
await Encrypted.encrypt(target, new_key)
print(encrypt_ctr.counter)
async def test_tl(self):
root = TLRootFactory(TLRParametres(2, Plain.factory())).empty()
for char in string.ascii_uppercase:
root = await root.add(HashPoint.of(Plain(char.encode())))
print(await root.str(0))
print((await root.node_resolved()).parametres.height)