v6d2ctx/v6d2ctx/pain.py
2022-12-24 16:37:21 +00:00

81 lines
2.4 KiB
Python

import asyncio
import inspect
import os
import time
from rainbowadn.instrument import Instrumentation
class ALog(Instrumentation):
start = time.time()
def time(self):
return time.time() - self.start
async def instrument(self, method, *args, **kwargs):
key = os.urandom(4).hex()
print(f'[{self.time(): 11.3f}] +{key} +A"{self.methodname}"')
try:
result = await method(*args, **kwargs)
except BaseException:
print(f'[{self.time(): 11.3f}] !{key} !A"{self.methodname}"')
raise
else:
print(f'[{self.time(): 11.3f}] -{key} -A"{self.methodname}"')
return result
class SLog(Instrumentation):
def time(self):
return time.time() - ALog.start
def instrument(self, method, *args, **kwargs):
key = os.urandom(4).hex()
print(f'[{self.time(): 11.3f}] +{key} +S"{self.methodname}"')
try:
result = method(*args, **kwargs)
except BaseException:
print(f'[{self.time(): 11.3f}] !{key} !S"{self.methodname}"')
raise
else:
print(f'[{self.time(): 11.3f}] -{key} -S"{self.methodname}"')
return result
class FrameTrace(Instrumentation):
async def instrument(self, method, *args, **kwargs):
frame = inspect.currentframe()
while frame is not None:
print(frame.f_code.co_filename, frame.f_lineno)
frame = frame.f_back
return await method(*args, **kwargs)
class ABlockMonitor:
__task: asyncio.Future
def __init__(self, *, threshold=0.0, delta=10.0, interval=0.0) -> None:
self.threshold = threshold
self.delta = delta
self.interval = interval
async def _monitor(self):
while True:
delta = self.delta
t = time.time()
await asyncio.sleep(delta)
spent = time.time() - t
delay = spent - delta
if delay > self.threshold:
self.threshold = delay
print(f'block monitor reached new peak delay {delay:.4f}')
interval = self.interval
if interval > 0:
await asyncio.sleep(interval)
async def __aenter__(self):
self.__task = asyncio.get_running_loop().create_task(self._monitor())
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.__task.cancel()