rainbowadn/trace_flow.py

106 lines
2.8 KiB
Python

import asyncio
import random
from contextlib import ExitStack
from nacl.signing import SigningKey
from plot import *
from rainbowadn.collection.trees.binary import *
from rainbowadn.core import *
from rainbowadn.flow13 import *
from rainbowadn.instrument import *
from rainbowadn.testing.resolvers import *
from rainbowadn.v13 import *
from trace_common import *
def get_instrumentations() -> list[Instrumentation]:
sleep_cc = Concurrency(DelayedResolver, 'sleep')
return [
sleep_cc,
Concurrency(ActiveBinaryTree, 'add'),
Concurrency(ActiveBinaryTree, 'contains'),
]
async def _generate(
blocks: int,
subjects_min: int,
subjects_max: int,
transactions_min: int,
transactions_max: int,
) -> BankBlock:
bank: BankBlock = BankBlock.empty()
for _ in range(blocks):
bank = await bank.add(
await FlowCheque.make(
[
await FlowTransaction.make(
[],
[
FlowCoinData.of(Subject(SigningKey.generate().verify_key), 0)
for _ in range(random.randint(subjects_min, subjects_max))
],
[]
)
for _ in range(random.randint(transactions_min, transactions_max))
]
)
)
print('generated')
return bank
async def _migrate(bank: BankBlock) -> BankBlock:
assert_true(await bank.verify())
bank = BankBlock(await get_dr().migrate_resolved(bank.reference))
print('migrated')
return bank
async def _instrument(bank: BankBlock) -> list[Instrumentation]:
with ExitStack() as estack:
instrumentations: list[Instrumentation] = get_instrumentations()
for stacked in instrumentations:
stacked.enter(estack)
assert_true(await bank.verify())
print('deinstrumentation (should be empty):', Instrumentation.deinstrumentation)
print('instrumented')
return instrumentations
params = (
16,
8, 16,
8, 16,
)
async def _trace():
set_gather_linear()
bank = await _generate(
*params
)
bank = await _migrate(bank)
set_gather_asyncio()
with DeintrumentationSize(Instrumentation, 'deinstrument'):
with Counter(DeintrumentationSize, 'instrument') as de_ctr:
instrumentations = await _instrument(bank)
print(jsonify(de_ctr))
print('traced')
return instrumentations
async def main():
instrumentations = await _trace()
fn = get_fn()
jsonified = jsonify_list(instrumentations)
dump(fn, jsonified | {'params': params})
copy(fn)
plot(fn)
print('plotted')
if __name__ == '__main__':
asyncio.run(main())