import asyncio import json import os import random import shutil import time from contextlib import ExitStack from nacl.signing import SigningKey from plot import plot from rainbowadn.chain import * from rainbowadn.collection.linear import * from rainbowadn.collection.trees.binary import * from rainbowadn.core import * from rainbowadn.nullability import * from rainbowadn.testing.instrument import * from rainbowadn.testing.resolvers import * from rainbowadn.v13 import * def get_dr() -> ExtendableResolver: dr = DictResolver() dr = DelayedResolver(dr, lambda: random.uniform(0.200, 0.500)) # dr = CachingResolver(dr) return dr def target_str(target) -> str: match target: case type(__name__=name): return name case object(__class__=type(__name__=name)): return name def jsonify(dumped: Instrumentation) -> dict: prefix = f'{target_str(dumped.target)}:{dumped.methodname}' match dumped: case Counter(counter=ctr): return {f'{prefix}:counter': ctr} case Concurrency(log=log): return {f'{prefix}:concurrency': log} case EntryExit(entry_log=entry_log, exit_log=exit_log): return { f'{prefix}:entry': entry_log, f'{prefix}:exit': exit_log, } case _: return {} async def mock(bank: BankChain) -> BankChain: key_0 = SigningKey.generate() transaction_0 = Transaction.make( [], [CoinData.of(Subject(key_0.verify_key), 100_000)], [] ) coin_0, coin_1 = await transaction_0.coins(MINT_CONST, NotNull(HashPoint.of(Subject(key_0.verify_key)))) bank = await bank.adds( [ transaction_0, Transaction.make( [coin_1], [CoinData.of(Subject(SigningKey.generate().verify_key), 10_000)], [key_0] ), ] ) return bank def get_instrumentations() -> list[Instrumentation]: sleep_cc = Concurrency(DelayedResolver, 'sleep') return [ sleep_cc, EntryExit(ActiveBinaryTree, 'add', sleep_cc.point), Concurrency(ActiveBinaryTree, 'add'), Concurrency(ActiveBinaryTree, 'contains'), Concurrency(Stack, 'list'), ] async def _generate( blocks: int, subjects_min: int, subjects_max: int, transactions_min: int, transactions_max: int, ) -> BankChain: bank: BankChain = BankChain.empty(ReductionChainMetaFactory().loose()) # bank = await mock(bank) for _ in range(blocks): bank = await bank.adds( [ Transaction.make( [], [CoinData.of(Subject(SigningKey.generate().verify_key), 0)] * random.randint( subjects_min, subjects_max ), [] ) for _ in range( random.randint( transactions_min, transactions_max ) ) ] ) print('generated') return bank async def _migrate(bank: BankChain) -> BankChain: assert_true(await bank.verify()) bank = BankChain.from_reference( ReductionChainMetaFactory(), await get_dr().migrate_resolved(bank.reference) ) print('migrated') return bank async def _instrument(bank: BankChain) -> list[Instrumentation]: with ExitStack() as estack: instrumentations: list[Instrumentation] = get_instrumentations() for stacked in instrumentations: stacked.enter(estack) assert_true(await bank.verify()) print('deinstrumentation (should be empty):', Instrumentation.deinstrumentation) print('instrumented') return instrumentations class DeintrumentationSize(Instrumentation): def instrument(self, method, *args, **kwargs): print('deinstrumentation size', len(self.deinstrumentation)) return method(*args, **kwargs) async def _trace(): set_gather_linear() bank = await _generate( 16, 8, 15, 8, 15, ) bank = await _migrate(bank) set_gather_asyncio() instrumentations = await _instrument(bank) print('traced') return instrumentations def _fn() -> str: return f'trace/{int(time.time())}-{os.urandom(2).hex()}.json' def _jsonify(instrumentations: list[Instrumentation]) -> dict: jsonified = {} for dumped in instrumentations: jsonified |= jsonify(dumped) return jsonified def _dump(fn: str, jsonified: dict) -> None: with open(fn, 'w') as file: json.dump( jsonified, file ) print('dumped') def _copy(fn: str) -> None: shutil.copy(fn, f'trace/latest.json') print('copied') async def main(): instrumentations = await _trace() fn = _fn() jsonified = _jsonify(instrumentations) _dump(fn, jsonified) _copy(fn) plot(fn) print('plotted') if __name__ == '__main__': asyncio.run(main())