import asyncio import random from contextlib import ExitStack from typing import Any, Callable, Coroutine from nacl.signing import SigningKey from plot import * from rainbowadn.collection.trees.binary import * from rainbowadn.core import * from rainbowadn.flow13 import * from rainbowadn.instrument import * from rainbowadn.testing.resolvers import * from rainbowadn.v13 import * from trace_common import * def get_instrumentations() -> list[Instrumentation]: sleep_cc = Concurrency(DelayedResolver, 'sleep') return [ sleep_cc, Concurrency(ActiveBinaryTree, 'add'), Concurrency(ActiveBinaryTree, 'contains'), Concurrency(FlowStandard, 'verify_subset'), ] async def _generate( blocks: int, subjects_min: int, subjects_max: int, transactions_min: int, transactions_max: int, ) -> BankBlock: bank: BankBlock = BankBlock.empty() for _ in range(blocks): bank = await bank.add( await FlowCheque.make( [ await FlowTransaction.make( [], [ FlowCoinData.of(Subject(SigningKey.generate().verify_key), 0) for _ in range(random.randint(subjects_min, subjects_max)) ], [] ) for _ in range(random.randint(transactions_min, transactions_max)) ] ) ) print('generated') return bank async def _migrate(bank: BankBlock, params) -> BankBlock: assert_true(await bank.verify()) bank = BankBlock(await get_dr(params['delay'], params['caching']).migrate_resolved(bank.reference)) print('migrated') return bank async def _instrument(process: Callable[[], Coroutine[Any, Any, None]]) -> list[Instrumentation]: with ExitStack() as estack: instrumentations: list[Instrumentation] = get_instrumentations() for stacked in instrumentations: stacked.enter(estack) try: await process() except Terminated: pass print('deinstrumentation (should be empty):', Instrumentation.deinstrumentation) print('instrumented') return instrumentations async def _process(bank: BankBlock) -> None: with Measure(bank, 'verify') as measurement: assert_true(await bank.verify()) print('measured', *(f'{t:.3f}' for t in measurement.log)) async def _trace(params): set_gather_linear() bank = await _generate( params['blocks'], *params['subjects'], *params['transactions'], ) await _process(bank) bank = await _migrate(bank, params) set_gather_asyncio() with DeintrumentationSize(Instrumentation, 'deinstrument'): with Counter(DeintrumentationSize, 'instrument') as de_ctr: instrumentations = await _instrument(lambda: _process(bank)) print(jsonify(de_ctr)) print('traced') return instrumentations async def trace(params): instrumentations = await _trace(params) fn = get_fn() jsonified = jsonify_list(instrumentations) dump(fn, jsonified | {'params': params}) copy(fn) plot(fn) print('plotted') preset_long = dict(blocks=64, subjects=(8, 16), transactions=(8, 16), caching=True, delay=.35) preset_short = dict(blocks=16, subjects=(8, 16), transactions=(8, 16), caching=True, delay=.35) preset_old = dict(blocks=16, subjects=(8, 15), transactions=(8, 15), caching=False, delay=.35) if __name__ == '__main__': asyncio.run( trace( preset_long ) )