371 lines
13 KiB
Python
371 lines
13 KiB
Python
from typing import AsyncIterable, Iterable
|
|
|
|
import nacl.signing
|
|
|
|
from rainbowadn.atomic import *
|
|
from rainbowadn.collection.linear import *
|
|
from rainbowadn.core import *
|
|
from rainbowadn.nullability import *
|
|
from .signature import *
|
|
from .subject import *
|
|
|
|
__all__ = ('CoinData', 'Coin', 'TransactionData', 'Transaction',)
|
|
|
|
|
|
class CoinData(RecursiveMentionable, StaticMentionable):
|
|
def __init__(
|
|
self,
|
|
owner: HashPoint[Subject],
|
|
value: HashPoint[Integer]
|
|
):
|
|
assert isinstance(owner, HashPoint)
|
|
assert isinstance(value, HashPoint)
|
|
self.owner = owner
|
|
self.value = value
|
|
|
|
async def int_value(self) -> int:
|
|
return (await self.value.resolve()).integer
|
|
|
|
@classmethod
|
|
def of(cls, owner: Subject, value: int) -> 'CoinData':
|
|
assert isinstance(owner, Subject)
|
|
assert isinstance(value, int)
|
|
return cls(HashPoint.of(owner), HashPoint.of(Integer(value)))
|
|
|
|
def points(self) -> Iterable[HashPoint]:
|
|
return [self.owner, self.value]
|
|
|
|
def __bytes__(self):
|
|
return bytes(self.owner) + bytes(self.value)
|
|
|
|
@classmethod
|
|
def from_bytes(cls, source: bytes, resolver: HashResolver) -> 'CoinData':
|
|
assert isinstance(source, bytes)
|
|
assert isinstance(resolver, HashResolver)
|
|
return cls(
|
|
ResolverOrigin(Subject.factory(), source[:HashPoint.HASH_LENGTH], resolver).hash_point(),
|
|
ResolverOrigin(Integer.factory(), source[HashPoint.HASH_LENGTH:], resolver).hash_point(),
|
|
)
|
|
|
|
async def str(self, tab: int) -> str:
|
|
assert isinstance(tab, int)
|
|
owner_str, value_str = await gather(
|
|
hash_point_format(self.owner, tab),
|
|
hash_point_format(self.value, tab),
|
|
)
|
|
assert isinstance(owner_str, str)
|
|
assert isinstance(value_str, str)
|
|
return f'{owner_str}' \
|
|
f'{tabulate(tab)}{value_str}'
|
|
|
|
|
|
class Coin(RecursiveMentionable, StaticMentionable):
|
|
def __init__(
|
|
self,
|
|
data: HashPoint[CoinData],
|
|
transaction: HashPoint['Transaction'],
|
|
index: HashPoint[Integer]
|
|
):
|
|
assert isinstance(data, HashPoint)
|
|
assert isinstance(transaction, HashPoint)
|
|
assert isinstance(index, HashPoint)
|
|
self.data = data
|
|
self.transaction = transaction
|
|
self.index = index
|
|
|
|
async def data_resolved(self) -> CoinData:
|
|
return await self.data.resolve()
|
|
|
|
def points(self) -> Iterable[HashPoint]:
|
|
return [self.data, self.transaction, self.index]
|
|
|
|
def __bytes__(self):
|
|
return bytes(self.data) + bytes(self.transaction) + bytes(self.index)
|
|
|
|
@classmethod
|
|
def from_bytes(cls, source: bytes, resolver: HashResolver) -> 'Coin':
|
|
assert isinstance(source, bytes)
|
|
assert isinstance(resolver, HashResolver)
|
|
return cls(
|
|
ResolverOrigin(CoinData.factory(), source[:HashPoint.HASH_LENGTH], resolver).hash_point(),
|
|
ResolverOrigin(
|
|
Transaction.factory(), source[HashPoint.HASH_LENGTH:2 * HashPoint.HASH_LENGTH], resolver
|
|
).hash_point(),
|
|
ResolverOrigin(Integer.factory(), source[2 * HashPoint.HASH_LENGTH:], resolver).hash_point(),
|
|
)
|
|
|
|
async def str(self, tab: int) -> str:
|
|
assert isinstance(tab, int)
|
|
data_str, index_str = await gather(
|
|
hash_point_format(self.data, tab + 1),
|
|
hash_point_format(self.index, tab + 1),
|
|
)
|
|
assert isinstance(data_str, str)
|
|
assert isinstance(index_str, str)
|
|
return f'(' \
|
|
f'{tabulate(tab + 1)}coin' \
|
|
f'{tabulate(tab + 1)}{data_str}' \
|
|
f'{tabulate(tab + 1)}(origin)' \
|
|
f'{tabulate(tab + 1)}{index_str}' \
|
|
f'{tabulate(tab)})'
|
|
|
|
async def int_value(self) -> int:
|
|
return await (await self.data_resolved()).int_value()
|
|
|
|
async def owner_resolved(self) -> Subject:
|
|
return await (await self.data_resolved()).owner.resolve()
|
|
|
|
|
|
class TransactionData(RecursiveMentionable, StaticMentionable):
|
|
def __init__(
|
|
self,
|
|
in_coins: NullableReference[Stack[Coin]],
|
|
out_coins: NullableReference[Stack[CoinData]],
|
|
):
|
|
assert isinstance(in_coins, NullableReference)
|
|
assert isinstance(out_coins, NullableReference)
|
|
self.in_coins = in_coins
|
|
self.out_coins = out_coins
|
|
self.hash_point = HashPoint.of(self)
|
|
assert isinstance(self.hash_point, HashPoint)
|
|
|
|
def points(self) -> Iterable[HashPoint]:
|
|
return [*self.in_coins.points(), *self.out_coins.points()]
|
|
|
|
def __bytes__(self):
|
|
return bytes(self.in_coins) + bytes(self.out_coins)
|
|
|
|
@classmethod
|
|
def from_bytes(cls, source: bytes, resolver: HashResolver) -> 'TransactionData':
|
|
assert isinstance(source, bytes)
|
|
assert isinstance(resolver, HashResolver)
|
|
return cls(
|
|
NullableReferenceFactory(
|
|
StackFactory(Coin.factory()).loose()
|
|
).from_bytes(source[:HashPoint.HASH_LENGTH], resolver),
|
|
NullableReferenceFactory(
|
|
StackFactory(CoinData.factory()).loose()
|
|
).from_bytes(source[HashPoint.HASH_LENGTH:], resolver),
|
|
)
|
|
|
|
async def _signature_verify(self, coin: Coin, signature: Signature) -> bool:
|
|
assert isinstance(coin, Coin)
|
|
assert isinstance(signature, Signature)
|
|
assert_true(
|
|
signature.verify(
|
|
await coin.owner_resolved(),
|
|
self.hash_point
|
|
)
|
|
)
|
|
return True
|
|
|
|
async def _verify_signatures(
|
|
self,
|
|
signatures: NullableReference[Stack[Signature]]
|
|
) -> bool:
|
|
assert isinstance(signatures, NullableReference)
|
|
assert_trues(
|
|
await gather(
|
|
*[
|
|
self._signature_verify(coin, signature)
|
|
for
|
|
coin, signature
|
|
in
|
|
zip(
|
|
await self.in_coins_resolved(),
|
|
await Stack.list(signatures),
|
|
strict=True
|
|
)
|
|
]
|
|
)
|
|
)
|
|
return True
|
|
|
|
def iter_in_coins(self) -> AsyncIterable[HashPoint[Coin]]:
|
|
return Stack.iter(self.in_coins)
|
|
|
|
async def in_coins_resolved(self) -> list[Coin]:
|
|
return await Stack.list(self.in_coins)
|
|
|
|
async def _total_in(self) -> int:
|
|
return await asum(
|
|
coin.int_value() for coin in await self.in_coins_resolved()
|
|
)
|
|
|
|
def iter_out_coins(self) -> AsyncIterable[HashPoint[CoinData]]:
|
|
return Stack.iter(self.out_coins)
|
|
|
|
async def out_coins_resolved(self) -> list[CoinData]:
|
|
return await Stack.list(self.out_coins)
|
|
|
|
async def _total_out(self) -> int:
|
|
return await asum(
|
|
coin.int_value() for coin in await self.out_coins_resolved()
|
|
)
|
|
|
|
async def _verify_values(self, mint: int) -> bool:
|
|
assert isinstance(mint, int)
|
|
assert (await self.extra(mint)) >= 0
|
|
return True
|
|
|
|
async def extra(self, mint: int) -> int:
|
|
assert isinstance(mint, int)
|
|
total_in, total_out = await gather(
|
|
self._total_in(),
|
|
self._total_out(),
|
|
)
|
|
assert isinstance(total_in, int)
|
|
assert isinstance(total_out, int)
|
|
return total_in + mint - total_out
|
|
|
|
async def verify(
|
|
self,
|
|
signatures: NullableReference[Stack[Signature]],
|
|
mint: int
|
|
) -> bool:
|
|
assert isinstance(signatures, NullableReference)
|
|
assert isinstance(mint, int)
|
|
assert_trues(
|
|
await gather(
|
|
self._verify_signatures(signatures),
|
|
self._verify_values(mint),
|
|
)
|
|
)
|
|
return True
|
|
|
|
async def str(self, tab: int) -> str:
|
|
assert isinstance(tab, int)
|
|
in_str, out_str = await gather(
|
|
self.in_coins.str(tab),
|
|
self.out_coins.str(tab),
|
|
)
|
|
assert isinstance(in_str, str)
|
|
assert isinstance(out_str, str)
|
|
return f'(in)' \
|
|
f'{tabulate(tab)}{in_str}' \
|
|
f'{tabulate(tab)}(out)' \
|
|
f'{tabulate(tab)}{out_str}'
|
|
|
|
|
|
class Transaction(RecursiveMentionable, StaticMentionable):
|
|
def __init__(
|
|
self,
|
|
data: HashPoint[TransactionData],
|
|
signatures: NullableReference[Stack[Signature]]
|
|
):
|
|
assert isinstance(data, HashPoint)
|
|
assert isinstance(signatures, NullableReference)
|
|
self.data = data
|
|
self.signatures = signatures
|
|
self.hash_point = HashPoint.of(self)
|
|
assert isinstance(self.hash_point, HashPoint)
|
|
|
|
async def data_resolved(self) -> TransactionData:
|
|
return await self.data.resolve()
|
|
|
|
def points(self) -> Iterable[HashPoint]:
|
|
return [self.data, *self.signatures.points()]
|
|
|
|
def __bytes__(self):
|
|
return bytes(self.data) + bytes(self.signatures)
|
|
|
|
@classmethod
|
|
def from_bytes(cls, source: bytes, resolver: HashResolver) -> 'Transaction':
|
|
assert isinstance(source, bytes)
|
|
assert isinstance(resolver, HashResolver)
|
|
signature_factory: RainbowFactory[Signature] = Signature.factory()
|
|
assert isinstance(signature_factory, RainbowFactory)
|
|
stack_factory: RainbowFactory[Stack[Signature]] = StackFactory(signature_factory).loose()
|
|
assert isinstance(stack_factory, RainbowFactory)
|
|
return cls(
|
|
ResolverOrigin(TransactionData.factory(), source[:HashPoint.HASH_LENGTH], resolver).hash_point(),
|
|
NullableReferenceFactory(stack_factory).from_bytes(source[HashPoint.HASH_LENGTH:], resolver),
|
|
)
|
|
|
|
async def iter_coins(
|
|
self,
|
|
mint: int,
|
|
miner: Nullable[HashPoint[Subject]]
|
|
) -> AsyncIterable[tuple[Coin, Nullable[HashPoint[Subject]]]]:
|
|
assert isinstance(mint, int)
|
|
assert isinstance(miner, Nullable)
|
|
transaction_data: TransactionData = await self.data_resolved()
|
|
assert isinstance(transaction_data, TransactionData)
|
|
index = 0
|
|
out_coin: HashPoint[CoinData]
|
|
async for out_coin in transaction_data.iter_out_coins():
|
|
assert isinstance(out_coin, HashPoint)
|
|
if miner.null():
|
|
miner = NotNull((await out_coin.resolve()).owner)
|
|
assert isinstance(miner, Nullable)
|
|
coin: Coin = Coin(out_coin, self.hash_point, HashPoint.of(Integer(index)))
|
|
assert isinstance(coin, Coin)
|
|
yield coin, miner
|
|
index += 1
|
|
if not miner.null():
|
|
coin: Coin = Coin(
|
|
HashPoint.of(
|
|
CoinData(
|
|
miner.resolve(),
|
|
HashPoint.of(Integer(await transaction_data.extra(mint)))
|
|
)
|
|
),
|
|
self.hash_point,
|
|
HashPoint.of(Integer(index))
|
|
)
|
|
assert isinstance(coin, Coin)
|
|
yield coin, miner
|
|
|
|
async def coins(
|
|
self,
|
|
mint: int,
|
|
miner: Nullable[HashPoint[Subject]]
|
|
) -> list[Coin]:
|
|
assert isinstance(mint, int)
|
|
assert isinstance(miner, Nullable)
|
|
return [coin async for coin, _ in self.iter_coins(mint, miner)]
|
|
|
|
async def verify(self, mint: int):
|
|
assert isinstance(mint, int)
|
|
data: TransactionData = await self.data_resolved()
|
|
assert isinstance(data, TransactionData)
|
|
assert_true(await data.verify(self.signatures, mint))
|
|
return True
|
|
|
|
async def str(self, tab: int) -> str:
|
|
assert isinstance(tab, int)
|
|
data_str, signatures_str = await gather(
|
|
hash_point_format(self.data, tab + 1),
|
|
self.signatures.str(tab + 1),
|
|
)
|
|
assert isinstance(data_str, str)
|
|
assert isinstance(signatures_str, str)
|
|
return f'(' \
|
|
f'{tabulate(tab + 1)}transaction' \
|
|
f'{tabulate(tab + 1)}{data_str}' \
|
|
f'{tabulate(tab + 1)}{signatures_str}' \
|
|
f'{tabulate(tab)})'
|
|
|
|
@classmethod
|
|
def make(
|
|
cls,
|
|
in_coins: list[Coin],
|
|
out_coins: list[CoinData],
|
|
keys: list[nacl.signing.SigningKey],
|
|
) -> 'Transaction':
|
|
assert isinstance(in_coins, list)
|
|
assert isinstance(out_coins, list)
|
|
assert isinstance(keys, list)
|
|
transaction_data = TransactionData(
|
|
Stack.off(Coin.factory(), reversed(in_coins)),
|
|
Stack.off(CoinData.factory(), reversed(out_coins)),
|
|
)
|
|
assert isinstance(transaction_data, TransactionData)
|
|
return Transaction(
|
|
HashPoint.of(transaction_data),
|
|
Stack.off(
|
|
Signature.factory(),
|
|
(Signature.sign(key, HashPoint.of(transaction_data)) for key in reversed(keys))
|
|
)
|
|
)
|