v6d3music/v6d3music/core/ystate.py

118 lines
4.2 KiB
Python

import asyncio
from collections import deque
from contextlib import AsyncExitStack
from typing import AsyncIterable, Iterable
from v6d3music.core.create_ytaudio import *
from v6d3music.core.ytaudio import *
from v6d3music.processing.pool import *
from v6d3music.utils.argctx import *
from v6d2ctx.context import Context
__all__ = ('YState',)
class YState:
def __init__(self, pool: Pool, ctx: Context, sources: Iterable[UrlCtx]) -> None:
self.pool = pool
self.ctx = ctx
self.sources: deque[UrlCtx] = deque(sources)
self.playlists: deque[asyncio.Future[list[InfoCtx]]] = deque()
self.entries: deque[InfoCtx | BaseException] = deque()
self.results: asyncio.Queue[asyncio.Future[YTAudio | None] | None] = asyncio.Queue()
self.es = AsyncExitStack()
self.descheduled: int = 0
def empty(self) -> bool:
return self.empty_processing() and self.results.empty()
def empty_processing(self) -> bool:
return not self.sources and not self.playlists and not self.entries
async def iterate(self) -> AsyncIterable[YTAudio]:
async with self.es:
for _ in range(self.pool.workers()):
await self.es.enter_async_context(YJD(self).at(self.pool))
while not self.empty():
future = await self.results.get()
if future is None:
return
try:
audio = await future
if audio is not None:
yield audio
except OSError:
raise Explicit('extraction error\nunknown ytdl error')
finally:
self.results.task_done()
async def playlist(self, source: UrlCtx) -> list[InfoCtx]:
return [info async for info in source.entries()]
async def result(self, entry: InfoCtx) -> YTAudio | None:
try:
return await create_ytaudio(self.ctx, entry)
except Exception:
if not entry.ignore:
raise
else:
return None
class YJD(JobDescriptor):
def __init__(self, state: YState) -> None:
self.state = state
def _unpack_playlists(self) -> None:
while self.state.playlists and self.state.playlists[0].done():
playlist = self.state.playlists.popleft()
if (e := playlist.exception()) is not None:
self.state.playlists.clear()
self.state.sources.clear()
self.state.entries.append(e)
else:
for entry in playlist.result():
self.state.entries.append(entry)
async def run(self) -> JobDescriptor | None:
if self.state.empty_processing():
if self.state.results.empty():
self.state.results.put_nowait(None)
return None
elif self.state.entries:
entry = self.state.entries.popleft()
audiotask: asyncio.Future[YTAudio | None]
if isinstance(entry, BaseException):
audiotask = asyncio.Future()
audiotask.set_exception(entry)
else:
audiotask = asyncio.create_task(self.state.result(entry))
self.state.results.put_nowait(audiotask)
try:
await audiotask
except:
self.state.entries.clear()
self.state.playlists.clear()
self.state.sources.clear()
return self
elif self.state.sources:
source = self.state.sources.popleft()
playlisttask = asyncio.create_task(self.state.playlist(source))
self.state.playlists.append(playlisttask)
try:
await playlisttask
except:
self.state.sources.clear()
return self
finally:
self._unpack_playlists()
rescheduled = self.state.descheduled
self.state.descheduled = 0
for _ in range(rescheduled):
await self.state.es.enter_async_context(YJD(self.state).at(self.state.pool))
return self
else:
self.state.descheduled += 1
return None