import asyncio import traceback from contextlib import AsyncExitStack from typing import AsyncIterable, TypeVar import discord from v6d3music.config import myroot from v6d3music.core.caching import * from v6d3music.core.default_effects import * from v6d3music.core.mainaudio import * from v6d3music.core.queueaudio import * from v6d3music.core.ystate import * from v6d3music.core.ytaservicing import * from v6d3music.core.ytaudio import * from v6d3music.processing.pool import * from v6d3music.utils.argctx import * from ptvp35 import * from v6d2ctx.context import Context, Explicit from v6d2ctx.lock_for import lock_for T = TypeVar('T') class MainService: def __init__(self, defaulteffects: DefaultEffects, client: discord.Client) -> None: self.defaulteffects = defaulteffects self.client = client self.mains: dict[discord.Guild, MainAudio] = {} self.restore_lock = asyncio.Lock() @staticmethod async def raw_vc_for_member(member: discord.Member) -> discord.VoiceClient: vc: discord.VoiceProtocol | None = member.guild.voice_client if vc is None or isinstance(vc, discord.VoiceClient) and not vc.is_connected(): vs: discord.VoiceState | None = member.voice if vs is None: raise Explicit('not connected') vch: discord.abc.Connectable | None = vs.channel if vch is None: raise Explicit('not connected') try: vc = await vch.connect() except discord.ClientException: vc = member.guild.voice_client assert vc is not None await member.guild.fetch_channels() await vc.disconnect(force=True) raise Explicit('try again later') assert isinstance(vc, discord.VoiceClient) return vc async def raw_vc_for(self, ctx: Context) -> discord.VoiceClient: if ctx.member is None: raise Explicit('not in a guild') return await self.raw_vc_for_member(ctx.member) def descriptor(self, *, create: bool, force_play: bool) -> 'MainDescriptor': return MainDescriptor(self, create=create, force_play=force_play) def context(self, ctx: Context, *, create: bool, force_play: bool) -> 'MainContext': return self.descriptor(create=create, force_play=force_play).context(ctx) async def create(self, guild: discord.Guild) -> MainAudio: return await MainAudio.create(self.__servicing, self.__volumes, self.__queues, guild) async def __aenter__(self) -> 'MainService': async with AsyncExitStack() as es: self.__volumes = await es.enter_async_context(DbFactory(myroot / 'volume.db', kvfactory=KVJson())) self.__queues = await es.enter_async_context(DbFactory(myroot / 'queue.db', kvfactory=KVJson())) self.__caching = await es.enter_async_context(Caching()) self.__pool = await es.enter_async_context(Pool(5)) self.__servicing = YTAServicing(self.__caching, self.__pool) self.__vcs_restored: asyncio.Future[None] = asyncio.Future() self.__save_task = asyncio.create_task(self.save_daemon()) self.__es = es.pop_all() return self async def __aexit__(self, exc_type, exc_val, exc_tb): async with self.__es: await self.final_save() del self.__es async def save_queues(self, delay: bool) -> None: for mainasrc in list(self.mains.values()): if delay: await asyncio.sleep(0.01) await mainasrc.queue.save(delay) async def save_vcs(self, delay: bool) -> None: vcs = [] vc: discord.VoiceClient for vc in (vcc for vcc in self.client.voice_clients if isinstance(vcc, discord.VoiceClient)): if delay: await asyncio.sleep(0.01) if vc.is_playing(): if vc.guild is not None and vc.channel is not None: vcs.append((vc.guild.id, vc.channel.id, vc.is_paused())) self.__queues.set_nowait('vcs', vcs) async def save_commit(self) -> None: await self.__queues.commit() async def _save_all(self, delay: bool, save_playing: bool) -> None: await self.save_queues(delay) if save_playing: await self.save_vcs(delay) await self.save_commit() async def save_all(self, delay: bool, save_playing: bool) -> None: await self._save_all(delay, save_playing) async def save_job(self): await self.__vcs_restored print('starting saving') while True: await asyncio.sleep(1) await self.save_all(True, not self.client.is_closed()) async def save_daemon(self): try: await self.save_job() except asyncio.CancelledError: pass async def final_save(self): self.__save_task.cancel() if not self.__vcs_restored.done(): self.__vcs_restored.cancel() else: try: await self.save_all(False, False) print('saved') except Exception: traceback.print_exc() async def _restore_vc(self, guild: discord.Guild, vccid: int, vc_is_paused: bool) -> None: channels = await guild.fetch_channels() channel: discord.VoiceChannel channel, = [ ch for ch in ( chc for chc in channels if isinstance(chc, discord.VoiceChannel) ) if ch.id == vccid ] vp: discord.VoiceProtocol = await channel.connect() assert isinstance(vp, discord.VoiceClient) vc = vp await self.descriptor(create=True, force_play=True).main_for_raw_vc(vc) if vc_is_paused: vc.pause() async def restore_vc(self, vcgid: int, vccid: int, vc_is_paused: bool) -> None: try: print(f'vc restoring {vcgid}') guild: discord.Guild = await self.client.fetch_guild(vcgid) async with lock_for(guild, 'not in a guild'): await self._restore_vc(guild, vccid, vc_is_paused) except Exception as e: print(f'vc {vcgid} {vccid} {vc_is_paused} failed', e) else: print(f'vc restored {vcgid} {vccid}') async def restore_vcs(self) -> None: vcs: list[tuple[int, int, bool]] = self.__queues.get('vcs', []) try: tasks = [] for vcgid, vccid, vc_is_paused in vcs: tasks.append(asyncio.create_task(self.restore_vc(vcgid, vccid, vc_is_paused))) for task in tasks: await task finally: self.__vcs_restored.set_result(None) async def restore(self) -> None: async with self.restore_lock: if not self.__vcs_restored.done(): await self.restore_vcs() async def yt_audios(self, ctx: Context, args: list[str]) -> AsyncIterable[YTAudio]: assert ctx.guild is not None argctx = ArgCtx(self.defaulteffects.get(ctx.guild.id), args) async for audio in YState(self.__servicing, self.__pool, ctx, argctx.sources).iterate(): yield audio class MainDescriptor: def __init__(self, service: MainService, *, create: bool, force_play: bool) -> None: self.mainservice = service self.mains = service.mains self.create = create self.force_play = force_play async def main_for_raw_vc(self, vc: discord.VoiceClient) -> MainAudio: if vc.guild in self.mains: source = self.mains[vc.guild] elif self.create: source = self.mains.setdefault( vc.guild, await self.mainservice.create(vc.guild) ) else: raise Explicit('not playing, use `queue pause` or `queue resume`') if vc.source != source or self.create and not vc.is_playing() and (self.force_play or not vc.is_paused()): vc.play(source) return source def context(self, ctx: Context) -> 'MainContext': return MainContext(self, ctx) class MainContext: def __init__(self, descriptor: MainDescriptor, ctx: Context) -> None: self.mainservice = descriptor.mainservice self.descriptor = descriptor self.ctx = ctx async def vc_main(self) -> tuple[discord.VoiceClient, MainAudio]: vc = await self.mainservice.raw_vc_for(self.ctx) return vc, await self.descriptor.main_for_raw_vc(vc) async def vc(self) -> discord.VoiceClient: vc, _ = await self.vc_main() return vc async def main(self) -> MainAudio: _, source = await self.vc_main() return source async def queue(self) -> QueueAudio: return (await self.main()).queue