import asyncio import random from collections import deque from io import StringIO from typing import MutableSequence import discord from ptvp35 import * from v6d2ctx.context import * from v6d3music.core.ytaservicing import * from v6d3music.core.ytaudio import * from v6d3music.utils.assert_admin import * from v6d3music.utils.fill import * __all__ = ('QueueAudio',) PRE_SET_LENGTH = 24 class QueueAudio(discord.AudioSource): def __init__(self, db: DbConnection, guild: discord.Guild, respawned: list[YTAudio]): self.db = db self.queue: deque[YTAudio] = deque() for audio in respawned: self.append(audio) self.guild = guild self.loop = asyncio.get_running_loop() def update_sources(self): for i in range(PRE_SET_LENGTH): try: self.queue[i].set_source_if_necessary() except IndexError: return @staticmethod async def respawned(servicing: YTAServicing, db: DbConnection, guild: discord.Guild) -> list[YTAudio]: respawned = [] try: for audio_respawn in db.get(guild.id, []): try: respawned.append(await YTAudio.respawn(servicing, guild, audio_respawn)) except Exception as e: print('audio respawn failed', e) raise except Exception as e: print('queue respawn failed', e) return respawned @classmethod async def create(cls, servicing: YTAServicing, db: DbConnection, guild: discord.Guild) -> 'QueueAudio': return cls(db, guild, await cls.respawned(servicing, db, guild)) async def save(self, delay: bool) -> None: hybernated = [] for audio in list(self.queue): if delay: await asyncio.sleep(0.01) hybernated.append(audio.hybernate()) self.db.set_nowait(self.guild.id, hybernated) def append(self, audio: YTAudio): if len(self.queue) < PRE_SET_LENGTH: audio.set_source_if_necessary() self.queue.append(audio) def _popleft(self, audio: YTAudio): if self.queue and self.queue[0] is audio: self.queue.popleft().cleanup() self.update_sources() def read(self) -> bytes: for i in range(len(self.queue)): try: audio = self.queue[i] except IndexError: break audio.set_source_if_necessary() frame = audio.read() if len(frame) == discord.opus.Encoder.FRAME_SIZE: return frame else: self.loop.call_soon_threadsafe(self._popleft, audio) return FILL def skip_at(self, pos: int, member: discord.Member) -> bool: if pos < len(self.queue): audio = self.queue[pos] if audio.can_be_skipped_by(member): self.queue.remove(audio) audio.cleanup() return True return False def skip_audio(self, audio: YTAudio, member: discord.Member) -> bool: if audio in self.queue: if audio.can_be_skipped_by(member): self.queue.remove(audio) audio.cleanup() return True self.update_sources() return False def clear(self, member: discord.Member) -> None: assert_admin(member) to_clean = list(self.queue) self.queue.clear() for audio in to_clean: try: audio.cleanup() except ValueError: pass def swap(self, member: discord.Member, a: int, b: int) -> None: assert_admin(member) if max(a, b) >= len(self.queue): return self.queue[a], self.queue[b] = self.queue[b], self.queue[a] self.update_sources() def move(self, member: discord.Member, a: int, b: int) -> None: assert_admin(member) if max(a, b) >= len(self.queue): return audio = self.queue[a] self.queue.remove(audio) self.queue.insert(b, audio) self.update_sources() async def format(self) -> str: stream = StringIO() for i, audio in enumerate(lst := list(self.queue)): if i >= (n := 100): stream.write(f'cutting queue at {n} results, {len(lst) - n} remaining.\n') break stream.write(f'`[{i}]` `{audio.source_timecode()} / {audio.duration()}` {audio.description}\n') return stream.getvalue() def cleanup(self): for audio in self.queue: try: audio.cleanup() except ValueError: pass async def pubjson(self, member: discord.Member, limit: int) -> list: import random audios = list(self.queue) return [await audio.pubjson(member) for audio, _ in zip(audios, range(limit))] def repeat(self, n: int) -> None: if not self.queue: raise Explicit('empty queue') if n > 99: raise Explicit('too long') audio = self.queue[0] for _ in range(n): self.queue.insert(1, audio.copy()) self.update_sources() def shuffle(self) -> None: try: random.shuffle(ForwardView(self.queue)) except: from traceback import print_exc print_exc() self.update_sources() class ForwardView(MutableSequence[YTAudio]): def __init__(self, sequence: MutableSequence[YTAudio]) -> None: self.sequence = sequence def __len__(self) -> int: return max(0, self.sequence.__len__() - 1) def __setitem__(self, index: int, value: YTAudio) -> None: self.sequence.__setitem__(index + 1, value) def __getitem__(self, index: int) -> YTAudio: return self.sequence.__getitem__(index + 1) def __delitem__(self, index: int | slice) -> None: self.sequence.__delitem__(index) def insert(self, index: int, value: YTAudio) -> None: self.sequence.insert(index, value)