rainbowadn/rainbowadn/flow13/_flowtransaction.py

390 lines
13 KiB
Python

from typing import Any, Iterable
from nacl.signing import SigningKey
from rainbowadn.atomic import *
from rainbowadn.collection.comparison import *
from rainbowadn.collection.keyvalue import *
from rainbowadn.core import *
from rainbowadn.flow.core import *
from rainbowadn.flow.verification.core import *
from rainbowadn.v13 import *
from ._flowstandard import *
from ._resolvemapper import *
__all__ = ('FlowCoinData', 'FlowCoin', 'FlowTransactionData', 'FlowTransaction',)
class FlowCoinData(RecursiveMentionable, StaticMentionable):
def __init__(
self,
owner: HashPoint[Subject],
value: HashPoint[Integer]
):
assert isinstance(owner, HashPoint)
assert isinstance(value, HashPoint)
self.owner = owner
self.value = value
async def int_value(self) -> int:
return (await self.value.resolve()).integer
@classmethod
def of(cls, owner: Subject, value: int) -> 'FlowCoinData':
assert isinstance(owner, Subject)
assert isinstance(value, int)
return cls(HashPoint.of(owner), HashPoint.of(Integer(value)))
def points(self) -> Iterable[HashPoint]:
return [self.owner, self.value]
def __bytes__(self):
return bytes(self.owner) + bytes(self.value)
@classmethod
def from_bytes(cls, source: bytes, resolver: HashResolver) -> 'FlowCoinData':
assert isinstance(source, bytes)
assert isinstance(resolver, HashResolver)
return cls(
ResolverOrigin(Subject.factory(), source[:HashPoint.HASH_LENGTH], resolver).hash_point(),
ResolverOrigin(Integer.factory(), source[HashPoint.HASH_LENGTH:], resolver).hash_point(),
)
async def str(self, tab: int) -> str:
assert isinstance(tab, int)
owner_str, value_str = await gather(
hash_point_format(self.owner, tab),
hash_point_format(self.value, tab),
)
assert isinstance(owner_str, str)
assert isinstance(value_str, str)
return f'{owner_str}' \
f'{tabulate(tab)}{value_str}'
class FlowCoin(RecursiveMentionable, StaticMentionable):
def __init__(
self,
data: HashPoint[FlowCoinData],
transaction: HashPoint['FlowTransaction']
):
assert isinstance(data, HashPoint)
assert isinstance(transaction, HashPoint)
self.data = data
self.transaction = transaction
async def data_resolved(self) -> FlowCoinData:
return await self.data.resolve()
def points(self) -> Iterable[HashPoint]:
return [self.data, self.transaction]
def __bytes__(self):
return bytes(self.data) + bytes(self.transaction)
@classmethod
def from_bytes(cls, source: bytes, resolver: HashResolver) -> 'FlowCoin':
assert isinstance(source, bytes)
assert isinstance(resolver, HashResolver)
return cls(
ResolverOrigin(FlowCoinData.factory(), source[:HashPoint.HASH_LENGTH], resolver).hash_point(),
ResolverOrigin(
FlowTransaction.factory(), source[HashPoint.HASH_LENGTH:2 * HashPoint.HASH_LENGTH], resolver
).hash_point(),
)
async def str(self, tab: int) -> str:
assert isinstance(tab, int)
return f'(' \
f'{tabulate(tab + 1)}coin' \
f'{tabulate(tab + 1)}{await hash_point_format(self.data, tab + 1)}' \
f'{tabulate(tab + 1)}(origin)' \
f'{tabulate(tab)})'
async def int_value(self) -> int:
return await (await self.data_resolved()).int_value()
async def owner_resolved(self) -> Subject:
return await (await self.data_resolved()).owner.resolve()
class FlowTransactionData(RecursiveMentionable, StaticMentionable):
def __init__(
self,
in_coins: FlowStandard[FlowCoin],
out_coins: FlowStandard[FlowCoinData],
):
assert isinstance(in_coins, FlowStandard)
assert isinstance(out_coins, FlowStandard)
self.in_coins = in_coins
self.out_coins = out_coins
self.hash_point = HashPoint.of(self)
assert isinstance(self.hash_point, HashPoint)
def points(self) -> Iterable[HashPoint]:
return [*self.in_coins.points(), *self.out_coins.points()]
def __bytes__(self):
return bytes(self.in_coins) + bytes(self.out_coins)
@classmethod
def from_bytes(cls, source: bytes, resolver: HashResolver) -> 'FlowTransactionData':
assert isinstance(source, bytes)
assert isinstance(resolver, HashResolver)
return cls(
FlowStandardFactory.of(
FlowCoin.factory(), HashComparator(Fail())
).from_bytes(source[:HashPoint.HASH_LENGTH], resolver),
FlowStandardFactory.of(
FlowCoinData.factory(), HashComparator(Fail())
).from_bytes(source[HashPoint.HASH_LENGTH:], resolver),
)
async def _signature_verify(self, coin: FlowCoin, signature: Signature) -> bool:
assert isinstance(coin, FlowCoin)
assert isinstance(signature, Signature)
assert_true(
signature.verify(
await coin.owner_resolved(),
self.hash_point
)
)
return True
@classmethod
def _coin_verification_mapper(
cls,
signatures: FlowStandard[KeyValue[Subject, Signature]],
) -> Mapper[
HashPoint[FlowCoin],
bool
]:
assert isinstance(signatures, FlowStandard)
return ResolveMapper.wrap_mapper(CVMapper(signatures).loose())
def _signature_pair_verification_mapper(self) -> Mapper[
HashPoint[KeyValue[Subject, Signature]],
bool
]:
return ResolveMapper.wrap_mapper(SPVMapper(self.hash_point).loose())
async def _verify_coin_signatures(
self,
signatures: FlowStandard[KeyValue[Subject, Signature]],
) -> bool:
assert isinstance(signatures, FlowStandard)
assert_true(
await ReduceVerification(
MapperVerification(self._coin_verification_mapper(signatures))
).loose().verify(
await self.in_coins.reducer()
)
)
return True
async def _verify_signature_pairs(
self,
signatures: FlowStandard[KeyValue[Subject, Signature]],
):
assert isinstance(signatures, FlowStandard)
assert_true(
await ReduceVerification(
MapperVerification(self._signature_pair_verification_mapper())
).loose().verify(
await signatures.reducer()
)
)
return True
async def _verify_signatures(
self,
signatures: FlowStandard[KeyValue[Subject, Signature]],
) -> bool:
assert isinstance(signatures, FlowStandard)
assert_trues(
await gather(
self._verify_coin_signatures(signatures),
self._verify_signature_pairs(signatures),
)
)
return True
async def verify(
self,
signatures: FlowStandard[KeyValue[Subject, Signature]]
) -> bool:
assert isinstance(signatures, FlowStandard)
assert_true(await self._verify_signatures(signatures))
return True
async def str(self, tab: int) -> str:
assert isinstance(tab, int)
in_str, out_str = await gather(
self.in_coins.str(tab),
self.out_coins.str(tab),
)
assert isinstance(in_str, str)
assert isinstance(out_str, str)
return f'(in)' \
f'{tabulate(tab)}{in_str}' \
f'{tabulate(tab)}(out)' \
f'{tabulate(tab)}{out_str}'
@classmethod
def empty(cls) -> 'FlowTransactionData':
return cls(
FlowStandardFactory.empty(FlowCoin.factory(), HashComparator(Fail())),
FlowStandardFactory.empty(FlowCoinData.factory(), HashComparator(Fail())),
)
class CVMapper(Mapper[FlowCoin, bool]):
def __init__(self, signatures: FlowStandard[KeyValue[Subject, Signature]]):
assert isinstance(signatures, FlowStandard)
self.signatures = signatures
async def map(self, element: FlowCoin) -> bool:
assert isinstance(element, FlowCoin)
assert_true(
await self.signatures.contains(
HashPoint.of(await KeyValue.off((await element.data.resolve()).owner, HashPoint.of(Signature.empty())))
)
)
return True
def loose(self) -> Mapper[FlowCoin, bool]:
return self
class SPVMapper(Mapper[KeyValue[Subject, Signature], bool]):
def __init__(self, hashpoint: HashPoint):
assert isinstance(hashpoint, HashPoint)
self.hashpoint = hashpoint
async def map(self, element: KeyValue[Subject, Signature]) -> bool:
assert isinstance(element, KeyValue)
subject: Subject
signature: Signature
subject, signature = await gather(
element.key.resolve(),
element.value.resolve(),
)
assert isinstance(subject, Subject)
assert isinstance(signature, Signature)
assert_true(signature.verify(subject, self.hashpoint))
return True
def loose(self) -> Mapper[KeyValue[Subject, Signature], bool]:
return self
class FlowTransaction(RecursiveMentionable, StaticMentionable):
def __init__(
self,
data: HashPoint[FlowTransactionData],
signatures: FlowStandard[KeyValue[Subject, Signature]]
):
assert isinstance(data, HashPoint)
assert isinstance(signatures, FlowStandard)
self.data = data
self.signatures = signatures
self.hash_point = HashPoint.of(self)
assert isinstance(self.hash_point, HashPoint)
async def data_resolved(self) -> FlowTransactionData:
return await self.data.resolve()
def points(self) -> Iterable[HashPoint]:
return [self.data, *self.signatures.points()]
def __bytes__(self):
return bytes(self.data) + bytes(self.signatures)
@classmethod
def from_bytes(cls, source: bytes, resolver: HashResolver) -> 'FlowTransaction':
assert isinstance(source, bytes)
assert isinstance(resolver, HashResolver)
return cls(
ResolverOrigin(FlowTransactionData.factory(), source[:HashPoint.HASH_LENGTH], resolver).hash_point(),
FlowStandardFactory.of(
KeyValue.f(Subject.factory(), Signature.factory()),
KeyedComparator(HashComparator(Fail()))
).from_bytes(
source[HashPoint.HASH_LENGTH:], resolver
),
)
def _coin(self, data: HashPoint[FlowCoinData]) -> HashPoint[FlowCoin]:
assert isinstance(data, HashPoint)
return HashPoint.of(FlowCoin(data, self.hash_point))
async def used_reducer(self) -> Reducer[HashPoint[FlowCoin], Any]:
transaction_data: FlowTransactionData = await self.data_resolved()
assert isinstance(transaction_data, FlowTransactionData)
return await transaction_data.in_coins.reducer()
async def minted_reducer(self) -> Reducer[HashPoint[FlowCoin], Any]:
transaction_data: FlowTransactionData = await self.data_resolved()
assert isinstance(transaction_data, FlowTransactionData)
return MapReducer(CallableMapper(self._coin), await transaction_data.out_coins.reducer())
async def verify(self):
data: FlowTransactionData = await self.data_resolved()
assert isinstance(data, FlowTransactionData)
assert_true(await data.verify(self.signatures))
return True
async def str(self, tab: int) -> str:
assert isinstance(tab, int)
return f'(' \
f'{tabulate(tab + 1)}transaction' \
f'{tabulate(tab + 1)}{await hash_point_format(self.data, tab + 1)}' \
f'{tabulate(tab + 1)}(signatures)' \
f'{tabulate(tab)})'
@classmethod
def empty(cls):
return cls(
HashPoint.of(FlowTransactionData.empty()),
FlowStandardFactory.empty(
KeyValue.f(Subject.factory(), Signature.factory()), KeyedComparator(HashComparator(Fail()))
),
)
@classmethod
async def make(
cls,
used: Iterable[FlowCoin],
minted: Iterable[FlowCoinData],
keys: Iterable[SigningKey],
) -> 'FlowTransaction':
used_std: FlowStandard[FlowCoin]
minted_std: FlowStandard[FlowCoinData]
used_std, minted_std = await gather(
FlowStandardFactory.off(FlowCoin.factory(), HashComparator(Fail()), used),
FlowStandardFactory.off(FlowCoinData.factory(), HashComparator(Fail()), minted),
)
assert isinstance(used_std, FlowStandard)
assert isinstance(minted_std, FlowStandard)
transaction_data: FlowTransactionData = FlowTransactionData(
used_std,
minted_std,
)
signatures: list[KeyValue[Subject, Signature]] = [
KeyValue.of(
Subject(signing_key.verify_key),
Signature.sign(signing_key, transaction_data.hash_point),
)
for
signing_key
in
keys
]
return cls(
HashPoint.of(transaction_data),
await FlowStandardFactory.off(
KeyValue.f(Subject.factory(), Signature.factory()), KeyedComparator(HashComparator(Fail())), signatures
)
)