This commit is contained in:
AF 2021-12-19 20:22:24 +03:00
parent 30a5820932
commit 83dfb7f680
3 changed files with 160 additions and 127 deletions

View File

@ -1,5 +1,5 @@
aiohttp~=3.7.4.post0
discord.py[voice]~=1.7.3
git+https://gitea.ongoteam.net/PTV/v6d0auth.git
git+https://gitea.ongoteam.net/PTV/v6d1tokens.git
youtube_dl
git+https://gitea.ongoteam.net/PTV/v6d1tokens.git@2dca5338ecec2042f731ff2855225417f66e1372
git+https://gitea.ongoteam.net/PTV/v6d2ctx.git@087aa39918a147ad9df7de35e6484ccb3efdc6c9
youtube_dl~=2021.12.17

View File

@ -1,97 +0,0 @@
import asyncio
import time
from io import StringIO
from typing import Union, Optional, Callable, Awaitable
# noinspection PyPackageRequirements
import discord
usertype = Union[discord.abc.User, discord.user.BaseUser, discord.Member, discord.User]
class Context:
def __init__(self, message: discord.Message):
self.message: discord.Message = message
self.channel: discord.abc.Messageable = message.channel
self.dm_or_text: Union[discord.DMChannel, discord.TextChannel] = message.channel
self.author: usertype = message.author
self.content: str = message.content
self.member: Optional[discord.Member] = message.author if isinstance(message.author, discord.Member) else None
self.guild: Optional[discord.Guild] = None if self.member is None else self.member.guild
async def reply(self, content=None, **kwargs) -> discord.Message:
return await self.message.reply(content, mention_author=False, **kwargs)
async def long(self, s: str):
resio = StringIO(s)
res = ''
for line in resio:
if len(res) + len(line) < 2000:
res += line
else:
await self.reply(res)
res = line
if res:
await self.reply(res)
ESCAPED = '`_*\'"\\'
def escape(s: str):
res = StringIO()
for c in s:
if c in ESCAPED:
c = '\\' + c
res.write(c)
return res.getvalue()
buckets: dict[str, dict[str, Callable[[Context, list[str]], Awaitable[None]]]] = {}
def at(bucket: str, name: str):
def wrap(f: Callable[[Context, list[str]], Awaitable[None]]):
buckets.setdefault(bucket, {})[name] = f
return f
return wrap
class Implicit(Exception):
pass
def of(bucket: str, name: str) -> Callable[[Context, list[str]], Awaitable[None]]:
try:
return buckets[bucket][name]
except KeyError:
raise Implicit
benchmarks: dict[str, dict[str, float]] = {}
_t = time.perf_counter()
class Benchmark:
def __init__(self, benchmark: str):
self.benchmark = benchmark
def __enter__(self):
self.t = time.perf_counter()
def __exit__(self, exc_type, exc_val, exc_tb):
d = (time.perf_counter() - self.t)
benchmarks.setdefault(self.benchmark, {'integral': 0.0, 'max': 0.0})
benchmarks[self.benchmark]['integral'] += d
benchmarks[self.benchmark]['max'] = max(benchmarks[self.benchmark]['max'], d)
async def monitor():
while True:
await asyncio.sleep(10)
dt = time.perf_counter() - _t
print('Benchmarks:')
for benchmark, metrics in benchmarks.items():
print(benchmark, '=', metrics['integral'] / max(dt, .00001), ':', metrics['max'])

View File

@ -1,5 +1,6 @@
import asyncio
import concurrent.futures
import os
import random
import shlex
import string
@ -11,15 +12,16 @@ from typing import Optional, AsyncIterable, Any
# noinspection PyPackageRequirements
import discord
import youtube_dl as youtube_dl
import nacl.hash
import youtube_dl
from ptvp35 import Db, KVJson
from v6d0auth.config import root
from v6d0auth.run_app import start_app
from v6d1tokens.client import request_token
from v6d2ctx.context import Context, of, at, escape, Implicit, monitor, Benchmark, Explicit
from v6d3music.app import get_app
from v6d3music.config import prefix
from v6d3music.context import Context, of, at, escape, Implicit, monitor, Benchmark
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
@ -40,6 +42,10 @@ myroot = root / 'v6d3music'
myroot.mkdir(exist_ok=True)
volume_db = Db(myroot / 'volume.db', kvrequest_type=KVJson)
queue_db = Db(myroot / 'queue.db', kvrequest_type=KVJson)
cache_db = Db(myroot / 'cache.db', kvrequest_type=KVJson)
cache_root = myroot / 'cache'
cache_root.mkdir(exist_ok=True)
cache_locks: dict[str, asyncio.Lock] = {}
vcs_restored = False
@ -89,17 +95,12 @@ async def help_(ctx: Context, args: list[str]) -> None:
case []:
await ctx.reply('music bot')
case [name]:
await ctx.reply(f'help for {name}:')
await ctx.reply(f'help for {name}: `{name} help`')
locks: dict[discord.Guild, asyncio.Lock] = {}
class Explicit(Exception):
def __init__(self, msg: str):
self.msg = msg
def lock_for(guild: discord.Guild) -> asyncio.Lock:
if guild is None:
raise Explicit('not in a guild')
@ -109,6 +110,13 @@ def lock_for(guild: discord.Guild) -> asyncio.Lock:
return locks.setdefault(guild, asyncio.Lock())
def cache_lock_for(hurl: str) -> asyncio.Lock:
if hurl in cache_locks:
return cache_locks[hurl]
else:
return cache_locks.setdefault(hurl, asyncio.Lock())
class YTAudio(discord.AudioSource):
source: discord.FFmpegPCMAudio
@ -139,7 +147,11 @@ class YTAudio(discord.AudioSource):
)
def before_options(self):
before_options = '-reconnect 1 -reconnect_at_eof 0 -reconnect_streamed 1 -reconnect_delay_max 10 -copy_unknown'
before_options = ''
if 'https' in self.url:
before_options += (
'-reconnect 1 -reconnect_at_eof 0 -reconnect_streamed 1 -reconnect_delay_max 10 -copy_unknown'
)
if self.already_read:
before_options += f' -ss {self.already_read * discord.opus.Encoder.FRAME_LENGTH / 1000}'
return before_options
@ -202,7 +214,7 @@ class YTAudio(discord.AudioSource):
async def regenerate(self):
try:
print(f'regenerating {self.origin}')
self.url = await real_url(self.origin)
self.url = await real_url(self.origin, True)
self.source.cleanup()
self.set_source()
print(f'regenerated {self.origin}')
@ -281,6 +293,26 @@ class QueueAudio(discord.AudioSource):
else:
raise Explicit('not an administrator')
def swap(self, member: discord.Member, a: int, b: int) -> None:
permissions: discord.Permissions = member.guild_permissions
if permissions.administrator:
if max(a, b) >= len(self.queue):
return
self.queue[a], self.queue[b] = self.queue[b], self.queue[a]
else:
raise Explicit('not an administrator')
def move(self, member: discord.Member, a: int, b: int) -> None:
permissions: discord.Permissions = member.guild_permissions
if permissions.administrator:
if max(a, b) >= len(self.queue):
return
audio = self.queue[a]
self.queue.remove(audio)
self.queue.insert(b, audio)
else:
raise Explicit('not an administrator')
def format(self) -> str:
stream = StringIO()
for i, audio in enumerate(list(self.queue)):
@ -320,6 +352,21 @@ def extract(params: dict, url: str, kwargs: dict):
return extracted
def bytes_hash(b: bytes) -> str:
return nacl.hash.sha256(b).decode()
def recursive_hash(obj) -> str:
if isinstance(obj, str):
return bytes_hash(obj.encode())
elif isinstance(obj, tuple) or isinstance(obj, list):
return recursive_hash(';'.join(map(recursive_hash, obj)))
elif isinstance(obj, dict):
return recursive_hash([*obj.items()])
else:
raise TypeError
async def aextract(params: dict, url: str, **kwargs):
with Benchmark('AEX'):
with concurrent.futures.ProcessPoolExecutor() as pool:
@ -332,7 +379,42 @@ async def aextract(params: dict, url: str, **kwargs):
)
async def real_url(url: str):
async def cache_url(hurl: str, rurl: str, override: bool) -> None:
async with cache_lock_for(hurl):
if not override and cache_db.get(f'url:{hurl}', None) is not None:
return
cachable: bool = cache_db.get(f'cachable:{hurl}', False)
if cachable:
print('caching', hurl)
path = cache_root / f'{hurl}.opus'
p = subprocess.Popen(
[
'ffmpeg', '-hide_banner', '-loglevel', 'warning',
'-reconnect', '1', '-reconnect_at_eof', '0',
'-reconnect_streamed', '1', '-reconnect_delay_max', '10', '-copy_unknown',
'-y', '-i', rurl, str(path)
],
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE,
)
with Benchmark('CCH'):
code = await loop.run_in_executor(None, p.wait)
if code:
raise RuntimeError(code)
await cache_db.set(f'url:{hurl}', str(path))
print('cached', hurl)
# await cache_db.set(f'cachable:{hurl}', False)
else:
await cache_db.set(f'cachable:{hurl}', True)
async def real_url(url: str, override: bool) -> str:
hurl: str = bytes_hash(url.encode())
if not override:
curl: Optional[str] = cache_db.get(f'url:{hurl}', None)
if curl is not None:
print('using cached', hurl)
return curl
p = subprocess.Popen(
['youtube-dl', '--no-playlist', '-f', 'bestaudio', '-g', '--', url],
stdout=subprocess.PIPE
@ -341,10 +423,12 @@ async def real_url(url: str):
code = await loop.run_in_executor(None, p.wait)
if code:
raise RuntimeError(code)
return p.stdout.readline().decode()[:-1]
rurl: str = p.stdout.readline().decode()[:-1]
loop.create_task(cache_url(hurl, rurl, override))
return rurl
async def create_ytaudio(ctx: Context, info: dict[str, Any], effects: Optional[str]) -> YTAudio:
async def create_ytaudio(ctx: Context, info: dict[str, Any], effects: Optional[str], already_read: int) -> YTAudio:
if effects:
permissions: discord.Permissions = ctx.member.guild_permissions
if not permissions.administrator:
@ -355,12 +439,12 @@ async def create_ytaudio(ctx: Context, info: dict[str, Any], effects: Optional[s
else:
options = None
return YTAudio(
await real_url(info['url']),
await real_url(info['url'], False),
info['url'],
f'{escape(info.get("title"))} `Rby` {ctx.member}',
options,
ctx.member,
0
already_read
)
@ -383,12 +467,12 @@ async def entries_for_url(url: str) -> AsyncIterable[
yield info | {'url': url}
async def create_ytaudios(ctx: Context, infos: list[tuple[dict[str, Any], str]]) -> AsyncIterable[YTAudio]:
async def create_ytaudios(ctx: Context, infos: list[tuple[dict[str, Any], str, int]]) -> AsyncIterable[YTAudio]:
for audio in await asyncio.gather(
*[
create_ytaudio(ctx, info, effects)
create_ytaudio(ctx, info, effects, already_read)
for
info, effects
info, effects, already_read
in
infos
]
@ -400,12 +484,14 @@ presets: dict[str, str] = {
'cursed': 'aeval=val(0)*2*sin(440*t)+val(1)*2*cos(622*t)|val(1)*2*sin(622*t)+val(0)*2*cos(440*t)',
'bassboost': 'bass=g=10',
'bassbooboost': 'bass=g=30',
'nightcore': 'asetrate=67882',
'пришествие анимешне': 'bass=g=15,asetrate=67882,bass=g=15',
'difference': 'aeval=val(0)-val(1)|val(1)-val(0)',
'mono': 'aeval=.5*val(0)+.5*val(1)|.5*val(1)+.5*val(0)',
}
async def entries_effects_for_args(args: list[str]) -> AsyncIterable[tuple[dict[str, Any], str]]:
async def entries_effects_for_args(args: list[str]) -> AsyncIterable[tuple[dict[str, Any], str, int]]:
while args:
match args:
case [url, '-', effects, *args]:
@ -416,14 +502,25 @@ async def entries_effects_for_args(args: list[str]) -> AsyncIterable[tuple[dict[
effects = None
case _:
raise RuntimeError
timestamp = 0
match args:
case [h, m, s, *args] if h.isdecimal() and m.isdecimal() and s.isdecimal():
timestamp = 3600 * int(h) + 60 * int(m) + int(s)
case [m, s, *args] if m.isdecimal() and s.isdecimal():
timestamp = 60 * int(m) + int(s)
case [s, *args] if s.isdecimal():
timestamp = int(s)
case [*args]:
pass
already_read = timestamp * 1000 / discord.opus.Encoder.FRAME_LENGTH
async for info in entries_for_url(url):
yield info, effects
yield info, effects, already_read
async def yt_audios(ctx: Context, args: list[str]) -> AsyncIterable[YTAudio]:
tuples: list[tuple[dict[str, Any], str]] = []
async for info, effects in entries_effects_for_args(args):
tuples.append((info, effects))
tuples: list[tuple[dict[str, Any], str, int]] = []
async for info, effects, already_read in entries_effects_for_args(args):
tuples.append((info, effects, already_read))
if len(tuples) >= 5:
async for audio in create_ytaudios(ctx, tuples):
yield audio
@ -513,19 +610,23 @@ async def skip(ctx: Context, args: list[str]) -> None:
await ctx.reply('''
`skip [first] [last]`
'''.strip())
return
case []:
queue = await queue_for(ctx, create=False)
queue.skip_at(0, ctx.member)
case [pos]:
case [pos] if pos.isdecimal():
pos = int(pos)
queue = await queue_for(ctx, create=False)
queue.skip_at(pos, ctx.member)
case [pos0, pos1]:
case [pos0, pos1] if pos0.isdecimal() and pos1.isdecimal():
pos0, pos1 = int(pos0), int(pos1)
queue = await queue_for(ctx, create=False)
for i in range(pos0, pos1 + 1):
if not queue.skip_at(pos0, ctx.member):
pos0 += 1
case _:
raise Explicit("misformatted")
await ctx.reply('done')
@at('commands', 'queue')
@ -541,6 +642,32 @@ async def queue_(ctx: Context, args: list[str]) -> None:
case ['resume']:
async with lock_for(ctx.guild):
await queue_for(ctx, create=True)
case _:
raise Explicit("misformatted")
@at('commands', 'swap')
async def swap(ctx: Context, args: list[str]) -> None:
match args:
case ['help']:
await ctx.reply('`swap a b`')
case [a, b] if a.isdecimal() and b.isdecimal():
a, b = int(a), int(b)
(await queue_for(ctx, create=False)).swap(ctx.member, a, b)
case _:
raise Explicit("misformatted")
@at('commands', 'move')
async def move(ctx: Context, args: list[str]) -> None:
match args:
case ['help']:
await ctx.reply('`move a b`')
case [a, b] if a.isdecimal() and b.isdecimal():
a, b = int(a), int(b)
(await queue_for(ctx, create=False)).move(ctx.member, a, b)
case _:
raise Explicit("misformatted")
@at('commands', 'volume')
@ -551,6 +678,8 @@ async def volume_(ctx: Context, args: list[str]) -> None:
case [volume]:
volume = float(volume)
await (await main_for(ctx, create=False)).set(volume, ctx.member)
case _:
raise Explicit("misformatted")
@at('commands', 'pause')
@ -624,11 +753,12 @@ async def save_job():
async def main():
async with volume_db, queue_db:
async with volume_db, queue_db, cache_db:
await start_app(get_app(client))
await client.login(token)
loop.create_task(save_job())
loop.create_task(monitor())
if os.getenv('v6monitor'):
loop.create_task(monitor())
await client.connect()