dynamic instrumentation

This commit is contained in:
AF 2023-08-25 03:28:47 +00:00
parent 1b2102db3f
commit f1692effcf
2 changed files with 105 additions and 0 deletions

View File

@ -0,0 +1,67 @@
# rainbowadn.instrument.instrumentation
import functools
from contextlib import ExitStack
from typing import Callable, Self
__all__ = ('Instrumentation',)
class Instrumentation:
deinstrumentation = {}
_method: Callable
_wrap: Callable
def __init__(self, target, methodname: str):
if not isinstance(methodname, str):
raise TypeError('methodname must be str')
if not callable(getattr(target, methodname)):
raise TypeError('target.methodname must be callable')
self.target = target
self.methodname = methodname
def instrument(self, method, *args, **kwargs):
return method(*args, **kwargs)
def __enter__(self) -> Self:
if hasattr(self, '_method') or hasattr(self, '_wrap'):
raise RuntimeError
method = getattr(self.target, self.methodname)
if not callable(method):
raise TypeError
self._method = method
@functools.wraps(method)
def wrap(*args, **kwargs):
nonlocal method
while method in self.deinstrumentation:
self._method = method = self.deinstrumentation.pop(method)
return self.instrument(method, *args, **kwargs)
self._wrap = wrap
setattr(self.target, self.methodname, wrap)
return self
def schedule_deinstrumentation(self):
self.deinstrumentation[self._wrap] = self._method
del self._wrap
del self._method
def deinstrument(self):
while (method := getattr(self.target, self.methodname)) in self.deinstrumentation:
setattr(self.target, self.methodname, self.deinstrumentation.pop(method))
def __exit__(self, exc_type, exc_val, exc_tb):
self.schedule_deinstrumentation()
self.deinstrument()
def enter(self, stack: ExitStack) -> Self:
return stack.enter_context(self)
def enter_conditional(self, stack: ExitStack) -> Self | None:
if hasattr(self.target, self.methodname):
return self.enter(stack)
else:
return None

View File

@ -2,11 +2,14 @@ from __future__ import annotations
import asyncio
from contextlib import asynccontextmanager
from time import time
from typing import AsyncContextManager, Callable, Generic, Hashable, Type, TypeVar
import discord
from discord.ext import commands
from starbot.instrumentation import Instrumentation
from .bot import Reservations, StarBot, StarState
from .db import AbstractConnection
@ -250,6 +253,41 @@ class Stars(commands.Cog):
async def on_raw_reaction_add(self, event: discord.RawReactionActionEvent):
await ReactionCtx(self.__bot, event).on()
@commands.hybrid_command()
@commands.is_owner()
async def timeit(self, ctx: commands.Context, duration: float):
with Timeit(ReactionCtx, "on") as timeit:
await asyncio.sleep(duration)
await ctx.reply(f"```\n{timeit.times}\n```")
@commands.hybrid_command()
@commands.is_owner()
async def throttle(self, ctx: commands.Context, duration: float, delay: float):
with Cooldown(StarEventCtx, "_on", delay):
await asyncio.sleep(duration)
await ctx.reply(f"done")
class Cooldown(Instrumentation):
def __init__(self, target, methodname: str, delay: float):
super().__init__(target, methodname)
self.delay = delay
async def instrument(self, method, *args, **kwargs):
await method(*args, **kwargs)
await asyncio.sleep(self.delay)
class Timeit(Instrumentation):
def __init__(self, target, methodname: str):
super().__init__(target, methodname)
self.times = []
async def instrument(self, method, *args, **kwargs):
t = time()
await method(*args, **kwargs)
self.times.append((t, time()))
async def setup(bot: StarBot):
global cog