argctx
This commit is contained in:
parent
f4dcc1a03a
commit
3b58d59194
@ -8,7 +8,7 @@ from v6d3music.core.yt_audios import yt_audios
|
|||||||
from v6d3music.utils.assert_admin import assert_admin
|
from v6d3music.utils.assert_admin import assert_admin
|
||||||
from v6d3music.utils.catch import catch
|
from v6d3music.utils.catch import catch
|
||||||
from v6d3music.utils.effects_for_preset import effects_for_preset
|
from v6d3music.utils.effects_for_preset import effects_for_preset
|
||||||
from v6d3music.utils.entries_effects_for_args import default_effects, set_default_effects
|
from v6d3music.core.entries_effects_for_args import default_effects, set_default_effects
|
||||||
from v6d3music.utils.options_for_effects import options_for_effects
|
from v6d3music.utils.options_for_effects import options_for_effects
|
||||||
from v6d3music.utils.presets import allowed_presets
|
from v6d3music.utils.presets import allowed_presets
|
||||||
|
|
||||||
|
@ -8,26 +8,27 @@ from v6d3music.core.ytaudio import YTAudio
|
|||||||
from v6d3music.utils.assert_admin import assert_admin
|
from v6d3music.utils.assert_admin import assert_admin
|
||||||
from v6d3music.utils.options_for_effects import options_for_effects
|
from v6d3music.utils.options_for_effects import options_for_effects
|
||||||
from v6d3music.utils.presets import allowed_effects
|
from v6d3music.utils.presets import allowed_effects
|
||||||
|
from v6d3music.utils.argctx import InfoCtx
|
||||||
|
|
||||||
|
|
||||||
async def create_ytaudio(
|
async def create_ytaudio(
|
||||||
ctx: Context, info: dict[str, Any], effects: Optional[str], already_read: int, tor: bool
|
ctx: Context, it: InfoCtx
|
||||||
) -> YTAudio:
|
) -> YTAudio:
|
||||||
assert ctx.member is not None
|
assert ctx.member is not None
|
||||||
if effects:
|
if it.effects:
|
||||||
if effects not in allowed_effects:
|
if it.effects not in allowed_effects:
|
||||||
assert_admin(ctx.member)
|
assert_admin(ctx.member)
|
||||||
if not set(effects) <= set(string.ascii_letters + string.digits + '*,=+-/()|.^:_'):
|
if not set(it.effects) <= set(string.ascii_letters + string.digits + '*,=+-/()|.^:_'):
|
||||||
raise Explicit('malformed effects')
|
raise Explicit('malformed effects')
|
||||||
options = options_for_effects(effects)
|
options = options_for_effects(it.effects)
|
||||||
else:
|
else:
|
||||||
options = None
|
options = None
|
||||||
return YTAudio(
|
return YTAudio(
|
||||||
await real_url(info['url'], False, tor),
|
await real_url(it.info['url'], False, it.tor),
|
||||||
info['url'],
|
it.info['url'],
|
||||||
f'{escape(info.get("title", "unknown"))} `Rby` {ctx.member}',
|
f'{escape(it.info.get("title", "unknown"))} `Rby` {ctx.member}',
|
||||||
options,
|
options,
|
||||||
ctx.member,
|
ctx.member,
|
||||||
already_read,
|
it.already_read,
|
||||||
tor
|
it.tor
|
||||||
)
|
)
|
||||||
|
@ -5,15 +5,15 @@ from v6d2ctx.context import Context
|
|||||||
|
|
||||||
from v6d3music.core.create_ytaudio import create_ytaudio
|
from v6d3music.core.create_ytaudio import create_ytaudio
|
||||||
from v6d3music.core.ytaudio import YTAudio
|
from v6d3music.core.ytaudio import YTAudio
|
||||||
from v6d3music.utils.info_tuple import info_tuple
|
from v6d3music.utils.argctx import InfoCtx
|
||||||
|
|
||||||
|
|
||||||
async def create_ytaudios(ctx: Context, infos: list[info_tuple]) -> AsyncIterable[YTAudio]:
|
async def create_ytaudios(ctx: Context, infos: list[InfoCtx]) -> AsyncIterable[YTAudio]:
|
||||||
for audio in await asyncio.gather(
|
for audio in await asyncio.gather(
|
||||||
*[
|
*[
|
||||||
create_ytaudio(ctx, info, effects, already_read, tor)
|
create_ytaudio(ctx, it)
|
||||||
for
|
for
|
||||||
info, effects, already_read, tor
|
it
|
||||||
in
|
in
|
||||||
infos
|
infos
|
||||||
]
|
]
|
||||||
|
31
v6d3music/core/entries_effects_for_args.py
Normal file
31
v6d3music/core/entries_effects_for_args.py
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
from typing import AsyncIterable
|
||||||
|
|
||||||
|
from ptvp35 import Db, KVJson
|
||||||
|
from v6d2ctx.context import Explicit
|
||||||
|
|
||||||
|
from v6d3music.utils.argctx import ArgCtx, InfoCtx
|
||||||
|
from v6d3music.config import myroot
|
||||||
|
from v6d3music.utils.presets import allowed_effects
|
||||||
|
|
||||||
|
|
||||||
|
effects_db = Db(myroot / 'effects.db', kvfactory=KVJson())
|
||||||
|
|
||||||
|
|
||||||
|
def default_effects(gid: int) -> str | None:
|
||||||
|
effects = effects_db.get(gid, None)
|
||||||
|
if effects in allowed_effects:
|
||||||
|
return effects
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def set_default_effects(gid: int, effects: str | None) -> None:
|
||||||
|
if effects is not None and effects not in allowed_effects:
|
||||||
|
raise Explicit('these effects are not allowed')
|
||||||
|
await effects_db.set(gid, effects)
|
||||||
|
|
||||||
|
|
||||||
|
async def entries_effects_for_args(args: list[str], gid: int) -> AsyncIterable[InfoCtx]:
|
||||||
|
for ctx in ArgCtx(default_effects, args, gid).sources:
|
||||||
|
async for it in ctx.entries():
|
||||||
|
yield it
|
@ -28,7 +28,7 @@ async def _resolve_url(url: str, tor: bool) -> str:
|
|||||||
args.extend(tor_prefix())
|
args.extend(tor_prefix())
|
||||||
args.extend(
|
args.extend(
|
||||||
[
|
[
|
||||||
'youtube-dl', '--no-playlist', '-f', 'bestaudio', '-g', '--', url
|
'yt-dlp', '--no-playlist', '-f', 'bestaudio', '-g', '--', url,
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
ap = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE)
|
ap = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE)
|
||||||
|
@ -1,18 +1,18 @@
|
|||||||
from typing import AsyncIterable
|
from typing import AsyncIterable
|
||||||
|
|
||||||
|
from v6d3music.core.create_ytaudios import create_ytaudios
|
||||||
|
from v6d3music.core.entries_effects_for_args import entries_effects_for_args
|
||||||
|
from v6d3music.core.ytaudio import YTAudio
|
||||||
|
from v6d3music.utils.argctx import InfoCtx
|
||||||
|
|
||||||
from v6d2ctx.context import Context
|
from v6d2ctx.context import Context
|
||||||
|
|
||||||
from v6d3music.core.create_ytaudios import create_ytaudios
|
|
||||||
from v6d3music.core.ytaudio import YTAudio
|
|
||||||
from v6d3music.utils.entries_effects_for_args import entries_effects_for_args
|
|
||||||
from v6d3music.utils.info_tuple import info_tuple
|
|
||||||
|
|
||||||
|
|
||||||
async def yt_audios(ctx: Context, args: list[str]) -> AsyncIterable[YTAudio]:
|
async def yt_audios(ctx: Context, args: list[str]) -> AsyncIterable[YTAudio]:
|
||||||
tuples: list[info_tuple] = []
|
tuples: list[InfoCtx] = []
|
||||||
assert ctx.guild is not None
|
assert ctx.guild is not None
|
||||||
async for info, effects, already_read, tor in entries_effects_for_args(args, ctx.guild.id):
|
async for it in entries_effects_for_args(args, ctx.guild.id):
|
||||||
tuples.append((info, effects, already_read, tor))
|
tuples.append(it)
|
||||||
if len(tuples) >= 5:
|
if len(tuples) >= 5:
|
||||||
async for audio in create_ytaudios(ctx, tuples):
|
async for audio in create_ytaudios(ctx, tuples):
|
||||||
yield audio
|
yield audio
|
||||||
|
@ -13,7 +13,7 @@ from v6d3music.core.cache_url import cache_db
|
|||||||
from v6d3music.core.mainasrc import main_for_raw_vc, mainasrcs
|
from v6d3music.core.mainasrc import main_for_raw_vc, mainasrcs
|
||||||
from v6d3music.core.mainaudio import volume_db
|
from v6d3music.core.mainaudio import volume_db
|
||||||
from v6d3music.core.queueaudio import queue_db
|
from v6d3music.core.queueaudio import queue_db
|
||||||
from v6d3music.utils.entries_effects_for_args import effects_db
|
from v6d3music.core.entries_effects_for_args import effects_db
|
||||||
|
|
||||||
from rainbowadn.instrument import Instrumentation
|
from rainbowadn.instrument import Instrumentation
|
||||||
from v6d1tokens.client import request_token
|
from v6d1tokens.client import request_token
|
||||||
|
59
v6d3music/utils/argctx.py
Normal file
59
v6d3music/utils/argctx.py
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
from typing import AsyncIterable, Callable, Any
|
||||||
|
|
||||||
|
from v6d3music.utils.effects_for_preset import effects_for_preset
|
||||||
|
from v6d3music.utils.entries_for_url import entries_for_url
|
||||||
|
from v6d3music.utils.options_for_effects import options_for_effects
|
||||||
|
from v6d3music.utils.sparq import sparq
|
||||||
|
|
||||||
|
|
||||||
|
class InfoCtx:
|
||||||
|
def __init__(self, info: dict[str, Any], effects: str | None, already_read: int, tor: bool) -> None:
|
||||||
|
self.info = info
|
||||||
|
self.effects = effects
|
||||||
|
self.already_read = already_read
|
||||||
|
self.tor = tor
|
||||||
|
|
||||||
|
|
||||||
|
class UrlCtx:
|
||||||
|
def __init__(self, url: str, effects: str | None) -> None:
|
||||||
|
self.url = url
|
||||||
|
self.effects = effects
|
||||||
|
self.already_read = 0
|
||||||
|
self.tor = False
|
||||||
|
|
||||||
|
async def entries(self) -> AsyncIterable[InfoCtx]:
|
||||||
|
async for info in entries_for_url(self.url, self.tor):
|
||||||
|
yield InfoCtx(info, self.effects, self.already_read, self.tor)
|
||||||
|
|
||||||
|
|
||||||
|
class ArgCtx:
|
||||||
|
def __init__(self, default_effects: Callable[[int], str | None], args: list[str], gid: int) -> None:
|
||||||
|
self.sources: list[UrlCtx] = []
|
||||||
|
while args:
|
||||||
|
match args:
|
||||||
|
case [url, '-', effects, *args]:
|
||||||
|
pass
|
||||||
|
case [url, '+', preset, *args]:
|
||||||
|
effects = effects_for_preset(preset)
|
||||||
|
case [url, *args]:
|
||||||
|
effects = default_effects(gid)
|
||||||
|
case _:
|
||||||
|
raise RuntimeError
|
||||||
|
ctx = UrlCtx(url, effects)
|
||||||
|
seconds = 0
|
||||||
|
match args:
|
||||||
|
case [h, m, s, *args] if h.isdecimal() and m.isdecimal() and s.isdecimal():
|
||||||
|
seconds = 3600 * int(h) + 60 * int(m) + int(s)
|
||||||
|
case [m, s, *args] if m.isdecimal() and s.isdecimal():
|
||||||
|
seconds = 60 * int(m) + int(s)
|
||||||
|
case [s, *args] if s.isdecimal():
|
||||||
|
seconds = int(s)
|
||||||
|
case [*args]:
|
||||||
|
pass
|
||||||
|
ctx.already_read = round(seconds / sparq(options_for_effects(effects)))
|
||||||
|
match args:
|
||||||
|
case ['tor', *args]:
|
||||||
|
ctx.tor = True
|
||||||
|
case [*args]:
|
||||||
|
pass
|
||||||
|
self.sources.append(ctx)
|
@ -1,61 +0,0 @@
|
|||||||
from typing import AsyncIterable, Optional
|
|
||||||
|
|
||||||
from ptvp35 import Db, KVJson
|
|
||||||
from v6d2ctx.context import Explicit
|
|
||||||
|
|
||||||
from v6d3music.config import myroot
|
|
||||||
from v6d3music.utils.effects_for_preset import effects_for_preset
|
|
||||||
from v6d3music.utils.entries_for_url import entries_for_url
|
|
||||||
from v6d3music.utils.info_tuple import info_tuple
|
|
||||||
from v6d3music.utils.options_for_effects import options_for_effects
|
|
||||||
from v6d3music.utils.presets import allowed_effects
|
|
||||||
from v6d3music.utils.sparq import sparq
|
|
||||||
|
|
||||||
|
|
||||||
effects_db = Db(myroot / 'effects.db', kvfactory=KVJson())
|
|
||||||
|
|
||||||
|
|
||||||
def default_effects(gid: int) -> Optional[str]:
|
|
||||||
effects = effects_db.get(gid, None)
|
|
||||||
if effects in allowed_effects:
|
|
||||||
return effects
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
async def set_default_effects(gid: int, effects: Optional[str]) -> None:
|
|
||||||
if effects is not None and effects not in allowed_effects:
|
|
||||||
raise Explicit('these effects are not allowed')
|
|
||||||
await effects_db.set(gid, effects)
|
|
||||||
|
|
||||||
|
|
||||||
async def entries_effects_for_args(args: list[str], gid: int) -> AsyncIterable[info_tuple]:
|
|
||||||
while args:
|
|
||||||
match args:
|
|
||||||
case [url, '-', effects, *args]:
|
|
||||||
pass
|
|
||||||
case [url, '+', preset, *args]:
|
|
||||||
effects = effects_for_preset(preset)
|
|
||||||
case [url, *args]:
|
|
||||||
effects = default_effects(gid)
|
|
||||||
case _:
|
|
||||||
raise RuntimeError
|
|
||||||
seconds = 0
|
|
||||||
match args:
|
|
||||||
case [h, m, s, *args] if h.isdecimal() and m.isdecimal() and s.isdecimal():
|
|
||||||
seconds = 3600 * int(h) + 60 * int(m) + int(s)
|
|
||||||
case [m, s, *args] if m.isdecimal() and s.isdecimal():
|
|
||||||
seconds = 60 * int(m) + int(s)
|
|
||||||
case [s, *args] if s.isdecimal():
|
|
||||||
seconds = int(s)
|
|
||||||
case [*args]:
|
|
||||||
pass
|
|
||||||
already_read = round(seconds / sparq(options_for_effects(effects)))
|
|
||||||
tor = False
|
|
||||||
match args:
|
|
||||||
case ['tor', *args]:
|
|
||||||
tor = True
|
|
||||||
case [*args]:
|
|
||||||
pass
|
|
||||||
async for info in entries_for_url(url, tor):
|
|
||||||
yield info, effects, already_read, tor
|
|
@ -1,3 +0,0 @@
|
|||||||
from typing import Any, TypeAlias
|
|
||||||
|
|
||||||
info_tuple: TypeAlias = tuple[dict[str, Any], str | None, int, bool]
|
|
Loading…
Reference in New Issue
Block a user