196 lines
8.1 KiB
Python
196 lines
8.1 KiB
Python
import os
|
|
import string
|
|
import time
|
|
import unittest
|
|
from typing import Any
|
|
|
|
import nacl.signing
|
|
|
|
from rainbowadn.chain.blockchain import BlockChainFactory
|
|
from rainbowadn.chain.chaincollectioninterface import ChainCollectionInterface
|
|
from rainbowadn.chain.reduction.reductionchainmetafactory import ReductionChainMetaFactory
|
|
from rainbowadn.core.asserts import assert_eq, assert_false, assert_true
|
|
from rainbowadn.core.hashpoint import HashPoint
|
|
from rainbowadn.core.nullability.notnull import NotNull
|
|
from rainbowadn.core.rainbow_factory import RainbowFactory
|
|
from rainbowadn.data.atomic.integer import Integer
|
|
from rainbowadn.data.atomic.plain import Plain
|
|
from rainbowadn.data.collection.pair import Pair, PairFactory
|
|
from rainbowadn.data.collection.trees.binary.activebinarytree import ActiveBinaryTree
|
|
from rainbowadn.data.collection.trees.binary.avl import AVL
|
|
from rainbowadn.data.collection.trees.comparison.comparator import Replace
|
|
from rainbowadn.data.collection.trees.comparison.plaincomparator import PlainComparator
|
|
from rainbowadn.encryption.encrypted import Encrypted
|
|
from rainbowadn.testing.dictresolver import DictResolver
|
|
from rainbowadn.testing.instrumentation import Counter
|
|
from rainbowadn.v13.algo import MINT_CONST
|
|
from rainbowadn.v13.bankchain import BankChain
|
|
from rainbowadn.v13.subject import Subject
|
|
from rainbowadn.v13.transaction import CoinData, Transaction
|
|
from rainbowadn.wrisbt.wrisbtchainprotocol import WrisbtChainProtocol
|
|
from rainbowadn.wrisbt.wrisbtparametres import WrisbtParametres
|
|
from rainbowadn.wrisbt.wrisbtroot import WrisbtRoot
|
|
|
|
|
|
class TestAll(unittest.IsolatedAsyncioTestCase):
|
|
"""examples rather than real tests"""
|
|
|
|
async def test_bankchain(self):
|
|
with self.subTest('setup'):
|
|
dr = DictResolver()
|
|
with self.subTest('create empty'):
|
|
bank: BankChain = BankChain.empty(ReductionChainMetaFactory().loose())
|
|
with self.subTest('prepare transactions'):
|
|
key_0 = nacl.signing.SigningKey.generate()
|
|
transaction_0 = Transaction.make(
|
|
[],
|
|
[CoinData.of(Subject(key_0.verify_key), 1_000_000)],
|
|
[]
|
|
)
|
|
coin_0, coin_1 = await transaction_0.coins(MINT_CONST, NotNull(HashPoint.of(Subject(key_0.verify_key))))
|
|
with self.subTest('add transactions'):
|
|
bank = await bank.adds(
|
|
[
|
|
transaction_0,
|
|
Transaction.make(
|
|
[coin_1],
|
|
[CoinData.of(Subject(nacl.signing.SigningKey.generate().verify_key), 10_000)],
|
|
[key_0]
|
|
),
|
|
]
|
|
)
|
|
with self.subTest('add empty'):
|
|
bank = await bank.adds(
|
|
[]
|
|
)
|
|
print(bank)
|
|
with self.subTest('verify'):
|
|
assert_true(await bank.verify())
|
|
with self.subTest('recover'):
|
|
await dr.save(HashPoint.of(bank.reference))
|
|
bank = BankChain.from_reference(
|
|
ReductionChainMetaFactory(), await dr.migrate_resolved(bank.reference)
|
|
)
|
|
print(bank)
|
|
with self.subTest('verify'):
|
|
assert_true(await bank.verify())
|
|
|
|
async def test_wrisbt(self):
|
|
with self.subTest('setup'):
|
|
stoptime = time.process_time()
|
|
|
|
def measure(message: str) -> float:
|
|
nonlocal stoptime
|
|
now = time.process_time()
|
|
delta = now - stoptime
|
|
print(message, delta)
|
|
stoptime = now
|
|
return delta
|
|
|
|
dr = DictResolver()
|
|
n = 2500
|
|
keysize = 7
|
|
with self.subTest('create empty'):
|
|
btree: WrisbtRoot = WrisbtRoot.empty(WrisbtParametres(1, keysize))
|
|
measure('init')
|
|
with self.subTest('add keys', n=n):
|
|
for _ in range(n):
|
|
key = os.urandom(keysize)
|
|
assert_false(await btree.contains(key))
|
|
btree = await btree.add(key)
|
|
assert_true(await btree.contains(key))
|
|
measure('add')
|
|
with self.subTest('save'):
|
|
await dr.save(HashPoint.of(btree))
|
|
measure('save')
|
|
with self.subTest('resolve and iterate'):
|
|
btree = await dr.migrate_resolved(btree)
|
|
assert_eq(len(await btree.keys()), n)
|
|
print(btree.height)
|
|
measure('resolve and iterate')
|
|
with self.subTest('resolve and add', n=n):
|
|
for _ in range(n):
|
|
key = os.urandom(keysize)
|
|
assert_false(await btree.contains(key))
|
|
btree = await btree.add(key)
|
|
assert_true(await btree.contains(key))
|
|
print(btree.height)
|
|
measure('resolve and add')
|
|
|
|
async def test_wrisbt_index(self):
|
|
with self.subTest('create empty'):
|
|
factory: RainbowFactory[Pair[Plain, Plain]] = PairFactory(Plain.factory(), Plain.factory()).loose()
|
|
chain: ChainCollectionInterface[Any, Pair[Plain, Plain], WrisbtRoot] = BlockChainFactory(
|
|
WrisbtChainProtocol(factory, 2).loose()
|
|
).empty().loose()
|
|
with self.subTest('fill'):
|
|
for _ in range(1000):
|
|
chain = await chain.add(
|
|
HashPoint.of(
|
|
Pair(
|
|
HashPoint.of(Plain(os.urandom(16))),
|
|
HashPoint.of(Plain(os.urandom(16)))
|
|
)
|
|
)
|
|
)
|
|
with self.subTest('check'):
|
|
assert_true(await chain.verify())
|
|
with self.subTest('measure height'):
|
|
reference = await chain.actual_state()
|
|
assert not reference.null()
|
|
print((await reference.resolve()).height)
|
|
|
|
async def test_avl(self):
|
|
tree: ActiveBinaryTree[Plain, Integer] = ActiveBinaryTree.empty(
|
|
AVL(PlainComparator(Replace())), Plain.factory()
|
|
)
|
|
for i in range(26):
|
|
tree = await tree.add(HashPoint.of(Plain(bytes([ord('A') + i]))))
|
|
print(await tree.reference.str(0))
|
|
|
|
async def test_avl_stress(self):
|
|
tree: ActiveBinaryTree[Plain, Integer] = ActiveBinaryTree.empty(
|
|
AVL(PlainComparator(Replace())), Plain.factory()
|
|
)
|
|
for i in range(250):
|
|
tree = await tree.add(HashPoint.of(Plain(os.urandom(16))))
|
|
print((await (await (await tree.loose().reference.resolve()).key.resolve()).metadata.resolve()).integer)
|
|
|
|
async def test_encryption(self):
|
|
instrumentation = Counter(Encrypted, 'encrypt')
|
|
with self.subTest('setup'):
|
|
key = b'a' * 32
|
|
dr = DictResolver()
|
|
with self.subTest('create empty'):
|
|
tree: ActiveBinaryTree[Plain, Integer] = ActiveBinaryTree.empty(
|
|
AVL(PlainComparator(Replace())), Plain.factory()
|
|
)
|
|
with self.subTest('fill'):
|
|
for char in string.ascii_uppercase:
|
|
tree = await tree.add(HashPoint.of(Plain(char.encode())))
|
|
print(await tree.reference.str(0))
|
|
with self.subTest('encrypt'):
|
|
target = tree.reference
|
|
with instrumentation:
|
|
target = (await Encrypted.encrypt(target, key)).decrypted
|
|
print(instrumentation.counter)
|
|
tree = tree.create(target)
|
|
print(await tree.reference.str(0))
|
|
with self.subTest('alter'):
|
|
tree = await tree.add(HashPoint.of(Plain(b'NEWKEY')))
|
|
tree = await tree.remove(HashPoint.of(Plain(b'F')))
|
|
print(await tree.reference.str(0))
|
|
with self.subTest('encrypt and migrate'):
|
|
target = tree.reference
|
|
with instrumentation:
|
|
eeed = await Encrypted.encrypt(target, key)
|
|
print(instrumentation.counter)
|
|
await dr.save(HashPoint.of(eeed))
|
|
print(await (await dr.migrate_resolved(eeed)).decrypted.str(0))
|
|
with self.subTest('re-encrypt'):
|
|
new_key = b'b' * 32
|
|
target = eeed.decrypted
|
|
with instrumentation:
|
|
await Encrypted.encrypt(target, new_key)
|
|
print(instrumentation.counter)
|