v6d3music/v6d3music/core/ystate.py

186 lines
6.9 KiB
Python

import asyncio
from collections import deque
from contextlib import AsyncExitStack
from typing import AsyncIterable, Iterable
from v6d2ctx.integration.responsetype import *
from v6d2ctx.context import *
from v6d3music.core.create_ytaudio import *
from v6d3music.core.ytaservicing import *
from v6d3music.core.ytaudio import *
from v6d3music.processing.pool import *
from v6d3music.utils.argctx import *
__all__ = ('YState',)
class _Stop:
pass
class YState:
def __init__(self, servicing: YTAServicing, pool: Pool, ctx: Context, sources: Iterable[UrlCtx]) -> None:
self.servicing = servicing
self.pool = pool
self.ctx = ctx
self.sources: deque[UrlCtx] = deque(sources)
self.playlists: deque[asyncio.Future[list[InfoCtx]]] = deque()
self.entries: deque[InfoCtx | BaseException] = deque()
self.results: asyncio.Queue[asyncio.Future[YTAudio | None] | _Stop] = asyncio.Queue()
self.es = AsyncExitStack()
self.descheduled: int = 0
def empty(self) -> bool:
return self.empty_processing() and self.results.empty()
def empty_processing(self) -> bool:
return not self.sources and not self.playlists and not self.entries
async def _start_workers(self) -> None:
for _ in range(self.pool.workers()):
await self.es.enter_async_context(YStream(self).at(self.pool))
async def _next_audio(self) -> YTAudio | None | _Stop:
future = await self.results.get()
if isinstance(future, _Stop):
return _Stop()
try:
return await future
except OSError as e:
raise Explicit('extraction error\nunknown ytdl error (probably due to video being unavailable, e.g. because of regional restrictions)') from e
finally:
self.results.task_done()
async def _iterate_with_workers(self) -> AsyncIterable[YTAudio]:
while not self.empty():
audio = await self._next_audio()
if isinstance(audio, _Stop):
return
if audio is not None:
yield audio
async def _iterate(self) -> AsyncIterable[YTAudio]:
await self._start_workers()
async for audio in self._iterate_with_workers():
yield audio
async def iterate(self) -> AsyncIterable[YTAudio]:
async with self.es:
async for audio in self._iterate():
yield audio
async def playlist(self, source: UrlCtx) -> list[InfoCtx]:
return [info async for info in source.entries()]
async def result(self, entry: InfoCtx) -> YTAudio | None:
try:
return await create_ytaudio(self.servicing, self.ctx, entry)
except Exception:
if not entry.ignore:
raise
else:
return None
class YStream(JobUnit):
def __init__(self, state: YState) -> None:
self.state = state
self.__running = False
self.__details: dict[str, ResponseType] = {'status': 'stopped'}
def _unpack_playlists(self) -> None:
while self.state.playlists and self.state.playlists[0].done():
playlist = self.state.playlists.popleft()
if (e := playlist.exception()) is not None:
self.state.playlists.clear()
self.state.sources.clear()
self.state.entries.append(e)
else:
for entry in playlist.result():
self.state.entries.append(entry)
async def _run(self, context: JobContext, /) -> JobUnit | None:
if self.state.empty_processing():
self.__details = {'status': 'stopping'}
if self.state.results.empty():
self.state.results.put_nowait(_Stop())
return None
elif self.state.entries:
entry = self.state.entries.popleft()
audiotask: asyncio.Future[YTAudio | None]
if isinstance(entry, BaseException):
self._set_details(context, {'status': 'breaking downstream audio creation'})
audiotask = asyncio.Future()
audiotask.set_exception(entry)
else:
self._set_details(
context,
{
'status': 'creating audio',
'info': cast_to_response(entry.info),
'effects': entry.effects,
'already_read': entry.already_read,
'tor': entry.tor,
'ignore': entry.ignore,
}
)
audiotask = asyncio.create_task(self.state.result(entry))
self.state.results.put_nowait(audiotask)
try:
await audiotask
except:
self.state.entries.clear()
self.state.playlists.clear()
self.state.sources.clear()
self._set_details(context, {'status': 'rescheduling self from entries'})
return self
elif self.state.sources:
source = self.state.sources.popleft()
self._set_details(
context,
{
'status': 'parsing playlist',
'url': source.url,
'effects': source.effects,
'already_read': source.already_read,
'tor': source.tor,
'ignore': source.ignore,
}
)
playlisttask = asyncio.create_task(self.state.playlist(source))
self.state.playlists.append(playlisttask)
try:
await playlisttask
except:
self.state.sources.clear()
return self
finally:
self._unpack_playlists()
rescheduled = self.state.descheduled
self.state.descheduled = 0
self._set_details(context, {'status': 'rescheduling others', 'rescheduling': rescheduled})
for _ in range(rescheduled):
await self.state.es.enter_async_context(YStream(self.state).at(self.state.pool))
self._set_details(context, {'status': 'rescheduling self from sources'})
return self
else:
self._set_details(context, {'status': 'descheduling'})
self.state.descheduled += 1
return None
def _set_details(self, context: JobContext, details: dict[str, ResponseType], /) -> None:
self.__details = details
context.events.send(JobStatusChanged(self))
async def run(self, context: JobContext, /) -> JobUnit | None:
try:
self.__running = True
return await self._run(context)
finally:
self.__running = False
self.__details = {'status': 'stopped'}
def json(self) -> ResponseType:
return {'type': 'ystream', 'details': self.__details, 'running': self.__running}