initial commit

This commit is contained in:
AF 2022-05-08 21:56:17 +03:00
commit 244997690d
94 changed files with 3922 additions and 0 deletions

213
.gitignore vendored Normal file
View File

@ -0,0 +1,213 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/artifacts
# .idea/compiler.xml
# .idea/jarRepositories.xml
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser

8
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@ -0,0 +1,5 @@
<component name="ProjectCodeStyleConfiguration">
<state>
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
</state>
</component>

View File

@ -0,0 +1,20 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="DuplicatedCode" enabled="false" level="WEAK WARNING" enabled_by_default="false" />
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="1">
<item index="0" class="java.lang.String" itemvalue="nacl" />
</list>
</value>
</option>
</inspection_tool>
<inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false">
<option name="processCode" value="true" />
<option name="processLiterals" value="true" />
<option name="processComments" value="true" />
</inspection_tool>
</profile>
</component>

4
.idea/misc.xml Normal file
View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (rainbowadn)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/rainbowadn.iml" filepath="$PROJECT_DIR$/.idea/rainbowadn.iml" />
</modules>
</component>
</project>

10
.idea/rainbowadn.iml Normal file
View File

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

74
main.py Normal file
View File

@ -0,0 +1,74 @@
from collections import OrderedDict
from typing import MutableMapping
import nacl.signing
from rainbowadn.chain.reduction.reductionchainmetafactory import ReductionChainMetaFactory
from rainbowadn.hashing.hashmentionable import HashMentionable
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.recursivementionable import RecursiveMentionable
from rainbowadn.v13.algo import MINT_CONST
from rainbowadn.v13.bankchain import BankChain
from rainbowadn.v13.subject import Subject
from rainbowadn.v13.transaction import CoinData, Transaction
class DumbResolver(HashResolver):
def __init__(self):
self.table: MutableMapping[bytes, bytes] = OrderedDict()
def _resolve(self, point: bytes) -> bytes:
assert isinstance(point, bytes)
return self.table[point]
def save(self, hash_point: HashPoint) -> None:
assert isinstance(hash_point, HashPoint)
if hash_point.point in self.table:
pass
elif isinstance(hash_point.value, NotNull):
value: HashMentionable = hash_point.value.value
self.table[hash_point.point] = HashPoint.bytes_of_mentioned(value)
if isinstance(value, RecursiveMentionable):
for hash_point in value.points():
self.save(hash_point)
else:
raise TypeError
def main():
dr = DumbResolver()
bank = BankChain.empty(ReductionChainMetaFactory(), dr)
key_0 = nacl.signing.SigningKey.generate()
transaction_0 = Transaction.make(
[],
[CoinData.of(Subject(key_0.verify_key), 1_000_000)],
[]
)
coin_0, coin_1 = transaction_0.coins(dr, MINT_CONST, NotNull(HashPoint.of(Subject(key_0.verify_key))))
bank = bank.adds(
[
transaction_0,
Transaction.make(
[coin_1],
[CoinData.of(Subject(nacl.signing.SigningKey.generate().verify_key), 10_000)],
[key_0]
),
]
)
bank = bank.adds(
[]
)
print(bank)
print(bank.verify())
dr.save(HashPoint.of(bank.reference))
bank = BankChain.from_reference(ReductionChainMetaFactory(), dr.resolve(HashPoint.of(bank.reference).loose()), dr)
print(bank)
print(bank.verify())
for key, value in dr.table.items():
print(key.hex(), value.hex())
if __name__ == '__main__':
main()

0
rainbowadn/__init__.py Normal file
View File

View File

View File

View File

@ -0,0 +1,25 @@
from typing import Generic, TypeVar
from rainbowadn.chain.chaincollectionfactory import ChainCollectionFactory
from rainbowadn.chain.reduction.reductionprotocol import ReductionProtocol
from rainbowadn.hashing.rainbow_factory import RainbowFactory
__all__ = ('AbstractReductionChainMetaFactory',)
BlockType = TypeVar('BlockType')
ReductorType = TypeVar('ReductorType')
AccumulatorType = TypeVar('AccumulatorType')
class AbstractReductionChainMetaFactory(
Generic[BlockType, ReductorType, AccumulatorType]
):
def factory(
self,
reductor_factory: RainbowFactory[ReductorType],
accumulator_factory: RainbowFactory[AccumulatorType],
protocol: ReductionProtocol[ReductorType, AccumulatorType],
) -> ChainCollectionFactory[
BlockType, ReductorType, AccumulatorType
]:
raise NotImplementedError

73
rainbowadn/chain/block.py Normal file
View File

@ -0,0 +1,73 @@
from typing import Generic, Iterable, TypeVar
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hash_point_format import hash_point_format, tabulate
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullablereference import NullableReference, NullableReferenceFactory
from rainbowadn.hashing.rainbow_factory import RainbowFactory
from rainbowadn.hashing.recursivementionable import RecursiveMentionable
__all__ = ('Block', 'BlockFactory',)
HeaderType = TypeVar('HeaderType')
StateType = TypeVar('StateType')
class Block(RecursiveMentionable, Generic[HeaderType, StateType]):
def __init__(
self,
previous: NullableReference['Block[HeaderType, StateType]'],
header: HashPoint[HeaderType],
state: HashPoint[StateType],
):
assert isinstance(previous, NullableReference)
assert isinstance(header, HashPoint)
assert isinstance(state, HashPoint)
self.previous = previous
self.header = header
self.state = state
def points(self) -> Iterable[HashPoint]:
return [*self.previous.points(), self.header, self.state]
def __bytes__(self):
return bytes(self.previous) + bytes(self.header) + bytes(self.state)
def __factory__(self) -> RainbowFactory['Block[HeaderType, StateType]']:
return BlockFactory(
self.header.factory,
self.state.factory,
)
def str(self, resolver: HashResolver, tab: int) -> str:
assert isinstance(resolver, HashResolver)
assert isinstance(tab, int)
return f'{self.previous.str(resolver, tab)}' \
f'{tabulate(tab)}(' \
f'{tabulate(tab + 1)}block' \
f'{tabulate(tab + 1)}(header)' \
f'{tabulate(tab + 1)}{hash_point_format(self.header, resolver, tab + 1)}' \
f'{tabulate(tab + 1)}(state)' \
f'{tabulate(tab + 1)}{hash_point_format(self.state, resolver, tab + 1)}' \
f'{tabulate(tab)})'
class BlockFactory(RainbowFactory[Block[HeaderType, StateType]], Generic[HeaderType, StateType]):
def __init__(
self,
header_factory: RainbowFactory[HeaderType],
state_factory: RainbowFactory[StateType],
):
assert isinstance(header_factory, RainbowFactory)
assert isinstance(state_factory, RainbowFactory)
self.header_factory = header_factory
self.state_factory = state_factory
def from_bytes(self, source: bytes) -> Block[HeaderType, StateType]:
assert isinstance(source, bytes)
return Block(
NullableReferenceFactory(self).from_bytes(source[:HashPoint.HASH_LENGTH]),
HashPoint(self.header_factory, source[HashPoint.HASH_LENGTH:2 * HashPoint.HASH_LENGTH], Null()),
HashPoint(self.state_factory, source[2 * HashPoint.HASH_LENGTH:3 * HashPoint.HASH_LENGTH], Null()),
)

View File

@ -0,0 +1,74 @@
from typing import Generic, TypeVar
from rainbowadn.data.collection.collection_interface.collectioninterface import CollectionInterface
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullablereference import NullableReference
from rainbowadn.hashing.rainbow_factory import RainbowFactory
__all__ = ('BlockCollectionInterface',)
BlockType = TypeVar('BlockType')
HeaderType = TypeVar('HeaderType')
ActualStateType = TypeVar('ActualStateType')
class BlockCollectionInterface(
CollectionInterface[
BlockType
],
Generic[BlockType, HeaderType, ActualStateType]
):
def __init__(
self,
reference: NullableReference[BlockType],
resolver: HashResolver
):
assert isinstance(reference, NullableReference)
assert isinstance(resolver, HashResolver)
self.resolver = resolver
super().__init__(reference)
def _verify_link(
self,
block: BlockType,
previous: NullableReference[BlockType]
) -> bool:
raise NotImplementedError
def verify_link(
self,
previous: NullableReference[BlockType]
) -> bool:
assert isinstance(previous, NullableReference)
if isinstance(self.reference.reference, Null):
return True
elif isinstance(self.reference.reference, NotNull):
return self._verify_link(
self.resolver.resolve(self.reference.reference.value),
previous
)
else:
raise TypeError
def _base_state_factory(self) -> RainbowFactory[ActualStateType]:
raise NotImplementedError
def _base_state(self, block: BlockType) -> HashPoint[ActualStateType]:
raise NotImplementedError
def base_state(self) -> NullableReference[ActualStateType]:
if isinstance(self.reference.reference, Null):
return NullableReference(Null(), self._base_state_factory())
elif isinstance(self.reference.reference, NotNull):
return NullableReference.of(self._base_state(self.resolver.resolve(self.reference.reference.value)))
else:
raise TypeError
def add(
self,
header: HashPoint[HeaderType]
) -> 'BlockCollectionInterface[BlockType, HeaderType, ActualStateType]':
raise NotImplementedError

View File

@ -0,0 +1,21 @@
from typing import Generic, TypeVar
from rainbowadn.chain.chaincollectioninterface import ChainCollectionInterface
from rainbowadn.hashing.nullability.nullablereference import NullableReference
__all__ = ('ChainCollectionFactory',)
BlockType = TypeVar('BlockType')
HeaderType = TypeVar('HeaderType')
ActualStateType = TypeVar('ActualStateType')
class ChainCollectionFactory(Generic[BlockType, HeaderType, ActualStateType]):
def empty(self) -> ChainCollectionInterface[BlockType, HeaderType, ActualStateType]:
raise NotImplementedError
def from_reference(
self,
reference: NullableReference[BlockType]
) -> ChainCollectionInterface[BlockType, HeaderType, ActualStateType]:
raise NotImplementedError

View File

@ -0,0 +1,27 @@
import abc
from typing import Generic, TypeVar
from rainbowadn.chain.blockcollectioninterface import BlockCollectionInterface
from rainbowadn.hashing.hashpoint import HashPoint
__all__ = ('ChainCollectionInterface',)
BlockType = TypeVar('BlockType')
HeaderType = TypeVar('HeaderType')
ActualStateType = TypeVar('ActualStateType')
class ChainCollectionInterface(
BlockCollectionInterface[
BlockType,
HeaderType,
ActualStateType
],
Generic[BlockType, HeaderType, ActualStateType],
abc.ABC
):
def add(self, header: HashPoint[HeaderType]) -> 'ChainCollectionInterface[BlockType, HeaderType, ActualStateType]':
raise NotImplementedError
def verify(self) -> bool:
raise NotImplementedError

View File

View File

@ -0,0 +1,15 @@
from typing import Generic, TypeVar
from rainbowadn.chain.reduction.reductionresult import ReductionResult
from rainbowadn.hashing.hashpoint import HashPoint
__all__ = ('Reduced',)
ReductorType = TypeVar('ReductorType')
AccumulatorType = TypeVar('AccumulatorType')
class Reduced(ReductionResult, Generic[ReductorType, AccumulatorType]):
def __init__(self, reduced: HashPoint[AccumulatorType]):
assert isinstance(reduced, HashPoint)
self.reduced = reduced

View File

@ -0,0 +1,63 @@
from typing import Generic, Iterable, TypeVar
from rainbowadn.chain.reduction.reductionresult import ReductionResult
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hash_point_format import hash_point_format, tabulate
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.rainbow_factory import RainbowFactory
from rainbowadn.hashing.recursivementionable import RecursiveMentionable
__all__ = ('Reduction', 'ReductionFactory',)
ReductorType = TypeVar('ReductorType')
AccumulatorType = TypeVar('AccumulatorType')
class Reduction(
ReductionResult[ReductorType, AccumulatorType],
RecursiveMentionable,
Generic[ReductorType, AccumulatorType]
):
def __init__(self, reductor: HashPoint[ReductorType], accumulator: HashPoint[AccumulatorType]):
assert isinstance(reductor, HashPoint)
assert isinstance(accumulator, HashPoint)
self.reductor = reductor
self.accumulator = accumulator
def points(self) -> Iterable[HashPoint]:
return [self.reductor, self.accumulator]
def __bytes__(self):
return bytes(self.reductor) + bytes(self.accumulator)
def __factory__(self) -> RainbowFactory['Reduction[ReductorType, AccumulatorType]']:
return ReductionFactory(self.reductor.factory, self.accumulator.factory)
def str(self, resolver: HashResolver, tab: int) -> str:
assert isinstance(resolver, HashResolver)
assert isinstance(tab, int)
return f'(reduction)' \
f'{tabulate(tab)}{hash_point_format(self.accumulator, resolver, tab)}'
class ReductionFactory(
RainbowFactory[Reduction[ReductorType, AccumulatorType]],
Generic[ReductorType, AccumulatorType]
):
def __init__(
self,
reductor_factory: RainbowFactory[ReductorType],
accumulator_factory: RainbowFactory[AccumulatorType],
):
assert isinstance(reductor_factory, RainbowFactory)
assert isinstance(accumulator_factory, RainbowFactory)
self.reductor_factory = reductor_factory
self.accumulator_factory = accumulator_factory
def from_bytes(self, source: bytes) -> Reduction[ReductorType, AccumulatorType]:
assert isinstance(source, bytes)
return Reduction(
HashPoint(self.reductor_factory, source[:HashPoint.HASH_LENGTH], Null()),
HashPoint(self.accumulator_factory, source[HashPoint.HASH_LENGTH:], Null()),
)

View File

@ -0,0 +1,306 @@
from typing import Generic, TypeVar
from rainbowadn.chain.block import Block, BlockFactory
from rainbowadn.chain.blockcollectioninterface import BlockCollectionInterface
from rainbowadn.chain.chaincollectioninterface import ChainCollectionInterface
from rainbowadn.chain.reduction.reduction import Reduction, ReductionFactory
from rainbowadn.chain.reduction.reductionprotocol import ReductionProtocol
from rainbowadn.chain.reduction.reductionstageprotocol import ReductionStageProtocol
from rainbowadn.chain.stages.stage import StageStageFactory, StateStage, StateStageFactory
from rainbowadn.chain.stages.stageprotocol import StageProtocol
from rainbowadn.chain.stages.stagestateprotocol import StageStateProtocol
from rainbowadn.chain.states.stateprotocol import StateProtocol
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullablereference import NullableReference
from rainbowadn.hashing.rainbow_factory import RainbowFactory
__all__ = ('ReductionChain',)
ReductorType = TypeVar('ReductorType')
AccumulatorType = TypeVar('AccumulatorType')
class ReductionChain(
ChainCollectionInterface[
Block[
ReductorType,
StateStage[
ReductorType,
AccumulatorType,
Reduction[ReductorType, AccumulatorType]
]
],
ReductorType,
AccumulatorType
],
Generic[ReductorType, AccumulatorType]
):
def _verify_link(
self,
block: Block[
ReductorType,
StateStage[
ReductorType,
AccumulatorType,
Reduction[ReductorType, AccumulatorType]
]
],
previous: NullableReference[
Block[
ReductorType,
StateStage[
ReductorType,
AccumulatorType,
Reduction[ReductorType, AccumulatorType]
]
]
]
) -> bool:
assert isinstance(block, Block)
assert isinstance(previous, NullableReference)
assert block.previous == previous
previous_chain = ReductionChain(previous, self.reductor_factory, self.accumulator_factory, self.protocol)
assert isinstance(previous_chain, BlockCollectionInterface)
state_protocol = StageStateProtocol(self.resolver)
assert isinstance(state_protocol, StateProtocol)
return state_protocol.verify(
previous_chain.state(),
block.header,
block.state,
)
def _base_state_factory(self) -> RainbowFactory[AccumulatorType]:
return self.accumulator_factory
def _base_state(
self,
block: Block[
ReductorType,
StateStage[
ReductorType,
AccumulatorType,
Reduction[ReductorType, AccumulatorType]
]
]
) -> HashPoint[AccumulatorType]:
assert isinstance(block, Block)
return self.resolver.resolve(block.state).state
def add(
self,
header: HashPoint[ReductorType]
) -> 'ReductionChain[ReductorType, AccumulatorType]':
assert isinstance(header, HashPoint)
return ReductionChain(
NullableReference.off(
self._add(header)
),
header.factory,
self.accumulator_factory,
self.protocol
)
def __init__(
self,
reference: NullableReference[
Block[
ReductorType,
StateStage[
ReductorType,
AccumulatorType,
Reduction[ReductorType, AccumulatorType]
]
]
],
reductor_factory: RainbowFactory[ReductorType],
accumulator_factory: RainbowFactory[AccumulatorType],
protocol: ReductionProtocol[ReductorType, AccumulatorType]
):
assert isinstance(reference, NullableReference)
assert isinstance(reductor_factory, RainbowFactory)
assert isinstance(accumulator_factory, RainbowFactory)
assert isinstance(protocol, ReductionProtocol)
super().__init__(reference, protocol.resolver)
self.reductor_factory = reductor_factory
self.accumulator_factory = accumulator_factory
self.protocol = protocol
@classmethod
def _state_stage_factory(
cls,
reductor_factory: RainbowFactory[ReductorType],
accumulator_factory: RainbowFactory[AccumulatorType],
protocol: ReductionProtocol[ReductorType, AccumulatorType]
) -> RainbowFactory[
StateStage[
ReductorType,
AccumulatorType,
Reduction[ReductorType, AccumulatorType]
]
]:
assert isinstance(reductor_factory, RainbowFactory)
assert isinstance(accumulator_factory, RainbowFactory)
assert isinstance(protocol, ReductionProtocol)
stage_protocol = ReductionStageProtocol(protocol)
assert isinstance(stage_protocol, StageProtocol)
return StateStageFactory(
stage_protocol,
StageStageFactory(
stage_protocol,
ReductionFactory(
reductor_factory,
accumulator_factory
)
),
accumulator_factory
)
@classmethod
def _block_factory(
cls,
reductor_factory: RainbowFactory[ReductorType],
accumulator_factory: RainbowFactory[AccumulatorType],
protocol: ReductionProtocol[ReductorType, AccumulatorType]
) -> RainbowFactory[
Block[
ReductorType,
StateStage[
ReductorType,
AccumulatorType,
Reduction[ReductorType, AccumulatorType]
]
]
]:
assert isinstance(reductor_factory, RainbowFactory)
assert isinstance(accumulator_factory, RainbowFactory)
assert isinstance(protocol, ReductionProtocol)
return BlockFactory(
reductor_factory,
cls._state_stage_factory(
reductor_factory,
accumulator_factory,
protocol
)
)
@classmethod
def empty(
cls,
reductor_factory: RainbowFactory[ReductorType],
accumulator_factory: RainbowFactory[AccumulatorType],
protocol: ReductionProtocol[ReductorType, AccumulatorType]
) -> 'ReductionChain[ReductorType, AccumulatorType]':
assert isinstance(reductor_factory, RainbowFactory)
assert isinstance(accumulator_factory, RainbowFactory)
assert isinstance(protocol, ReductionProtocol)
return cls(
NullableReference(
Null(),
cls._block_factory(
reductor_factory,
accumulator_factory,
protocol
)
),
reductor_factory,
accumulator_factory,
protocol
)
def state(
self
) -> NullableReference[
StateStage[
ReductorType,
AccumulatorType,
Reduction[ReductorType, AccumulatorType]
]
]:
if isinstance(self.reference.reference, Null):
return NullableReference(
Null(),
self._state_stage_factory(
self.reductor_factory,
self.accumulator_factory,
self.protocol
)
)
elif isinstance(self.reference.reference, NotNull):
block: Block[
ReductorType,
StateStage[
ReductorType,
AccumulatorType,
Reduction[ReductorType, AccumulatorType]
]
] = self.resolver.resolve(self.reference.reference.value)
assert isinstance(block, Block)
return NullableReference.of(block.state)
else:
raise TypeError
def _add(
self,
reductor: HashPoint[ReductorType]
) -> Block[
ReductorType,
StateStage[
ReductorType,
AccumulatorType,
Reduction[ReductorType, AccumulatorType]
]
]:
assert isinstance(reductor, HashPoint)
stage_protocol = ReductionStageProtocol(self.protocol)
assert isinstance(stage_protocol, StageProtocol)
return Block(
self.reference,
reductor,
stage_protocol.derive_full(
self.state(),
reductor,
self.accumulator_factory,
ReductionFactory(reductor.factory, self.accumulator_factory)
)
)
def previous(
self
):
assert isinstance(self.reference.reference, NotNull)
block: Block[
ReductorType,
StateStage[
ReductorType,
AccumulatorType,
Reduction[ReductorType, AccumulatorType]
]
] = self.resolver.resolve(self.reference.reference.value)
assert isinstance(block, Block)
return ReductionChain(block.previous, self.reductor_factory, self.accumulator_factory, self.protocol)
def verify(self) -> bool:
if isinstance(self.reference.reference, Null):
return True
elif isinstance(self.reference.reference, NotNull):
return self.verify_link(
self.previous().reference
) and self.previous().verify()
else:
raise TypeError
def loose(self) -> ChainCollectionInterface[
Block[
ReductorType,
StateStage[
ReductorType,
AccumulatorType,
Reduction[ReductorType, AccumulatorType]
]
],
ReductorType,
AccumulatorType
]:
return self

View File

@ -0,0 +1,73 @@
from typing import Generic, TypeVar
from rainbowadn.chain.block import Block
from rainbowadn.chain.chaincollectionfactory import ChainCollectionFactory
from rainbowadn.chain.reduction.reduction import Reduction
from rainbowadn.chain.reduction.reductionchain import ReductionChain
from rainbowadn.chain.reduction.reductionprotocol import ReductionProtocol
from rainbowadn.chain.stages.stage import StateStage
from rainbowadn.hashing.nullability.nullablereference import NullableReference
from rainbowadn.hashing.rainbow_factory import RainbowFactory
__all__ = ('ReductionChainFactory',)
ReductorType = TypeVar('ReductorType')
AccumulatorType = TypeVar('AccumulatorType')
class ReductionChainFactory(
ChainCollectionFactory[
Block[
ReductorType,
StateStage[
ReductorType,
AccumulatorType,
Reduction[ReductorType, AccumulatorType]
]
],
ReductorType,
AccumulatorType
],
Generic[ReductorType, AccumulatorType]
):
def __init__(
self,
reductor_factory: RainbowFactory[ReductorType],
accumulator_factory: RainbowFactory[AccumulatorType],
protocol: ReductionProtocol[ReductorType, AccumulatorType],
):
assert isinstance(reductor_factory, RainbowFactory)
assert isinstance(accumulator_factory, RainbowFactory)
assert isinstance(protocol, ReductionProtocol)
self.reductor_factory = reductor_factory
self.accumulator_factory = accumulator_factory
self.protocol = protocol
def empty(self) -> ReductionChain[ReductorType, AccumulatorType]:
return ReductionChain.empty(
self.reductor_factory,
self.accumulator_factory,
self.protocol,
)
def from_reference(
self, reference: NullableReference[
Block[
ReductorType,
StateStage[
ReductorType,
AccumulatorType,
Reduction[ReductorType, AccumulatorType]
]
],
]
) -> ReductionChain[
ReductorType, AccumulatorType
]:
assert isinstance(reference, NullableReference)
return ReductionChain(
reference,
self.reductor_factory,
self.accumulator_factory,
self.protocol,
)

View File

@ -0,0 +1,42 @@
from typing import Generic, TypeVar
from rainbowadn.chain.abstractreductionchainmetafactory import AbstractReductionChainMetaFactory
from rainbowadn.chain.block import Block
from rainbowadn.chain.reduction.reduction import Reduction
from rainbowadn.chain.reduction.reductionchainfactory import ReductionChainFactory
from rainbowadn.chain.reduction.reductionprotocol import ReductionProtocol
from rainbowadn.chain.stages.stage import StateStage
from rainbowadn.hashing.rainbow_factory import RainbowFactory
__all__ = ('ReductionChainMetaFactory',)
ReductorType = TypeVar('ReductorType')
AccumulatorType = TypeVar('AccumulatorType')
class ReductionChainMetaFactory(
AbstractReductionChainMetaFactory[
Block[
ReductorType,
StateStage[
ReductorType,
AccumulatorType,
Reduction[ReductorType, AccumulatorType]
]
],
ReductorType,
AccumulatorType
],
Generic[ReductorType, AccumulatorType]
):
def factory(
self,
reductor_factory: RainbowFactory[ReductorType],
accumulator_factory: RainbowFactory[AccumulatorType],
protocol: ReductionProtocol[ReductorType, AccumulatorType],
):
return ReductionChainFactory(
reductor_factory,
accumulator_factory,
protocol,
)

View File

@ -0,0 +1,33 @@
from typing import Generic, TypeVar
from rainbowadn.chain.reduction.reduction import Reduction
from rainbowadn.chain.reduction.reductionresult import ReductionResult
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.rainbow_factory import RainbowFactory
__all__ = ('ReductionProtocol',)
ReductorType = TypeVar('ReductorType')
AccumulatorType = TypeVar('AccumulatorType')
class ReductionProtocol(Generic[ReductorType, AccumulatorType]):
def __init__(self, resolver: HashResolver):
assert isinstance(resolver, HashResolver)
self.resolver = resolver
def reduce(
self,
reduction: Reduction[ReductorType, AccumulatorType]
) -> ReductionResult[ReductorType, AccumulatorType]:
raise NotImplementedError
def initial(self, factory: RainbowFactory[AccumulatorType]) -> HashPoint[AccumulatorType]:
raise NotImplementedError
def header_filter(
self,
state: HashPoint[AccumulatorType]
) -> HashPoint[AccumulatorType]:
raise NotImplementedError

View File

@ -0,0 +1,10 @@
from typing import Generic, TypeVar
__all__ = ('ReductionResult',)
ReductorType = TypeVar('ReductorType')
AccumulatorType = TypeVar('AccumulatorType')
class ReductionResult(Generic[ReductorType, AccumulatorType]):
pass

View File

@ -0,0 +1,79 @@
from typing import Generic, TypeVar
from rainbowadn.chain.reduction.reduced import Reduced
from rainbowadn.chain.reduction.reduction import Reduction
from rainbowadn.chain.reduction.reductionprotocol import ReductionProtocol
from rainbowadn.chain.reduction.reductionresult import ReductionResult
from rainbowadn.chain.stages.derivation.activestageprotocol import ActiveStageProtocol
from rainbowadn.chain.stages.derivation.derived import Derived
from rainbowadn.chain.stages.derivation.derivedstage import DerivedStage
from rainbowadn.chain.stages.derivation.derivedstate import DerivedState
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullablereference import NullableReference
__all__ = ('ReductionStageProtocol',)
ReductorType = TypeVar('ReductorType')
AccumulatorType = TypeVar('AccumulatorType')
class ReductionStageProtocol(
ActiveStageProtocol[ReductorType, AccumulatorType, Reduction[ReductorType, AccumulatorType]],
Generic[ReductorType, AccumulatorType]
):
def __init__(self, protocol: ReductionProtocol[ReductorType, AccumulatorType]):
assert isinstance(protocol, ReductionProtocol)
self.protocol = protocol
super().__init__(protocol.resolver)
def _derive_accumulator(
self,
previous: NullableReference[AccumulatorType],
) -> HashPoint[AccumulatorType]:
assert isinstance(previous, NullableReference)
if isinstance(previous.reference, Null):
return self.protocol.initial(previous.factory)
elif isinstance(previous.reference, NotNull):
return previous.reference.value
else:
raise TypeError
def derive_header(
self,
previous: NullableReference[AccumulatorType],
header: HashPoint[ReductorType]
) -> HashPoint[Reduction[ReductorType, AccumulatorType]]:
assert isinstance(previous, NullableReference)
assert isinstance(header, HashPoint)
return HashPoint.of(
Reduction(
header,
self._derive_accumulator(previous)
)
)
@classmethod
def _derived_from_reduced(
cls,
reduced: ReductionResult[ReductorType, AccumulatorType]
) -> Derived[ReductorType, Reduction[ReductorType, AccumulatorType]]:
assert isinstance(reduced, ReductionResult)
if isinstance(reduced, Reduction):
return DerivedStage(HashPoint.of(reduced))
elif isinstance(reduced, Reduced):
return DerivedState(reduced.reduced)
else:
raise TypeError
def derive_stage_or_state(
self,
stage: HashPoint[Reduction[ReductorType, AccumulatorType]]
) -> Derived[ReductorType, Reduction[ReductorType, AccumulatorType]]:
assert isinstance(stage, HashPoint)
return self._derived_from_reduced(
self.protocol.reduce(
self.resolver.resolve(stage)
)
)

View File

View File

@ -0,0 +1,155 @@
from typing import Generic, TypeVar
from rainbowadn.chain.stages.derivation.derived import Derived
from rainbowadn.chain.stages.derivation.derivedstage import DerivedStage
from rainbowadn.chain.stages.derivation.derivedstate import DerivedState
from rainbowadn.chain.stages.stage import StageStage, StageStageFactory, StateStage
from rainbowadn.chain.stages.stageprotocol import StageProtocol
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullablereference import NullableReference
from rainbowadn.hashing.rainbow_factory import RainbowFactory
__all__ = ('ActiveStageProtocol',)
HeaderType = TypeVar('HeaderType')
BaseStateType = TypeVar('BaseStateType')
StageType = TypeVar('StageType')
class ActiveStageProtocol(
StageProtocol[HeaderType, BaseStateType, StageType],
Generic[HeaderType, BaseStateType, StageType]
):
def derive_header(
self,
previous: NullableReference[BaseStateType],
header: HashPoint[HeaderType],
) -> HashPoint[StageType]:
raise NotImplementedError
def derive_stage_or_state(
self,
stage: HashPoint[StageType],
) -> Derived[BaseStateType, StageType]:
raise NotImplementedError
def _previous_base_state(
self,
previous_reference: NullableReference[StateStage[HeaderType, BaseStateType, StageType]],
base_state_factory: RainbowFactory[BaseStateType],
) -> NullableReference[BaseStateType]:
assert isinstance(previous_reference, NullableReference)
assert isinstance(base_state_factory, RainbowFactory)
if isinstance(previous_reference.reference, Null):
return NullableReference(Null(), base_state_factory)
elif isinstance(previous_reference.reference, NotNull):
previous_state_stage: StateStage[HeaderType, BaseStateType, StageType] = self.resolver.resolve(
previous_reference.reference.value
)
assert isinstance(previous_state_stage, StateStage)
return NullableReference.of(previous_state_stage.state)
else:
raise TypeError
def _derive_cycle(
self,
stage: StageStage[HeaderType, BaseStateType, StageType],
) -> HashPoint[StateStage[HeaderType, BaseStateType, StageType]]:
assert isinstance(stage, StageStage)
while True:
derived = self.derive_stage_or_state(stage.stage)
assert isinstance(derived, Derived)
if isinstance(derived, DerivedStage):
stage = StageStage(self, NullableReference.off(stage), derived.stage)
assert isinstance(stage, StageStage)
elif isinstance(derived, DerivedState):
return HashPoint.of(StateStage(self, HashPoint.of(stage), derived.state))
else:
raise TypeError
def _derive_initial_stage(
self,
previous: NullableReference[BaseStateType],
header: HashPoint[HeaderType],
stage_factory: RainbowFactory[StageType],
) -> StageStage[HeaderType, BaseStateType, StageType]:
assert isinstance(previous, NullableReference)
assert isinstance(header, HashPoint)
assert isinstance(stage_factory, RainbowFactory)
return StageStage(
self,
NullableReference(
Null(),
StageStageFactory(
self,
stage_factory
)
),
self.derive_header(previous, header)
)
def derive_full(
self,
previous_reference: NullableReference[StateStage[HeaderType, BaseStateType, StageType]],
header: HashPoint[HeaderType],
base_state_factory: RainbowFactory[BaseStateType],
stage_factory: RainbowFactory[StageType],
) -> HashPoint[StateStage[HeaderType, BaseStateType, StageType]]:
assert isinstance(previous_reference, NullableReference)
assert isinstance(header, HashPoint)
assert isinstance(base_state_factory, RainbowFactory)
assert isinstance(stage_factory, RainbowFactory)
return self._derive_cycle(
self._derive_initial_stage(
self._previous_base_state(
previous_reference,
base_state_factory
),
header,
stage_factory
)
)
def verify_header(
self,
previous: NullableReference[BaseStateType],
header: HashPoint[HeaderType],
stage: HashPoint[StageType]
) -> bool:
assert isinstance(previous, NullableReference)
assert isinstance(header, HashPoint)
assert isinstance(stage, HashPoint)
assert stage == self.derive_header(previous, header)
return True
def verify_stage(
self,
previous: HashPoint[StageType],
stage: HashPoint[StageType]
) -> bool:
assert isinstance(previous, HashPoint)
assert isinstance(stage, HashPoint)
derived = self.derive_stage_or_state(previous)
assert isinstance(derived, Derived)
if isinstance(derived, DerivedStage):
assert stage == derived.stage
return True
else:
raise TypeError
def verify_state(
self,
stage: HashPoint[StageType],
state: HashPoint[BaseStateType]
) -> bool:
assert isinstance(stage, HashPoint)
assert isinstance(state, HashPoint)
derived = self.derive_stage_or_state(stage)
assert isinstance(derived, Derived)
if isinstance(derived, DerivedState):
assert state == derived.state
return True
else:
raise TypeError

View File

@ -0,0 +1,10 @@
from typing import Generic, TypeVar
__all__ = ('Derived',)
BaseStateType = TypeVar('BaseStateType')
StageType = TypeVar('StageType')
class Derived(Generic[BaseStateType, StageType]):
pass

View File

@ -0,0 +1,15 @@
from typing import Generic, TypeVar
from rainbowadn.chain.stages.derivation.derived import Derived
from rainbowadn.hashing.hashpoint import HashPoint
__all__ = ('DerivedStage',)
BaseStateType = TypeVar('BaseStateType')
StageType = TypeVar('StageType')
class DerivedStage(Derived[BaseStateType, StageType], Generic[BaseStateType, StageType]):
def __init__(self, stage: HashPoint[StageType]):
assert isinstance(stage, HashPoint)
self.stage = stage

View File

@ -0,0 +1,15 @@
from typing import Generic, TypeVar
from rainbowadn.chain.stages.derivation.derived import Derived
from rainbowadn.hashing.hashpoint import HashPoint
__all__ = ('DerivedState',)
BaseStateType = TypeVar('BaseStateType')
StageType = TypeVar('StageType')
class DerivedState(Derived[BaseStateType, StageType], Generic[BaseStateType, StageType]):
def __init__(self, state: HashPoint[BaseStateType]):
assert isinstance(state, HashPoint)
self.state = state

View File

@ -0,0 +1,215 @@
from typing import Generic, Iterable, TypeVar
from rainbowadn.chain.stages.stageprotocol import StageProtocol
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hash_point_format import hash_point_format, tabulate
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullablereference import NullableReference, NullableReferenceFactory
from rainbowadn.hashing.rainbow_factory import RainbowFactory
from rainbowadn.hashing.recursivementionable import RecursiveMentionable
__all__ = (
'StageStage',
'StageStageFactory',
'StateStage',
'StateStageFactory',
)
HeaderType = TypeVar('HeaderType')
BaseStateType = TypeVar('BaseStateType')
StageType = TypeVar('StageType')
class Stage(Generic[HeaderType, BaseStateType, StageType]):
def __init__(self, protocol: StageProtocol[HeaderType, BaseStateType, StageType]):
assert isinstance(protocol, StageProtocol)
self.protocol = protocol
class StageStage(
RecursiveMentionable,
Stage[HeaderType, BaseStateType, StageType],
Generic[HeaderType, BaseStateType, StageType],
):
def __init__(
self,
protocol: StageProtocol[HeaderType, BaseStateType, StageType],
previous: NullableReference['StageStage[HeaderType, BaseStateType, StageType]'],
stage: HashPoint[StageType],
):
assert isinstance(protocol, StageProtocol)
assert isinstance(previous, NullableReference)
assert isinstance(stage, HashPoint)
super().__init__(protocol)
self.previous = previous
self.stage = stage
def _previous_state(
self,
previous: NullableReference['StateStage[HeaderType, BaseStateType, StageType]'],
base_factory: RainbowFactory[BaseStateType],
) -> NullableReference[BaseStateType]:
assert isinstance(previous, NullableReference)
assert isinstance(base_factory, RainbowFactory)
if isinstance(previous.reference, Null):
return NullableReference(Null(), base_factory)
elif isinstance(previous.reference, NotNull):
state_stage: StateStage[HeaderType, BaseStateType, StageType] = self.protocol.resolver.resolve(
previous.reference.value
)
assert isinstance(state_stage, StateStage)
return NullableReference.of(state_stage.state)
else:
raise TypeError
def verify(
self,
previous: NullableReference['StateStage[HeaderType, BaseStateType, StageType]'],
header: HashPoint[HeaderType],
base_factory: RainbowFactory[BaseStateType],
) -> bool:
assert isinstance(previous, NullableReference)
assert isinstance(header, HashPoint)
assert isinstance(base_factory, RainbowFactory)
if isinstance(self.previous.reference, Null):
return self.protocol.verify_header(
self._previous_state(previous, base_factory),
header,
self.stage
)
elif isinstance(self.previous.reference, NotNull):
previous_stage: StageStage[HeaderType, BaseStateType, StageType] = self.protocol.resolver.resolve(
self.previous.reference.value
)
assert isinstance(previous_stage, StageStage)
return self.protocol.verify_stage(
previous_stage.stage,
self.stage
) and previous_stage.verify(
previous,
header,
base_factory
)
else:
raise TypeError
def points(self) -> Iterable[HashPoint]:
return [*self.previous.points(), self.stage]
def __bytes__(self):
return bytes(self.previous) + bytes(self.stage)
def __factory__(self) -> RainbowFactory['StageStage[HeaderType, BaseStateType, StageType]']:
return StageStageFactory(
self.protocol,
self.stage.factory
)
def str(self, resolver: HashResolver, tab: int) -> str:
assert isinstance(resolver, HashResolver)
assert isinstance(tab, int)
return f'{self.previous.str(resolver, tab)}' \
f'{tabulate(tab)}{hash_point_format(self.stage, resolver, tab)}'
class StageStageFactory(RainbowFactory[StageStage[HeaderType, BaseStateType, StageType]]):
def __init__(
self,
protocol: StageProtocol[HeaderType, BaseStateType, StageType],
stage_factory: RainbowFactory[StageType]
):
assert isinstance(protocol, StageProtocol)
assert isinstance(stage_factory, RainbowFactory)
self.protocol = protocol
self.stage_factory = stage_factory
def from_bytes(self, source: bytes) -> StageStage[HeaderType, BaseStateType, StageType]:
assert isinstance(source, bytes)
return StageStage(
self.protocol,
NullableReferenceFactory(self).from_bytes(source[:HashPoint.HASH_LENGTH]),
HashPoint(self.stage_factory, source[HashPoint.HASH_LENGTH:], Null())
)
class StateStage(
RecursiveMentionable,
Stage[HeaderType, BaseStateType, StageType],
Generic[HeaderType, BaseStateType, StageType]
):
def __init__(
self,
protocol: StageProtocol[HeaderType, BaseStateType, StageType],
previous: HashPoint[StageStage[HeaderType, BaseStateType, StageType]],
state: HashPoint[BaseStateType]
):
assert isinstance(protocol, StageProtocol)
assert isinstance(previous, HashPoint)
assert isinstance(state, HashPoint)
super().__init__(protocol)
self.previous = previous
self.state = state
def verify(
self,
previous: NullableReference['StateStage[HeaderType, BaseStateType, StageType]'],
header: HashPoint[HeaderType]
) -> bool:
assert isinstance(previous, NullableReference)
assert isinstance(header, HashPoint)
previous_stage: StageStage[HeaderType, BaseStateType, StageType] = self.protocol.resolver.resolve(
self.previous
)
assert isinstance(previous_stage, StageStage)
return self.protocol.verify_state(
previous_stage.stage,
self.state
) and previous_stage.verify(
previous,
header,
self.state.factory
)
def points(self) -> Iterable[HashPoint]:
return [self.previous, self.state]
def __bytes__(self):
return bytes(self.previous) + bytes(self.state)
def __factory__(self) -> RainbowFactory['StateStage[HeaderType, BaseStateType, StageType]']:
return StateStageFactory(
self.protocol,
self.previous.factory,
self.state.factory
)
def str(self, resolver: HashResolver, tab: int) -> str:
assert isinstance(resolver, HashResolver)
assert isinstance(tab, int)
return f'{hash_point_format(self.previous, resolver, tab)}' \
f'{tabulate(tab)}{hash_point_format(self.state, resolver, tab)}'
class StateStageFactory(RainbowFactory[StateStage[HeaderType, BaseStateType, StageType]]):
def __init__(
self,
protocol: StageProtocol[HeaderType, BaseStateType, StageType],
stage_stage_factory: RainbowFactory[StageStage[HeaderType, BaseStateType, StageType]],
state_factory: RainbowFactory[BaseStateType]
):
assert isinstance(protocol, StageProtocol)
assert isinstance(stage_stage_factory, RainbowFactory)
assert isinstance(state_factory, RainbowFactory)
self.protocol = protocol
self.stage_stage_factory = stage_stage_factory
self.state_factory = state_factory
def from_bytes(self, source: bytes) -> StateStage[HeaderType, BaseStateType, StageType]:
assert isinstance(source, bytes)
return StateStage(
self.protocol,
HashPoint(self.stage_stage_factory, source[:HashPoint.HASH_LENGTH], Null()),
HashPoint(self.state_factory, source[HashPoint.HASH_LENGTH:], Null()),
)

View File

@ -0,0 +1,39 @@
from typing import Generic, TypeVar
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.nullablereference import NullableReference
__all__ = ('StageProtocol',)
HeaderType = TypeVar('HeaderType')
BaseStateType = TypeVar('BaseStateType')
StageType = TypeVar('StageType')
class StageProtocol(Generic[HeaderType, BaseStateType, StageType]):
def __init__(self, resolver: HashResolver):
assert isinstance(resolver, HashResolver)
self.resolver = resolver
def verify_header(
self,
previous: NullableReference[BaseStateType],
header: HashPoint[HeaderType],
stage: HashPoint[StageType]
) -> bool:
raise NotImplementedError
def verify_stage(
self,
previous: HashPoint[StageType],
stage: HashPoint[StageType]
) -> bool:
raise NotImplementedError
def verify_state(
self,
stage: HashPoint[StageType],
state: HashPoint[BaseStateType]
) -> bool:
raise NotImplementedError

View File

@ -0,0 +1,28 @@
from typing import Generic, TypeVar
from rainbowadn.chain.stages.stage import StateStage
from rainbowadn.chain.states.stateprotocol import StateProtocol
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.nullability.nullablereference import NullableReference
__all__ = ('StageStateProtocol',)
HeaderType = TypeVar('HeaderType')
BaseStateType = TypeVar('BaseStateType')
StageType = TypeVar('StageType')
class StageStateProtocol(
StateProtocol[HeaderType, StateStage[HeaderType, BaseStateType, StageType]],
Generic[HeaderType, BaseStateType, StageType]
):
def verify(
self,
previous: NullableReference[StateStage[HeaderType, BaseStateType, StageType]],
header: HashPoint[HeaderType],
state: HashPoint[StateStage[HeaderType, BaseStateType, StageType]]
) -> bool:
assert isinstance(previous, NullableReference)
assert isinstance(header, HashPoint)
assert isinstance(state, HashPoint)
return self.resolver.resolve(state).verify(previous, header)

View File

View File

@ -0,0 +1,24 @@
from typing import Generic, TypeVar
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.nullablereference import NullableReference
__all__ = ('StateProtocol',)
HeaderType = TypeVar('HeaderType')
StateType = TypeVar('StateType')
class StateProtocol(Generic[HeaderType, StateType]):
def __init__(self, resolver: HashResolver):
assert isinstance(resolver, HashResolver)
self.resolver = resolver
def verify(
self,
previous: NullableReference[StateType],
header: HashPoint[HeaderType],
state: NullableReference[StateType]
) -> bool:
raise NotImplementedError

View File

View File

View File

@ -0,0 +1,14 @@
import abc
from typing import Generic, TypeVar
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.static import StaticMentionable
__all__ = ('Atomic',)
AtomicMentioned = TypeVar('AtomicMentioned')
class Atomic(StaticMentionable[AtomicMentioned], abc.ABC, Generic[AtomicMentioned]):
def __topology_hash__(self) -> bytes:
return HashPoint.NULL_HASH

View File

@ -0,0 +1,21 @@
from rainbowadn.data.atomic.atomic import Atomic
__all__ = ('Integer',)
class Integer(Atomic['Integer']):
def __init__(self, integer: int):
assert isinstance(integer, int)
assert integer >= 0
self.integer = integer
@classmethod
def from_bytes(cls, source: bytes) -> 'Integer':
assert isinstance(source, bytes)
return cls(int.from_bytes(source, 'little'))
def __bytes__(self):
return self.integer.to_bytes((self.integer.bit_length() + 7) // 8, 'little')
def __str__(self):
return str(self.integer)

View File

@ -0,0 +1,20 @@
from rainbowadn.data.atomic.atomic import Atomic
__all__ = ('Plain',)
class Plain(Atomic['Plain']):
def __init__(self, source: bytes):
assert isinstance(source, bytes)
self.source = source
@classmethod
def from_bytes(cls, source: bytes) -> 'Plain':
assert isinstance(source, bytes)
return cls(source)
def __bytes__(self):
return self.source
def __str__(self):
return self.source.decode(errors='replace')

View File

View File

@ -0,0 +1,14 @@
from typing import Generic, TypeVar
from rainbowadn.data.collection.collection_interface.collectioninterface import CollectionInterface
from rainbowadn.hashing.hashpoint import HashPoint
__all__ = ('ActiveCollectionInterface',)
CollectionType = TypeVar('CollectionType')
KeyType = TypeVar('KeyType')
class ActiveCollectionInterface(CollectionInterface[CollectionType], Generic[CollectionType, KeyType]):
def add(self, key: HashPoint[KeyType]) -> 'ActiveCollectionInterface[KeyType]':
raise NotImplementedError

View File

@ -0,0 +1,17 @@
from typing import Generic, TypeVar
from rainbowadn.hashing.nullability.nullablereference import NullableReference
__all__ = ('CollectionFactory',)
CollectionType = TypeVar('CollectionType')
InterfaceType = TypeVar('InterfaceType')
class CollectionFactory(
Generic[CollectionType, InterfaceType],
):
"""вперёд, уроды, вас ждут заводы"""
def from_reference(self, source: NullableReference[CollectionType]) -> InterfaceType:
raise NotImplementedError

View File

@ -0,0 +1,18 @@
from typing import Generic, TypeVar
from rainbowadn.hashing.nullability.nullablereference import NullableReference
__all__ = ('CollectionInterface',)
CollectionType = TypeVar('CollectionType')
class CollectionInterface(
Generic[CollectionType]
):
def __init__(
self,
reference: NullableReference[CollectionType]
):
assert isinstance(reference, NullableReference)
self.reference = reference

View File

@ -0,0 +1,13 @@
from typing import Generic, TypeVar
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.nullability.nullable import Nullable
__all__ = ('QueryCollectionInterface',)
KeyType = TypeVar('KeyType')
class QueryCollectionInterface(Generic[KeyType]):
def query(self, key: HashPoint[KeyType]) -> Nullable[HashPoint[KeyType]]:
raise NotImplementedError

View File

@ -0,0 +1,15 @@
import abc
from typing import Generic, TypeVar
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.recursivementionable import RecursiveMentionable
__all__ = ('Keyed',)
KKeyType = TypeVar('KKeyType')
class Keyed(RecursiveMentionable, Generic[KKeyType], abc.ABC):
def __init__(self, key: HashPoint[KKeyType]):
assert isinstance(key, HashPoint)
self.key = key

View File

@ -0,0 +1,57 @@
from typing import Generic, Iterable, TypeVar
from rainbowadn.data.collection.keyed import Keyed
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hash_point_format import hash_point_format, tabulate
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.rainbow_factory import RainbowFactory
__all__ = ('KeyMetadata', 'KeyMetadataFactory',)
ActiveKeyType = TypeVar('ActiveKeyType')
MetaDataType = TypeVar('MetaDataType')
class KeyMetadata(Keyed[ActiveKeyType], Generic[ActiveKeyType, MetaDataType]):
def __init__(self, key: HashPoint[ActiveKeyType], metadata: HashPoint[MetaDataType]):
assert isinstance(key, HashPoint)
assert isinstance(metadata, HashPoint)
super().__init__(key)
self.metadata = metadata
def points(self) -> Iterable[HashPoint]:
return [self.key, self.metadata]
def __bytes__(self):
return bytes(self.key) + bytes(self.metadata)
def __factory__(self) -> RainbowFactory['KeyMetadata[ActiveKeyType, MetaDataType]']:
return KeyMetadataFactory(self.key.factory, self.metadata.factory)
def str(self, resolver: HashResolver, tab: int) -> str:
assert isinstance(resolver, HashResolver)
assert isinstance(tab, int)
return f'{hash_point_format(self.key, resolver, tab)}' \
f'{tabulate(tab)}{hash_point_format(self.metadata, resolver, tab)}'
class KeyMetadataFactory(
RainbowFactory[KeyMetadata[ActiveKeyType, MetaDataType]],
Generic[ActiveKeyType, MetaDataType]
):
def __init__(self, key_factory: RainbowFactory[ActiveKeyType], metadata_factory: RainbowFactory[MetaDataType]):
assert isinstance(key_factory, RainbowFactory)
assert isinstance(metadata_factory, RainbowFactory)
self.key_factory = key_factory
self.metadata_factory = metadata_factory
def from_bytes(self, source: bytes) -> KeyMetadata[ActiveKeyType, MetaDataType]:
assert isinstance(source, bytes)
return KeyMetadata(
HashPoint(self.key_factory, source[:HashPoint.HASH_LENGTH], Null()),
HashPoint(self.metadata_factory, source[HashPoint.HASH_LENGTH:], Null()),
)
def loose(self) -> RainbowFactory[KeyMetadata[ActiveKeyType, MetaDataType]]:
return self

View File

@ -0,0 +1,44 @@
from typing import Generic, TypeVar
from rainbowadn.data.collection.collection_interface.querycollectioninterface import QueryCollectionInterface
from rainbowadn.data.collection.keymetadata import KeyMetadata
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullable import Nullable
__all__ = ('KeyMetadataQueryCollection',)
ActiveKeyType = TypeVar('ActiveKeyType')
MetaDataType = TypeVar('MetaDataType')
class KeyMetadataQueryCollection(QueryCollectionInterface[ActiveKeyType], Generic[ActiveKeyType, MetaDataType]):
def __init__(
self,
metadata: HashPoint[MetaDataType],
collection: QueryCollectionInterface[KeyMetadata[ActiveKeyType, MetaDataType]],
resolver: HashResolver
):
assert isinstance(metadata, HashPoint)
assert isinstance(collection, QueryCollectionInterface)
assert isinstance(resolver, HashResolver)
self.metadata = metadata
self.collection = collection
self.resolver = resolver
def query(self, key: HashPoint[ActiveKeyType]) -> Nullable[HashPoint[ActiveKeyType]]:
assert isinstance(key, HashPoint)
result: Nullable[
HashPoint[KeyMetadata[ActiveKeyType, MetaDataType]]
] = self.collection.query(HashPoint.of(KeyMetadata(key, self.metadata)))
assert isinstance(result, Nullable)
if isinstance(result, Null):
return Null()
elif isinstance(result, NotNull):
hash_point: HashPoint[KeyMetadata[ActiveKeyType, MetaDataType]] = result.value
assert isinstance(hash_point, HashPoint)
return NotNull(self.resolver.resolve(hash_point).key)
else:
raise TypeError

View File

@ -0,0 +1,46 @@
from typing import Generic, Iterable, TypeVar
from rainbowadn.data.collection.keyed import Keyed
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.rainbow_factory import RainbowFactory
__all__ = ('KeyValue', 'KeyValueFactory',)
KVKeyType = TypeVar('KVKeyType')
KVValueType = TypeVar('KVValueType')
class KeyValue(Keyed[KVKeyType], Generic[KVKeyType, KVValueType]):
def __init__(self, key: HashPoint[KVKeyType], value: HashPoint[KVValueType]):
assert isinstance(key, HashPoint)
assert isinstance(value, HashPoint)
super().__init__(key)
self.value = value
def points(self) -> Iterable[HashPoint]:
return [self.key, self.value]
def __bytes__(self):
return bytes(self.key) + bytes(self.value)
def __factory__(self) -> RainbowFactory['KeyValue[KVKeyType, KVValueType]']:
return KeyValueFactory(self.key.factory, self.value.factory)
class KeyValueFactory(
RainbowFactory[KeyValue[KVKeyType, KVValueType]],
Generic[KVKeyType, KVValueType]
):
def __init__(self, key_factory: RainbowFactory[KVKeyType], value_factory: RainbowFactory[KVValueType]):
assert isinstance(key_factory, RainbowFactory)
assert isinstance(value_factory, RainbowFactory)
self.key_factory = key_factory
self.value_factory = value_factory
def from_bytes(self, source: bytes) -> KeyValue[KVKeyType, KVValueType]:
assert isinstance(source, bytes)
return KeyValue(
HashPoint(self.key_factory, source[:HashPoint.HASH_LENGTH], Null()),
HashPoint(self.value_factory, source[HashPoint.HASH_LENGTH:], Null()),
)

View File

@ -0,0 +1,26 @@
from typing import Generic, TypeVar
from rainbowadn.data.collection.collection_interface.activecollectioninterface import ActiveCollectionInterface
from rainbowadn.data.collection.keyvalue import KeyValue
from rainbowadn.hashing.hashpoint import HashPoint
__all__ = ('ActiveMapping',)
KVKeyType = TypeVar('KVKeyType')
KVValueType = TypeVar('KVValueType')
class ActiveMapping(Generic[KVKeyType, KVValueType]):
def __init__(
self,
collection: ActiveCollectionInterface[KeyValue[KVKeyType, KVValueType]]
):
assert isinstance(collection, ActiveCollectionInterface)
self.collection = collection
def add(self, key: HashPoint[KVKeyType], value: HashPoint[KVValueType]) -> 'ActiveMapping[KVKeyType, KVValueType]':
assert isinstance(key, HashPoint)
assert isinstance(value, HashPoint)
return ActiveMapping(
self.collection.add(HashPoint.of(KeyValue(key, value)))
)

View File

@ -0,0 +1,44 @@
from typing import Generic, TypeVar
from rainbowadn.data.collection.collection_interface.querycollectioninterface import QueryCollectionInterface
from rainbowadn.data.collection.keyvalue import KeyValue
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullable import Nullable
__all__ = ('QueryMapping',)
KVKeyType = TypeVar('KVKeyType')
KVValueType = TypeVar('KVValueType')
class QueryMapping(Generic[KVKeyType, KVValueType]):
def __init__(
self,
collection: QueryCollectionInterface[KeyValue[KVKeyType, KVValueType]],
empty_value: HashPoint[KVValueType],
resoler: HashResolver
):
assert isinstance(collection, QueryCollectionInterface)
assert isinstance(empty_value, HashPoint)
assert isinstance(resoler, HashResolver)
self.collection = collection
self.empty_value = empty_value
self.resolver = resoler
def query(self, key: HashPoint[KVKeyType]) -> Nullable[HashPoint[KVValueType]]:
assert isinstance(key, HashPoint)
query_result: Nullable[
HashPoint[KeyValue[KVKeyType, KVValueType]]
] = self.collection.query(HashPoint.of(KeyValue(key, self.empty_value)))
assert isinstance(query_result, Nullable)
if isinstance(query_result, Null):
return Null()
elif isinstance(query_result, NotNull):
key_value: KeyValue[KVKeyType, KVValueType] = self.resolver.resolve(query_result.value)
assert isinstance(key_value, KeyValue)
return NotNull(key_value.value)
else:
raise TypeError

View File

@ -0,0 +1,40 @@
from typing import Generic, TypeVar
from rainbowadn.data.collection.collection_interface.activecollectioninterface import ActiveCollectionInterface
from rainbowadn.data.collection.stack.stack import Stack
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullablereference import NullableReference
from rainbowadn.hashing.rainbow_factory import RainbowFactory
__all__ = ('ActiveStack',)
ElementType = TypeVar('ElementType')
class ActiveStack(ActiveCollectionInterface[Stack[ElementType], ElementType], Generic[ElementType]):
def __init__(self, reference: NullableReference[Stack[ElementType]]):
assert isinstance(reference, NullableReference)
super().__init__(reference)
def add(self, element: HashPoint[ElementType]) -> 'ActiveStack[ElementType]':
assert isinstance(element, HashPoint)
return ActiveStack(NullableReference.off(Stack(self.reference, element)))
@classmethod
def empty(cls, factory: RainbowFactory[ElementType]) -> 'ActiveStack[ElementType]':
assert isinstance(factory, RainbowFactory)
return cls(NullableReference(Null(), Stack.factory(factory)))
def str(self, resolver: HashResolver):
assert isinstance(resolver, HashResolver)
if isinstance(self.reference.reference, Null):
return f'-'
elif isinstance(self.reference.reference, NotNull):
stack: Stack[ElementType] = resolver.resolve(self.reference.reference.value)
assert isinstance(stack, Stack)
return f'{ActiveStack(stack.previous).str(resolver)} {resolver.resolve(stack.element)}'
else:
raise TypeError

View File

@ -0,0 +1,82 @@
from typing import Generic, Iterable, TypeVar
from rainbowadn.hashing.hash_point_format import hash_point_format, tabulate
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullablereference import NullableReference, NullableReferenceFactory
from rainbowadn.hashing.rainbow_factory import RainbowFactory
from rainbowadn.hashing.recursivementionable import RecursiveMentionable
__all__ = ('Stack', 'StackFactory',)
ElementType = TypeVar('ElementType')
class Stack(RecursiveMentionable, Generic[ElementType]):
def __factory__(self) -> RainbowFactory['Stack[ElementType]']:
return self.factory(self.element.factory)
@classmethod
def factory(cls, factory: RainbowFactory[ElementType]) -> RainbowFactory['Stack[ElementType]']:
assert isinstance(factory, RainbowFactory)
return StackFactory(factory)
def __init__(self, previous: NullableReference['Stack[ElementType]'], element: HashPoint[ElementType]):
assert isinstance(previous, NullableReference)
assert isinstance(element, HashPoint)
self.previous = previous
self.element = element
def points(self) -> Iterable[HashPoint]:
return [*self.previous.points(), self.element]
def __bytes__(self):
return bytes(self.previous) + bytes(self.element)
def str(self, resolver: HashResolver, tab: int) -> str:
assert isinstance(resolver, HashResolver)
assert isinstance(tab, int)
return f'{self.previous.str(resolver, tab)}' \
f'{tabulate(tab)}{hash_point_format(self.element, resolver, tab)}'
@classmethod
def of(
cls,
factory: RainbowFactory[ElementType],
elements: Iterable[HashPoint[ElementType]]
) -> NullableReference[
'Stack[ElementType]'
]:
assert isinstance(factory, RainbowFactory)
reference: NullableReference[Stack[ElementType]] = NullableReference(Null(), cls.factory(factory))
for element in elements:
assert isinstance(element, HashPoint)
reference = NullableReference.off(cls(reference, element))
return reference
@classmethod
def off(
cls,
factory: RainbowFactory[ElementType],
elements: Iterable[ElementType]
) -> NullableReference[
'Stack[ElementType]'
]:
return cls.of(factory, map(HashPoint.of, elements))
class StackFactory(RainbowFactory[Stack[ElementType]], Generic[ElementType]):
def __init__(self, factory: RainbowFactory[ElementType]):
assert isinstance(factory, RainbowFactory)
self.factory = factory
def from_bytes(self, source: bytes) -> Stack[ElementType]:
assert isinstance(source, bytes)
return Stack(
NullableReferenceFactory(self).from_bytes(source[:HashPoint.HASH_LENGTH]),
HashPoint(self.factory, source[HashPoint.HASH_LENGTH:], Null())
)
def loose(self) -> RainbowFactory[Stack[ElementType]]:
return self

View File

@ -0,0 +1,148 @@
from typing import Generic, TypeVar
from rainbowadn.data.collection.trees.binary.binarytree import BinaryTree
from rainbowadn.data.collection.trees.comparison.comparator import (Comparator, Comparison, Left, Replace, Right)
from rainbowadn.data.collection.trees.comparison.keyedcomparator import KeyedComparator
from rainbowadn.data.collection.trees.binary.querybinarytree import QueryBinaryTree
from rainbowadn.data.collection.collection_interface.activecollectioninterface import ActiveCollectionInterface
from rainbowadn.data.collection.collection_interface.querycollectioninterface import QueryCollectionInterface
from rainbowadn.data.collection.keymetadata import KeyMetadata
from rainbowadn.data.collection.keymetadataquerycollection import KeyMetadataQueryCollection
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullable import Nullable
from rainbowadn.hashing.nullability.nullablereference import NullableReference
__all__ = ('ActiveBinaryTree',)
ActiveKeyType = TypeVar('ActiveKeyType')
MetaDataType = TypeVar('MetaDataType')
class ActiveBinaryTree(
ActiveCollectionInterface[BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]], ActiveKeyType],
Generic[ActiveKeyType, MetaDataType]
):
def __init__(
self,
comparator: Comparator[ActiveKeyType],
reference: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]]]
):
assert isinstance(comparator, Comparator)
assert isinstance(reference, NullableReference)
super().__init__(reference)
self.comparator = comparator
self.resolver = comparator.resolver
assert isinstance(self.resolver, HashResolver)
def empty_metadata(self) -> HashPoint[MetaDataType]:
raise NotImplementedError
def metadata(
self,
treel: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]]],
treer: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]]],
key: HashPoint[ActiveKeyType]
) -> HashPoint[MetaDataType]:
raise NotImplementedError
def _binary_tree(
self,
treel: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]]],
treer: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]]],
key: HashPoint[ActiveKeyType]
) -> BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]]:
assert isinstance(treel, NullableReference)
assert isinstance(treer, NullableReference)
assert isinstance(key, HashPoint)
return BinaryTree(
treel,
treer,
HashPoint.of(KeyMetadata(key, self.metadata(treel, treer, key)))
)
def create(
self,
reference: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]]]
) -> 'ActiveBinaryTree[ActiveKeyType, MetaDataType]':
assert isinstance(reference, NullableReference)
return type(self)(
self.comparator,
reference
)
def active_tree(
self,
treel: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]]],
treer: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]]],
key: HashPoint[ActiveKeyType]
) -> 'ActiveBinaryTree[ActiveKeyType, MetaDataType]':
assert isinstance(treel, NullableReference)
assert isinstance(treer, NullableReference)
assert isinstance(key, HashPoint)
return self.create(
NullableReference.off(
self._binary_tree(
self.create(treel).balance().reference,
self.create(treer).balance().reference,
key
)
)
).balance()
def add(self, key: HashPoint[ActiveKeyType]) -> 'ActiveBinaryTree[ActiveKeyType, MetaDataType]':
assert isinstance(key, HashPoint)
reference: Nullable[
HashPoint[BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]]]
] = self.reference.reference
assert isinstance(reference, Nullable)
if isinstance(reference, Null):
return self.active_tree(self.reference, self.reference, key)
elif isinstance(reference, NotNull):
tree: BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]] = self.resolver.resolve(reference.value)
assert isinstance(tree, BinaryTree)
original: HashPoint[ActiveKeyType] = self.resolver.resolve(tree.key).key
assert isinstance(original, HashPoint)
comparison: Comparison = self.comparator.compare(original, key)
assert isinstance(comparison, Comparison)
if isinstance(comparison, Replace):
return self.active_tree(tree.treel, tree.treer, key)
elif isinstance(comparison, Left):
return self.active_tree(self.create(tree.treel).add(key).reference, tree.treer, original)
elif isinstance(comparison, Right):
return self.active_tree(tree.treel, self.create(tree.treer).add(key).reference, original)
else:
raise TypeError
else:
raise TypeError
def balance(self) -> 'ActiveBinaryTree[ActiveKeyType, MetaDataType]':
raise NotImplementedError
def __str__(self):
reference: Nullable[
HashPoint[BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]]]
] = self.reference.reference
assert isinstance(reference, Nullable)
if isinstance(reference, Null):
return '_'
elif isinstance(reference, NotNull):
tree: BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]] = self.resolver.resolve(reference.value)
assert isinstance(tree, BinaryTree)
key: KeyMetadata[ActiveKeyType, MetaDataType] = self.resolver.resolve(tree.key)
assert isinstance(key, KeyMetadata)
return f'( {self.create(tree.treel)}' \
f' {self.resolver.resolve(key.key)}' \
f' {self.resolver.resolve(key.metadata)}' \
f' {self.create(tree.treer)} )'
else:
raise TypeError
def query_tree(self) -> QueryCollectionInterface[ActiveKeyType]:
return KeyMetadataQueryCollection(
self.empty_metadata(),
QueryBinaryTree(KeyedComparator(self.resolver, self.comparator), self.reference),
self.resolver
)

View File

@ -0,0 +1,180 @@
from typing import Generic, TypeVar
from rainbowadn.data.atomic.integer import Integer
from rainbowadn.data.collection.trees.binary.activebinarytree import ActiveBinaryTree
from rainbowadn.data.collection.trees.binary.binarytree import BinaryTree, BinaryTreeFactory
from rainbowadn.data.collection.trees.comparison.comparator import Comparator
from rainbowadn.data.collection.keymetadata import KeyMetadata, KeyMetadataFactory
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullable import Nullable
from rainbowadn.hashing.nullability.nullablereference import NullableReference
from rainbowadn.hashing.rainbow_factory import RainbowFactory
__all__ = ('AVLABT',)
ActiveKeyType = TypeVar('ActiveKeyType')
class AVLABT(ActiveBinaryTree[ActiveKeyType, Integer], Generic[ActiveKeyType]):
def __init__(
self,
comparator: Comparator[ActiveKeyType],
reference: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, Integer]]]
):
assert isinstance(comparator, Comparator)
assert isinstance(reference, NullableReference)
super().__init__(comparator, reference)
def create(
self, reference: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, Integer]]]
) -> 'AVLABT[ActiveKeyType]':
assert isinstance(reference, NullableReference)
return type(self)(
self.comparator,
reference
)
def height(self) -> int:
reference: Nullable[
HashPoint[BinaryTree[KeyMetadata[ActiveKeyType, Integer]]]
] = self.reference.reference
assert isinstance(reference, Nullable)
if isinstance(reference, Null):
return 0
elif isinstance(reference, NotNull):
tree: BinaryTree[KeyMetadata[ActiveKeyType, Integer]] = self.resolver.resolve(reference.value)
assert isinstance(tree, BinaryTree)
key: KeyMetadata[ActiveKeyType, Integer] = self.resolver.resolve(tree.key)
assert isinstance(key, KeyMetadata)
return self.resolver.resolve(key.metadata).integer
@classmethod
def empty(
cls,
comparator: Comparator[ActiveKeyType],
factory: RainbowFactory[ActiveKeyType]
):
assert isinstance(comparator, Comparator)
assert isinstance(factory, RainbowFactory)
return cls(
comparator,
NullableReference(
Null(),
BinaryTreeFactory(KeyMetadataFactory(factory, Integer.factory()))
)
)
def empty_metadata(self) -> HashPoint[Integer]:
return HashPoint.of(Integer(0))
def metadata(
self,
treel: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, Integer]]],
treer: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, Integer]]],
key: HashPoint[ActiveKeyType]
) -> HashPoint[Integer]:
assert isinstance(treel, NullableReference)
assert isinstance(treer, NullableReference)
assert isinstance(key, HashPoint)
return HashPoint.of(Integer(1 + max(self.create(treel).height(), self.create(treer).height())))
def balance(self) -> ActiveBinaryTree[ActiveKeyType, Integer]:
reference: Nullable[
HashPoint[BinaryTree[KeyMetadata[ActiveKeyType, Integer]]]
] = self.reference.reference
assert isinstance(reference, Nullable)
if isinstance(reference, Null):
return self
elif isinstance(reference, NotNull):
tree: BinaryTree[KeyMetadata[ActiveKeyType, Integer]] = self.resolver.resolve(reference.value)
assert isinstance(tree, BinaryTree)
atl = self.create(tree.treel)
assert isinstance(atl, ActiveBinaryTree)
atr = self.create(tree.treer)
assert isinstance(atr, ActiveBinaryTree)
delta = atl.height() - atr.height()
assert isinstance(delta, int)
if delta < -1:
assert isinstance(atr.reference.reference, NotNull)
treer: BinaryTree[
KeyMetadata[ActiveKeyType, Integer]
] = self.resolver.resolve(atr.reference.reference.value)
assert isinstance(treer, BinaryTree)
atrl = self.create(treer.treel)
assert isinstance(atrl, ActiveBinaryTree)
atrr = self.create(treer.treer)
assert isinstance(atrr, ActiveBinaryTree)
if atrl.height() > atrr.height():
assert isinstance(atrl.reference.reference, NotNull)
treerl: BinaryTree[
KeyMetadata[ActiveKeyType, Integer]
] = self.resolver.resolve(atrl.reference.reference.value)
assert isinstance(treerl, BinaryTree)
return self.active_tree(
self.active_tree(
atl.reference,
treerl.treel,
self.resolver.resolve(tree.key).key
).reference,
self.active_tree(
treerl.treer,
atrr.reference,
self.resolver.resolve(treer.key).key
).reference,
self.resolver.resolve(treerl.key).key
)
else:
return self.active_tree(
self.active_tree(
atl.reference,
atrl.reference,
self.resolver.resolve(tree.key).key
).reference,
atrr.reference,
self.resolver.resolve(treer.key).key
)
elif delta > 1:
assert isinstance(atl.reference.reference, NotNull)
treel: BinaryTree[
KeyMetadata[ActiveKeyType, Integer]
] = self.resolver.resolve(atl.reference.reference.value)
assert isinstance(treel, BinaryTree)
atlr = self.create(treel.treer)
assert isinstance(atlr, ActiveBinaryTree)
atll = self.create(treel.treel)
assert isinstance(atll, ActiveBinaryTree)
if atlr.height() > atll.height():
assert isinstance(atlr.reference.reference, NotNull)
treelr: BinaryTree[
KeyMetadata[ActiveKeyType, Integer]
] = self.resolver.resolve(atlr.reference.reference.value)
assert isinstance(treelr, BinaryTree)
return self.active_tree(
self.active_tree(
atll.reference,
treelr.treel,
self.resolver.resolve(treel.key).key
).reference,
self.active_tree(
treelr.treer,
atr.reference,
self.resolver.resolve(tree.key).key
).reference,
self.resolver.resolve(treelr.key).key
)
else:
return self.active_tree(
atll.reference,
self.active_tree(
atlr.reference,
atr.reference,
self.resolver.resolve(tree.key).key
).reference,
self.resolver.resolve(treel.key).key
)
else:
return self
else:
raise TypeError

View File

@ -0,0 +1,65 @@
from typing import Generic, Iterable, TypeVar
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hash_point_format import hash_point_format, tabulate
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullablereference import NullableReference, NullableReferenceFactory
from rainbowadn.hashing.rainbow_factory import RainbowFactory
from rainbowadn.hashing.recursivementionable import RecursiveMentionable
__all__ = ('BinaryTree', 'BinaryTreeFactory',)
TreeKeyType = TypeVar('TreeKeyType')
class BinaryTree(RecursiveMentionable, Generic[TreeKeyType]):
def __factory__(self) -> RainbowFactory['BinaryTree[TreeKeyType]']:
return self.factory(self.key.factory)
@classmethod
def factory(cls, factory: RainbowFactory[TreeKeyType]) -> RainbowFactory['BinaryTree[TreeKeyType]']:
assert isinstance(factory, RainbowFactory)
return BinaryTreeFactory(factory)
def __init__(
self,
treel: NullableReference['BinaryTree[TreeKeyType]'], treer: NullableReference['BinaryTree[TreeKeyType]'],
key: HashPoint[TreeKeyType]
):
assert isinstance(treel, NullableReference)
assert isinstance(treer, NullableReference)
assert isinstance(key, HashPoint)
self.treel = treel
self.treer = treer
self.key = key
def points(self) -> Iterable[HashPoint]:
return [*self.treel.points(), *self.treer.points(), self.key]
def __bytes__(self):
return bytes(self.treel) + bytes(self.treer) + bytes(self.key)
def str(self, resolver: HashResolver, tab: int) -> str:
assert isinstance(resolver, HashResolver)
assert isinstance(tab, int)
return f'{self.treel.str(resolver, tab)}' \
f'{tabulate(tab)}{hash_point_format(self.key, resolver, tab)}' \
f'{tabulate(tab)}{self.treer.str(resolver, tab)}'
class BinaryTreeFactory(RainbowFactory[BinaryTree[TreeKeyType]], Generic[TreeKeyType]):
def __init__(self, factory: RainbowFactory[TreeKeyType]):
assert isinstance(factory, RainbowFactory)
self.factory = factory
def from_bytes(self, source: bytes) -> BinaryTree[TreeKeyType]:
assert isinstance(source, bytes)
return BinaryTree(
NullableReferenceFactory(self).from_bytes(source[:HashPoint.HASH_LENGTH]),
NullableReferenceFactory(self).from_bytes(source[HashPoint.HASH_LENGTH:HashPoint.HASH_LENGTH * 2]),
HashPoint(self.factory, source[HashPoint.HASH_LENGTH * 2:], Null())
)
def loose(self) -> RainbowFactory[BinaryTree[TreeKeyType]]:
return self

View File

@ -0,0 +1,51 @@
from typing import Generic, TypeVar
from rainbowadn.data.collection.trees.binary.binarytree import BinaryTree
from rainbowadn.data.collection.trees.comparison.comparator import Comparator, Comparison, Equal, Left, Right
from rainbowadn.data.collection.collection_interface.querycollectioninterface import QueryCollectionInterface
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullable import Nullable
from rainbowadn.hashing.nullability.nullablereference import NullableReference
__all__ = ('QueryBinaryTree',)
KeyType = TypeVar('KeyType')
class QueryBinaryTree(QueryCollectionInterface[KeyType], Generic[KeyType]):
def __init__(
self, comparator: Comparator[KeyType], reference: NullableReference[BinaryTree[KeyType]]
):
assert isinstance(comparator, Comparator)
assert isinstance(reference, NullableReference)
self.comparator = comparator
self.reference = reference
self.resolver = comparator.resolver
assert isinstance(self.resolver, HashResolver)
def query(self, key: HashPoint[KeyType]) -> Nullable[HashPoint[KeyType]]:
assert isinstance(key, HashPoint)
reference: Nullable[HashPoint[BinaryTree[KeyType]]] = self.reference.reference
assert isinstance(reference, Nullable)
if isinstance(reference, Null):
return Null()
elif isinstance(reference, NotNull):
hash_point: HashPoint[BinaryTree[KeyType]] = reference.value
assert isinstance(hash_point, HashPoint)
tree: BinaryTree[KeyType] = self.resolver.resolve(hash_point)
assert isinstance(tree, BinaryTree)
comparison: Comparison = self.comparator.compare(tree.key, key)
assert isinstance(comparison, Comparison)
if isinstance(comparison, Equal):
return NotNull(tree.key)
elif isinstance(comparison, Left):
return QueryBinaryTree(self.comparator, tree.treel).query(key)
elif isinstance(comparison, Right):
return QueryBinaryTree(self.comparator, tree.treer).query(key)
else:
raise TypeError
else:
raise TypeError

View File

@ -0,0 +1,75 @@
from typing import Generic, TypeVar
from rainbowadn.data.collection.trees.binary.activebinarytree import ActiveBinaryTree
from rainbowadn.data.collection.trees.binary.binarytree import BinaryTree, BinaryTreeFactory
from rainbowadn.data.collection.trees.comparison.comparator import Comparator
from rainbowadn.data.collection.keymetadata import KeyMetadata, KeyMetadataFactory
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullablereference import NullableReference
from rainbowadn.hashing.rainbow_factory import RainbowFactory
__all__ = ('UnbalancedABT',)
ActiveKeyType = TypeVar('ActiveKeyType')
MetaDataType = TypeVar('MetaDataType')
class UnbalancedABT(ActiveBinaryTree[ActiveKeyType, MetaDataType], Generic[ActiveKeyType, MetaDataType]):
def __init__(
self,
comparator: Comparator[ActiveKeyType],
reference: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]]],
metadata: HashPoint[MetaDataType]
):
assert isinstance(comparator, Comparator)
assert isinstance(reference, NullableReference)
assert isinstance(metadata, HashPoint)
super().__init__(comparator, reference)
self._metadata = metadata
def create(
self, reference: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]]]
) -> 'UnbalancedABT[ActiveKeyType, MetaDataType]':
assert isinstance(reference, NullableReference)
return type(self)(
self.comparator,
reference,
self._metadata
)
@classmethod
def empty(
cls,
comparator: Comparator[ActiveKeyType],
metadata: HashPoint[MetaDataType],
factory: RainbowFactory[ActiveKeyType]
):
assert isinstance(comparator, Comparator)
assert isinstance(metadata, HashPoint)
assert isinstance(factory, RainbowFactory)
return cls(
comparator,
NullableReference(
Null(),
BinaryTreeFactory(KeyMetadataFactory(factory, metadata.factory))
),
metadata
)
def empty_metadata(self) -> HashPoint[MetaDataType]:
return self._metadata
def metadata(
self,
treel: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]]],
treer: NullableReference[BinaryTree[KeyMetadata[ActiveKeyType, MetaDataType]]],
key: HashPoint[ActiveKeyType]
) -> HashPoint[MetaDataType]:
assert isinstance(treel, NullableReference)
assert isinstance(treer, NullableReference)
assert isinstance(key, HashPoint)
return self._metadata
def balance(self) -> 'UnbalancedABT[ActiveKeyType, MetaDataType]':
return self

View File

@ -0,0 +1,56 @@
import abc
from typing import Generic, TypeVar
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
__all__ = (
'Comparison',
'Left',
'Right',
'Equal',
'Replace',
'Fail',
'Duplicate',
'Comparator',
)
class Comparison(abc.ABC):
pass
class Left(Comparison):
pass
class Right(Comparison):
pass
class Equal(Comparison):
pass
class Replace(Equal):
pass
class Fail(Equal):
pass
class Duplicate(Equal):
pass
KeyType = TypeVar('KeyType')
class Comparator(Generic[KeyType]):
def __init__(self, resolver: HashResolver):
assert isinstance(resolver, HashResolver)
self.resolver = resolver
def compare(self, original: HashPoint[KeyType], key: HashPoint[KeyType]) -> Comparison:
raise NotImplementedError

View File

@ -0,0 +1,21 @@
from typing import Generic, TypeVar
from rainbowadn.data.collection.trees.comparison.comparator import Comparison, Left, Right
from rainbowadn.data.collection.trees.comparison.protocolcomparator import ProtocolComparator
from rainbowadn.hashing.hashpoint import HashPoint
__all__ = ('HashComparator',)
KeyType = TypeVar('KeyType')
class HashComparator(ProtocolComparator[KeyType], Generic[KeyType]):
def compare(self, original: HashPoint[KeyType], key: HashPoint[KeyType]) -> Comparison:
assert isinstance(original, HashPoint)
assert isinstance(key, HashPoint)
if key.point < original.point:
return Left()
elif key.point > original.point:
return Right()
else:
return self.equal

View File

@ -0,0 +1,32 @@
from typing import Generic, TypeVar
from rainbowadn.data.collection.trees.comparison.comparator import Comparator, Comparison
from rainbowadn.data.collection.keyed import Keyed
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
__all__ = ('KeyedComparator',)
ComparatorKeyType = TypeVar('ComparatorKeyType')
class KeyedComparator(
Comparator[Keyed[ComparatorKeyType]], Generic[ComparatorKeyType]
):
def __init__(self, resolver: HashResolver, comparator: Comparator[ComparatorKeyType]):
assert isinstance(resolver, HashResolver)
assert isinstance(comparator, Comparator)
self.comparator = comparator
super().__init__(resolver)
def compare(
self,
original: HashPoint[Keyed[ComparatorKeyType]],
key: HashPoint[Keyed[ComparatorKeyType]]
) -> Comparison:
assert isinstance(original, HashPoint)
assert isinstance(key, HashPoint)
return self.comparator.compare(
self.resolver.resolve(original).key,
self.resolver.resolve(key).key,
)

View File

@ -0,0 +1,20 @@
from rainbowadn.data.atomic.plain import Plain
from rainbowadn.data.collection.trees.comparison.comparator import Comparison, Left, Right
from rainbowadn.data.collection.trees.comparison.protocolcomparator import ProtocolComparator
from rainbowadn.hashing.hashpoint import HashPoint
__all__ = ('PlainComparator',)
class PlainComparator(ProtocolComparator[Plain]):
def compare(self, original: HashPoint[Plain], key: HashPoint[Plain]) -> Comparison:
assert isinstance(original, HashPoint)
assert isinstance(key, HashPoint)
original_value = self.resolver.resolve(original)
key_value = self.resolver.resolve(key)
if key_value.source < original_value.source:
return Left()
elif key_value.source > original_value.source:
return Right()
else:
return self.equal

View File

@ -0,0 +1,17 @@
import abc
from typing import Generic, TypeVar
from rainbowadn.data.collection.trees.comparison.comparator import Comparator, Equal
from rainbowadn.hashing.hashresolver import HashResolver
__all__ = ('ProtocolComparator',)
KeyType = TypeVar('KeyType')
class ProtocolComparator(Comparator[KeyType], abc.ABC, Generic[KeyType]):
def __init__(self, resolver: HashResolver, equal: Equal):
assert isinstance(resolver, HashResolver)
assert isinstance(equal, Equal)
super().__init__(resolver)
self.equal = equal

View File

View File

@ -0,0 +1,27 @@
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.recursivementionable import RecursiveMentionable
__all__ = ('hash_point_format', 'tabulate',)
def hash_point_format(hash_point: HashPoint, resolver: HashResolver, tab: int) -> str:
assert isinstance(hash_point, HashPoint)
assert isinstance(resolver, HashResolver)
assert isinstance(tab, int)
value = resolver.resolve(hash_point)
if isinstance(value, RecursiveMentionable):
return value.str(resolver, tab)
else:
return str(value)
newline = False
def tabulate(tab: int) -> str:
assert isinstance(tab, int)
if newline:
return '\n' + ' ' * tab
else:
return ' '

View File

@ -0,0 +1,14 @@
from rainbowadn.hashing.rainbow_factory import RainbowFactory
__all__ = ('HashMentionable',)
class HashMentionable:
def __bytes__(self):
raise NotImplementedError
def __factory__(self) -> RainbowFactory['HashMentionable']:
raise NotImplementedError
def __topology_hash__(self) -> bytes:
raise NotImplementedError

View File

@ -0,0 +1,58 @@
import hashlib
from typing import Generic, TypeVar
from rainbowadn.hashing.hashmentionable import HashMentionable
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullable import Nullable
from rainbowadn.hashing.rainbow_factory import RainbowFactory
__all__ = ('HashPoint',)
HashMentioned = TypeVar('HashMentioned')
class HashPoint(Generic[HashMentioned]):
def __init__(self, factory: RainbowFactory[HashMentioned], point: bytes, value: Nullable[HashMentioned]):
assert isinstance(factory, RainbowFactory)
assert isinstance(point, bytes)
assert isinstance(value, Nullable)
assert len(point) == self.HASH_LENGTH
self.factory = factory
self.point = point
self.value = value
def __bytes__(self):
return self.point
HASH_LENGTH = 32
NULL_HASH = b'\0' * HASH_LENGTH
@classmethod
def hash(cls, source: bytes) -> bytes:
assert isinstance(source, bytes)
return hashlib.sha256(source).digest()
@classmethod
def bytes_of_mentioned(cls, mentioned: HashMentionable):
assert isinstance(mentioned, HashMentionable)
topology_hash: bytes = mentioned.__topology_hash__()
assert isinstance(topology_hash, bytes)
assert len(topology_hash) == cls.HASH_LENGTH
return topology_hash + bytes(mentioned)
@classmethod
def of(cls, mentioned: HashMentioned) -> 'HashPoint[HashMentioned]':
assert isinstance(mentioned, HashMentionable)
return cls(
mentioned.__factory__(), cls.hash(cls.bytes_of_mentioned(mentioned)), NotNull(mentioned)
)
def loose(self) -> 'HashPoint[HashMentioned]':
return HashPoint(self.factory, self.point, Null())
def __eq__(self, other):
if isinstance(other, HashPoint):
return self.point == other.point
else:
return NotImplemented

View File

@ -0,0 +1,29 @@
from typing import TypeVar
from rainbowadn.hashing.hashmentionable import HashMentionable
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
__all__ = ('HashResolver',)
RHashMentioned = TypeVar('RHashMentioned')
class HashResolver:
def _resolve(self, point: bytes) -> bytes:
raise NotImplementedError
def resolve(self, hashpoint: HashPoint[RHashMentioned]) -> RHashMentioned:
assert isinstance(hashpoint, HashPoint)
if isinstance(hashpoint.value, NotNull):
return hashpoint.value.value
elif isinstance(hashpoint.value, Null):
resolved: bytes = self._resolve(bytes(hashpoint))
assert isinstance(resolved, bytes)
mentioned: RHashMentioned = hashpoint.factory.from_bytes(resolved[HashPoint.HASH_LENGTH:])
assert isinstance(mentioned, HashMentionable)
assert mentioned.__topology_hash__() == resolved[:HashPoint.HASH_LENGTH]
return mentioned
else:
raise TypeError

View File

@ -0,0 +1,18 @@
from typing import Generic, TypeVar
from rainbowadn.hashing.nullability.nullable import Nullable
__all__ = ('NotNull',)
NullableType = TypeVar('NullableType')
class NotNull(Nullable[NullableType], Generic[NullableType]):
def __init__(self, value: NullableType):
self.value = value
def __eq__(self, other):
if isinstance(other, NotNull):
return self.value == other.value
else:
return NotImplemented

View File

@ -0,0 +1,15 @@
from typing import Generic, TypeVar
from rainbowadn.hashing.nullability.nullable import Nullable
__all__ = ('Null',)
NullableType = TypeVar('NullableType')
class Null(Nullable[NullableType], Generic[NullableType]):
def __eq__(self, other):
if isinstance(other, Null):
return True
else:
return NotImplemented

View File

@ -0,0 +1,10 @@
import abc
from typing import Generic, TypeVar
__all__ = ('Nullable',)
NullableType = TypeVar('NullableType')
class Nullable(Generic[NullableType], abc.ABC):
pass

View File

@ -0,0 +1,87 @@
from typing import Generic, Iterable, TypeVar
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hash_point_format import hash_point_format
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullable import Nullable
from rainbowadn.hashing.rainbow_factory import RainbowFactory
from rainbowadn.hashing.recursivementionable import RecursiveMentionable
__all__ = ('NullableReference', 'NullableReferenceFactory',)
ReferencedType = TypeVar('ReferencedType')
class NullableReference(RecursiveMentionable, Generic[ReferencedType]):
def __factory__(self) -> RainbowFactory['NullableReference[ReferencedType]']:
return NullableReferenceFactory(self.factory)
def points(self) -> Iterable[HashPoint]:
if isinstance(self.reference, NotNull):
return [self.reference.value]
elif isinstance(self.reference, Null):
return []
else:
raise TypeError
def __bytes__(self):
if isinstance(self.reference, NotNull):
hash_point: HashPoint[ReferencedType] = self.reference.value
assert isinstance(hash_point, HashPoint)
return hash_point.point
elif isinstance(self.reference, Null):
return HashPoint.NULL_HASH
else:
raise TypeError
def __init__(
self, reference: Nullable[HashPoint[ReferencedType]], factory: RainbowFactory[ReferencedType]
):
assert isinstance(reference, Nullable)
assert isinstance(factory, RainbowFactory)
self.reference = reference
self.factory = factory
@classmethod
def of(cls, hash_point: HashPoint[ReferencedType]) -> 'NullableReference[ReferencedType]':
assert isinstance(hash_point, HashPoint)
return cls(NotNull(hash_point), hash_point.factory)
@classmethod
def off(cls, value: ReferencedType) -> 'NullableReference[ReferencedType]':
return cls.of(HashPoint.of(value))
def str(self, resolver: HashResolver, tab: int) -> str:
if isinstance(self.reference, Null):
return f'-'
elif isinstance(self.reference, NotNull):
return f'{hash_point_format(self.reference.value, resolver, tab)}'
else:
raise TypeError
def __eq__(self, other):
if isinstance(other, NullableReference):
return self.reference == other.reference
else:
return NotImplemented
FReferencedType = TypeVar('FReferencedType')
class NullableReferenceFactory(RainbowFactory[NullableReference[FReferencedType]], Generic[FReferencedType]):
def __init__(self, factory: RainbowFactory[FReferencedType]):
assert isinstance(factory, RainbowFactory)
self.factory = factory
def from_bytes(self, source: bytes) -> NullableReference[FReferencedType]:
assert isinstance(source, bytes)
if source == HashPoint.NULL_HASH:
return NullableReference(Null(), self.factory)
else:
return NullableReference.of(HashPoint(self.factory, source, Null()))
def loose(self) -> RainbowFactory[NullableReference[FReferencedType]]:
return self

View File

@ -0,0 +1,12 @@
from typing import Generic, TypeVar
__all__ = ('RainbowFactory',)
FHashMentioned = TypeVar('FHashMentioned')
class RainbowFactory(Generic[FHashMentioned]):
"""вперёд, уроды, вас ждут заводы"""
def from_bytes(self, source: bytes) -> FHashMentioned:
raise NotImplementedError

View File

@ -0,0 +1,21 @@
import abc
from typing import Iterable
from rainbowadn.hashing.hashmentionable import HashMentionable
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
__all__ = ('RecursiveMentionable',)
class RecursiveMentionable(HashMentionable, abc.ABC):
def points(self) -> Iterable[HashPoint]:
raise NotImplementedError
def str(self, resolver: HashResolver, tab: int) -> str:
assert isinstance(resolver, HashResolver)
assert isinstance(tab, int)
return f'(recursive {self.__class__.__name__})'
def __topology_hash__(self) -> bytes:
return HashPoint.hash(b''.join(hash_point.point for hash_point in self.points()))

View File

@ -0,0 +1,25 @@
from typing import TypeVar
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullablereference import NullableReference
__all__ = ('reduce_nullable_reference',)
ReReferencedType = TypeVar('ReReferencedType')
def reduce_nullable_reference(
reference: NullableReference[NullableReference[ReReferencedType]],
resolver: HashResolver
) -> NullableReference[ReReferencedType]:
assert isinstance(reference, NullableReference)
assert isinstance(resolver, HashResolver)
if isinstance(reference.reference, Null):
return reference.factory.from_bytes(HashPoint.NULL_HASH)
elif isinstance(reference.reference, NotNull):
return resolver.resolve(reference.reference.value)
else:
raise TypeError

View File

@ -0,0 +1,34 @@
import abc
from typing import Generic, Type, TypeVar
from rainbowadn.hashing.hashmentionable import HashMentionable
from rainbowadn.hashing.rainbow_factory import RainbowFactory
__all__ = ('StaticMentionable', 'StaticFactory',)
StaticMentioned = TypeVar('StaticMentioned')
class StaticMentionable(HashMentionable, abc.ABC, Generic[StaticMentioned]):
@classmethod
def from_bytes(cls, source: bytes) -> StaticMentioned:
raise NotImplementedError
def __factory__(self) -> RainbowFactory[StaticMentioned]:
return self.factory()
@classmethod
def factory(cls) -> RainbowFactory[StaticMentioned]:
return StaticFactory(cls)
class StaticFactory(RainbowFactory[StaticMentioned], Generic[StaticMentioned]):
def __init__(self, cls: Type[StaticMentioned]):
self.cls = cls
def from_bytes(self, source: bytes) -> StaticMentioned:
assert isinstance(source, bytes)
if issubclass(self.cls, StaticMentionable):
return self.cls.from_bytes(source)
else:
raise TypeError

View File

View File

4
rainbowadn/v13/algo.py Normal file
View File

@ -0,0 +1,4 @@
__all__ = ('MINT_CONST',)
MINT_ORDER = 20
MINT_CONST = 1 << MINT_ORDER

View File

@ -0,0 +1,90 @@
from typing import Generic, TypeVar
from rainbowadn.chain.abstractreductionchainmetafactory import AbstractReductionChainMetaFactory
from rainbowadn.chain.chaincollectioninterface import ChainCollectionInterface
from rainbowadn.data.collection.stack.stack import Stack, StackFactory
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.nullablereference import NullableReference, NullableReferenceFactory
from rainbowadn.v13.bankprotocol import BankProtocol
from rainbowadn.v13.bankstate import BankState
from rainbowadn.v13.transaction import Transaction
__all__ = ('BankChain',)
BlockType = TypeVar('BlockType')
class BankChain(Generic[BlockType]):
def __init__(
self,
chain: ChainCollectionInterface[
BlockType,
NullableReference[Stack[Transaction]],
BankState,
]
):
assert isinstance(chain, ChainCollectionInterface)
self.chain = chain
self.reference = self.chain.reference
assert isinstance(self.reference, NullableReference)
self.resolver = self.chain.resolver
assert isinstance(self.resolver, HashResolver)
@classmethod
def empty(
cls,
factory: AbstractReductionChainMetaFactory[BlockType, NullableReference[Stack[Transaction]], BankState],
resolver: HashResolver
) -> 'BankChain[BlockType]':
assert isinstance(factory, AbstractReductionChainMetaFactory)
assert isinstance(resolver, HashResolver)
return cls(
factory.factory(
NullableReferenceFactory(StackFactory(Transaction.factory()).loose()).loose(),
BankState.factory(),
BankProtocol(resolver),
).empty()
)
@classmethod
def from_reference(
cls,
factory: AbstractReductionChainMetaFactory[BlockType, NullableReference[Stack[Transaction]], BankState],
reference: NullableReference[
BlockType
],
resolver: HashResolver
) -> 'BankChain[BlockType]':
assert isinstance(factory, AbstractReductionChainMetaFactory)
assert isinstance(reference, NullableReference)
assert isinstance(resolver, HashResolver)
return cls(
factory.factory(
NullableReferenceFactory(StackFactory(Transaction.factory()).loose()).loose(),
BankState.factory(),
BankProtocol(resolver),
).from_reference(
reference
)
)
def add(self, stack: NullableReference[Stack[Transaction]]) -> 'BankChain[BlockType]':
assert isinstance(stack, NullableReference)
return BankChain(
self.chain.add(HashPoint.of(stack))
)
def adds(self, transactions: list[Transaction]) -> 'BankChain[BlockType]':
return self.add(
Stack.off(
Transaction.factory(),
reversed(transactions)
)
)
def verify(self) -> bool:
return self.chain.verify()
def __str__(self):
return self.reference.str(self.resolver, 0)

View File

@ -0,0 +1,64 @@
from rainbowadn.chain.reduction.reduced import Reduced
from rainbowadn.chain.reduction.reduction import Reduction
from rainbowadn.chain.reduction.reductionprotocol import ReductionProtocol
from rainbowadn.chain.reduction.reductionresult import ReductionResult
from rainbowadn.data.atomic.integer import Integer
from rainbowadn.hashing.static import StaticFactory
from rainbowadn.data.collection.trees.binary.binarytree import BinaryTreeFactory
from rainbowadn.data.collection.keymetadata import KeyMetadataFactory
from rainbowadn.data.collection.stack.stack import Stack
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullable import Nullable
from rainbowadn.hashing.nullability.nullablereference import NullableReference
from rainbowadn.hashing.rainbow_factory import RainbowFactory
from rainbowadn.v13.bankstate import BankState
from rainbowadn.v13.subject import Subject
from rainbowadn.v13.transaction import Coin, Transaction
__all__ = ('BankProtocol',)
class BankProtocol(ReductionProtocol[NullableReference[Stack[Transaction]], BankState]):
def reduce(
self,
reduction: Reduction[NullableReference[Stack[Transaction]], BankState]
) -> ReductionResult[NullableReference[Stack[Transaction]], BankState]:
assert isinstance(reduction, Reduction)
bank_state: BankState = self.resolver.resolve(reduction.accumulator)
assert isinstance(bank_state, BankState)
reference: Nullable[HashPoint[Stack[Transaction]]] = self.resolver.resolve(reduction.reductor).reference
assert isinstance(reference, Nullable)
if isinstance(reference, Null):
return Reduced(HashPoint.of(bank_state.loose()))
elif isinstance(reference, NotNull):
stack: Stack[Transaction] = self.resolver.resolve(reference.value)
assert isinstance(stack, Stack)
return Reduction(
HashPoint.of(stack.previous),
HashPoint.of(bank_state.push(stack.element, self.resolver))
)
else:
raise TypeError
def initial(self, factory: RainbowFactory[BankState]) -> HashPoint[BankState]:
assert isinstance(factory, RainbowFactory)
return HashPoint.of(
BankState(
NullableReference(
Null(),
BinaryTreeFactory(KeyMetadataFactory(StaticFactory(Coin), StaticFactory(Integer)).loose()).loose()
),
NullableReference(
Null(),
BinaryTreeFactory(KeyMetadataFactory(StaticFactory(Coin), StaticFactory(Integer)).loose()).loose()
),
NullableReference(Null(), Subject.factory()),
HashPoint.of(Integer(0))
)
)
def header_filter(self, state: HashPoint[BankState]) -> HashPoint[BankState]:
assert isinstance(state, HashPoint)
return HashPoint.of(self.resolver.resolve(state).loose().advance(self.resolver))

151
rainbowadn/v13/bankstate.py Normal file
View File

@ -0,0 +1,151 @@
from typing import Iterable
from rainbowadn.data.atomic.integer import Integer
from rainbowadn.data.collection.keymetadata import KeyMetadata, KeyMetadataFactory
from rainbowadn.data.collection.trees.binary.activebinarytree import ActiveBinaryTree
from rainbowadn.data.collection.trees.binary.avl import AVLABT
from rainbowadn.data.collection.trees.binary.binarytree import BinaryTree, BinaryTreeFactory
from rainbowadn.data.collection.trees.comparison.comparator import Fail
from rainbowadn.data.collection.trees.comparison.hashcomparator import HashComparator
from rainbowadn.hashing.hash_point_format import tabulate
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullable import Nullable
from rainbowadn.hashing.nullability.nullablereference import NullableReference, NullableReferenceFactory
from rainbowadn.hashing.rainbow_factory import RainbowFactory
from rainbowadn.hashing.recursivementionable import RecursiveMentionable
from rainbowadn.hashing.static import StaticFactory, StaticMentionable
from rainbowadn.v13.algo import MINT_CONST
from rainbowadn.v13.subject import Subject
from rainbowadn.v13.transaction import Coin, Transaction, TransactionData
__all__ = ('BankState',)
class BankState(RecursiveMentionable, StaticMentionable['BankState']):
def __init__(
self,
minted: NullableReference[BinaryTree[KeyMetadata[Coin, Integer]]],
used: NullableReference[BinaryTree[KeyMetadata[Coin, Integer]]],
miner: NullableReference[Subject],
length: HashPoint[Integer]
):
assert isinstance(miner, NullableReference)
assert isinstance(used, NullableReference)
assert isinstance(miner, NullableReference)
assert isinstance(length, HashPoint)
self.minted = minted
self.used = used
self.miner = miner
self.length = length
def points(self) -> Iterable[HashPoint]:
return [*self.minted.points(), *self.used.points(), *self.miner.points(), self.length]
def __bytes__(self):
return bytes(self.minted) + bytes(self.used) + bytes(self.miner) + bytes(self.length)
@classmethod
def from_bytes(cls, source: bytes) -> 'BankState':
assert isinstance(source, bytes)
reference_factory: RainbowFactory[
NullableReference[BinaryTree[KeyMetadata[Coin, Integer]]]
] = NullableReferenceFactory(
BinaryTreeFactory(
KeyMetadataFactory(
StaticFactory(Coin),
StaticFactory(Integer),
).loose()
).loose()
).loose()
assert isinstance(reference_factory, RainbowFactory)
return cls(
reference_factory.from_bytes(source[:HashPoint.HASH_LENGTH]),
reference_factory.from_bytes(source[HashPoint.HASH_LENGTH:2 * HashPoint.HASH_LENGTH]),
NullableReferenceFactory(
StaticFactory(Subject)
).from_bytes(source[2 * HashPoint.HASH_LENGTH:3 * HashPoint.HASH_LENGTH]),
HashPoint(Integer.factory(), source[3 * HashPoint.HASH_LENGTH:], Null())
)
def loose(self) -> 'BankState':
return BankState(
self.minted,
self.used,
NullableReference(Null(), self.miner.factory),
self.length
)
def advance(self, resolver: HashResolver) -> 'BankState':
assert isinstance(resolver, HashResolver)
return BankState(
self.minted,
self.used,
NullableReference(Null(), self.miner.factory),
HashPoint.of(Integer(resolver.resolve(self.length).integer + 1))
)
def push(self, transaction: HashPoint[Transaction], resolver: HashResolver) -> 'BankState':
assert isinstance(transaction, HashPoint)
assert isinstance(resolver, HashResolver)
transaction_resolved = resolver.resolve(transaction)
assert isinstance(transaction_resolved, Transaction)
miner: Nullable[HashPoint[Subject]] = self.miner.reference
assert isinstance(miner, Nullable)
if isinstance(miner, Null):
mint = MINT_CONST
elif isinstance(miner, NotNull):
mint = 0
else:
raise TypeError
assert transaction_resolved.verify(resolver, mint)
transaction_data: TransactionData = resolver.resolve(transaction_resolved.data)
assert isinstance(transaction_data, TransactionData)
minted: ActiveBinaryTree[Coin, Integer] = AVLABT(HashComparator(resolver, Fail()), self.minted)
assert isinstance(minted, ActiveBinaryTree)
used: ActiveBinaryTree[Coin, Integer] = AVLABT(HashComparator(resolver, Fail()), self.used)
assert isinstance(used, ActiveBinaryTree)
in_coin: HashPoint[Coin]
for in_coin in transaction_data.iter_in_coins(resolver):
assert isinstance(in_coin, HashPoint)
assert isinstance(minted.query_tree().query(in_coin), NotNull)
assert isinstance(used.query_tree().query(in_coin), Null)
used = used.add(in_coin)
assert isinstance(used, ActiveBinaryTree)
coin: Coin
for coin, miner in transaction_resolved.iter_coins(resolver, mint, miner):
assert isinstance(coin, Coin)
assert isinstance(miner, Nullable)
assert isinstance(minted.query_tree().query(HashPoint.of(coin)), Null)
minted = minted.add(HashPoint.of(coin))
assert isinstance(minted, ActiveBinaryTree)
return BankState(
minted.reference,
used.reference,
NullableReference(miner, self.miner.factory),
self.length
)
def str(self, resolver: HashResolver, tab: int) -> str:
assert isinstance(resolver, HashResolver)
assert isinstance(tab, int)
return f'(' \
f'{tabulate(tab + 1)}bank' \
f'{tabulate(tab + 1)}(miner)' \
f'{tabulate(tab + 1)}{self.miner.str(resolver, tab + 1)}' \
f'{tabulate(tab + 1)}(minted)' \
f'{tabulate(tab + 1)}{self.minted.str(resolver, tab + 1)}' \
f'{tabulate(tab + 1)}(used)' \
f'{tabulate(tab + 1)}{self.used.str(resolver, tab + 1)}' \
f'{tabulate(tab + 1)}(length)' \
f'{tabulate(tab + 1)}{resolver.resolve(self.length)}' \
f'{tabulate(tab)})'

View File

@ -0,0 +1,48 @@
import nacl.bindings
import nacl.exceptions
import nacl.signing
from rainbowadn.data.atomic.atomic import Atomic
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.v13.subject import Subject
__all__ = ('BadSignature', 'Signature',)
class BadSignature(nacl.exceptions.BadSignatureError):
pass
class Signature(Atomic['Signature']):
def __init__(self, source: bytes):
assert isinstance(source, bytes)
assert len(source) == nacl.bindings.crypto_sign_BYTES
self.source = source
@classmethod
def sign(cls, key: nacl.signing.SigningKey, hash_point: HashPoint) -> 'Signature':
assert isinstance(key, nacl.signing.SigningKey)
assert isinstance(hash_point, HashPoint)
return cls(
key.sign(hash_point.point).signature
)
@classmethod
def from_bytes(cls, source: bytes) -> 'Signature':
assert isinstance(source, bytes)
return cls(source)
def __bytes__(self):
return self.source
def verify(self, subject: Subject, hash_point: HashPoint) -> bool:
assert isinstance(subject, Subject)
assert isinstance(hash_point, HashPoint)
try:
subject.verify_key.verify(hash_point.point, self.source)
return True
except nacl.exceptions.BadSignatureError:
raise BadSignature
def __str__(self):
return f'(signature)'

25
rainbowadn/v13/subject.py Normal file
View File

@ -0,0 +1,25 @@
from nacl.public import PublicKey
from nacl.signing import VerifyKey
from rainbowadn.data.atomic.atomic import Atomic
__all__ = ('Subject',)
class Subject(Atomic['Subject']):
def __init__(self, verify_key: VerifyKey):
assert isinstance(verify_key, VerifyKey)
self.verify_key: VerifyKey = verify_key
self.public_key: PublicKey = verify_key.to_curve25519_public_key()
assert isinstance(self.public_key, PublicKey)
@classmethod
def from_bytes(cls, source: bytes) -> 'Subject':
assert isinstance(source, bytes)
return cls(VerifyKey(source))
def __bytes__(self):
return bytes(self.verify_key)
def __str__(self):
return f'(subject)'

View File

@ -0,0 +1,364 @@
from typing import Iterable
import nacl.signing
from rainbowadn.data.atomic.integer import Integer
from rainbowadn.data.collection.stack.stack import Stack, StackFactory
from rainbowadn.hashing.hash_point_format import hash_point_format, tabulate
from rainbowadn.hashing.hashpoint import HashPoint
from rainbowadn.hashing.hashresolver import HashResolver
from rainbowadn.hashing.nullability.notnull import NotNull
from rainbowadn.hashing.nullability.null import Null
from rainbowadn.hashing.nullability.nullable import Nullable
from rainbowadn.hashing.nullability.nullablereference import NullableReference, NullableReferenceFactory
from rainbowadn.hashing.rainbow_factory import RainbowFactory
from rainbowadn.hashing.recursivementionable import RecursiveMentionable
from rainbowadn.hashing.static import StaticMentionable
from rainbowadn.v13.signature import Signature
from rainbowadn.v13.subject import Subject
__all__ = ('CoinData', 'Coin', 'TransactionData', 'Transaction',)
class CoinData(RecursiveMentionable, StaticMentionable['CoinData']):
def __init__(
self,
owner: HashPoint[Subject],
value: HashPoint[Integer]
):
assert isinstance(owner, HashPoint)
assert isinstance(value, HashPoint)
self.owner = owner
self.value = value
@classmethod
def of(cls, owner: Subject, value: int) -> 'CoinData':
return cls(HashPoint.of(owner), HashPoint.of(Integer(value)))
def points(self) -> Iterable[HashPoint]:
return [self.owner, self.value]
def __bytes__(self):
return bytes(self.owner) + bytes(self.value)
@classmethod
def from_bytes(cls, source: bytes) -> 'CoinData':
assert isinstance(source, bytes)
return cls(
HashPoint(Subject.factory(), source[:HashPoint.HASH_LENGTH], Null()),
HashPoint(Integer.factory(), source[HashPoint.HASH_LENGTH:], Null()),
)
def str(self, resolver: HashResolver, tab: int) -> str:
assert isinstance(resolver, HashResolver)
assert isinstance(tab, int)
return f'{resolver.resolve(self.owner)}' \
f'{tabulate(tab)}{resolver.resolve(self.value)}'
class Coin(RecursiveMentionable, StaticMentionable['Coin']):
def __init__(
self,
data: HashPoint[CoinData],
origin: HashPoint['Transaction'],
index: HashPoint[Integer]
):
assert isinstance(data, HashPoint)
assert isinstance(origin, HashPoint)
assert isinstance(index, HashPoint)
self.data = data
self.origin = origin
self.index = index
def points(self) -> Iterable[HashPoint]:
return [self.data, self.origin, self.index]
def __bytes__(self):
return bytes(self.data) + bytes(self.origin) + bytes(self.index)
@classmethod
def from_bytes(cls, source: bytes) -> 'Coin':
assert isinstance(source, bytes)
return cls(
HashPoint(CoinData.factory(), source[:HashPoint.HASH_LENGTH], Null()),
HashPoint(Transaction.factory(), source[HashPoint.HASH_LENGTH:2 * HashPoint.HASH_LENGTH], Null()),
HashPoint(Integer.factory(), source[2 * HashPoint.HASH_LENGTH:], Null()),
)
def __str__(self):
return f'(coin)'
def str(self, resolver: HashResolver, tab: int) -> str:
assert isinstance(resolver, HashResolver)
assert isinstance(tab, int)
return f'(' \
f'{tabulate(tab + 1)}coin' \
f'{tabulate(tab + 1)}{hash_point_format(self.data, resolver, tab + 1)}' \
f'{tabulate(tab + 1)}(origin)' \
f'{tabulate(tab + 1)}{resolver.resolve(self.index)}' \
f'{tabulate(tab)})'
class TransactionData(RecursiveMentionable, StaticMentionable['TransactionData']):
def __init__(
self,
in_coins: NullableReference[Stack[Coin]],
out_coins: NullableReference[Stack[CoinData]],
):
assert isinstance(in_coins, NullableReference)
assert isinstance(out_coins, NullableReference)
self.in_coins = in_coins
self.out_coins = out_coins
self.hash_point = HashPoint.of(self)
assert isinstance(self.hash_point, HashPoint)
def points(self) -> Iterable[HashPoint]:
return [*self.in_coins.points(), *self.out_coins.points()]
def __bytes__(self):
return bytes(self.in_coins) + bytes(self.out_coins)
@classmethod
def from_bytes(cls, source: bytes) -> 'TransactionData':
assert isinstance(source, bytes)
return cls(
NullableReferenceFactory(
StackFactory(Coin.factory()).loose()
).from_bytes(source[:HashPoint.HASH_LENGTH]),
NullableReferenceFactory(
StackFactory(CoinData.factory()).loose()
).from_bytes(source[HashPoint.HASH_LENGTH:]),
)
def _verify_signatures(
self,
resolver: HashResolver,
in_coins: NullableReference[Stack[Coin]],
signatures: NullableReference[Stack[Signature]],
) -> bool:
assert isinstance(resolver, HashResolver)
assert isinstance(in_coins, NullableReference)
assert isinstance(signatures, NullableReference)
if isinstance(in_coins.reference, Null):
assert isinstance(signatures.reference, Null)
return True
elif isinstance(in_coins.reference, NotNull):
assert isinstance(signatures.reference, NotNull)
in_coins_stack: Stack[Coin] = resolver.resolve(in_coins.reference.value)
assert isinstance(in_coins_stack, Stack)
signatures_stack: Stack[Signature] = resolver.resolve(signatures.reference.value)
assert isinstance(signatures_stack, Stack)
coin: Coin = resolver.resolve(in_coins_stack.element)
assert isinstance(coin, Coin)
coin_data: CoinData = resolver.resolve(coin.data)
assert isinstance(coin_data, CoinData)
owner: Subject = resolver.resolve(coin_data.owner)
assert isinstance(owner, Subject)
signature: Signature = resolver.resolve(signatures_stack.element)
assert isinstance(signature, Signature)
return signature.verify(
owner,
self.hash_point
)
else:
raise TypeError
def iter_in_coins(self, resolver: HashResolver) -> Iterable[HashPoint[Coin]]:
assert isinstance(resolver, HashResolver)
in_coins: NullableReference[Stack[Coin]] = self.in_coins
assert isinstance(in_coins, NullableReference)
while isinstance(in_coins.reference, NotNull):
in_coins_stack: Stack[Coin] = resolver.resolve(in_coins.reference.value)
assert isinstance(in_coins_stack, Stack)
coin: HashPoint[Coin] = in_coins_stack.element
assert isinstance(coin, HashPoint)
yield coin
in_coins = in_coins_stack.previous
assert isinstance(in_coins, NullableReference)
assert isinstance(in_coins.reference, Null)
def _total_in(self, resolver: HashResolver) -> int:
assert isinstance(resolver, HashResolver)
total_in = 0
coin: HashPoint[Coin]
for coin in self.iter_in_coins(resolver):
assert isinstance(coin, HashPoint)
total_in += resolver.resolve(
resolver.resolve(
resolver.resolve(
coin
).data
).value
).integer
return total_in
def iter_out_coins(self, resolver: HashResolver) -> Iterable[HashPoint[CoinData]]:
assert isinstance(resolver, HashResolver)
out_coins: NullableReference[Stack[CoinData]] = self.out_coins
assert isinstance(out_coins, NullableReference)
while isinstance(out_coins.reference, NotNull):
out_coins_stack: Stack[CoinData] = resolver.resolve(out_coins.reference.value)
assert isinstance(out_coins_stack, Stack)
coin: HashPoint[CoinData] = out_coins_stack.element
assert isinstance(coin, HashPoint)
yield coin
out_coins = out_coins_stack.previous
assert isinstance(out_coins, NullableReference)
assert isinstance(out_coins.reference, Null)
def _total_out(self, resolver: HashResolver) -> int:
assert isinstance(resolver, HashResolver)
total_out = 0
coin: HashPoint[CoinData]
for coin in self.iter_out_coins(resolver):
assert isinstance(coin, HashPoint)
total_out += resolver.resolve(
resolver.resolve(
coin
).value
).integer
return total_out
def _verify_values(self, resolver: HashResolver, mint: int) -> bool:
assert isinstance(resolver, HashResolver)
assert isinstance(mint, int)
assert self._total_out(resolver) <= self._total_in(resolver) + mint
return True
def extra(self, resolver: HashResolver, mint: int) -> int:
assert isinstance(resolver, HashResolver)
assert isinstance(mint, int)
return self._total_in(resolver) + mint - self._total_out(resolver)
def verify(
self,
resolver: HashResolver,
signatures: NullableReference[Stack[Signature]],
mint: int
) -> bool:
assert isinstance(resolver, HashResolver)
assert isinstance(signatures, NullableReference)
assert isinstance(mint, int)
return self._verify_signatures(resolver, self.in_coins, signatures) and self._verify_values(resolver, mint)
def str(self, resolver: HashResolver, tab: int) -> str:
assert isinstance(resolver, HashResolver)
assert isinstance(tab, int)
return f'(in)' \
f'{tabulate(tab)}{self.in_coins.str(resolver, tab)}' \
f'{tabulate(tab)}(out)' \
f'{tabulate(tab)}{self.out_coins.str(resolver, tab)}'
class Transaction(RecursiveMentionable, StaticMentionable['Transaction']):
def __init__(
self,
data: HashPoint[TransactionData],
signatures: NullableReference[Stack[Signature]]
):
assert isinstance(data, HashPoint)
assert isinstance(signatures, NullableReference)
self.data = data
self.signatures = signatures
self.hash_point = HashPoint.of(self)
assert isinstance(self.hash_point, HashPoint)
def points(self) -> Iterable[HashPoint]:
return [self.data, *self.signatures.points()]
def __bytes__(self):
return bytes(self.data) + bytes(self.signatures)
@classmethod
def from_bytes(cls, source: bytes) -> 'Transaction':
assert isinstance(source, bytes)
signature_factory: RainbowFactory[Signature] = Signature.factory()
assert isinstance(signature_factory, RainbowFactory)
stack_factory: RainbowFactory[Stack[Signature]] = StackFactory(signature_factory).loose()
assert isinstance(stack_factory, RainbowFactory)
return cls(
HashPoint(TransactionData.factory(), source[:HashPoint.HASH_LENGTH], Null()),
NullableReferenceFactory(stack_factory).from_bytes(source[HashPoint.HASH_LENGTH:]),
)
def iter_coins(
self,
resolver: HashResolver,
mint: int,
miner: Nullable[HashPoint[Subject]]
) -> Iterable[tuple[Coin, Nullable[HashPoint[Subject]]]]:
transaction_data = resolver.resolve(self.data)
assert isinstance(transaction_data, TransactionData)
index = 0
out_coin: HashPoint[CoinData]
for out_coin in transaction_data.iter_out_coins(resolver):
assert isinstance(out_coin, HashPoint)
if isinstance(miner, Null):
miner = NotNull(
resolver.resolve(out_coin).owner
)
assert isinstance(miner, Nullable)
coin: Coin = Coin(out_coin, self.hash_point, HashPoint.of(Integer(index)))
assert isinstance(coin, Coin)
yield coin, miner
index += 1
if isinstance(miner, NotNull):
coin: Coin = Coin(
HashPoint.of(
CoinData(
miner.value,
HashPoint.of(Integer(transaction_data.extra(resolver, mint)))
)
),
self.hash_point,
HashPoint.of(Integer(index))
)
assert isinstance(coin, Coin)
yield coin, miner
def coins(
self,
resolver: HashResolver,
mint: int,
miner: Nullable[HashPoint[Subject]]
) -> list[Coin]:
return [coin for coin, _ in self.iter_coins(resolver, mint, miner)]
def verify(self, resolver: HashResolver, mint: int):
assert isinstance(resolver, HashResolver)
assert isinstance(mint, int)
data: TransactionData = resolver.resolve(self.data)
assert isinstance(data, TransactionData)
return data.verify(resolver, self.signatures, mint)
def __str__(self):
return f'(transaction)'
def str(self, resolver: HashResolver, tab: int) -> str:
assert isinstance(resolver, HashResolver)
assert isinstance(tab, int)
return f'(' \
f'{tabulate(tab + 1)}transaction' \
f'{tabulate(tab + 1)}{hash_point_format(self.data, resolver, tab + 1)}' \
f'{tabulate(tab + 1)}{self.signatures.str(resolver, tab + 1)}' \
f'{tabulate(tab)})'
@classmethod
def make(
cls,
in_coins: list[Coin],
out_coins: list[CoinData],
keys: list[nacl.signing.SigningKey],
) -> 'Transaction':
transaction_data = TransactionData(
Stack.off(Coin.factory(), reversed(in_coins)),
Stack.off(CoinData.factory(), reversed(out_coins)),
)
assert isinstance(transaction_data, TransactionData)
return Transaction(
HashPoint.of(transaction_data),
Stack.off(
Signature.factory(),
(Signature.sign(key, HashPoint.of(transaction_data)) for key in reversed(keys))
)
)