import asyncio import random from contextlib import ExitStack from typing import Any, Callable, Coroutine from nacl.signing import SigningKey, VerifyKey from plot import * from trace_common import * from rainbowadn.collection.trees.binary import * from rainbowadn.core import * from rainbowadn.flow13 import * from rainbowadn.flow13 import FlowCoin from rainbowadn.instrument import * from rainbowadn.testing.resolvers import * from rainbowadn.v13 import * def get_instrumentations() -> list[Instrumentation]: sleep_cc = Concurrency(DelayedResolver, "sleep") return [ sleep_cc, Concurrency(ActiveBinaryTree, "add"), Concurrency(ActiveBinaryTree, "contains"), Concurrency(FlowStandard, "verify_subset"), ] minted: set[HashPoint[FlowCoin]] = set() reverse: dict[VerifyKey, SigningKey] = {} def _generate_subject() -> Subject: signing_key = SigningKey.generate() verify_key = signing_key.verify_key reverse[verify_key] = signing_key return Subject(verify_key) async def _generate_transaction( subjects_min: int, subjects_max: int, ): in_coins: list[FlowCoin] = [] keys: list[SigningKey] = [] for _ in range(random.randint(subjects_min, subjects_max)): if not minted: break coin = await minted.pop().resolve() in_coins.append(coin) keys.append(reverse[(await coin.owner()).verify_key]) transaction = await FlowTransaction.make( in_coins, [FlowCoinData.of(_generate_subject(), 0) for _ in range(random.randint(subjects_min, subjects_max))], keys, ) for coinhp in await (await transaction.minted_reducer()).reduce(FlowIterate()): minted.add(coinhp) return transaction async def _generate( blocks: int, subjects_min: int, subjects_max: int, transactions_min: int, transactions_max: int, ) -> BankBlock: bank: BankBlock = BankBlock.empty() for _ in range(blocks): bank = await bank.add( await FlowCheque.make( [ await _generate_transaction(subjects_min, subjects_max) for _ in range(random.randint(transactions_min, transactions_max)) ] ) ) print("generated") return bank async def _migrate(bank: BankBlock, params) -> BankBlock: assert_true(await bank.verify()) bank = BankBlock(await get_dr(params["delay"], params["caching"]).migrate_resolved(bank.reference)) print("migrated") return bank async def _instrument(process: Callable[[], Coroutine[Any, Any, None]]) -> list[Instrumentation]: instrumentations: list[Instrumentation] = get_instrumentations() with ExitStack() as estack: for stacked in instrumentations: stacked.enter(estack) try: await process() except Terminated: pass print("deinstrumentation (should be empty):", Instrumentation.deinstrumentation) print("instrumented") return instrumentations async def _process(bank: BankBlock) -> None: with Measure(bank, "verify") as measurement: assert_true(await bank.verify()) print("measured", *(f"{t:.3f}" for t in measurement.log)) async def _report(bank: BankBlock): with open("trace/latest-report.txt", "w") as file: report = ClassReport() await report.walk(bank.reference) file.write(report.format()) print("reported") async def _preprocess(bank: BankBlock): with Resolution() as resolution: await _process(bank) with open("trace/latest-resolution.txt", "w") as file: file.write(resolution.format()) print("preprocessed") async def _trace(params): set_gather_linear() bank = await _generate( params["blocks"], *params["subjects"], *params["transactions"], ) if params["meta"]: await _report(bank) await _process(bank) await _preprocess(bank) bank = await _migrate(bank, params) set_gather_asyncio() with DeintrumentationSize(Instrumentation, "deinstrument"): with Counter(DeintrumentationSize, "instrument") as de_ctr: instrumentations = await _instrument(lambda: _process(bank)) print(jsonify(de_ctr)) print("traced") return instrumentations async def trace(params): instrumentations = await _trace(params) fn = get_fn() jsonified = jsonify_list(instrumentations) dump(fn, jsonified | {"params": params}) copy(fn) plot(fn) print("plotted") preset_default = dict(caching=True, delay=0.5, meta=False, subjects=(4, 8), transactions=(8, 16)) preset_extra = preset_default | dict(blocks=256) preset_long = preset_default | dict(blocks=64) preset_short = preset_default | dict(blocks=16) if __name__ == "__main__": random.seed(659918) try: asyncio.run(trace(preset_short)) except KeyboardInterrupt: print("interrupted")