187 lines
6.4 KiB
Python
187 lines
6.4 KiB
Python
from typing import Generic, Optional, TypeVar
|
|
|
|
from rainbowadn.atomic import *
|
|
from rainbowadn.collection.comparison import *
|
|
from rainbowadn.core import *
|
|
|
|
from .actions import *
|
|
from .core import *
|
|
|
|
__all__ = ('AVL',)
|
|
|
|
ActiveKeyType = TypeVar('ActiveKeyType', bound=Mentionable)
|
|
TreeType = TypeVar('TreeType')
|
|
|
|
|
|
class AVL(BinaryBalancing[ActiveKeyType, Integer, TreeType]):
|
|
def __init__(self, comparator_: Comparator[ActiveKeyType]):
|
|
super().__init__(comparator_, HashPoint.of(Integer(0)))
|
|
|
|
async def metadata(
|
|
self,
|
|
treel: TreeType,
|
|
treer: TreeType,
|
|
key: HashPoint[ActiveKeyType],
|
|
creation: BinaryCreation[ActiveKeyType, Integer, TreeType]
|
|
) -> HashPoint[Integer]:
|
|
assert isinstance(key, HashPoint)
|
|
assert isinstance(creation, BinaryCreation)
|
|
height_l, height_r = await gather(
|
|
self.height(BinaryProtocolized(creation, treel)),
|
|
self.height(BinaryProtocolized(creation, treer)),
|
|
)
|
|
assert isinstance(height_l, int)
|
|
assert isinstance(height_r, int)
|
|
return HashPoint.of(
|
|
Integer(
|
|
1
|
|
+
|
|
max(
|
|
height_l,
|
|
height_r,
|
|
)
|
|
)
|
|
)
|
|
|
|
@classmethod
|
|
async def height(
|
|
cls,
|
|
protocolized: BinaryProtocolized[ActiveKeyType, Integer, TreeType],
|
|
) -> int:
|
|
assert isinstance(protocolized, BinaryProtocolized)
|
|
return await HeightAction().on(protocolized)
|
|
|
|
@classmethod
|
|
async def full_height(
|
|
cls,
|
|
protocolized: BinaryProtocolized[ActiveKeyType, Integer, TreeType],
|
|
) -> tuple[int, Optional[BinarySplit[ActiveKeyType, Integer, TreeType]]]:
|
|
assert isinstance(protocolized, BinaryProtocolized)
|
|
return await FullHeightAction().on(protocolized)
|
|
|
|
async def balance(
|
|
self,
|
|
protocolized: BinaryProtocolized[ActiveKeyType, Integer, TreeType],
|
|
) -> TreeType:
|
|
assert isinstance(protocolized, BinaryProtocolized)
|
|
return await BalanceAction().on(protocolized)
|
|
|
|
|
|
class HeightAction(
|
|
BinaryAction[ActiveKeyType, Integer, TreeType, int],
|
|
Generic[ActiveKeyType, TreeType]
|
|
):
|
|
async def on_null(
|
|
self,
|
|
protocolized: BinaryProtocolized[ActiveKeyType, Integer, TreeType],
|
|
) -> int:
|
|
assert isinstance(protocolized, BinaryProtocolized)
|
|
return 0
|
|
|
|
async def on_split(
|
|
self,
|
|
case: ProtocolizedBinarySplit[ActiveKeyType, Integer, TreeType]
|
|
) -> int:
|
|
assert isinstance(case, ProtocolizedBinarySplit)
|
|
metadata: Integer = await case.split.metadata.resolve()
|
|
assert isinstance(metadata, Integer)
|
|
return metadata.integer
|
|
|
|
|
|
class FullHeightAction(
|
|
BinaryAction[
|
|
ActiveKeyType,
|
|
Integer,
|
|
TreeType,
|
|
tuple[int, Optional[BinarySplit[ActiveKeyType, Integer, TreeType]]]
|
|
],
|
|
Generic[ActiveKeyType, TreeType]
|
|
):
|
|
async def on_null(
|
|
self,
|
|
protocolized: BinaryProtocolized[ActiveKeyType, Integer, TreeType],
|
|
) -> tuple[int, Optional[BinarySplit[ActiveKeyType, Integer, TreeType]]]:
|
|
assert isinstance(protocolized, BinaryProtocolized)
|
|
return 0, None
|
|
|
|
async def on_split(
|
|
self,
|
|
case: ProtocolizedBinarySplit[ActiveKeyType, Integer, TreeType]
|
|
) -> tuple[int, Optional[BinarySplit[ActiveKeyType, Integer, TreeType]]]:
|
|
assert isinstance(case, ProtocolizedBinarySplit)
|
|
metadata: Integer = await case.split.metadata.resolve()
|
|
assert isinstance(metadata, Integer)
|
|
return metadata.integer, case.split
|
|
|
|
|
|
class BalanceAction(
|
|
BinaryAction[ActiveKeyType, Integer, TreeType, TreeType],
|
|
Generic[ActiveKeyType, TreeType]
|
|
):
|
|
async def on_null(
|
|
self,
|
|
protocolized: BinaryProtocolized[ActiveKeyType, Integer, TreeType],
|
|
) -> TreeType:
|
|
assert isinstance(protocolized, BinaryProtocolized)
|
|
return protocolized.tree
|
|
|
|
async def on_split(
|
|
self,
|
|
case: ProtocolizedBinarySplit[ActiveKeyType, Integer, TreeType]
|
|
) -> TreeType:
|
|
assert isinstance(case, ProtocolizedBinarySplit)
|
|
(height_l, splitl), (height_r, splitr) = await gather(
|
|
AVL.full_height(case.protocolizedl()),
|
|
AVL.full_height(case.protocolizedr()),
|
|
)
|
|
assert isinstance(height_l, int)
|
|
assert isinstance(height_r, int)
|
|
delta = height_l - height_r
|
|
assert isinstance(delta, int)
|
|
if delta < -1:
|
|
splitr: BinarySplit[ActiveKeyType, Integer, TreeType] | None
|
|
assert isinstance(splitr, BinarySplit)
|
|
return await self.on_symmetric(InnerOuter(case.protocol), case.split.treel, case.split.key, splitr)
|
|
elif delta > 1:
|
|
splitl: BinarySplit[ActiveKeyType, Integer, TreeType] | None
|
|
assert isinstance(splitl, BinarySplit)
|
|
return await self.on_symmetric(OuterInner(case.protocol), case.split.treer, case.split.key, splitl)
|
|
else:
|
|
return case.tree
|
|
|
|
@classmethod
|
|
async def on_symmetric(
|
|
cls,
|
|
symmetry: Symmetric[ActiveKeyType, Integer, TreeType],
|
|
treei: TreeType,
|
|
key: HashPoint[ActiveKeyType],
|
|
splito: BinarySplit[ActiveKeyType, Integer, TreeType],
|
|
) -> TreeType:
|
|
assert isinstance(symmetry, Symmetric)
|
|
assert isinstance(key, HashPoint)
|
|
assert isinstance(splito, BinarySplit)
|
|
(height_oi, splitoi), height_oo = await gather(
|
|
AVL.full_height(symmetry.protocolizedi(splito)),
|
|
AVL.height(symmetry.protocolizedo(splito)),
|
|
)
|
|
assert isinstance(height_oi, int)
|
|
assert isinstance(height_oo, int)
|
|
if height_oi > height_oo:
|
|
splitoi: BinarySplit[ActiveKeyType, Integer, TreeType] | None
|
|
assert isinstance(splitoi, BinarySplit)
|
|
inner, outer = await gather(
|
|
symmetry.tree(treei, symmetry.inner(splitoi), key),
|
|
symmetry.tree(symmetry.outer(splitoi), symmetry.outer(splito), splito.key),
|
|
)
|
|
return await symmetry.tree(
|
|
inner,
|
|
outer,
|
|
splitoi.key
|
|
)
|
|
else:
|
|
return await symmetry.tree(
|
|
await symmetry.tree(treei, symmetry.inner(splito), key),
|
|
symmetry.outer(splito),
|
|
splito.key
|
|
)
|