v6d3music/v6d3music/core/ytaudio.py
2022-11-01 18:35:10 +00:00

188 lines
5.6 KiB
Python

import asyncio
import random
import subprocess
from typing import Optional
import discord
from v6d2ctx.context import Benchmark
from v6d3music.core.ffmpegnormalaudio import FFmpegNormalAudio
from v6d3music.core.real_url import real_url
from v6d3music.utils.fill import FILL
from v6d3music.utils.sparq import sparq
from v6d3music.utils.tor_prefix import tor_prefix
class YTAudio(discord.AudioSource):
source: FFmpegNormalAudio
def __init__(
self,
url: str,
origin: str,
description: str,
options: Optional[str],
rby: discord.Member,
already_read: int,
tor: bool,
):
self.url = url
self.origin = origin
self.description = description
self.options = options
self.rby = rby
self.already_read = already_read
self.tor = tor
self.regenerating = False
# self.set_source()
self._durations: dict[str, str] = {}
self.loop = asyncio.get_running_loop()
def set_source_if_necessary(self):
if not hasattr(self, 'source'):
self.set_source()
def set_source(self):
self.schedule_duration_update()
self.source = FFmpegNormalAudio(
self.url,
options=self.options,
before_options=self.before_options(),
tor=self.tor
)
def set_already_read(self, already_read: int):
self.already_read = already_read
self.set_source()
def set_seconds(self, seconds: float):
self.set_already_read(round(seconds / sparq(self.options)))
def source_seconds(self) -> float:
return self.already_read * sparq(self.options)
def source_timecode(self) -> str:
seconds = round(self.source_seconds())
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
return f'{hours}:{minutes:02d}:{seconds:02d}'
def _schedule_duration_update(self):
self.loop.create_task(self.update_duration())
def schedule_duration_update(self):
self.loop.call_soon_threadsafe(self._schedule_duration_update)
async def update_duration(self):
url: str = self.url
if url in self._durations:
return
self._durations.setdefault(url, '')
if self.tor:
args = [*tor_prefix()]
else:
args = []
args += [
'ffprobe', '-i', url,
'-show_entries', 'format=duration',
'-v', 'quiet',
'-of', 'default=noprint_wrappers=1:nokey=1',
'-sexagesimal'
]
p = subprocess.Popen(
args,
stdout=subprocess.PIPE
)
with Benchmark('FFP'):
code = await self.loop.run_in_executor(None, p.wait)
if code:
pass
else:
assert p.stdout is not None
self._durations[url] = p.stdout.read().decode().strip().split('.')[0]
def duration(self) -> str:
duration = self._durations.get(self.url)
if duration is None:
self.schedule_duration_update()
return duration or '?:??:??'
def before_options(self):
before_options = ''
if 'https' in self.url:
before_options += (
'-reconnect 1 -reconnect_at_eof 0 -reconnect_streamed 1 -reconnect_delay_max 10 -copy_unknown'
)
if self.already_read:
before_options += (
f' -ss {self.source_seconds()}'
)
return before_options
def read(self) -> bytes:
if self.regenerating:
return FILL
self.already_read += 1
ret: bytes = self.source.read()
if not ret and not self.source.droppable():
if random.random() > .1:
self.regenerating = True
self.loop.create_task(self.regenerate())
return FILL
else:
print(f'dropped {self.origin}')
return ret
def cleanup(self):
if hasattr(self, 'source'):
self.source.cleanup()
def can_be_skipped_by(self, member: discord.Member) -> bool:
permissions: discord.Permissions = member.guild_permissions
if permissions.administrator:
return True
elif permissions.manage_permissions:
return True
elif permissions.manage_guild:
return True
elif permissions.manage_channels:
return True
elif permissions.manage_messages:
return True
else:
return self.rby == member
def hybernate(self):
return {
'url': self.url,
'origin': self.origin,
'description': self.description,
'options': self.options,
'rby': self.rby.id,
'already_read': self.already_read,
'tor': self.tor,
}
@classmethod
async def respawn(cls, guild: discord.Guild, respawn) -> 'YTAudio':
return YTAudio(
respawn['url'],
respawn['origin'],
respawn['description'],
respawn['options'],
guild.get_member(respawn['rby']) or await guild.fetch_member(respawn['rby']),
respawn['already_read'],
respawn.get('tor', False)
)
async def regenerate(self):
try:
print(f'regenerating {self.origin}')
self.url = await real_url(self.origin, True, self.tor)
if hasattr(self, 'source'):
self.source.cleanup()
self.set_source()
print(f'regenerated {self.origin}')
finally:
self.regenerating = False