405 lines
13 KiB
Python
405 lines
13 KiB
Python
import shlex
|
|
from typing import Callable
|
|
|
|
import discord
|
|
|
|
from v6d2ctx.at_of import AtOf
|
|
from v6d2ctx.context import Context, Explicit, command_type
|
|
from v6d3music.core.mainservice import MainService
|
|
from v6d3music.utils.assert_admin import assert_admin
|
|
from v6d3music.utils.catch import catch
|
|
from v6d3music.utils.effects_for_preset import effects_for_preset
|
|
from v6d3music.utils.presets import allowed_presets
|
|
|
|
__all__ = ("get_of",)
|
|
|
|
|
|
def get_of(mainservice: MainService) -> Callable[[str], command_type]:
|
|
at_of: AtOf[str, command_type] = AtOf()
|
|
at, of = at_of()
|
|
|
|
@at("?")
|
|
@at("help")
|
|
@at("/help")
|
|
async def help_(ctx: Context, args: list[str]) -> None:
|
|
match args:
|
|
case []:
|
|
await ctx.reply("music bot\nhttps://music.parrrate.ru/docs/usage.html")
|
|
case [name]:
|
|
await ctx.reply(f"help for {name}: `{name} help`")
|
|
|
|
@at("/")
|
|
@at("p")
|
|
@at("play")
|
|
async def play(ctx: Context, args: list[str]) -> None:
|
|
await catch(
|
|
ctx,
|
|
args,
|
|
f"""
|
|
`play ...args`
|
|
`play url [- effects]/[+ preset] [[[h]]] [[m]] [s] [ignore] ...args`
|
|
`pause`
|
|
`resume`
|
|
presets: {shlex.join(allowed_presets)}
|
|
""",
|
|
(),
|
|
"help",
|
|
)
|
|
match args:
|
|
case ["this", *args]:
|
|
reference = ctx.message.reference
|
|
if reference is None:
|
|
raise Explicit("use reply")
|
|
resolved = reference.resolved
|
|
if not isinstance(resolved, discord.Message):
|
|
raise Explicit("reference message is either deleted or cannot be found")
|
|
attachments = resolved.attachments
|
|
case [*args]:
|
|
attachments = ctx.message.attachments
|
|
case _:
|
|
raise RuntimeError
|
|
async with mainservice.lock_for(ctx.guild):
|
|
queue = await mainservice.context(ctx, create=True, force_play=False).queue()
|
|
if attachments:
|
|
args = ["[[", *(attachment.url for attachment in attachments), "]]"] + args
|
|
added = 0
|
|
async for audio in mainservice.audios(ctx, args):
|
|
queue.append(audio)
|
|
added += 1
|
|
await ctx.reply(f"added track(s): {added}")
|
|
|
|
@at("cancel")
|
|
async def cancel(ctx: Context, _args: list[str]) -> None:
|
|
mainservice.cancel_loading(ctx)
|
|
|
|
@at("skip")
|
|
@at("/skip")
|
|
async def skip(ctx: Context, args: list[str]) -> None:
|
|
await catch(
|
|
ctx,
|
|
args,
|
|
"""
|
|
`skip [first] [last]`
|
|
""",
|
|
"help",
|
|
)
|
|
assert ctx.member is not None
|
|
match args:
|
|
case []:
|
|
queue = await mainservice.context(ctx, create=False, force_play=False).queue()
|
|
queue.skip_at(0, ctx.member)
|
|
case [pos] if pos.isdecimal():
|
|
pos = int(pos)
|
|
queue = await mainservice.context(ctx, create=False, force_play=False).queue()
|
|
queue.skip_at(pos, ctx.member)
|
|
case [pos0, pos1] if pos0.isdecimal() and pos1.isdecimal():
|
|
pos0, pos1 = int(pos0), int(pos1)
|
|
queue = await mainservice.context(ctx, create=False, force_play=False).queue()
|
|
queue.skip_between(pos0, pos1, ctx.member)
|
|
case _:
|
|
raise Explicit("misformatted")
|
|
await ctx.reply("done")
|
|
|
|
@at("to")
|
|
async def skip_to(ctx: Context, args: list[str]) -> None:
|
|
await catch(
|
|
ctx,
|
|
args,
|
|
"""
|
|
`to [[h]] [m] s`
|
|
""",
|
|
"help",
|
|
)
|
|
match args:
|
|
case [h, m, s, *args] if h.isdecimal() and m.isdecimal() and s.isdecimal():
|
|
seconds = 3600 * int(h) + 60 * int(m) + int(s)
|
|
case [m, s, *args] if m.isdecimal() and s.isdecimal():
|
|
seconds = 60 * int(m) + int(s)
|
|
case [s, *args] if s.isdecimal():
|
|
seconds = int(s)
|
|
case _:
|
|
raise Explicit("misformatted, expected time")
|
|
match args:
|
|
case ["at", spos] if spos.isdecimal():
|
|
pos = int(spos)
|
|
case []:
|
|
pos = 0
|
|
case _:
|
|
raise Explicit("misformatted, expected position")
|
|
assert_admin(ctx.member)
|
|
queue = await mainservice.context(ctx, create=False, force_play=False).queue()
|
|
queue.queue[pos].set_seconds(seconds)
|
|
|
|
@at("effects")
|
|
async def effects_(ctx: Context, args: list[str]) -> None:
|
|
await catch(
|
|
ctx,
|
|
args,
|
|
"""
|
|
`effects - effects`
|
|
`effects + preset`
|
|
""",
|
|
"help",
|
|
)
|
|
match args:
|
|
case ["-", effects]:
|
|
pass
|
|
case ["+", preset]:
|
|
effects = effects_for_preset(preset)
|
|
case _:
|
|
raise Explicit("misformatted")
|
|
assert_admin(ctx.member)
|
|
queue = await mainservice.context(ctx, create=False, force_play=False).queue()
|
|
queue.queue[0].set_effects(effects)
|
|
|
|
@at("default")
|
|
async def default(ctx: Context, args: list[str]) -> None:
|
|
await catch(
|
|
ctx,
|
|
args,
|
|
"""
|
|
`default - effects`
|
|
`default + preset`
|
|
`default none`
|
|
""",
|
|
"help",
|
|
)
|
|
assert ctx.guild is not None
|
|
match args:
|
|
case ["-", effects]:
|
|
pass
|
|
case ["+", preset]:
|
|
effects = effects_for_preset(preset)
|
|
case ["none"]:
|
|
effects = None
|
|
case []:
|
|
await ctx.reply(f"current default effects: {mainservice.defaulteffects.get(ctx.guild.id)}")
|
|
return
|
|
case _:
|
|
raise Explicit("misformatted")
|
|
assert_admin(ctx.member)
|
|
await mainservice.defaulteffects.set(ctx.guild.id, effects)
|
|
await ctx.reply(f"effects set to `{effects}`")
|
|
|
|
@at("repeat")
|
|
async def repeat(ctx: Context, args: list[str]):
|
|
match args:
|
|
case ["x", n_, *args] if n_.isdecimal():
|
|
n = int(n_)
|
|
case [*args]:
|
|
n = 1
|
|
case _:
|
|
raise RuntimeError
|
|
match args:
|
|
case ["at", p_, *args] if p_.isdecimal():
|
|
p = int(p_)
|
|
case [*args]:
|
|
p = 0
|
|
case _:
|
|
raise RuntimeError
|
|
match args:
|
|
case ["to", t_, *args] if t_.isdecimal():
|
|
t = int(t_)
|
|
case ["to", "end"]:
|
|
t = None
|
|
case [*args]:
|
|
t = p + 1
|
|
case _:
|
|
raise RuntimeError
|
|
match args:
|
|
case []:
|
|
pass
|
|
case _:
|
|
raise Explicit("misformatted (extra arguments)")
|
|
assert_admin(ctx.member)
|
|
queue = await mainservice.context(ctx, create=False, force_play=False).queue()
|
|
queue.repeat(n, p, t)
|
|
|
|
@at("shuffle")
|
|
async def shuffle(ctx: Context, args: list[str]):
|
|
assert_admin(ctx.member)
|
|
queue = await mainservice.context(ctx, create=False, force_play=False).queue()
|
|
queue.shuffle()
|
|
|
|
@at("branch")
|
|
async def branch(ctx: Context, args: list[str]):
|
|
match args:
|
|
case ["-", effects]:
|
|
pass
|
|
case ["+", preset]:
|
|
effects = effects_for_preset(preset)
|
|
case ["none"]:
|
|
effects = ""
|
|
case []:
|
|
effects = None
|
|
case _:
|
|
raise Explicit("misformatted")
|
|
assert_admin(ctx.member)
|
|
queue = await mainservice.context(ctx, create=False, force_play=False).queue()
|
|
queue.branch(effects)
|
|
|
|
@at("//")
|
|
@at("queue")
|
|
@at("/queue")
|
|
async def queue_(ctx: Context, args: list[str]) -> None:
|
|
await catch(
|
|
ctx,
|
|
args,
|
|
"""
|
|
`queue`
|
|
`queue clear`
|
|
`queue resume`
|
|
`queue pause`
|
|
""",
|
|
"help",
|
|
)
|
|
assert ctx.member is not None
|
|
match args:
|
|
case []:
|
|
limit = 24
|
|
case [lstr] if lstr.isdecimal():
|
|
limit = int(lstr)
|
|
case ["tail", lstr] if lstr.isdecimal():
|
|
limit = -int(lstr)
|
|
if limit >= 0:
|
|
raise Explicit("limit of at least `1` required")
|
|
case ["clear"]:
|
|
(await mainservice.context(ctx, create=False, force_play=False).queue()).clear(ctx.member)
|
|
await ctx.reply("done")
|
|
return
|
|
case ["resume"]:
|
|
async with mainservice.lock_for(ctx.guild):
|
|
await mainservice.context(ctx, create=True, force_play=True).vc()
|
|
await ctx.reply("done")
|
|
return
|
|
case ["pause"]:
|
|
async with mainservice.lock_for(ctx.guild):
|
|
vc = await mainservice.context(ctx, create=True, force_play=False).vc()
|
|
vc.pause()
|
|
await ctx.reply("done")
|
|
return
|
|
case _:
|
|
raise Explicit("misformatted")
|
|
await ctx.long(
|
|
(await (await mainservice.context(ctx, create=True, force_play=False).queue()).format(limit)).strip()
|
|
or "no queue"
|
|
)
|
|
|
|
@at("np")
|
|
@at("cp")
|
|
@at("nowplaying")
|
|
@at("playing")
|
|
@at("//1")
|
|
@at("queue1")
|
|
async def queue1(ctx: Context, _args: list[str]) -> None:
|
|
await ctx.long(
|
|
(await (await mainservice.context(ctx, create=True, force_play=False).queue()).format(1)).strip()
|
|
or "no queue"
|
|
)
|
|
|
|
@at("swap")
|
|
async def swap(ctx: Context, args: list[str]) -> None:
|
|
await catch(
|
|
ctx,
|
|
args,
|
|
"""
|
|
`swap a b`
|
|
""",
|
|
"help",
|
|
)
|
|
assert ctx.member is not None
|
|
match args:
|
|
case [a, b] if a.isdecimal() and b.isdecimal():
|
|
a, b = int(a), int(b)
|
|
(await mainservice.context(ctx, create=False, force_play=False).queue()).swap(ctx.member, a, b)
|
|
case _:
|
|
raise Explicit("misformatted")
|
|
|
|
@at("move")
|
|
async def move(ctx: Context, args: list[str]) -> None:
|
|
await catch(
|
|
ctx,
|
|
args,
|
|
"""
|
|
`move a b`
|
|
""",
|
|
"help",
|
|
)
|
|
assert ctx.member is not None
|
|
match args:
|
|
case [a, b] if a.isdecimal() and b.isdecimal():
|
|
a, b = int(a), int(b)
|
|
(await mainservice.context(ctx, create=False, force_play=False).queue()).move(ctx.member, a, b)
|
|
case _:
|
|
raise Explicit("misformatted")
|
|
|
|
@at("volume")
|
|
async def volume_(ctx: Context, args: list[str]) -> None:
|
|
await catch(
|
|
ctx,
|
|
args,
|
|
"""
|
|
`volume volume`
|
|
""",
|
|
"help",
|
|
)
|
|
assert ctx.member is not None
|
|
match args:
|
|
case [svolume]:
|
|
volume = float(svolume)
|
|
await (await mainservice.context(ctx, create=True, force_play=False).main()).set(volume, ctx.member)
|
|
case []:
|
|
volume = (await mainservice.context(ctx, create=True, force_play=False).main()).get()
|
|
await ctx.reply(f"volume is {volume}")
|
|
case _:
|
|
raise Explicit("misformatted")
|
|
|
|
@at("||")
|
|
@at("⏸")
|
|
@at("⏸️")
|
|
@at("pause")
|
|
@at("/pause")
|
|
async def pause(ctx: Context, _args: list[str]) -> None:
|
|
vc = await mainservice.context(ctx, create=False, force_play=False).vc()
|
|
vc.pause()
|
|
|
|
@at("|>")
|
|
@at(">")
|
|
@at("⏵")
|
|
@at("resume")
|
|
@at("/resume")
|
|
async def resume(ctx: Context, _args: list[str]) -> None:
|
|
vc = await mainservice.context(ctx, create=False, force_play=True).vc()
|
|
vc.resume()
|
|
|
|
@at(".")
|
|
@at("⏯")
|
|
@at("⏯️")
|
|
@at("pp")
|
|
@at("playpause")
|
|
async def playpause(ctx: Context, _args: list[str]) -> None:
|
|
vc = await mainservice.context(ctx, create=False, force_play=True).vc()
|
|
if vc.is_paused():
|
|
vc.resume()
|
|
else:
|
|
vc.pause()
|
|
|
|
@at("leave")
|
|
@at("/leave")
|
|
async def leave(ctx: Context, _args: list[str]) -> None:
|
|
async with mainservice.lock_for(ctx.guild):
|
|
vc, main = await mainservice.context(ctx, create=False, force_play=False).vc_main()
|
|
queue = main.queue
|
|
if queue.queue:
|
|
raise Explicit("queue not empty")
|
|
await vc.disconnect()
|
|
|
|
# @at('vtest')
|
|
# async def vtest(ctx: Context, _args: list[str]) -> None:
|
|
# guild: discord.Guild = ctx.client.get_guild(914448749873479692)
|
|
# channel: discord.VoiceChannel = guild.get_channel(914448750414540843)
|
|
# await channel.connect()
|
|
# print('connected')
|
|
|
|
return of
|