rainbowadn/trace.py

203 lines
5.3 KiB
Python

import asyncio
import json
import os
import random
import shutil
import time
from contextlib import ExitStack
from nacl.signing import SigningKey
from plot import *
from rainbowadn.chain import *
from rainbowadn.collection.linear import *
from rainbowadn.collection.trees.binary import *
from rainbowadn.core import *
from rainbowadn.instrument import *
from rainbowadn.nullability import *
from rainbowadn.testing.resolvers import *
from rainbowadn.v13 import *
def get_dr() -> ExtendableResolver:
dr = DictResolver()
dr = DelayedResolver(dr, lambda: random.uniform(0.200, 0.500))
# dr = CachingResolver(dr)
return dr
def target_str(target) -> str:
match target:
case type(__name__=name):
return name
case object(__class__=type(__name__=name)):
return name
def jsonify(dumped: Instrumentation) -> dict:
prefix = f'{target_str(dumped.target)}:{dumped.methodname}'
match dumped:
case Counter(counter=ctr):
return {f'{prefix}:counter': ctr}
case Concurrency(log=log):
return {f'{prefix}:concurrency': log}
case EntryExit(entry_log=entry_log, exit_log=exit_log):
return {
f'{prefix}:entry': entry_log,
f'{prefix}:exit': exit_log,
}
case _:
return {}
async def mock(bank: BankChain) -> BankChain:
key_0 = SigningKey.generate()
transaction_0 = Transaction.make(
[],
[CoinData.of(Subject(key_0.verify_key), 100_000)],
[]
)
coin_0, coin_1 = await transaction_0.coins(MINT_CONST, NotNull(HashPoint.of(Subject(key_0.verify_key))))
bank = await bank.adds(
[
transaction_0,
Transaction.make(
[coin_1],
[CoinData.of(Subject(SigningKey.generate().verify_key), 10_000)],
[key_0]
),
]
)
return bank
def get_instrumentations() -> list[Instrumentation]:
sleep_cc = Concurrency(DelayedResolver, 'sleep')
return [
sleep_cc,
EntryExit(ActiveBinaryTree, 'add', sleep_cc.point),
Concurrency(ActiveBinaryTree, 'add'),
Concurrency(ActiveBinaryTree, 'contains'),
Concurrency(Stack, 'list'),
]
async def _generate(
blocks: int,
subjects_min: int,
subjects_max: int,
transactions_min: int,
transactions_max: int,
) -> BankChain:
bank: BankChain = BankChain.empty(ReductionChainMetaFactory().loose())
# bank = await mock(bank)
for _ in range(blocks):
bank = await bank.adds(
[
Transaction.make(
[],
[CoinData.of(Subject(SigningKey.generate().verify_key), 0)] * random.randint(
subjects_min,
subjects_max
),
[]
)
for
_
in
range(
random.randint(
transactions_min,
transactions_max
)
)
]
)
print('generated')
return bank
async def _migrate(bank: BankChain) -> BankChain:
assert_true(await bank.verify())
bank = BankChain.from_reference(
ReductionChainMetaFactory(), await get_dr().migrate_resolved(bank.reference)
)
print('migrated')
return bank
async def _instrument(bank: BankChain) -> list[Instrumentation]:
with ExitStack() as estack:
instrumentations: list[Instrumentation] = get_instrumentations()
for stacked in instrumentations:
stacked.enter(estack)
assert_true(await bank.verify())
print('deinstrumentation (should be empty):', Instrumentation.deinstrumentation)
print('instrumented')
return instrumentations
class DeintrumentationSize(Instrumentation):
def instrument(self, method, *args, **kwargs):
print(
f'deinstrumentation size @ {target_str(self.target)}:{self.methodname}',
len(self.deinstrumentation)
)
return method(*args, **kwargs)
async def _trace():
set_gather_linear()
bank = await _generate(
16,
8, 15,
8, 15,
)
bank = await _migrate(bank)
set_gather_asyncio()
with DeintrumentationSize(Instrumentation, 'deinstrument'):
with Counter(DeintrumentationSize, 'instrument') as de_ctr:
instrumentations = await _instrument(bank)
print(jsonify(de_ctr))
print('traced')
return instrumentations
def _fn() -> str:
return f'trace/{int(time.time())}-{os.urandom(2).hex()}.json'
def _jsonify(instrumentations: list[Instrumentation]) -> dict:
jsonified = {}
for dumped in instrumentations:
jsonified |= jsonify(dumped)
return jsonified
def _dump(fn: str, jsonified: dict) -> None:
with open(fn, 'w') as file:
json.dump(
jsonified,
file
)
print('dumped')
def _copy(fn: str) -> None:
shutil.copy(fn, f'trace/latest.json')
print('copied')
async def main():
instrumentations = await _trace()
fn = _fn()
jsonified = _jsonify(instrumentations)
_dump(fn, jsonified)
_copy(fn)
plot(fn)
print('plotted')
if __name__ == '__main__':
asyncio.run(main())