import json import os import random import shutil import time from io import StringIO from pathlib import Path from rainbowadn.core import * from rainbowadn.instrument import * from rainbowadn.testing.resolvers import * __all__ = ( "get_dr", "target_str", "jsonify", "get_fn", "jsonify_list", "dump", "copy", "DeintrumentationSize", "Resolution", ) def get_dr(mean_delay: float, caching: bool) -> ExtendableResolver: dr = DictResolver() dr = DelayedResolver(dr, lambda: mean_delay * random.gammavariate(10.0, 0.1)) if caching: dr = CachingResolver(dr) return dr def target_str(target) -> str: match target: case type(__name__=name): return name case object(__class__=type(__name__=name)): return name case _: raise TypeError def jsonify(dumped: Instrumentation) -> dict: prefix = f"{target_str(dumped.target)}:{dumped.methodname}" match dumped: case Counter(counter=ctr): return {f"{prefix}:counter": ctr} case Concurrency(log=log): return {f"{prefix}:concurrency": log} case EntryExit(entry_log=entry_log, exit_log=exit_log): return { f"{prefix}:entry": entry_log, f"{prefix}:exit": exit_log, } case _: return {} def get_fn() -> str: Path("trace").mkdir(exist_ok=True) return f"trace/{int(time.time())}-{os.urandom(2).hex()}.json" def jsonify_list(instrumentations: list[Instrumentation]) -> dict: jsonified = {} for dumped in instrumentations: jsonified |= jsonify(dumped) return jsonified def dump(fn: str, jsonified: dict) -> None: with open(fn, "w") as file: json.dump(jsonified, file) print("dumped") def copy(fn: str) -> None: shutil.copy(fn, f"trace/latest.json") print("copied") class DeintrumentationSize(Instrumentation): def instrument(self, method, *args, **kwargs): print(f"deinstrumentation size @ {target_str(self.target)}:{self.methodname}", len(self.deinstrumentation)) return method(*args, **kwargs) class Resolution(Instrumentation): def __init__(self): self.report: dict[tuple[str, bytes, bytes], int] = {} super().__init__(HashPoint, "resolve") def increment(self, key: tuple[str, bytes, bytes]): self.report[key] = self.report.get(key, 0) + 1 async def instrument(self, method, *args, **kwargs): result: Mentionable = await method(*args, **kwargs) self.increment(ClassReport.key(result)) return result def format(self) -> str: s = StringIO() for key, count in sorted(self.report.items(), key=lambda t: (-t[1], t[0])): s.write(f"{count: 6d}:{ClassReport.format_key(key)}\n") return s.getvalue()