initial commit

This commit is contained in:
AF 2021-11-29 00:14:06 +03:00
commit 13dc8d3e49
14 changed files with 1006 additions and 0 deletions

215
.gitignore vendored Normal file
View File

@ -0,0 +1,215 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/artifacts
# .idea/compiler.xml
# .idea/jarRepositories.xml
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
/data/

8
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/

View File

@ -0,0 +1,51 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="DuplicatedCode" enabled="false" level="WEAK WARNING" enabled_by_default="false" />
<inspection_tool class="HtmlUnknownTag" enabled="true" level="WARNING" enabled_by_default="true">
<option name="myValues">
<value>
<list size="11">
<item index="0" class="java.lang.String" itemvalue="nobr" />
<item index="1" class="java.lang.String" itemvalue="noembed" />
<item index="2" class="java.lang.String" itemvalue="comment" />
<item index="3" class="java.lang.String" itemvalue="noscript" />
<item index="4" class="java.lang.String" itemvalue="embed" />
<item index="5" class="java.lang.String" itemvalue="script" />
<item index="6" class="java.lang.String" itemvalue="markdown" />
<item index="7" class="java.lang.String" itemvalue="sv3i" />
<item index="8" class="java.lang.String" itemvalue="sv3o" />
<item index="9" class="java.lang.String" itemvalue="sv3a" />
<item index="10" class="java.lang.String" itemvalue="sv3c" />
</list>
</value>
</option>
<option name="myCustomValuesEnabled" value="true" />
</inspection_tool>
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="1">
<item index="0" class="java.lang.String" itemvalue="nacl" />
</list>
</value>
</option>
</inspection_tool>
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="PySide2.QtWidgets.clicked.connect" />
<option value="PySide2.QtWidgets.valueChanged.connect" />
<option value="PySide2.QtWidgets.textChanged.connect" />
<option value="PySide2.QtCore.Signal.emit" />
<option value="PySide2.QtCore.Signal.connect" />
</list>
</option>
</inspection_tool>
<inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false">
<option name="processCode" value="true" />
<option name="processLiterals" value="true" />
<option name="processComments" value="true" />
</inspection_tool>
</profile>
</component>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

4
.idea/misc.xml Normal file
View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (v6d3music)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/v6d3music.iml" filepath="$PROJECT_DIR$/.idea/v6d3music.iml" />
</modules>
</component>
</project>

10
.idea/v6d3music.iml Normal file
View File

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.10 (v6d3music)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

10
Dockerfile Normal file
View File

@ -0,0 +1,10 @@
# syntax=docker/dockerfile:1
FROM python:3.10
WORKDIR /v6
ENV v6root=/v6data
RUN apt-get update
RUN apt-get install -y libopus0 opus-tools ffmpeg
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY v6d3music v6d3music
CMD ["python3", "-m", "v6d3music.run-bot"]

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
aiohttp~=3.7.4.post0
discord.py[voice]~=1.7.3
git+https://gitea.ongoteam.net/PTV/v6d0auth.git
git+https://gitea.ongoteam.net/PTV/v6d1tokens.git
youtube_dl

0
v6d3music/__init__.py Normal file
View File

44
v6d3music/app.py Normal file
View File

@ -0,0 +1,44 @@
import time
# noinspection PyPackageRequirements
import discord
from aiohttp import web
from nacl.exceptions import BadSignatureError
from v6d0auth import certs
def define_routes(routes: web.RouteTableDef, client: discord.Client):
@routes.get('/')
async def home(_request: web.Request):
return web.Response(body='v6d3losyash\n')
@routes.post('/stop')
async def stop(request: web.Request):
try:
assert abs(float(certs.verify(await request.read())) - time.time()) < 1
except ValueError:
raise web.HTTPBadRequest
except BadSignatureError:
raise web.HTTPUnauthorized
except AssertionError:
raise web.HTTPRequestTimeout
else:
await client.change_presence(status=discord.Status.offline)
await client.close()
raise web.HTTPOk
def app_routes(client: discord.Client) -> web.RouteTableDef:
routes = web.RouteTableDef()
define_routes(routes, client)
return routes
def app_with_routes(routes: web.RouteTableDef):
app = web.Application()
app.add_routes(routes)
return app
def get_app(client: discord.Client) -> web.Application:
return app_with_routes(app_routes(client))

3
v6d3music/config.py Normal file
View File

@ -0,0 +1,3 @@
import os
prefix = os.getenv('v6prefix', '?/')

624
v6d3music/run-bot.py Normal file
View File

@ -0,0 +1,624 @@
import asyncio
import concurrent.futures
import random
import shlex
import string
import subprocess
import time
from collections import deque
from io import StringIO
from typing import Callable, Awaitable, Union, Optional, AsyncIterable, Any
# noinspection PyPackageRequirements
import discord
import youtube_dl as youtube_dl
from ptvp35 import Db, KVJson
from v6d0auth.config import root
from v6d0auth.run_app import start_app
from v6d1tokens.client import request_token
from v6d3music.app import get_app
from v6d3music.config import prefix
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
token = loop.run_until_complete(request_token('music'))
client = discord.Client(
intents=discord.Intents(
members=True,
guilds=True,
bans=True,
emojis=True,
invites=True,
voice_states=True,
guild_messages=True,
reactions=True
),
)
myroot = root / 'v6d3music'
myroot.mkdir(exist_ok=True)
volume_db = Db(myroot / 'volume.db', kvrequest_type=KVJson)
queue_db = Db(myroot / 'queue.db', kvrequest_type=KVJson)
ESCAPED = '`_*\'"\\'
def escape(s: str):
res = StringIO()
for c in s:
if c in ESCAPED:
c = '\\' + c
res.write(c)
return res.getvalue()
usertype = Union[discord.abc.User, discord.user.BaseUser, discord.Member, discord.User]
class Context:
def __init__(self, message: discord.Message):
self.message: discord.Message = message
self.channel: discord.abc.Messageable = message.channel
self.dm_or_text: Union[discord.DMChannel, discord.TextChannel] = message.channel
self.author: usertype = message.author
self.content: str = message.content
self.member: Optional[discord.Member] = message.author if isinstance(message.author, discord.Member) else None
self.guild: Optional[discord.Guild] = None if self.member is None else self.member.guild
async def reply(self, content=None, **kwargs) -> discord.Message:
return await self.message.reply(content, mention_author=False, **kwargs)
async def long(self, s: str):
resio = StringIO(s)
res = ''
for line in resio:
if len(res) + len(line) < 2000:
res += line
else:
await self.reply(res)
res = line
if res:
await self.reply(res)
@client.event
async def on_ready():
print("ready")
await client.change_presence(activity=discord.Game(
name='феноменально',
))
buckets: dict[str, dict[str, Callable[[Context, list[str]], Awaitable[None]]]] = {}
def at(bucket: str, name: str):
def wrap(f: Callable[[Context, list[str]], Awaitable[None]]):
buckets.setdefault(bucket, {})[name] = f
return f
return wrap
class Implicit(Exception):
pass
def of(bucket: str, name: str) -> Callable[[Context, list[str]], Awaitable[None]]:
try:
return buckets[bucket][name]
except KeyError:
raise Implicit
async def handle_command(ctx: Context, name: str, args: list[str]) -> None:
await of('commands', name)(ctx, args)
@at('commands', 'help')
async def help_(ctx: Context, args: list[str]) -> None:
match args:
case []:
await ctx.reply('music bot')
case [name]:
await ctx.reply(f'help for {name}:')
locks: dict[discord.Guild, asyncio.Lock] = {}
class Explicit(Exception):
def __init__(self, msg: str):
self.msg = msg
def lock_for(ctx: Context) -> asyncio.Lock:
# noinspection PyTypeChecker
guild: discord.Guild = ctx.guild
if guild is None:
raise Explicit('not in a guild')
if guild in locks:
return locks[guild]
else:
return locks.setdefault(guild, asyncio.Lock())
class YTAudio(discord.AudioSource):
source: discord.FFmpegPCMAudio
def __init__(
self,
url: str,
origin: str,
description: str,
options: Optional[str],
rby: discord.Member,
already_read: int,
):
self.url = url
self.origin = origin
self.description = description
self.options = options
self.rby = rby
self.already_read = already_read
self.loaded = False
self.regenerating = False
self.set_source()
def set_source(self):
self.source = discord.FFmpegPCMAudio(
self.url,
options=self.options,
before_options=self.before_options()
)
def before_options(self):
before_options = '-reconnect 1 -reconnect_at_eof 0 -reconnect_streamed 1 -reconnect_delay_max 10 -copy_unknown'
if self.already_read:
before_options += f' -ss {self.already_read * discord.opus.Encoder.FRAME_LENGTH / 1000}'
return before_options
def read(self) -> bytes:
if self.regenerating:
return FILL
self.already_read += 1
ret: bytes = self.source.read()
if ret:
self.loaded = True
elif not self.loaded:
if random.random() > .1:
self.regenerating = True
loop.create_task(self.regenerate())
return FILL
else:
print(f'dropped {self.origin}')
return ret
def cleanup(self):
self.source.cleanup()
def can_be_skipped_by(self, member: discord.Member) -> bool:
permissions: discord.Permissions = member.guild_permissions
if permissions.administrator:
return True
elif permissions.manage_permissions:
return True
elif permissions.manage_guild:
return True
elif permissions.manage_channels:
return True
elif permissions.manage_messages:
return True
else:
return self.rby == member
def hybernate(self):
return {
'url': self.url,
'origin': self.origin,
'description': self.description,
'options': self.options,
'rby': self.rby.id,
'already_read': self.already_read,
}
@classmethod
def respawn(cls, guild: discord.Guild, respawn) -> 'YTAudio':
return YTAudio(
respawn['url'],
respawn['origin'],
respawn['description'],
respawn['options'],
guild.get_member(respawn['rby']),
respawn['already_read']
)
async def regenerate(self):
try:
print(f'regenerating {self.origin}')
self.url = await real_url(self.origin)
self.source.cleanup()
self.set_source()
print(f'regenerated {self.origin}')
finally:
self.regenerating = False
FILL = b'\x00' * discord.opus.Encoder.FRAME_SIZE
class QueueAudio(discord.AudioSource):
def __init__(self, guild: discord.Guild):
self.queue: deque[YTAudio] = deque()
self.guild = guild
for audio_respawn in queue_db.get(self.guild.id, []):
try:
self.queue.append(YTAudio.respawn(self.guild, audio_respawn))
except Exception as e:
print('respawn failed', e)
def save(self):
queue_db.set_nowait(self.guild.id, [audio.hybernate() for audio in self.queue])
def append(self, audio: YTAudio):
self.queue.append(audio)
def read(self) -> bytes:
if not self.queue:
return FILL
audio = self.queue[0]
frame = audio.read()
if len(frame) != discord.opus.Encoder.FRAME_SIZE:
self.queue.popleft().cleanup()
frame = FILL
return frame
def skip_at(self, pos: int, member: discord.Member) -> bool:
if pos < len(self.queue):
audio = self.queue[pos]
if audio.can_be_skipped_by(member):
self.queue.remove(audio)
audio.cleanup()
return True
return False
def skip_audio(self, audio: YTAudio, member: discord.Member) -> bool:
if audio in self.queue:
if audio.can_be_skipped_by(member):
self.queue.remove(audio)
audio.cleanup()
return True
return False
def clear(self, member: discord.Member) -> None:
permissions: discord.Permissions = member.guild_permissions
if permissions.administrator:
self.cleanup()
else:
raise Explicit('not an administrator')
def format(self) -> str:
stream = StringIO()
for i, audio in enumerate(self.queue):
stream.write(f'`[{i}]` {audio.description}\n')
return stream.getvalue()
def cleanup(self):
for audio in self.queue:
try:
audio.cleanup()
except ValueError:
pass
self.queue.clear()
class MainAudio(discord.PCMVolumeTransformer):
def __init__(self, guild: discord.Guild, volume: float):
self.queue = QueueAudio(guild)
super().__init__(self.queue, volume=volume)
async def set(self, volume: float, member: discord.Member):
if volume < 0.01:
raise Explicit('volume too small')
if volume > 1:
raise Explicit('volume too big')
permissions: discord.Permissions = member.guild_permissions
if not permissions.administrator:
raise Explicit('not an administrator')
self.volume = volume
await volume_db.set(member.guild.id, volume)
def extract(params: dict, url: str, kwargs: dict):
extracted = youtube_dl.YoutubeDL(params=params).extract_info(url, **kwargs)
if 'entries' in extracted:
extracted['entries'] = list(extracted['entries'])
return extracted
async def aextract(params: dict, url: str, **kwargs):
with Benchmark('AEX'):
with concurrent.futures.ProcessPoolExecutor() as pool:
return await loop.run_in_executor(
pool,
extract,
params,
url,
kwargs
)
async def real_url(url: str):
p = subprocess.Popen(
['youtube-dl', '--no-playlist', '-f', 'bestaudio', '-g', '--', url],
stdout=subprocess.PIPE
)
with Benchmark('URL'):
code = await loop.run_in_executor(None, p.wait)
if code:
raise RuntimeError(code)
return p.stdout.readline().decode()[:-1]
async def create_ytaudio(ctx: Context, info: dict[str, Any], effects: Optional[str]) -> YTAudio:
if effects:
permissions: discord.Permissions = ctx.member.guild_permissions
if not permissions.administrator:
raise Explicit("not an administrator")
if not set(effects) <= set(string.ascii_letters + string.digits + '*,=+-/()|.^:_'):
raise Explicit('malformed effects')
options = f'-af {shlex.quote(effects)}'
else:
options = None
return YTAudio(
await real_url(info['url']),
info['url'],
f'{escape(info.get("title"))} `Rby` {ctx.member}',
options,
ctx.member,
0
)
async def entries_for_url(url: str) -> AsyncIterable[
dict[str, Any]
]:
info = await aextract(
{
"playlistend": 128,
"logtostderr": True
},
url,
download=False,
process=False
)
if 'entries' in info:
for entry in info['entries']:
yield entry
else:
yield info | {'url': url}
async def create_ytaudios(ctx: Context, infos: list[tuple[dict[str, Any], str]]) -> AsyncIterable[YTAudio]:
for audio in await asyncio.gather(
*[
create_ytaudio(ctx, info, effects)
for
info, effects
in
infos
]
):
yield audio
presets: dict[str, str] = {
'cursed': 'aeval=val(0)*2*sin(440*t)+val(1)*2*cos(622*t)|val(1)*2*sin(622*t)+val(0)*2*cos(440*t)',
'bassboost': 'bass=g=10',
'bassbooboost': 'bass=g=30',
'difference': 'aeval=val(0)-val(1)|val(1)-val(0)',
'mono': 'aeval=.5*val(0)+.5*val(1)|.5*val(1)+.5*val(0)',
}
async def entries_effects_for_args(args: list[str]) -> AsyncIterable[tuple[dict[str, Any], str]]:
while args:
match args:
case [url, '-', effects, *args]:
pass
case [url, '+', preset, *args]:
effects = presets[preset]
case [url, *args]:
effects = None
case _:
raise RuntimeError
async for info in entries_for_url(url):
yield info, effects
async def yt_audios(ctx: Context, args: list[str]) -> AsyncIterable[YTAudio]:
tuples: list[tuple[dict[str, Any], str]] = []
async for info, effects in entries_effects_for_args(args):
tuples.append((info, effects))
if len(tuples) >= 5:
async for audio in create_ytaudios(ctx, tuples):
yield audio
tuples.clear()
async for audio in create_ytaudios(ctx, tuples):
yield audio
mainasrcs: dict[discord.Guild, MainAudio] = {}
@at('commands', 'play')
async def play(ctx: Context, args: list[str]) -> None:
match args:
case [] | ['help']:
await ctx.reply('''
`play ...args`
`play url [- effects] ...args`
'''.strip())
case _:
async with lock_for(ctx):
queue = await queue_for(ctx)
async for audio in yt_audios(ctx, args):
queue.append(audio)
await ctx.reply('done')
async def vc_main_for(ctx: Context) -> tuple[discord.VoiceClient, MainAudio]:
if ctx.guild is None:
raise Explicit("not in a guild")
vc: discord.VoiceProtocol = ctx.guild.voice_client
if vc is None:
vs: discord.VoiceState = ctx.member.voice
if vs is None:
raise Explicit("not connected")
vch: discord.VoiceChannel = vs.channel
if vch is None:
raise Explicit("not connected")
vc: discord.VoiceProtocol = await vch.connect()
assert isinstance(vc, discord.VoiceClient)
if ctx.guild in mainasrcs:
source = mainasrcs[ctx.guild]
else:
await ctx.reply('respawning queue')
source = mainasrcs.setdefault(ctx.guild, MainAudio(ctx.guild, volume=volume_db.get(ctx.guild.id, 0.2)))
if vc.source != source:
vc.play(source)
return vc, source
async def vc_for(ctx: Context) -> discord.VoiceClient:
vc, source = await vc_main_for(ctx)
return vc
async def main_for(ctx: Context) -> MainAudio:
vc, source = await vc_main_for(ctx)
return source
async def queue_for(ctx: Context) -> QueueAudio:
return (await main_for(ctx)).queue
@at('commands', 'skip')
async def skip(ctx: Context, args: list[str]) -> None:
match args:
case ['help']:
await ctx.reply('''
`skip [first] [last]`
'''.strip())
case []:
queue = await queue_for(ctx)
queue.skip_at(0, ctx.member)
case [pos]:
pos = int(pos)
queue = await queue_for(ctx)
queue.skip_at(pos, ctx.member)
case [pos0, pos1]:
pos0, pos1 = int(pos0), int(pos1)
queue = await queue_for(ctx)
for i in range(pos0, pos1 + 1):
if not queue.skip_at(pos0, ctx.member):
pos0 += 1
@at('commands', 'queue')
async def queue_(ctx: Context, args: list[str]) -> None:
match args:
case ['help']:
await ctx.reply('current queue')
case []:
await ctx.long((await queue_for(ctx)).format().strip() or 'no queue')
case ['clear']:
(await queue_for(ctx)).clear(ctx.member)
@at('commands', 'volume')
async def volume_(ctx: Context, args: list[str]) -> None:
match args:
case ['help']:
await ctx.reply('`volume 0.2`')
case [volume]:
volume = float(volume)
await (await main_for(ctx)).set(volume, ctx.member)
@at('commands', 'pause')
async def pause(ctx: Context, _args: list[str]) -> None:
(await vc_for(ctx)).pause()
@at('commands', 'resume')
async def resume(ctx: Context, _args: list[str]) -> None:
(await vc_for(ctx)).resume()
@client.event
async def on_message(message: discord.Message) -> None:
if message.author.bot:
return
content: str = message.content
if not content.startswith(prefix):
return
content = content.removeprefix(prefix)
args = shlex.split(content)
match args:
case []:
return
case [command_name, *command_args]:
ctx = Context(message)
try:
await handle_command(ctx, command_name, command_args)
except Implicit:
pass
except Explicit as e:
await ctx.reply(e.msg)
async def save_queues():
while True:
await asyncio.sleep(1)
for mainasrc in list(mainasrcs.values()):
await asyncio.sleep(1)
mainasrc.queue.save()
benchmarks: dict[str, float] = {}
_t = time.perf_counter()
class Benchmark:
def __init__(self, benchmark: str):
self.benchmark = benchmark
def __enter__(self):
self.t = time.perf_counter()
def __exit__(self, exc_type, exc_val, exc_tb):
benchmarks.setdefault(self.benchmark, 0.0)
benchmarks[self.benchmark] += time.perf_counter() - self.t
async def monitor():
while True:
await asyncio.sleep(60)
dt = time.perf_counter() - _t
for benchmark, count in benchmarks.items():
print(benchmark, '=', count / max(dt, .00001))
async def main():
async with volume_db, queue_db:
await start_app(get_app(client))
await client.login(token)
asyncio.get_event_loop().create_task(save_queues())
asyncio.get_event_loop().create_task(monitor())
await client.connect()
if __name__ == '__main__':
loop.run_until_complete(main())

18
v6d3music/stop-bot.py Normal file
View File

@ -0,0 +1,18 @@
import asyncio
import time
import aiohttp
from v6d0auth import certs
from v6d0auth.config import host, port
async def main():
request = certs.sign(str(time.time()).encode())
async with aiohttp.ClientSession() as session:
# noinspection HttpUrlsUsage
async with session.post(f'http://{host}:{port}/stop', data=request) as response:
print(response.status)
if __name__ == '__main__':
asyncio.run(main())