rainbowadn/rainbowadn/testing/test_trees.py
2022-07-13 04:16:15 +03:00

99 lines
3.8 KiB
Python

import os
import time
import unittest
from rainbowadn.atomic import *
from rainbowadn.collection.comparison import *
from rainbowadn.collection.trees.binary import *
from rainbowadn.core import *
from rainbowadn.testing.resolvers import *
from rainbowadn.wrisbt import *
class TestTrees(unittest.IsolatedAsyncioTestCase):
async def test_wrisbt(self):
set_gather_linear()
with self.subTest('setup'):
stoptime = time.process_time()
def measure(message: str) -> float:
nonlocal stoptime
now = time.process_time()
delta = now - stoptime
print(message, delta)
stoptime = now
return delta
n = 5000
keysize = 7
with self.subTest('create empty'):
btree: WrisbtRoot = WrisbtRoot.empty(WrisbtParametres(5, keysize))
measure('init')
with self.subTest('add keys', n=n):
for _ in range(n):
key = os.urandom(keysize)
assert_false(await btree.contains(key))
btree = await btree.add(key)
assert_true(await btree.contains(key))
measure('add')
with self.subTest('save'):
btree = await default_resolver().migrate_resolved(btree)
measure('save')
set_gather_asyncio()
with self.subTest('resolve and iterate'):
assert_eq(len(await btree.keys()), n)
print(btree.height)
measure('resolve and iterate')
with self.subTest('resolve and add', n=n):
for _ in range(n):
key = os.urandom(keysize)
assert_false(await btree.contains(key))
btree = await btree.add(key)
assert_true(await btree.contains(key))
print(btree.height)
measure('resolve and add')
async def test_avl(self):
set_gather_linear()
tree: ActiveBinaryTree[Plain, Integer] = ActiveBinaryTree.empty(
AVL(PlainComparator(Replace())), Plain.factory()
)
for i in range(26):
tree = await tree.add(HashPoint.of(Plain(bytes([ord('A') + i]))))
print(await tree.reference.str(0))
async def test_avl_stress(self):
set_gather_linear()
protocol = AVL(PlainComparator(Replace()))
tree: ActiveBinaryTree[Plain, Integer] = ActiveBinaryTree.empty(
protocol, Plain.factory()
)
for i in range(250):
tree = await tree.add(HashPoint.of(Plain(os.urandom(16))))
print(await AVL.height(tree.protocolized()))
async def test_split(self):
set_gather_linear()
protocol = AVL(PlainComparator(Replace()))
tree: ActiveBinaryTree[Plain, Integer] = ActiveBinaryTree.empty(
protocol, Plain.factory()
)
for i in range(26):
tree = await tree.add(HashPoint.of(Plain(bytes([ord('A') + i]))))
treel, treer = await tree.split(HashPoint.of(Plain(b'J')))
print(await treel.reference.str(0), ' << split >> ', await treer.reference.str(0))
async def test_union(self):
set_gather_linear()
protocol = AVL(PlainComparator(Replace()))
tree0: ActiveBinaryTree[Plain, Integer] = ActiveBinaryTree.empty(protocol, Plain.factory())
for i in range(10):
tree0 = await tree0.add(HashPoint.of(Plain(os.urandom(2).hex().encode())))
print(await tree0.reference.str(0))
tree1: ActiveBinaryTree[Plain, Integer] = ActiveBinaryTree.empty(protocol, Plain.factory())
for i in range(10):
tree1 = await tree1.add(HashPoint.of(Plain(os.urandom(2).hex().encode())))
print(await tree1.reference.str(0))
tree = await tree0.union(tree1)
print(await tree.reference.str(0))