rainbowadn/rainbowadn/wrisbt/wrisbtroot.py
2022-05-19 03:24:50 +03:00

126 lines
5.1 KiB
Python

from typing import Iterable, Optional
from rainbowadn.data.atomic.integer import Integer
from rainbowadn.hashing.hash_point_format import hash_point_format, tabulate
from rainbowadn.hashing.hashmentionable import HashMentionable
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.rainbow_factory import RainbowFactory
from rainbowadn.hashing.recursivementionable import RecursiveMentionable
from rainbowadn.wrisbt.weakreferenceindexsetbtree import WeakReferenceIndexSetBTree, WrisbtFactory
__all__ = ('WrisbtRoot', 'WrisbtRootFactory',)
from rainbowadn.wrisbt.wrisbtparametres import WrisbtParametres
class WrisbtRoot(RecursiveMentionable):
def __init__(self, root: HashPoint[WeakReferenceIndexSetBTree], height: int, parametres: WrisbtParametres):
assert isinstance(root, HashPoint)
assert isinstance(height, int)
assert isinstance(parametres, WrisbtParametres)
self.root = root
self.height = height
self.parametres = parametres
def points(self) -> Iterable[HashPoint]:
return [self.root]
def __bytes__(self):
return bytes(self.root) + bytes(Integer(self.height))
def __factory__(self) -> RainbowFactory['WrisbtRoot']:
return WrisbtRootFactory(self.parametres)
@classmethod
def empty(cls, parametres: WrisbtParametres) -> 'WrisbtRoot':
assert isinstance(parametres, WrisbtParametres)
return WrisbtRoot(HashPoint.of(WeakReferenceIndexSetBTree(b'', 0, parametres, True, ())), 0, parametres)
def str(self, resolver: HashResolver, tab: int) -> str:
assert isinstance(resolver, HashResolver)
assert isinstance(tab, int)
return f'(root)' \
f'{tabulate(tab)}{self.height}' \
f'{tabulate(tab)}{hash_point_format(self.root, resolver, tab)}'
def contains(self, resolver: HashResolver, key: bytes) -> bool:
assert isinstance(resolver, HashResolver)
assert isinstance(key, bytes)
assert len(key) == self.parametres.keysize
root: WeakReferenceIndexSetBTree = resolver.resolve(self.root)
assert isinstance(root, WeakReferenceIndexSetBTree)
return root.contains(resolver, key)
def add(self, resolver: HashResolver, key: bytes) -> 'WrisbtRoot':
assert isinstance(resolver, HashResolver)
assert isinstance(key, bytes)
assert len(key) == self.parametres.keysize
root: WeakReferenceIndexSetBTree = resolver.resolve(self.root).add(resolver, key)
assert isinstance(root, WeakReferenceIndexSetBTree)
if root.full():
left, middle, right = root.split()
assert isinstance(left, WeakReferenceIndexSetBTree)
assert isinstance(middle, bytes)
assert isinstance(right, WeakReferenceIndexSetBTree)
root = WeakReferenceIndexSetBTree(
middle + bytes(HashPoint.of(left)) + bytes(HashPoint.of(right)),
root.height + 1,
root.parametres,
True,
(NotNull(left), NotNull(right))
)
assert isinstance(root, WeakReferenceIndexSetBTree)
return self.of(root)
@classmethod
def of(cls, root: WeakReferenceIndexSetBTree) -> 'WrisbtRoot':
assert isinstance(root, WeakReferenceIndexSetBTree)
return cls(HashPoint.of(root), root.height, root.parametres)
def keys(self, resolver: HashResolver) -> list[bytes]:
return list(resolver.resolve(self.root).iter_keys(resolver))
def index(
self, resolver: HashResolver, target: HashPoint, exclude: Optional['WrisbtRoot']
) -> 'WrisbtRoot':
assert isinstance(resolver, HashResolver)
assert isinstance(target, HashPoint)
if exclude is None:
exclude = self.empty(self.parametres)
assert isinstance(exclude, WrisbtRoot)
key: bytes = target.point
assert isinstance(key, bytes)
if exclude.contains(resolver, key) or self.contains(resolver, key):
return self
tree = self
value: HashMentionable = resolver.resolve(target)
assert isinstance(value, HashMentionable)
if isinstance(value, RecursiveMentionable):
for hash_point in value.points():
tree = tree.index(resolver, hash_point, exclude)
tree = tree.add(resolver, key)
return tree
class WrisbtRootFactory(RainbowFactory[WrisbtRoot]):
def __init__(self, parametres: WrisbtParametres):
assert isinstance(parametres, WrisbtParametres)
self.parametres = parametres
def from_bytes(self, source: bytes) -> WrisbtRoot:
assert isinstance(source, bytes)
height: int = Integer.from_bytes(source[HashPoint.HASH_LENGTH:]).integer
assert isinstance(height, int)
return WrisbtRoot(
HashPoint(WrisbtFactory(height, self.parametres, True), source[:HashPoint.HASH_LENGTH], Null()),
height,
self.parametres
)