v6d3music/v6d3music/core/ytaudio.py

307 lines
9.6 KiB
Python

import asyncio
import random
import re
import traceback
from typing import Optional
import discord
from v6d2ctx.context import *
from v6d3music.core.ffmpegnormalaudio import *
from v6d3music.core.real_url import *
from v6d3music.core.ytaservicing import *
from v6d3music.processing.abstractrunner import *
from v6d3music.utils.fill import *
from v6d3music.utils.options_for_effects import *
from v6d3music.utils.sparq import *
from v6d3music.utils.tor_prefix import *
__all__ = ("YTAudio",)
class YTAudio(discord.AudioSource):
source: FFmpegNormalAudio
def __init__(
self,
servicing: YTAServicing,
url: str,
origin: str,
description: str,
options: Optional[str],
rby: discord.Member | None,
already_read: int,
tor: bool,
/,
*,
stop_at: int | None = None,
):
self.servicing = servicing
self.url = url
self.origin = origin
self.unstable = False
if "https://soundcloud.com/" in self.origin:
self.unstable = True
self.description = description
self.options = options
self.rby = rby
self.already_read = already_read
self.tor = tor
self.regenerating = False
# self.set_source()
self._durations: dict[str, str] = {}
self._duration_lock = asyncio.Lock()
self.loop = asyncio.get_running_loop()
self.stop_at: int | None = stop_at
self.attempts = 0
def _reduced_durations(self) -> dict[str, str]:
return {url: duration for url, duration in self._durations.items() if url == self.url}
def set_source_if_necessary(self):
if not hasattr(self, "source"):
self.set_source()
def set_source_if_stable(self):
if not self.unstable:
self.set_source_if_necessary()
def set_source_given_index(self, index: int):
if index:
self.set_source_if_stable()
else:
self.set_source_if_necessary()
def set_source(self):
self.schedule_duration_update()
self.source = FFmpegNormalAudio(
self.url, options=self.options, before_options=self.before_options(), tor=self.tor
)
def set_already_read(self, already_read: int):
self.already_read = already_read
self.set_source()
def set_seconds(self, seconds: float):
self.set_already_read(round(seconds / sparq(self.options)))
def set_effects(self, effects: str | None) -> None:
seconds = self.source_seconds()
self.options = options_for_effects(effects or None)
self.set_seconds(seconds)
def source_seconds(self) -> float:
return self.already_read * sparq(self.options)
def source_timecode(self) -> str:
seconds = round(self.source_seconds())
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
return f"{hours}:{minutes:02d}:{seconds:02d}"
def _schedule_duration_update(self):
self.loop.create_task(self.update_duration())
def schedule_duration_update(self):
self.loop.call_soon_threadsafe(self._schedule_duration_update)
async def _do_update_duration(self):
url: str = self.url
if url in self._durations:
return
self._durations.setdefault(url, "")
if self.tor:
args = [*tor_prefix()]
else:
args = []
args += [
"ffprobe",
"-i",
url,
"-show_entries",
"format=duration",
"-v",
"quiet",
"-of",
"default=noprint_wrappers=1:nokey=1",
"-sexagesimal",
]
ap = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE)
code = await ap.wait()
if code:
pass
else:
assert ap.stdout is not None
self._durations[url] = (await ap.stdout.read()).decode().strip().split(".")[0]
async def _update_duration(self):
async with self._duration_lock:
await self._do_update_duration()
async def _update_duration_context(self, context: CoroContext):
context.events.send(CoroStatusChanged({"ytaudio": "duration"}))
await self._update_duration()
async def update_duration(self):
await self.servicing.runner.run(self._update_duration())
def duration(self) -> str:
duration = self._durations.get(self.url)
if duration is None:
self.schedule_duration_update()
return duration or "?:??:??"
def before_options(self) -> str:
before_options = ""
if "http" in self.url and not self.unstable:
before_options += (
" -reconnect 1 -reconnect_at_eof 0 -reconnect_streamed 1 -reconnect_delay_max 60 -copy_unknown"
)
if self.already_read:
before_options += f" -ss {self.source_seconds()}"
return before_options.strip()
def estimated_seconds_duration(self) -> float:
duration = self.duration()
_m = re.match(r"(\d+):(\d+):(\d+)", duration)
if _m is None:
return 0.0
else:
try:
hs, ms, ss = _m.groups()
h, m, s = int(hs), int(ms), int(ss)
return float(h * 3600 + m * 60 + s)
except Exception:
traceback.print_exc()
return 0.0
def underran(self) -> bool:
to_end = self.estimated_seconds_duration() - self.source_seconds()
return to_end > 1.0
def read(self) -> bytes:
if self.regenerating:
return FILL
if self.stop_at is not None and self.already_read >= self.stop_at - 1:
return b""
self.already_read += 1
ret: bytes = self.source.read()
if not ret and (not (droppable := self.source.droppable()) or self.underran()):
if self.attempts < 5 or random.random() > 0.1:
self.attempts += 1
self.regenerating = True
self.loop.create_task(self.regenerate("underran" if droppable else "not droppable"))
return FILL
else:
print(f"dropped {self.origin}")
return ret
def cleanup(self):
if hasattr(self, "source"):
self.source.cleanup()
def can_be_skipped_by(self, member: discord.Member) -> bool:
permissions: discord.Permissions = member.guild_permissions
if permissions.administrator:
return True
elif permissions.manage_permissions:
return True
elif permissions.manage_guild:
return True
elif permissions.manage_channels:
return True
elif permissions.manage_messages:
return True
elif self.rby is None:
return False
else:
return self.rby == member
def hybernate(self) -> dict:
return {
"url": self.url,
"origin": self.origin,
"description": self.description,
"options": self.options,
"rby": None if self.rby is None else self.rby.id,
"already_read": self.already_read,
"tor": self.tor,
"stop_at": self.stop_at,
"durations": self._reduced_durations(),
}
@classmethod
async def respawn(cls, servicing: YTAServicing, guild: discord.Guild, respawn: dict) -> "YTAudio":
member_id: int | None = respawn["rby"]
if member_id is None:
member = None
else:
member = guild.get_member(member_id)
if member is None:
try:
member = await guild.fetch_member(respawn["rby"])
guild._add_member(member)
except discord.NotFound:
member = None
audio = YTAudio(
servicing,
respawn["url"],
respawn["origin"],
respawn["description"],
respawn["options"],
member,
respawn["already_read"],
respawn.get("tor", False),
stop_at=respawn.get("stop_at", None),
)
audio._durations |= respawn.get("durations", {})
return audio
async def regenerate(self, reason: str):
try:
print(f"regenerating {self.origin} {reason=}")
self.url = await real_url(self.servicing.caching, self.origin, True, self.tor)
if hasattr(self, "source"):
self.source.cleanup()
self.set_source()
print(f"regenerated {self.origin}")
finally:
self.regenerating = False
async def pubjson(self, member: discord.Member) -> dict:
return {
"seconds": self.source_seconds(),
"timecode": self.source_timecode(),
"duration": self.duration(),
"description": self.description,
"canbeskipped": self.can_be_skipped_by(member),
}
def copy(self) -> "YTAudio":
return YTAudio(
self.servicing,
self.url,
self.origin,
self.description,
self.options,
self.rby,
0,
self.tor,
)
def branch(self) -> "YTAudio":
if self.stop_at is not None:
raise Explicit("already branched")
self.stop_at = stop_at = self.already_read + 50
audio = YTAudio(
self.servicing,
self.url,
self.origin,
self.description,
self.options,
self.rby,
stop_at,
self.tor,
)
return audio