diff --git a/v6d3music/core/entries_effects_for_args.py b/v6d3music/core/entries_effects_for_args.py index b669674..9da95af 100644 --- a/v6d3music/core/entries_effects_for_args.py +++ b/v6d3music/core/entries_effects_for_args.py @@ -8,6 +8,9 @@ from v6d3music.config import myroot from v6d3music.utils.presets import allowed_effects +__all__ = ('effects_db', 'default_effects', 'set_default_effects', 'entries_effects_for_args',) + + effects_db = Db(myroot / 'effects.db', kvfactory=KVJson()) @@ -26,6 +29,6 @@ async def set_default_effects(gid: int, effects: str | None) -> None: async def entries_effects_for_args(args: list[str], gid: int) -> AsyncIterable[InfoCtx]: - for ctx in ArgCtx(default_effects, args, gid).sources: + for ctx in ArgCtx(default_effects(gid), args).sources: async for it in ctx.entries(): yield it diff --git a/v6d3music/core/ystate.py b/v6d3music/core/ystate.py new file mode 100644 index 0000000..9d79288 --- /dev/null +++ b/v6d3music/core/ystate.py @@ -0,0 +1,117 @@ +import asyncio +from collections import deque +from contextlib import AsyncExitStack +from typing import AsyncIterable, Iterable + +from v6d3music.core.create_ytaudio import * +from v6d3music.core.ytaudio import * +from v6d3music.processing.pool import * +from v6d3music.utils.argctx import * + +from v6d2ctx.context import Context + +__all__ = ('YState',) + + +class YState: + def __init__(self, pool: Pool, ctx: Context, sources: Iterable[UrlCtx]) -> None: + self.pool = pool + self.ctx = ctx + self.sources: deque[UrlCtx] = deque(sources) + self.playlists: deque[asyncio.Future[list[InfoCtx]]] = deque() + self.entries: deque[InfoCtx | BaseException] = deque() + self.results: asyncio.Queue[asyncio.Future[YTAudio | None] | None] = asyncio.Queue() + self.es = AsyncExitStack() + self.descheduled: int = 0 + + def empty(self) -> bool: + return self.empty_processing() and self.results.empty() + + def empty_processing(self) -> bool: + return not self.sources and not self.playlists and not self.entries + + async def iterate(self) -> AsyncIterable[YTAudio]: + async with self.es: + for _ in range(self.pool.workers()): + await self.es.enter_async_context(YJD(self).at(self.pool)) + while not self.empty(): + future = await self.results.get() + if future is None: + return + try: + audio = await future + if audio is not None: + yield audio + except OSError: + raise Explicit('extraction error\nunknown ytdl error') + finally: + self.results.task_done() + + async def playlist(self, source: UrlCtx) -> list[InfoCtx]: + return [info async for info in source.entries()] + + async def result(self, entry: InfoCtx) -> YTAudio | None: + try: + return await create_ytaudio(self.ctx, entry) + except: + if not entry.ignore: + raise + else: + return None + + +class YJD(JobDescriptor): + def __init__(self, state: YState) -> None: + self.state = state + + def _unpack_playlists(self) -> None: + while self.state.playlists and self.state.playlists[0].done(): + playlist = self.state.playlists.popleft() + if (e := playlist.exception()) is not None: + self.state.playlists.clear() + self.state.sources.clear() + self.state.entries.append(e) + else: + for entry in playlist.result(): + self.state.entries.append(entry) + + async def run(self) -> JobDescriptor | None: + if self.state.empty_processing(): + if self.state.results.empty(): + self.state.results.put_nowait(None) + return None + elif self.state.entries: + entry = self.state.entries.popleft() + audiotask: asyncio.Future[YTAudio | None] + if isinstance(entry, BaseException): + audiotask = asyncio.Future() + audiotask.set_exception(entry) + else: + audiotask = asyncio.create_task(self.state.result(entry)) + self.state.results.put_nowait(audiotask) + try: + await audiotask + except: + self.state.entries.clear() + self.state.playlists.clear() + self.state.sources.clear() + return self + elif self.state.sources: + source = self.state.sources.popleft() + playlisttask = asyncio.create_task(self.state.playlist(source)) + self.state.playlists.append(playlisttask) + try: + await playlisttask + except: + self.state.sources.clear() + return self + finally: + self._unpack_playlists() + rescheduled = self.state.descheduled + self.state.descheduled = 0 + for _ in range(rescheduled): + await self.state.es.enter_async_context(YJD(self.state).at(self.state.pool)) + return self + else: + self.state.descheduled += 1 + return None diff --git a/v6d3music/core/yt_audios.py b/v6d3music/core/yt_audios.py index 8f01b81..30405e0 100644 --- a/v6d3music/core/yt_audios.py +++ b/v6d3music/core/yt_audios.py @@ -1,21 +1,17 @@ from typing import AsyncIterable -from v6d3music.core.create_ytaudios import create_ytaudios -from v6d3music.core.entries_effects_for_args import entries_effects_for_args +from v6d3music.core.entries_effects_for_args import * +from v6d3music.core.ystate import * from v6d3music.core.ytaudio import YTAudio -from v6d3music.utils.argctx import InfoCtx +from v6d3music.processing.pool import * +from v6d3music.utils.argctx import * from v6d2ctx.context import Context async def yt_audios(ctx: Context, args: list[str]) -> AsyncIterable[YTAudio]: - tuples: list[InfoCtx] = [] assert ctx.guild is not None - async for it in entries_effects_for_args(args, ctx.guild.id): - tuples.append(it) - if len(tuples) >= 5: - async for audio in create_ytaudios(ctx, tuples): - yield audio - tuples.clear() - async for audio in create_ytaudios(ctx, tuples): - yield audio + argctx = ArgCtx(default_effects(ctx.guild.id), args) + async with Pool(5) as pool: + async for audio in YState(pool, ctx, argctx.sources).iterate(): + yield audio diff --git a/v6d3music/html/main.css b/v6d3music/html/main.css index 9d28873..26b5ce0 100644 --- a/v6d3music/html/main.css +++ b/v6d3music/html/main.css @@ -30,15 +30,17 @@ body, #root-container { display: flex; + height: 100%; } #root { width: auto; min-width: min(40em, 100%); flex: auto; + overflow-y: scroll; } .sidebars { width: 100%; - background: #111; + background: #050505; } diff --git a/v6d3music/processing/pool.py b/v6d3music/processing/pool.py new file mode 100644 index 0000000..c1e1a5c --- /dev/null +++ b/v6d3music/processing/pool.py @@ -0,0 +1,219 @@ +import asyncio +from typing import Union + +from v6d3music.processing.yprocess import * + +__all__ = ('Job', 'Pool', 'JobDescriptor',) + + +class Job: + def __init__(self, future: asyncio.Future[None]) -> None: + self.future = future + + async def run(self) -> Union['Job', None]: + raise NotImplementedError + + +class JobDescriptor: + async def run(self) -> Union['JobDescriptor', None]: + raise NotImplementedError + + def wrap(self) -> Job: + return DescriptorJob(asyncio.Future(), self) + + def at(self, pool: 'Pool') -> 'JDC': + return JDC(self, pool) + + +class DescriptorJob(Job): + def __init__(self, future: asyncio.Future[None], descriptor: JobDescriptor) -> None: + super().__init__(future) + self.__descriptor = descriptor + + async def run(self) -> Job | None: + next_descriptor = await self.__descriptor.run() + if next_descriptor is None: + return None + else: + return DescriptorJob(self.future, next_descriptor) + + +class Worker: + def __init__(self) -> None: + self.__queue: asyncio.Queue[Job | None] = asyncio.Queue() + self.__working = False + self.__busy = 0 + + def _put_nowait(self, job: Job | None) -> None: + self.__queue.put_nowait(job) + self.__busy += 1 + + async def _handle(self, job: Job) -> None: + try: + next_job = await job.run() + except BaseException as e: + job.future.set_exception(e) + else: + if next_job is None: + job.future.set_result(None) + else: + self._put_nowait(next_job) + + async def _safe_handle(self, job: Job) -> None: + try: + await self._handle(job) + except asyncio.InvalidStateError: + pass + + def _on_shutdown(self) -> bool: + if self.__queue.empty(): + self.__working = False + return True + else: + self._put_nowait(None) + return False + + async def _on(self, job: Job | None) -> bool: + if job is None: + return self._on_shutdown() + else: + await self._handle(job) + return False + + async def _step(self) -> bool: + job = await self.__queue.get() + try: + return await self._on(job) + finally: + self.__busy -= 1 + self.__queue.task_done() + + async def _work(self) -> None: + while True: + if await self._step(): + return + + def _cancel_job(self, job: Job) -> None: + try: + job.future.set_exception(RuntimeError('task left in the worker after shutdown')) + except asyncio.InvalidStateError: + pass + + def _cancel_one(self, job: Job | None) -> None: + if job is not None: + self._cancel_job(job) + + def _cancel_get(self) -> None: + job = self.__queue.get_nowait() + try: + self._cancel_one(job) + finally: + self.__queue.task_done() + + def _cancel_all(self) -> None: + while not self.__queue.empty(): + self._cancel_get() + + async def _task(self) -> None: + await self._work() + if self.__working: + raise RuntimeError('worker left seemingly running after shutdown') + if not self.__queue.empty(): + raise RuntimeError('worker failed to finish all its jobs') + + def start(self) -> asyncio.Future[None]: + if self.__working: + raise RuntimeError('starting an already running worker') + self.__working = True + return asyncio.create_task(self._task()) + + def submit(self, job: Job | None) -> None: + if not self.__working: + raise RuntimeError('submitting to a non-working worker') + self._put_nowait(job) + + def working(self) -> bool: + return self.__working + + def busy(self) -> int: + return self.__busy + + +class Working: + def __init__(self, worker: Worker, task: asyncio.Future[None]) -> None: + self.__worker = worker + self.__task = task + + async def close(self) -> None: + self.__worker.submit(None) + await self.__task + + def submit(self, job: Job) -> None: + self.__worker.submit(job) + + @classmethod + def start(cls) -> 'Working': + worker = Worker() + task = worker.start() + return cls(worker, task) + + def working(self) -> bool: + return self.__worker.working() + + def busy(self) -> int: + return self.__worker.busy() + + +class Pool: + def __init__(self, workers: int) -> None: + if workers < 1: + raise ValueError('non-positive number of workers') + self.__workers = workers + self.__working = False + self.__open = False + + async def __aenter__(self) -> 'Pool': + if self.__open: + raise RuntimeError('starting an already open pool') + if self.__working: + raise RuntimeError('starting an already running pool') + self.__working = True + self.__pool = set(Working.start() for _ in range(self.__workers)) + self.__open = True + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if not self.__working: + raise RuntimeError('stopping a non-working pool') + if not self.__open: + raise RuntimeError('stopping a closed pool') + self.__open = False + for working in self.__pool: + await working.close() + del self.__pool + self.__working = False + + def submit(self, job: Job) -> None: + if not self.__working: + raise RuntimeError('submitting to a non-working pool') + if not self.__open: + raise RuntimeError('submitting to a closed pool') + min(self.__pool, key=lambda working: working.busy()).submit(job) + + def workers(self) -> int: + return self.__workers + + +class JDC: + def __init__(self, descriptor: JobDescriptor, pool: Pool) -> None: + self.__descriptor = descriptor + self.__pool = pool + + async def __aenter__(self) -> 'JDC': + job = self.__descriptor.wrap() + self.__future = job.future + self.__pool.submit(job) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.__future diff --git a/v6d3music/utils/argctx.py b/v6d3music/utils/argctx.py index b6014f3..c8f8019 100644 --- a/v6d3music/utils/argctx.py +++ b/v6d3music/utils/argctx.py @@ -1,17 +1,22 @@ -from typing import AsyncIterable, Callable, Any +from typing import Any, AsyncIterable, Callable from v6d3music.utils.effects_for_preset import effects_for_preset from v6d3music.utils.entries_for_url import entries_for_url from v6d3music.utils.options_for_effects import options_for_effects from v6d3music.utils.sparq import sparq +__all__ = ('InfoCtx', 'UrlCtx', 'ArgCtx',) + class InfoCtx: - def __init__(self, info: dict[str, Any], effects: str | None, already_read: int, tor: bool) -> None: + def __init__( + self, info: dict[str, Any], effects: str | None, already_read: int, tor: bool, ignore: bool + ) -> None: self.info = info self.effects = effects self.already_read = already_read self.tor = tor + self.ignore = ignore class UrlCtx: @@ -20,14 +25,19 @@ class UrlCtx: self.effects = effects self.already_read = 0 self.tor = False + self.ignore = False async def entries(self) -> AsyncIterable[InfoCtx]: - async for info in entries_for_url(self.url, self.tor): - yield InfoCtx(info, self.effects, self.already_read, self.tor) + try: + async for info in entries_for_url(self.url, self.tor): + yield InfoCtx(info, self.effects, self.already_read, self.tor, self.ignore) + except: + if not self.ignore: + raise class ArgCtx: - def __init__(self, default_effects: Callable[[int], str | None], args: list[str], gid: int) -> None: + def __init__(self, default_effects: str | None, args: list[str]) -> None: self.sources: list[UrlCtx] = [] while args: match args: @@ -36,7 +46,7 @@ class ArgCtx: case [url, '+', preset, *args]: effects = effects_for_preset(preset) case [url, *args]: - effects = default_effects(gid) + effects = default_effects case _: raise RuntimeError ctx = UrlCtx(url, effects) @@ -56,4 +66,9 @@ class ArgCtx: ctx.tor = True case [*args]: pass + match args: + case ['ignore', *args]: + ctx.ignore = True + case [*args]: + pass self.sources.append(ctx)