v6d3music/v6d3music/core/audio.py
2023-12-27 04:35:27 +00:00

304 lines
9.4 KiB
Python

from __future__ import annotations
import asyncio
import random
import re
import traceback
from typing import Optional
import discord
from v6d2ctx.context import Explicit
from v6d3music.core.aservicing import AServicing
from v6d3music.core.ffmpegnormalaudio import FFmpegNormalAudio
from v6d3music.core.real_url import real_url
from v6d3music.processing.abstractrunner import CoroContext, CoroStatusChanged
from v6d3music.utils.fill import FILL
from v6d3music.utils.options_for_effects import options_for_effects
from v6d3music.utils.sparq import sparq
__all__ = ("Audio",)
class Audio(discord.AudioSource):
source: FFmpegNormalAudio
def __init__(
self,
servicing: AServicing,
url: str,
origin: str,
description: str,
options: Optional[str],
rby: discord.Member | None,
already_read: int,
/,
*,
stop_at: int | None = None,
):
self.servicing = servicing
self.url = url
self.origin = origin
self.unstable = False
if "https://soundcloud.com/" in self.origin:
self.unstable = True
self.description = description
self.options = options
self.rby = rby
self.already_read = already_read
self.regenerating = False
# self.set_source()
self._durations: dict[str, str] = {}
self._duration_lock = asyncio.Lock()
self.loop = asyncio.get_running_loop()
self.stop_at: int | None = stop_at
self.attempts = 0
def _reduced_durations(self) -> dict[str, str]:
return {
url: duration
for url, duration in self._durations.items()
if url == self.url
}
def set_source_if_necessary(self):
if not hasattr(self, "source"):
self.set_source()
def set_source_if_stable(self):
if not self.unstable:
self.set_source_if_necessary()
def set_source_given_index(self, index: int):
if index:
self.set_source_if_stable()
else:
self.set_source_if_necessary()
def set_source(self):
self.schedule_duration_update()
self.source = FFmpegNormalAudio(
self.url, options=self.options, before_options=self.before_options()
)
def set_already_read(self, already_read: int):
self.already_read = already_read
self.set_source()
def set_seconds(self, seconds: float):
self.set_already_read(round(seconds / sparq(self.options)))
def set_effects(self, effects: str | None) -> None:
seconds = self.source_seconds()
self.options = options_for_effects(effects or None)
self.set_seconds(seconds)
def source_seconds(self) -> float:
return self.already_read * sparq(self.options)
def source_timecode(self) -> str:
seconds = round(self.source_seconds())
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
return f"{hours}:{minutes:02d}:{seconds:02d}"
def _schedule_duration_update(self):
self.loop.create_task(self.update_duration())
def schedule_duration_update(self):
self.loop.call_soon_threadsafe(self._schedule_duration_update)
async def _do_update_duration(self):
url: str = self.url
if url in self._durations:
return
self._durations.setdefault(url, "")
args = []
args += [
"ffprobe",
*("-i", url),
"-show_entries",
"format=duration",
*("-v", "quiet"),
*("-of", "default=noprint_wrappers=1:nokey=1"),
"-sexagesimal",
]
ap = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE)
code = await ap.wait()
if code:
pass
else:
assert ap.stdout is not None
self._durations[url] = (
(await ap.stdout.read()).decode().strip().split(".")[0]
)
async def _update_duration(self):
async with self._duration_lock:
await self._do_update_duration()
async def _update_duration_context(self, context: CoroContext):
context.events.send(CoroStatusChanged({"audio": "duration"}))
await self._update_duration()
async def update_duration(self):
await self.servicing.runner.run(self._update_duration())
def duration(self) -> str:
duration = self._durations.get(self.url)
if duration is None:
self.schedule_duration_update()
return duration or "?:??:??"
def before_options(self) -> str:
before_options = ""
if "http" in self.url and not self.unstable:
before_options += " -reconnect 1 -reconnect_at_eof 0 -reconnect_streamed 1 -reconnect_delay_max 60 -copy_unknown"
if self.already_read:
before_options += f" -ss {self.source_seconds()}"
return before_options.strip()
def estimated_seconds_duration(self) -> float:
duration = self.duration()
_m = re.match(r"(\d+):(\d+):(\d+)", duration)
if _m is None:
return 0.0
else:
try:
hs, ms, ss = _m.groups()
h, m, s = int(hs), int(ms), int(ss)
return float(h * 3600 + m * 60 + s)
except Exception:
traceback.print_exc()
return 0.0
def underran(self) -> bool:
to_end = self.estimated_seconds_duration() - self.source_seconds()
return to_end > 1.0
def read(self) -> bytes:
if self.regenerating:
return FILL
if self.stop_at is not None and self.already_read >= self.stop_at - 1:
return b""
self.already_read += 1
ret: bytes = self.source.read()
if not ret and (not (droppable := self.source.droppable()) or self.underran()):
if self.attempts < 5 or random.random() > 0.1:
self.attempts += 1
self.regenerating = True
self.loop.create_task(
self.regenerate("underran" if droppable else "not droppable")
)
return FILL
else:
print(f"dropped {self.origin}")
return ret
def cleanup(self):
if hasattr(self, "source"):
self.source.cleanup()
def can_be_skipped_by(self, member: discord.Member) -> bool:
permissions: discord.Permissions = member.guild_permissions
if permissions.administrator:
return True
elif permissions.manage_permissions:
return True
elif permissions.manage_guild:
return True
elif permissions.manage_channels:
return True
elif permissions.manage_messages:
return True
elif self.rby is None:
return False
else:
return self.rby == member
def hybernate(self) -> dict:
return {
"url": self.url,
"origin": self.origin,
"description": self.description,
"options": self.options,
"rby": None if self.rby is None else self.rby.id,
"already_read": self.already_read,
"stop_at": self.stop_at,
"durations": self._reduced_durations(),
}
@classmethod
async def respawn(
cls, servicing: AServicing, guild: discord.Guild, respawn: dict
) -> Audio:
member_id: int | None = respawn["rby"]
if member_id is None:
member = None
else:
member = guild.get_member(member_id)
if member is None:
try:
member = await guild.fetch_member(respawn["rby"])
guild._add_member(member)
except discord.NotFound:
member = None
audio = Audio(
servicing,
respawn["url"],
respawn["origin"],
respawn["description"],
respawn["options"],
member,
respawn["already_read"],
stop_at=respawn.get("stop_at", None),
)
audio._durations |= respawn.get("durations", {})
return audio
async def regenerate(self, reason: str):
try:
print(f"regenerating {self.origin} {reason=}")
self.url = await real_url(self.origin, True)
if hasattr(self, "source"):
self.source.cleanup()
self.set_source()
print(f"regenerated {self.origin}")
finally:
self.regenerating = False
async def pubjson(self, member: discord.Member) -> dict:
return {
"seconds": self.source_seconds(),
"timecode": self.source_timecode(),
"duration": self.duration(),
"description": self.description,
"canbeskipped": self.can_be_skipped_by(member),
}
def copy(self) -> Audio:
return Audio(
self.servicing,
self.url,
self.origin,
self.description,
self.options,
self.rby,
0,
)
def branch(self) -> Audio:
if self.stop_at is not None:
raise Explicit("already branched")
self.stop_at = stop_at = self.already_read + 50
audio = Audio(
self.servicing,
self.url,
self.origin,
self.description,
self.options,
self.rby,
stop_at,
)
return audio