v6d3music/v6d3music/main.py
2023-02-28 12:32:19 +00:00

184 lines
5.2 KiB
Python

import asyncio
import contextlib
import os
import sys
import time
from traceback import print_exc
import discord
from ptvp35 import *
from rainbowadn.instrument import Instrumentation
from v6d1tokens.client import *
from v6d2ctx.handle_content import *
from v6d2ctx.integration.event import *
from v6d2ctx.integration.targets import *
from v6d2ctx.pain import *
from v6d2ctx.serve import *
from v6d3music.api import *
from v6d3music.app import *
from v6d3music.commands import *
from v6d3music.config import prefix
from v6d3music.core.caching import *
from v6d3music.core.default_effects import *
from v6d3music.core.mainservice import *
from v6d3music.core.set_config import set_config
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
class MusicClient(discord.Client):
pass
_client = MusicClient(
intents=discord.Intents(
members=True,
guilds=True,
bans=True,
emojis=True,
invites=True,
voice_states=True,
guild_messages=True,
reactions=True,
message_content=True,
),
loop=loop,
)
banned_guilds = set(map(int, filter(bool, map(str.strip, os.getenv("banned_guilds", "").split(":")))))
def guild_allowed(guild: discord.Guild | None) -> bool:
return guild is not None and guild.id not in banned_guilds
def message_allowed(message: discord.Message) -> bool:
return guild_allowed(message.guild)
def register_handlers(client: discord.Client, mainservice: MainService):
of = get_of(mainservice)
@client.event
async def on_message(message: discord.Message) -> None:
if message_allowed(message):
try:
await handle_content(of, message, message.content, prefix, client)
except:
print_exc()
@client.event
async def on_ready():
print("ready")
if client.user is None:
raise RuntimeError
await set_config("user-id", client.user.id)
await set_config("ready", True)
await client.change_presence(
activity=discord.Game(
name="феноменально",
)
)
await mainservice.restore()
print("ready (startup finished)")
class UpgradeABMInit(Instrumentation):
def __init__(self):
super().__init__(ABlockMonitor, "__init__")
def instrument(self, method, abm, *, threshold=0.0, delta=10.0, interval=0.0):
print("created upgraded")
method(abm, threshold=threshold, delta=delta, interval=interval)
abm.threshold = threshold
class UpgradeABMTask(Instrumentation):
def __init__(self):
super().__init__(ABlockMonitor, "_monitor")
async def instrument(self, _, abm):
print("started upgraded")
while True:
delta = abm.delta
t = time.time()
await asyncio.sleep(delta)
spent = time.time() - t
delay = spent - delta
if delay > abm.threshold:
abm.threshold = delay
print(f"upgraded block monitor reached new peak delay {delay:.4f}")
interval = abm.interval
if interval > 0:
await asyncio.sleep(interval)
def _upgrade_abm() -> contextlib.ExitStack:
with contextlib.ExitStack() as es:
es.enter_context(UpgradeABMInit())
es.enter_context(UpgradeABMTask())
return es.pop_all()
raise RuntimeError
class PathPrint(Instrumentation):
def __init__(self, methodname: str, pref: str):
super().__init__(DbConnection, methodname)
self.pref = pref
async def instrument(self, method, db: DbConnection, *args, **kwargs):
result = await method(db, *args, **kwargs)
try:
print(self.pref, db._DbConnection__path) # type: ignore
except Exception:
from traceback import print_exc
print_exc()
return result
def _db_ee() -> contextlib.ExitStack:
with contextlib.ExitStack() as es:
es.enter_context(PathPrint("_initialize", "open :"))
es.enter_context(PathPrint("aclose", "close:"))
return es.pop_all()
raise RuntimeError
async def amain(client: discord.Client):
roles = {key: value for key, value in os.environ.items() if key.startswith("roles")}
async with (
client,
DefaultEffects() as defaulteffects,
MainService(Targets(), defaulteffects, client, Events()) as mainservice,
AppContext(Api(mainservice, roles)),
ABlockMonitor(delta=0.5),
):
register_handlers(client, mainservice)
if "guerilla" in sys.argv:
from pathlib import Path
tokenpath = Path(".token.txt")
if tokenpath.exists():
token = tokenpath.read_text()
else:
token = input("token:")
tokenpath.write_text(token)
elif token_ := os.getenv("trial_token"):
token = token_
else:
token = await request_token("music", "token")
await client.login(token)
if os.getenv("v6tor", None) is None:
print("no tor")
await client.connect()
print("exited")
def main() -> None:
with _upgrade_abm(), _db_ee():
serve(amain(_client), _client, loop)