rainbowadn/rainbowadn/collection/trees/binary/avl.py
2022-09-11 16:26:58 +03:00

186 lines
6.4 KiB
Python

from typing import Generic, Optional, TypeVar
from rainbowadn.atomic import *
from rainbowadn.collection.comparison import *
from rainbowadn.core import *
from .actions import *
from .core import *
__all__ = ('AVL',)
ActiveKeyType = TypeVar('ActiveKeyType')
TreeType = TypeVar('TreeType')
class AVL(BinaryBalancing[ActiveKeyType, Integer, TreeType]):
def __init__(self, comparator_: Comparator[ActiveKeyType]):
super().__init__(comparator_, HashPoint.of(Integer(0)))
async def metadata(
self,
treel: TreeType,
treer: TreeType,
key: HashPoint[ActiveKeyType],
creation: BinaryCreation[ActiveKeyType, Integer, TreeType]
) -> HashPoint[Integer]:
assert isinstance(key, HashPoint)
assert isinstance(creation, BinaryCreation)
height_l, height_r = await gather(
self.height(BinaryProtocolized(creation, treel)),
self.height(BinaryProtocolized(creation, treer)),
)
assert isinstance(height_l, int)
assert isinstance(height_r, int)
return HashPoint.of(
Integer(
1
+
max(
height_l,
height_r,
)
)
)
@classmethod
async def height(
cls,
protocolized: BinaryProtocolized[ActiveKeyType, Integer, TreeType],
) -> int:
assert isinstance(protocolized, BinaryProtocolized)
return await HeightAction().on(protocolized)
@classmethod
async def full_height(
cls,
protocolized: BinaryProtocolized[ActiveKeyType, Integer, TreeType],
) -> tuple[int, Optional[BinarySplit[ActiveKeyType, Integer, TreeType]]]:
assert isinstance(protocolized, BinaryProtocolized)
return await FullHeightAction().on(protocolized)
async def balance(
self,
protocolized: BinaryProtocolized[ActiveKeyType, Integer, TreeType],
) -> TreeType:
assert isinstance(protocolized, BinaryProtocolized)
return await BalanceAction().on(protocolized)
class HeightAction(
BinaryAction[ActiveKeyType, Integer, TreeType, int],
Generic[ActiveKeyType, TreeType]
):
async def on_null(
self,
protocolized: BinaryProtocolized[ActiveKeyType, Integer, TreeType],
) -> int:
assert isinstance(protocolized, BinaryProtocolized)
return 0
async def on_split(
self,
case: ProtocolizedBinarySplit[ActiveKeyType, Integer, TreeType]
) -> int:
assert isinstance(case, ProtocolizedBinarySplit)
metadata: Integer = await case.split.metadata.resolve()
assert isinstance(metadata, Integer)
return metadata.integer
class FullHeightAction(
BinaryAction[
ActiveKeyType,
Integer,
TreeType,
tuple[int, Optional[BinarySplit[ActiveKeyType, Integer, TreeType]]]
],
Generic[ActiveKeyType, TreeType]
):
async def on_null(
self,
protocolized: BinaryProtocolized[ActiveKeyType, Integer, TreeType],
) -> tuple[int, Optional[BinarySplit[ActiveKeyType, Integer, TreeType]]]:
assert isinstance(protocolized, BinaryProtocolized)
return 0, None
async def on_split(
self,
case: ProtocolizedBinarySplit[ActiveKeyType, Integer, TreeType]
) -> tuple[int, Optional[BinarySplit[ActiveKeyType, Integer, TreeType]]]:
assert isinstance(case, ProtocolizedBinarySplit)
metadata: Integer = await case.split.metadata.resolve()
assert isinstance(metadata, Integer)
return metadata.integer, case.split
class BalanceAction(
BinaryAction[ActiveKeyType, Integer, TreeType, TreeType],
Generic[ActiveKeyType, TreeType]
):
async def on_null(
self,
protocolized: BinaryProtocolized[ActiveKeyType, Integer, TreeType],
) -> TreeType:
assert isinstance(protocolized, BinaryProtocolized)
return protocolized.tree
async def on_split(
self,
case: ProtocolizedBinarySplit[ActiveKeyType, Integer, TreeType]
) -> TreeType:
assert isinstance(case, ProtocolizedBinarySplit)
(height_l, splitl), (height_r, splitr) = await gather(
AVL.full_height(case.protocolizedl()),
AVL.full_height(case.protocolizedr()),
)
assert isinstance(height_l, int)
assert isinstance(height_r, int)
delta = height_l - height_r
assert isinstance(delta, int)
if delta < -1:
splitr: BinarySplit[ActiveKeyType, Integer, TreeType]
assert isinstance(splitr, BinarySplit)
return await self.on_symmetric(InnerOuter(case.protocol), case.split.treel, case.split.key, splitr)
elif delta > 1:
splitl: BinarySplit[ActiveKeyType, Integer, TreeType]
assert isinstance(splitl, BinarySplit)
return await self.on_symmetric(OuterInner(case.protocol), case.split.treer, case.split.key, splitl)
else:
return case.tree
@classmethod
async def on_symmetric(
cls,
symmetry: Symmetric[ActiveKeyType, Integer, TreeType],
treei: TreeType,
key: HashPoint[ActiveKeyType],
splito: BinarySplit[ActiveKeyType, Integer, TreeType],
) -> TreeType:
assert isinstance(symmetry, Symmetric)
assert isinstance(key, HashPoint)
assert isinstance(splito, BinarySplit)
(height_oi, splitoi), height_oo = await gather(
AVL.full_height(symmetry.protocolizedi(splito)),
AVL.height(symmetry.protocolizedo(splito)),
)
assert isinstance(height_oi, int)
assert isinstance(height_oo, int)
if height_oi > height_oo:
splitoi: BinarySplit[ActiveKeyType, Integer, TreeType]
assert isinstance(splitoi, BinarySplit)
inner, outer = await gather(
symmetry.tree(treei, symmetry.inner(splitoi), key),
symmetry.tree(symmetry.outer(splitoi), symmetry.outer(splito), splito.key),
)
return await symmetry.tree(
inner,
outer,
splitoi.key
)
else:
return await symmetry.tree(
await symmetry.tree(treei, symmetry.inner(splito), key),
symmetry.outer(splito),
splito.key
)