From 83dfb7f6807bb5a47e65e7fcd56d005abf71703c Mon Sep 17 00:00:00 2001 From: timotheyca Date: Sun, 19 Dec 2021 20:22:24 +0300 Subject: [PATCH] v6d2ctx --- requirements.txt | 6 +- v6d3music/context.py | 97 ----------------------- v6d3music/run-bot.py | 184 ++++++++++++++++++++++++++++++++++++------- 3 files changed, 160 insertions(+), 127 deletions(-) delete mode 100644 v6d3music/context.py diff --git a/requirements.txt b/requirements.txt index c245abc..8367210 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ aiohttp~=3.7.4.post0 discord.py[voice]~=1.7.3 -git+https://gitea.ongoteam.net/PTV/v6d0auth.git -git+https://gitea.ongoteam.net/PTV/v6d1tokens.git -youtube_dl +git+https://gitea.ongoteam.net/PTV/v6d1tokens.git@2dca5338ecec2042f731ff2855225417f66e1372 +git+https://gitea.ongoteam.net/PTV/v6d2ctx.git@087aa39918a147ad9df7de35e6484ccb3efdc6c9 +youtube_dl~=2021.12.17 diff --git a/v6d3music/context.py b/v6d3music/context.py deleted file mode 100644 index 24120f1..0000000 --- a/v6d3music/context.py +++ /dev/null @@ -1,97 +0,0 @@ -import asyncio -import time -from io import StringIO -from typing import Union, Optional, Callable, Awaitable - -# noinspection PyPackageRequirements -import discord - -usertype = Union[discord.abc.User, discord.user.BaseUser, discord.Member, discord.User] - - -class Context: - def __init__(self, message: discord.Message): - self.message: discord.Message = message - self.channel: discord.abc.Messageable = message.channel - self.dm_or_text: Union[discord.DMChannel, discord.TextChannel] = message.channel - self.author: usertype = message.author - self.content: str = message.content - self.member: Optional[discord.Member] = message.author if isinstance(message.author, discord.Member) else None - self.guild: Optional[discord.Guild] = None if self.member is None else self.member.guild - - async def reply(self, content=None, **kwargs) -> discord.Message: - return await self.message.reply(content, mention_author=False, **kwargs) - - async def long(self, s: str): - resio = StringIO(s) - res = '' - for line in resio: - if len(res) + len(line) < 2000: - res += line - else: - await self.reply(res) - res = line - if res: - await self.reply(res) - - -ESCAPED = '`_*\'"\\' - - -def escape(s: str): - res = StringIO() - for c in s: - if c in ESCAPED: - c = '\\' + c - res.write(c) - return res.getvalue() - - -buckets: dict[str, dict[str, Callable[[Context, list[str]], Awaitable[None]]]] = {} - - -def at(bucket: str, name: str): - def wrap(f: Callable[[Context, list[str]], Awaitable[None]]): - buckets.setdefault(bucket, {})[name] = f - - return f - - return wrap - - -class Implicit(Exception): - pass - - -def of(bucket: str, name: str) -> Callable[[Context, list[str]], Awaitable[None]]: - try: - return buckets[bucket][name] - except KeyError: - raise Implicit - - -benchmarks: dict[str, dict[str, float]] = {} -_t = time.perf_counter() - - -class Benchmark: - def __init__(self, benchmark: str): - self.benchmark = benchmark - - def __enter__(self): - self.t = time.perf_counter() - - def __exit__(self, exc_type, exc_val, exc_tb): - d = (time.perf_counter() - self.t) - benchmarks.setdefault(self.benchmark, {'integral': 0.0, 'max': 0.0}) - benchmarks[self.benchmark]['integral'] += d - benchmarks[self.benchmark]['max'] = max(benchmarks[self.benchmark]['max'], d) - - -async def monitor(): - while True: - await asyncio.sleep(10) - dt = time.perf_counter() - _t - print('Benchmarks:') - for benchmark, metrics in benchmarks.items(): - print(benchmark, '=', metrics['integral'] / max(dt, .00001), ':', metrics['max']) diff --git a/v6d3music/run-bot.py b/v6d3music/run-bot.py index 9938902..19db6b9 100644 --- a/v6d3music/run-bot.py +++ b/v6d3music/run-bot.py @@ -1,5 +1,6 @@ import asyncio import concurrent.futures +import os import random import shlex import string @@ -11,15 +12,16 @@ from typing import Optional, AsyncIterable, Any # noinspection PyPackageRequirements import discord -import youtube_dl as youtube_dl +import nacl.hash +import youtube_dl from ptvp35 import Db, KVJson from v6d0auth.config import root from v6d0auth.run_app import start_app from v6d1tokens.client import request_token +from v6d2ctx.context import Context, of, at, escape, Implicit, monitor, Benchmark, Explicit from v6d3music.app import get_app from v6d3music.config import prefix -from v6d3music.context import Context, of, at, escape, Implicit, monitor, Benchmark loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) @@ -40,6 +42,10 @@ myroot = root / 'v6d3music' myroot.mkdir(exist_ok=True) volume_db = Db(myroot / 'volume.db', kvrequest_type=KVJson) queue_db = Db(myroot / 'queue.db', kvrequest_type=KVJson) +cache_db = Db(myroot / 'cache.db', kvrequest_type=KVJson) +cache_root = myroot / 'cache' +cache_root.mkdir(exist_ok=True) +cache_locks: dict[str, asyncio.Lock] = {} vcs_restored = False @@ -89,17 +95,12 @@ async def help_(ctx: Context, args: list[str]) -> None: case []: await ctx.reply('music bot') case [name]: - await ctx.reply(f'help for {name}:') + await ctx.reply(f'help for {name}: `{name} help`') locks: dict[discord.Guild, asyncio.Lock] = {} -class Explicit(Exception): - def __init__(self, msg: str): - self.msg = msg - - def lock_for(guild: discord.Guild) -> asyncio.Lock: if guild is None: raise Explicit('not in a guild') @@ -109,6 +110,13 @@ def lock_for(guild: discord.Guild) -> asyncio.Lock: return locks.setdefault(guild, asyncio.Lock()) +def cache_lock_for(hurl: str) -> asyncio.Lock: + if hurl in cache_locks: + return cache_locks[hurl] + else: + return cache_locks.setdefault(hurl, asyncio.Lock()) + + class YTAudio(discord.AudioSource): source: discord.FFmpegPCMAudio @@ -139,7 +147,11 @@ class YTAudio(discord.AudioSource): ) def before_options(self): - before_options = '-reconnect 1 -reconnect_at_eof 0 -reconnect_streamed 1 -reconnect_delay_max 10 -copy_unknown' + before_options = '' + if 'https' in self.url: + before_options += ( + '-reconnect 1 -reconnect_at_eof 0 -reconnect_streamed 1 -reconnect_delay_max 10 -copy_unknown' + ) if self.already_read: before_options += f' -ss {self.already_read * discord.opus.Encoder.FRAME_LENGTH / 1000}' return before_options @@ -202,7 +214,7 @@ class YTAudio(discord.AudioSource): async def regenerate(self): try: print(f'regenerating {self.origin}') - self.url = await real_url(self.origin) + self.url = await real_url(self.origin, True) self.source.cleanup() self.set_source() print(f'regenerated {self.origin}') @@ -281,6 +293,26 @@ class QueueAudio(discord.AudioSource): else: raise Explicit('not an administrator') + def swap(self, member: discord.Member, a: int, b: int) -> None: + permissions: discord.Permissions = member.guild_permissions + if permissions.administrator: + if max(a, b) >= len(self.queue): + return + self.queue[a], self.queue[b] = self.queue[b], self.queue[a] + else: + raise Explicit('not an administrator') + + def move(self, member: discord.Member, a: int, b: int) -> None: + permissions: discord.Permissions = member.guild_permissions + if permissions.administrator: + if max(a, b) >= len(self.queue): + return + audio = self.queue[a] + self.queue.remove(audio) + self.queue.insert(b, audio) + else: + raise Explicit('not an administrator') + def format(self) -> str: stream = StringIO() for i, audio in enumerate(list(self.queue)): @@ -320,6 +352,21 @@ def extract(params: dict, url: str, kwargs: dict): return extracted +def bytes_hash(b: bytes) -> str: + return nacl.hash.sha256(b).decode() + + +def recursive_hash(obj) -> str: + if isinstance(obj, str): + return bytes_hash(obj.encode()) + elif isinstance(obj, tuple) or isinstance(obj, list): + return recursive_hash(';'.join(map(recursive_hash, obj))) + elif isinstance(obj, dict): + return recursive_hash([*obj.items()]) + else: + raise TypeError + + async def aextract(params: dict, url: str, **kwargs): with Benchmark('AEX'): with concurrent.futures.ProcessPoolExecutor() as pool: @@ -332,7 +379,42 @@ async def aextract(params: dict, url: str, **kwargs): ) -async def real_url(url: str): +async def cache_url(hurl: str, rurl: str, override: bool) -> None: + async with cache_lock_for(hurl): + if not override and cache_db.get(f'url:{hurl}', None) is not None: + return + cachable: bool = cache_db.get(f'cachable:{hurl}', False) + if cachable: + print('caching', hurl) + path = cache_root / f'{hurl}.opus' + p = subprocess.Popen( + [ + 'ffmpeg', '-hide_banner', '-loglevel', 'warning', + '-reconnect', '1', '-reconnect_at_eof', '0', + '-reconnect_streamed', '1', '-reconnect_delay_max', '10', '-copy_unknown', + '-y', '-i', rurl, str(path) + ], + # stdout=subprocess.PIPE, + # stderr=subprocess.PIPE, + ) + with Benchmark('CCH'): + code = await loop.run_in_executor(None, p.wait) + if code: + raise RuntimeError(code) + await cache_db.set(f'url:{hurl}', str(path)) + print('cached', hurl) + # await cache_db.set(f'cachable:{hurl}', False) + else: + await cache_db.set(f'cachable:{hurl}', True) + + +async def real_url(url: str, override: bool) -> str: + hurl: str = bytes_hash(url.encode()) + if not override: + curl: Optional[str] = cache_db.get(f'url:{hurl}', None) + if curl is not None: + print('using cached', hurl) + return curl p = subprocess.Popen( ['youtube-dl', '--no-playlist', '-f', 'bestaudio', '-g', '--', url], stdout=subprocess.PIPE @@ -341,10 +423,12 @@ async def real_url(url: str): code = await loop.run_in_executor(None, p.wait) if code: raise RuntimeError(code) - return p.stdout.readline().decode()[:-1] + rurl: str = p.stdout.readline().decode()[:-1] + loop.create_task(cache_url(hurl, rurl, override)) + return rurl -async def create_ytaudio(ctx: Context, info: dict[str, Any], effects: Optional[str]) -> YTAudio: +async def create_ytaudio(ctx: Context, info: dict[str, Any], effects: Optional[str], already_read: int) -> YTAudio: if effects: permissions: discord.Permissions = ctx.member.guild_permissions if not permissions.administrator: @@ -355,12 +439,12 @@ async def create_ytaudio(ctx: Context, info: dict[str, Any], effects: Optional[s else: options = None return YTAudio( - await real_url(info['url']), + await real_url(info['url'], False), info['url'], f'{escape(info.get("title"))} `Rby` {ctx.member}', options, ctx.member, - 0 + already_read ) @@ -383,12 +467,12 @@ async def entries_for_url(url: str) -> AsyncIterable[ yield info | {'url': url} -async def create_ytaudios(ctx: Context, infos: list[tuple[dict[str, Any], str]]) -> AsyncIterable[YTAudio]: +async def create_ytaudios(ctx: Context, infos: list[tuple[dict[str, Any], str, int]]) -> AsyncIterable[YTAudio]: for audio in await asyncio.gather( *[ - create_ytaudio(ctx, info, effects) + create_ytaudio(ctx, info, effects, already_read) for - info, effects + info, effects, already_read in infos ] @@ -400,12 +484,14 @@ presets: dict[str, str] = { 'cursed': 'aeval=val(0)*2*sin(440*t)+val(1)*2*cos(622*t)|val(1)*2*sin(622*t)+val(0)*2*cos(440*t)', 'bassboost': 'bass=g=10', 'bassbooboost': 'bass=g=30', + 'nightcore': 'asetrate=67882', + 'пришествие анимешне': 'bass=g=15,asetrate=67882,bass=g=15', 'difference': 'aeval=val(0)-val(1)|val(1)-val(0)', 'mono': 'aeval=.5*val(0)+.5*val(1)|.5*val(1)+.5*val(0)', } -async def entries_effects_for_args(args: list[str]) -> AsyncIterable[tuple[dict[str, Any], str]]: +async def entries_effects_for_args(args: list[str]) -> AsyncIterable[tuple[dict[str, Any], str, int]]: while args: match args: case [url, '-', effects, *args]: @@ -416,14 +502,25 @@ async def entries_effects_for_args(args: list[str]) -> AsyncIterable[tuple[dict[ effects = None case _: raise RuntimeError + timestamp = 0 + match args: + case [h, m, s, *args] if h.isdecimal() and m.isdecimal() and s.isdecimal(): + timestamp = 3600 * int(h) + 60 * int(m) + int(s) + case [m, s, *args] if m.isdecimal() and s.isdecimal(): + timestamp = 60 * int(m) + int(s) + case [s, *args] if s.isdecimal(): + timestamp = int(s) + case [*args]: + pass + already_read = timestamp * 1000 / discord.opus.Encoder.FRAME_LENGTH async for info in entries_for_url(url): - yield info, effects + yield info, effects, already_read async def yt_audios(ctx: Context, args: list[str]) -> AsyncIterable[YTAudio]: - tuples: list[tuple[dict[str, Any], str]] = [] - async for info, effects in entries_effects_for_args(args): - tuples.append((info, effects)) + tuples: list[tuple[dict[str, Any], str, int]] = [] + async for info, effects, already_read in entries_effects_for_args(args): + tuples.append((info, effects, already_read)) if len(tuples) >= 5: async for audio in create_ytaudios(ctx, tuples): yield audio @@ -513,19 +610,23 @@ async def skip(ctx: Context, args: list[str]) -> None: await ctx.reply(''' `skip [first] [last]` '''.strip()) + return case []: queue = await queue_for(ctx, create=False) queue.skip_at(0, ctx.member) - case [pos]: + case [pos] if pos.isdecimal(): pos = int(pos) queue = await queue_for(ctx, create=False) queue.skip_at(pos, ctx.member) - case [pos0, pos1]: + case [pos0, pos1] if pos0.isdecimal() and pos1.isdecimal(): pos0, pos1 = int(pos0), int(pos1) queue = await queue_for(ctx, create=False) for i in range(pos0, pos1 + 1): if not queue.skip_at(pos0, ctx.member): pos0 += 1 + case _: + raise Explicit("misformatted") + await ctx.reply('done') @at('commands', 'queue') @@ -541,6 +642,32 @@ async def queue_(ctx: Context, args: list[str]) -> None: case ['resume']: async with lock_for(ctx.guild): await queue_for(ctx, create=True) + case _: + raise Explicit("misformatted") + + +@at('commands', 'swap') +async def swap(ctx: Context, args: list[str]) -> None: + match args: + case ['help']: + await ctx.reply('`swap a b`') + case [a, b] if a.isdecimal() and b.isdecimal(): + a, b = int(a), int(b) + (await queue_for(ctx, create=False)).swap(ctx.member, a, b) + case _: + raise Explicit("misformatted") + + +@at('commands', 'move') +async def move(ctx: Context, args: list[str]) -> None: + match args: + case ['help']: + await ctx.reply('`move a b`') + case [a, b] if a.isdecimal() and b.isdecimal(): + a, b = int(a), int(b) + (await queue_for(ctx, create=False)).move(ctx.member, a, b) + case _: + raise Explicit("misformatted") @at('commands', 'volume') @@ -551,6 +678,8 @@ async def volume_(ctx: Context, args: list[str]) -> None: case [volume]: volume = float(volume) await (await main_for(ctx, create=False)).set(volume, ctx.member) + case _: + raise Explicit("misformatted") @at('commands', 'pause') @@ -624,11 +753,12 @@ async def save_job(): async def main(): - async with volume_db, queue_db: + async with volume_db, queue_db, cache_db: await start_app(get_app(client)) await client.login(token) loop.create_task(save_job()) - loop.create_task(monitor()) + if os.getenv('v6monitor'): + loop.create_task(monitor()) await client.connect()