diff --git a/main.py b/main.py index 37f793b..0f0b17d 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,5 @@ import os +import string import time from collections import OrderedDict from typing import Any, MutableMapping @@ -14,6 +15,7 @@ from rainbowadn.data.collection.trees.binary.activebinarytree import ActiveBinar from rainbowadn.data.collection.trees.binary.avl import AVLBTBP from rainbowadn.data.collection.trees.comparison.comparator import Replace from rainbowadn.data.collection.trees.comparison.plaincomparator import PlainComparator +from rainbowadn.encryption.encrypted import Encrypted from rainbowadn.hashing.hashmentionable import HashMentionable from rainbowadn.hashing.hashpoint import HashPoint from rainbowadn.hashing.hashresolver import HashResolver @@ -33,9 +35,9 @@ class DumbResolver(HashResolver): def __init__(self): self.table: MutableMapping[bytes, bytes] = OrderedDict() - def resolve_bytes(self, point: bytes) -> bytes: + def resolve(self, point: bytes) -> tuple[bytes, 'HashResolver']: assert isinstance(point, bytes) - return self.table[point] + return self.table[point], self def save(self, hash_point: HashPoint) -> None: assert isinstance(hash_point, HashPoint) @@ -132,7 +134,7 @@ def main2(): print(chain.actual_state().reference.value.resolve().height) -def main(): +def main3(): tree: ActiveBinaryTree[Plain, Integer] = ActiveBinaryTree.empty( AVLBTBP(PlainComparator(Replace())), Plain.factory() ) @@ -141,5 +143,30 @@ def main(): print(tree.reference.str(0)) +def main(): + key = b'a' * 32 + dr = DumbResolver() + tree: ActiveBinaryTree[Plain, Integer] = ActiveBinaryTree.empty( + AVLBTBP(PlainComparator(Replace())), Plain.factory() + ) + for char in string.ascii_uppercase: + tree = tree.add(HashPoint.of(Plain(char.encode()))) + print(tree.reference.str(0)) + target = tree.reference + target = Encrypted.encrypt(target, key).decrypted + print(Encrypted.ecc) + tree = tree.create(target) + print(tree.reference.str(0)) + tree = tree.add(HashPoint.of(Plain(b'NEWKEY'))) + print(tree.reference.str(0)) + target = tree.reference + eeed = Encrypted.encrypt(target, key) + print(Encrypted.ecc) + dr.save(HashPoint.of(eeed)) + # for key, value in dr.table.items(): + # print(key.hex(), value.hex()) + print(ResolverMetaOrigin(dr).migrate(HashPoint.of(eeed)).resolve().decrypted.str(0)) + + if __name__ == '__main__': - main0() + main() diff --git a/rainbowadn/encryption/encrypted.py b/rainbowadn/encryption/encrypted.py new file mode 100644 index 0000000..6a2f202 --- /dev/null +++ b/rainbowadn/encryption/encrypted.py @@ -0,0 +1,185 @@ +from typing import Generic, Iterable, TypeVar + +from nacl.bindings import crypto_hash_sha256 +from nacl.secret import SecretBox + +from rainbowadn.hashing.hashmentionable import HashMentionable +from rainbowadn.hashing.hashpoint import HashPoint +from rainbowadn.hashing.hashresolver import HashResolver +from rainbowadn.hashing.origin import Origin +from rainbowadn.hashing.rainbow_factory import RainbowFactory +from rainbowadn.hashing.recursivementionable import RecursiveMentionable +from rainbowadn.hashing.resolverorigin import ResolverOrigin + +__all__ = ('Encrypted', 'EncryptedFactory') + +EncryptedType = TypeVar('EncryptedType') + + +class Encrypted(RecursiveMentionable, Generic[EncryptedType]): + def __init__( + self, + key: bytes, + resolution: tuple[HashPoint['Encrypted'], ...], + decrypted: EncryptedType + ): + assert isinstance(key, bytes) + assert isinstance(resolution, tuple) + assert isinstance(decrypted, HashMentionable) + + self.factory: RainbowFactory[EncryptedType] = decrypted.__factory__() + self.key = key + + self.resolution = resolution + self.decrypted: EncryptedType = decrypted + + self.hashpoints = tuple(decrypted.points()) if isinstance(decrypted, RecursiveMentionable) else () + assert len(self.hashpoints) == len(self.resolution) + self.mapping: dict[bytes, HashPoint[Encrypted]] = { + hashpoint.point: encrypted for hashpoint, encrypted in zip(self.hashpoints, resolution) + } + + def points(self) -> Iterable[HashPoint]: + return self.resolution + + ecc = 0 + + @classmethod + def encrypt(cls, decrypted: EncryptedType, key: bytes) -> 'Encrypted[EncryptedType]': + cls.ecc += 1 + assert isinstance(key, bytes) + hashpoints = tuple(decrypted.points()) if isinstance(decrypted, RecursiveMentionable) else () + encrypted: Encrypted[EncryptedType] = object.__new__(cls) + encrypted.__init__( + key, + tuple( + cls.encrypt_hashpoint(hashpoint, key) + for + hashpoint + in + hashpoints + ), + decrypted.__factory__().from_bytes( + bytes(decrypted), + EncryptedResolver(encrypted) + ) + ) + return encrypted + + @classmethod + def encrypt_hashpoint( + cls, hashpoint: HashPoint[EncryptedType], key: bytes + ) -> HashPoint['Encrypted[EncryptedType]']: + assert isinstance(hashpoint, HashPoint) + assert isinstance(key, bytes) + if isinstance(hashpoint.origin, ResolverOrigin): + resolver: HashResolver = hashpoint.origin.resolver + assert isinstance(resolver, HashResolver) + if isinstance(resolver, EncryptedResolver): + return ShortcutOrigin( + hashpoint.factory, + resolver.encrypted.mapping[hashpoint.point], + key + ).hash_point() + return HashPoint.of(cls.encrypt(hashpoint.resolve(), key)) + + def __bytes__(self): + source: bytes = len(self.resolution).to_bytes(8, 'little') + b''.join( + encrypted.point + for + encrypted + in self.resolution + ) + bytes(self.decrypted) + nonce: bytes = crypto_hash_sha256(self.key + source)[:24] + return SecretBox(self.key).encrypt(source, nonce=nonce) + + def __factory__(self) -> RainbowFactory['Encrypted[EncryptedType]']: + return EncryptedFactory(self.factory, self.key) + + +class EncryptedFactory(RainbowFactory[Encrypted[EncryptedType]], Generic[EncryptedType]): + def __init__(self, factory: RainbowFactory[EncryptedType], key: bytes): + assert isinstance(factory, RainbowFactory) + assert isinstance(key, bytes) + self.factory = factory + self.key = key + + def from_bytes(self, source: bytes, resolver: HashResolver) -> Encrypted[EncryptedType]: + assert isinstance(source, bytes) + assert isinstance(resolver, HashResolver) + plain: bytes = SecretBox(self.key).decrypt(source) + resolution_size: int = int.from_bytes(plain[:8], 'little') + encrypted: Encrypted[EncryptedType] = object.__new__(Encrypted) + decrypted: EncryptedType = self.factory.from_bytes( + plain[8 + resolution_size * HashPoint.HASH_LENGTH:], + EncryptedResolver(encrypted) + ) + hashpoints = tuple(decrypted.points()) if isinstance(decrypted, RecursiveMentionable) else () + assert len(hashpoints) == resolution_size + encrypted.__init__( + self.key, + tuple( + ResolverOrigin( + EncryptedFactory( + hashpoint.factory, + self.key + ), + plain[8 + i * HashPoint.HASH_LENGTH: 8 + (i + 1) * HashPoint.HASH_LENGTH], + resolver + ).hash_point() for i, hashpoint in enumerate(hashpoints) + ), + decrypted + ) + return encrypted + + def loose(self) -> RainbowFactory[Encrypted[EncryptedType]]: + return self + + +class EncryptedResolver(HashResolver): + def __init__(self, encrypted: Encrypted): + assert isinstance(encrypted, Encrypted) + self.encrypted = encrypted + + def resolve(self, point: bytes) -> tuple[bytes, 'HashResolver']: + assert isinstance(point, bytes) + encrypted = self.encrypted.mapping[point].resolve() + return HashPoint.bytes_of_mentioned(encrypted.decrypted), EncryptedResolver(encrypted) + + +class ShortcutOrigin(Origin[Encrypted[EncryptedType]], Generic[EncryptedType]): + def __init__(self, factory: RainbowFactory[EncryptedType], hashpoint: HashPoint[Encrypted], key: bytes): + assert isinstance(factory, RainbowFactory) + assert isinstance(hashpoint, HashPoint) + assert isinstance(key, bytes) + self.factory: RainbowFactory[Encrypted[EncryptedType]] = EncryptedFactory(factory, key) + assert isinstance(self.factory, RainbowFactory) + self.hashpoint = hashpoint + + def resolve(self) -> Encrypted[EncryptedType]: + encrypted = self.hashpoint.resolve() + encrypted = self.factory.from_bytes(bytes(encrypted), ShortcutResolver(encrypted)) + assert HashPoint.of(encrypted) == self.hashpoint + return encrypted + + def hash_point(self) -> HashPoint[Encrypted[EncryptedType]]: + return HashPoint( + self.factory, + self.hashpoint.point, + self + ) + + +class ShortcutResolver(HashResolver): + def __init__(self, encrypted: Encrypted): + assert isinstance(encrypted, Encrypted) + self.mapping: dict[bytes, HashPoint[Encrypted]] = { + hashpoint.point: hashpoint for hashpoint in encrypted.resolution + } + + def resolve(self, point: bytes) -> tuple[bytes, 'HashResolver']: + assert isinstance(point, bytes) + return ( + HashPoint.bytes_of_mentioned(self.mapping[point].resolve()), + ShortcutResolver(self.mapping[point].resolve()) + ) diff --git a/rainbowadn/hashing/hashpoint.py b/rainbowadn/hashing/hashpoint.py index 9db8cfb..524e433 100644 --- a/rainbowadn/hashing/hashpoint.py +++ b/rainbowadn/hashing/hashpoint.py @@ -9,6 +9,7 @@ from rainbowadn.hashing.rainbow_factory import RainbowFactory __all__ = ('HashPoint',) HashMentioned = TypeVar('HashMentioned') +ReHashMentioned = TypeVar('ReHashMentioned') def _hash(source: bytes) -> bytes: @@ -60,6 +61,7 @@ class HashPoint(Generic[HashMentioned]): def resolve(self) -> HashMentioned: resolved = self.origin.resolve() assert isinstance(resolved, HashMentionable) + assert self.point == self.hash(self.bytes_of_mentioned(resolved)) return resolved def __eq__(self, other): diff --git a/rainbowadn/hashing/hashresolver.py b/rainbowadn/hashing/hashresolver.py index 33a7418..8a77dfd 100644 --- a/rainbowadn/hashing/hashresolver.py +++ b/rainbowadn/hashing/hashresolver.py @@ -6,5 +6,5 @@ RHashMentioned = TypeVar('RHashMentioned') class HashResolver: - def resolve_bytes(self, point: bytes) -> bytes: + def resolve(self, point: bytes) -> tuple[bytes, 'HashResolver']: raise NotImplementedError diff --git a/rainbowadn/hashing/resolverorigin.py b/rainbowadn/hashing/resolverorigin.py index 36c67bd..77809ac 100644 --- a/rainbowadn/hashing/resolverorigin.py +++ b/rainbowadn/hashing/resolverorigin.py @@ -26,11 +26,13 @@ class ResolverOrigin(Origin[OriginType], Generic[OriginType]): self.resolver = resolver def resolve(self) -> OriginType: - resolved: bytes = self.resolver.resolve_bytes(self.point) + resolved, resolver = self.resolver.resolve(self.point) assert isinstance(resolved, bytes) - mentioned: OriginType = self.factory.from_bytes(resolved[HashPoint.HASH_LENGTH:], self.resolver) + assert isinstance(resolver, HashResolver) + mentioned: OriginType = self.factory.from_bytes(resolved[HashPoint.HASH_LENGTH:], resolver) assert isinstance(mentioned, HashMentionable) assert mentioned.__topology_hash__() == resolved[:HashPoint.HASH_LENGTH] + assert self.point == HashPoint.hash(HashPoint.bytes_of_mentioned(mentioned)) return mentioned def hash_point(self) -> HashPoint[OriginType]: diff --git a/rainbowadn/testing/failresolver.py b/rainbowadn/testing/failresolver.py index 1ed8c75..6a6e957 100644 --- a/rainbowadn/testing/failresolver.py +++ b/rainbowadn/testing/failresolver.py @@ -2,5 +2,5 @@ from rainbowadn.hashing.hashresolver import HashResolver class FailResolver(HashResolver): - def resolve_bytes(self, point: bytes) -> bytes: + def resolve(self, point: bytes) -> tuple[bytes, 'HashResolver']: raise TypeError('fail-resolver always fails')