rainbowadn/rainbowadn/wrisbt/weakreferenceindexsetbtree.py
2022-06-23 00:51:43 +03:00

317 lines
11 KiB
Python

import bisect
from typing import AsyncIterable, Iterable, Sequence
from rainbowadn.core import *
from .wrisbtparametres import WrisbtParametres
__all__ = ('WeakReferenceIndexSetBTree', 'WrisbtFactory',)
class WeakReferenceIndexSetBTree(RecursiveMentionable):
def __init__(
self,
source: bytes,
height: int,
parametres: WrisbtParametres,
root: bool,
cache: tuple[MetaOrigin['WeakReferenceIndexSetBTree'], ...]
):
assert isinstance(source, bytes)
assert isinstance(height, int)
assert isinstance(parametres, WrisbtParametres)
assert isinstance(root, bool)
assert isinstance(cache, tuple)
self.length = len(source)
self.source = source
assert height >= 0
self.height = height
self.leaf = height == 0
self.empty = not self.source
if self.empty:
assert self.leaf
if self.leaf:
self.keys = self.length // parametres.keysize
self.children = 0
else:
self.keys = self.length // (parametres.keysize + HashPoint.HASH_LENGTH)
self.children = self.keys + 1
assert_eq(self.length, self.keys * parametres.keysize + self.children * HashPoint.HASH_LENGTH)
self.parametres = parametres
self.keymin = parametres.keymin
self.keysize = parametres.keysize
self.root = root
if not root:
assert self.keymin <= self.keys
assert self.keys <= 2 * self.keymin + 1
self.keyend = self.keys * self.keysize
assert_eq(len(cache), self.children)
self.cache = cache
def full(self) -> bool:
return self.keys == 2 * self.keymin + 1
def bytes_no(self, index: int, start: int, size: int) -> bytes:
assert isinstance(index, int)
assert isinstance(start, int)
assert isinstance(size, int)
assert 0 <= index < self.length
return self.source[start + size * index:start + size * (index + 1)]
def key_no(self, index: int) -> bytes:
assert isinstance(index, int)
assert 0 <= index < self.keys
return self.bytes_no(index, 0, self.keysize)
def cached_no(self, index: int) -> MetaOrigin['WeakReferenceIndexSetBTree']:
assert isinstance(index, int)
assert 0 <= index < self.children
assert not self.leaf
cached: MetaOrigin[WeakReferenceIndexSetBTree] = self.cache[index]
assert isinstance(cached, MetaOrigin)
return cached
def child_no(self, index: int) -> HashPoint['WeakReferenceIndexSetBTree']:
assert isinstance(index, int)
assert 0 <= index < self.children
assert not self.leaf
return self.cached_no(index).hash_point(
WrisbtFactory(self.height - 1, self.parametres, False),
self.bytes_no(index, self.keyend, HashPoint.HASH_LENGTH)
)
async def child_resolved_no(self, index: int) -> 'WeakReferenceIndexSetBTree':
assert isinstance(index, int)
assert 0 <= index < self.children
assert not self.leaf
return await self.child_no(index).resolve()
def balanced(self) -> bool:
return self.keys <= 2 * self.keymin
def points(self) -> Iterable[HashPoint]:
assert self.balanced()
if self.leaf:
return []
else:
return [self.child_no(child_index) for child_index in range(self.children)]
def __bytes__(self):
assert self.balanced()
return self.source
def __topology_hash__(self) -> bytes:
return HashPoint.hash(self.source[self.keyend:])
def __factory__(self) -> RainbowFactory['WeakReferenceIndexSetBTree']:
assert self.balanced()
return WrisbtFactory(self.height, self.parametres, self.root)
def range(self, left: int, right: int) -> 'WeakReferenceIndexSetBTree':
assert isinstance(left, int)
assert isinstance(right, int)
assert 0 <= left < right <= self.keys
hl: int = HashPoint.HASH_LENGTH
assert isinstance(hl, int)
if self.leaf:
return WeakReferenceIndexSetBTree(
self.source[self.keysize * left:self.keysize * right],
0,
self.parametres,
False,
()
)
else:
keybytes: bytes = self.source[self.keysize * left:self.keysize * right]
assert isinstance(keybytes, bytes)
childbytes: bytes = self.source[self.keyend + hl * left:self.keyend + hl * (right + 1)]
assert isinstance(childbytes, bytes)
return WeakReferenceIndexSetBTree(
keybytes + childbytes,
self.height,
self.parametres,
False,
self.cache[left:right + 1],
)
def split(self) -> tuple['WeakReferenceIndexSetBTree', bytes, 'WeakReferenceIndexSetBTree']:
assert self.full()
return (
self.range(0, self.keymin),
self.key_no(self.keymin),
self.range(self.keymin + 1, 2 * self.keymin + 1),
)
async def str(self, tab: int) -> str:
assert isinstance(tab, int)
formatted = f'{self.height}' \
f'{tabulate(tab)}('
for key_index in range(self.keys):
formatted += f'{tabulate(tab + 1)}{self.key_no(key_index).hex()}'
formatted += ''.join(
f'{tabulate(tab + 1)}{child_formatted}' for child_formatted in (
await gather(
hash_point_format(self.child_no(child_index), tab + 1) for child_index in range(self.children)
)
)
)
return f'{formatted}' \
f'{tabulate(tab)})'
async def contains(self, key: bytes) -> bool:
assert isinstance(key, bytes)
assert_eq(len(key), self.keysize)
assert self.balanced()
index: int = bisect.bisect_left(KeyView(self), key)
assert isinstance(index, int)
if index < self.keys and self.key_no(index) == key:
return True
if index < self.keys:
assert key < self.key_no(index)
if index > 0:
assert key > self.key_no(index - 1)
if self.leaf:
return False
child: WeakReferenceIndexSetBTree = await self.child_resolved_no(index)
assert isinstance(child, WeakReferenceIndexSetBTree)
return await child.contains(key)
async def add(self, key: bytes) -> 'WeakReferenceIndexSetBTree':
assert isinstance(key, bytes)
assert_eq(len(key), self.keysize)
assert self.balanced()
index: int = bisect.bisect_left(KeyView(self), key)
assert isinstance(index, int)
if index < self.keys and self.key_no(index) == key:
return self
if index < self.keys:
assert key < self.key_no(index)
if index > 0:
assert key > self.key_no(index - 1)
if self.leaf:
return WeakReferenceIndexSetBTree(
self.source[:self.keysize * index] + key + self.source[self.keysize * index:],
self.height,
self.parametres,
self.root,
()
)
child: WeakReferenceIndexSetBTree = await self.child_resolved_no(index)
assert isinstance(child, WeakReferenceIndexSetBTree)
child: WeakReferenceIndexSetBTree = await child.add(key)
assert isinstance(child, WeakReferenceIndexSetBTree)
if child.full():
left, middle, right = child.split()
assert isinstance(left, WeakReferenceIndexSetBTree)
assert isinstance(middle, bytes)
assert isinstance(right, WeakReferenceIndexSetBTree)
return WeakReferenceIndexSetBTree(
(
self.source[:self.keysize * index]
+
middle
+
self.source[self.keysize * index:self.keyend + HashPoint.HASH_LENGTH * index]
+
bytes(HashPoint.of(left))
+
bytes(HashPoint.of(right))
+
self.source[self.keyend + HashPoint.HASH_LENGTH * (index + 1):]
),
self.height,
self.parametres,
self.root,
(
self.cache[:index]
+
(LocalMetaOrigin(LocalOrigin(left)), LocalMetaOrigin(LocalOrigin(right)))
+
self.cache[index + 1:]
)
)
else:
return WeakReferenceIndexSetBTree(
(
self.source[:self.keyend + HashPoint.HASH_LENGTH * index]
+
bytes(HashPoint.of(child))
+
self.source[self.keyend + HashPoint.HASH_LENGTH * (index + 1):]
),
self.height,
self.parametres,
self.root,
(
self.cache[:index]
+
(LocalMetaOrigin(LocalOrigin(child)),)
+
self.cache[index + 1:]
)
)
async def iter_keys(self) -> AsyncIterable[bytes]:
if self.leaf:
for key_index in range(self.keys):
yield self.key_no(key_index)
else:
for index in range(self.keys * 2 + 1):
real_index, mode = divmod(index, 2)
if mode:
yield self.key_no(real_index)
else:
async for key in (await self.child_resolved_no(real_index)).iter_keys():
yield key
class KeyView(Sequence[WeakReferenceIndexSetBTree]):
def __init__(self, wrisbt: WeakReferenceIndexSetBTree):
assert isinstance(wrisbt, WeakReferenceIndexSetBTree)
self.wrisbt = wrisbt
def __len__(self):
return self.wrisbt.keys
def __getitem__(self, index: int) -> bytes:
return self.wrisbt.key_no(index)
class WrisbtFactory(RainbowFactory[WeakReferenceIndexSetBTree]):
def __init__(self, height: int, parametres: WrisbtParametres, root: bool):
assert isinstance(height, int)
assert isinstance(parametres, WrisbtParametres)
assert isinstance(root, bool)
assert height >= 0
self.height = height
self.parametres = parametres
self.root = root
def from_bytes(self, source: bytes, resolver: HashResolver) -> WeakReferenceIndexSetBTree:
assert isinstance(source, bytes)
assert isinstance(resolver, HashResolver)
return WeakReferenceIndexSetBTree(
source,
self.height,
self.parametres,
self.root,
(
ResolverMetaOrigin(resolver),
) * (len(source) // (HashPoint.HASH_LENGTH + self.parametres.keysize) + 1) if self.height else ()
)