87 lines
2.5 KiB
Python
87 lines
2.5 KiB
Python
import asyncio
|
|
import time
|
|
from io import StringIO
|
|
from typing import Awaitable, Callable, Optional, Union
|
|
|
|
import discord
|
|
|
|
usertype = Union[discord.abc.User, discord.user.BaseUser, discord.Member, discord.User]
|
|
|
|
|
|
class Context:
|
|
def __init__(self, message: discord.Message, client: discord.Client):
|
|
self.message: discord.Message = message
|
|
self.client: discord.Client = client
|
|
self.channel: discord.abc.Messageable = message.channel
|
|
self.dm_or_text: Union[discord.DMChannel, discord.TextChannel] = message.channel # type: ignore
|
|
self.author: usertype = message.author
|
|
self.content: str = message.content
|
|
self.member: Optional[discord.Member] = message.author if isinstance(message.author, discord.Member) else None
|
|
self.guild: Optional[discord.Guild] = None if self.member is None else self.member.guild
|
|
|
|
async def reply(self, content=None, **kwargs) -> discord.Message:
|
|
return await self.message.reply(content, mention_author=False, **kwargs)
|
|
|
|
async def long(self, s: str):
|
|
resio = StringIO(s)
|
|
res = ''
|
|
for line in resio:
|
|
if len(res) + len(line) < 2000:
|
|
res += line
|
|
else:
|
|
await self.reply(res)
|
|
res = line
|
|
if res:
|
|
await self.reply(res)
|
|
|
|
|
|
ESCAPED = '`_*\'"\\'
|
|
|
|
|
|
def escape(s: str):
|
|
res = StringIO()
|
|
for c in s:
|
|
if c in ESCAPED:
|
|
c = '\\' + c
|
|
res.write(c)
|
|
return res.getvalue()
|
|
|
|
|
|
command_type = Callable[[Context, list[str]], Awaitable[None]]
|
|
|
|
|
|
class Explicit(Exception):
|
|
def __init__(self, msg: str):
|
|
self.msg = msg
|
|
|
|
|
|
class Implicit(Exception):
|
|
pass
|
|
|
|
|
|
benchmarks: dict[str, dict[str, float]] = {}
|
|
_t = time.perf_counter()
|
|
|
|
|
|
class Benchmark:
|
|
def __init__(self, benchmark: str):
|
|
self.benchmark = benchmark
|
|
|
|
def __enter__(self):
|
|
self.t = time.perf_counter()
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
d = (time.perf_counter() - self.t)
|
|
benchmarks.setdefault(self.benchmark, {'integral': 0.0, 'max': 0.0})
|
|
benchmarks[self.benchmark]['integral'] += d
|
|
benchmarks[self.benchmark]['max'] = max(benchmarks[self.benchmark]['max'], d)
|
|
|
|
|
|
async def monitor():
|
|
while True:
|
|
await asyncio.sleep(10)
|
|
dt = time.perf_counter() - _t
|
|
print('Benchmarks:')
|
|
for benchmark, metrics in benchmarks.items():
|
|
print(benchmark, '=', metrics['integral'] / max(dt, .00001), ':', metrics['max'])
|