v6d3music/v6d3music/commands.py

289 lines
8.3 KiB
Python

import shlex
from v6d2ctx.context import Context, Explicit, at
from v6d2ctx.lock_for import lock_for
from v6d3music.core.mainasrc import main_for, queue_for, vc_for
from v6d3music.core.yt_audios import yt_audios
from v6d3music.utils.assert_admin import assert_admin
from v6d3music.utils.catch import catch
from v6d3music.utils.effects_for_preset import effects_for_preset
from v6d3music.utils.entries_effects_for_args import default_effects, set_default_effects
from v6d3music.utils.options_for_effects import options_for_effects
from v6d3music.utils.presets import allowed_presets
@at('commands', 'help')
async def help_(ctx: Context, args: list[str]) -> None:
match args:
case []:
await ctx.reply('music bot')
case [name]:
await ctx.reply(f'help for {name}: `{name} help`')
@at('commands', '/')
@at('commands', 'play')
async def play(ctx: Context, args: list[str]) -> None:
await catch(
ctx, args,
f'''
`play ...args`
`play url [- effects]/[+ preset] [[[h]]] [[m]] [s] [tor] ...args`
`pause`
`resume`
presets: {shlex.join(allowed_presets)}
''',
(), 'help'
)
async with lock_for(ctx.guild, 'not in a guild'):
queue = await queue_for(ctx, create=True, force_play=False)
if ctx.message.attachments:
if len(ctx.message.attachments) > 1:
raise Explicit('no more than one attachment')
args = [ctx.message.attachments[0].url] + args
async for audio in yt_audios(ctx, args):
queue.append(audio)
await ctx.reply('done')
@at('commands', 'skip')
async def skip(ctx: Context, args: list[str]) -> None:
await catch(
ctx, args, '''
`skip [first] [last]`
''', 'help'
)
assert ctx.member is not None
match args:
case []:
queue = await queue_for(ctx, create=False, force_play=False)
queue.skip_at(0, ctx.member)
case [pos] if pos.isdecimal():
pos = int(pos)
queue = await queue_for(ctx, create=False, force_play=False)
queue.skip_at(pos, ctx.member)
case [pos0, pos1] if pos0.isdecimal() and pos1.isdecimal():
pos0, pos1 = int(pos0), int(pos1)
queue = await queue_for(ctx, create=False, force_play=False)
for i in range(pos0, pos1 + 1):
if not queue.skip_at(pos0, ctx.member):
pos0 += 1
case _:
raise Explicit('misformatted')
await ctx.reply('done')
@at('commands', 'to')
async def skip_to(ctx: Context, args: list[str]) -> None:
await catch(
ctx, args, '''
`to [[h]] [m] s`
''', 'help'
)
match args:
case [h, m, s] if h.isdecimal() and m.isdecimal() and s.isdecimal():
seconds = 3600 * int(h) + 60 * int(m) + int(s)
case [m, s] if m.isdecimal() and s.isdecimal():
seconds = 60 * int(m) + int(s)
case [s] if s.isdecimal():
seconds = int(s)
case _:
raise Explicit('misformatted')
queue = await queue_for(ctx, create=False, force_play=False)
queue.queue[0].set_seconds(seconds)
@at('commands', 'effects')
async def effects_(ctx: Context, args: list[str]) -> None:
await catch(
ctx, args, '''
`effects - effects`
`effects + preset`
''', 'help'
)
match args:
case ['-', effects]:
pass
case ['+', preset]:
effects = effects_for_preset(preset)
case _:
raise Explicit('misformatted')
assert_admin(ctx.member)
queue = await queue_for(ctx, create=False, force_play=False)
yta = queue.queue[0]
seconds = yta.source_seconds()
yta.options = options_for_effects(effects)
yta.set_seconds(seconds)
@at('commands', 'default')
async def default(ctx: Context, args: list[str]) -> None:
await catch(
ctx, args, '''
`default - effects`
`default + preset`
`default none`
''', 'help'
)
assert ctx.guild is not None
match args:
case ['-', effects]:
pass
case ['+', preset]:
effects = effects_for_preset(preset)
case ['none']:
effects = None
case []:
await ctx.reply(f'current default effects: {default_effects(ctx.guild.id)}')
return
case _:
raise Explicit('misformatted')
assert_admin(ctx.member)
await set_default_effects(ctx.guild.id, effects)
await ctx.reply(f'effects set to `{effects}`')
@at('commands', 'repeat')
async def repeat(ctx: Context, args: list[str]):
match args:
case []:
n = 1
case [n_] if n_.isdecimal():
n = int(n_)
case _:
raise Explicit('misformatted')
assert_admin(ctx.member)
queue = await queue_for(ctx, create=False, force_play=False)
if not queue.queue:
raise Explicit('empty queue')
if n > 99:
raise Explicit('too long')
audio = queue.queue[0]
for _ in range(n):
queue.queue.insert(1, audio.copy())
@at('commands', 'branch')
async def branch(ctx: Context, args: list[str]):
match args:
case ['-', effects]:
pass
case ['+', preset]:
effects = effects_for_preset(preset)
case ['none']:
effects = ''
case []:
effects = None
case _:
raise Explicit('misformatted')
assert_admin(ctx.member)
queue = await queue_for(ctx, create=False, force_play=False)
if not queue.queue:
raise Explicit('empty queue')
audio = queue.queue[0].branch()
if effects is not None:
seconds = audio.source_seconds()
audio.options = options_for_effects(effects or None)
audio.set_seconds(seconds)
else:
audio.set_source()
queue.queue.insert(1, audio)
@at('commands', '//')
@at('commands', 'queue')
async def queue_(ctx: Context, args: list[str]) -> None:
await catch(
ctx, args, '''
`queue`
`queue clear`
`queue resume`
`queue pause`
''', 'help'
)
assert ctx.member is not None
match args:
case []:
await ctx.long(
(await (await queue_for(ctx, create=True, force_play=False)).format()).strip() or 'no queue'
)
case ['clear']:
(await queue_for(ctx, create=False, force_play=False)).clear(ctx.member)
await ctx.reply('done')
case ['resume']:
async with lock_for(ctx.guild, 'not in a guild'):
await queue_for(ctx, create=True, force_play=True)
await ctx.reply('done')
case ['pause']:
async with lock_for(ctx.guild, 'not in a guild'):
vc = await vc_for(ctx, create=True, force_play=False)
vc.pause()
await ctx.reply('done')
case _:
raise Explicit('misformatted')
@at('commands', 'swap')
async def swap(ctx: Context, args: list[str]) -> None:
await catch(
ctx, args, '''
`swap a b`
''', 'help'
)
assert ctx.member is not None
match args:
case [a, b] if a.isdecimal() and b.isdecimal():
a, b = int(a), int(b)
(await queue_for(ctx, create=False, force_play=False)).swap(ctx.member, a, b)
case _:
raise Explicit('misformatted')
@at('commands', 'move')
async def move(ctx: Context, args: list[str]) -> None:
await catch(
ctx, args, '''
`move a b`
''', 'help'
)
assert ctx.member is not None
match args:
case [a, b] if a.isdecimal() and b.isdecimal():
a, b = int(a), int(b)
(await queue_for(ctx, create=False, force_play=False)).move(ctx.member, a, b)
case _:
raise Explicit('misformatted')
@at('commands', 'volume')
async def volume_(ctx: Context, args: list[str]) -> None:
await catch(
ctx, args, '''
`volume volume`
''', 'help'
)
assert ctx.member is not None
match args:
case [volume]:
volume = float(volume)
await (await main_for(ctx, create=True, force_play=False)).set(volume, ctx.member)
case _:
raise Explicit('misformatted')
@at('commands', 'pause')
async def pause(ctx: Context, _args: list[str]) -> None:
vc = await vc_for(ctx, create=False, force_play=False)
vc.pause()
@at('commands', 'resume')
async def resume(ctx: Context, _args: list[str]) -> None:
vc = await vc_for(ctx, create=False, force_play=True)
vc.resume()
def register_commands():
pass