pooled extraction

This commit is contained in:
AF 2022-12-23 13:28:48 +00:00
parent 209b34a8a7
commit 2e45ccc591
6 changed files with 372 additions and 20 deletions

View File

@ -8,6 +8,9 @@ from v6d3music.config import myroot
from v6d3music.utils.presets import allowed_effects
__all__ = ('effects_db', 'default_effects', 'set_default_effects', 'entries_effects_for_args',)
effects_db = Db(myroot / 'effects.db', kvfactory=KVJson())
@ -26,6 +29,6 @@ async def set_default_effects(gid: int, effects: str | None) -> None:
async def entries_effects_for_args(args: list[str], gid: int) -> AsyncIterable[InfoCtx]:
for ctx in ArgCtx(default_effects, args, gid).sources:
for ctx in ArgCtx(default_effects(gid), args).sources:
async for it in ctx.entries():
yield it

117
v6d3music/core/ystate.py Normal file
View File

@ -0,0 +1,117 @@
import asyncio
from collections import deque
from contextlib import AsyncExitStack
from typing import AsyncIterable, Iterable
from v6d3music.core.create_ytaudio import *
from v6d3music.core.ytaudio import *
from v6d3music.processing.pool import *
from v6d3music.utils.argctx import *
from v6d2ctx.context import Context
__all__ = ('YState',)
class YState:
def __init__(self, pool: Pool, ctx: Context, sources: Iterable[UrlCtx]) -> None:
self.pool = pool
self.ctx = ctx
self.sources: deque[UrlCtx] = deque(sources)
self.playlists: deque[asyncio.Future[list[InfoCtx]]] = deque()
self.entries: deque[InfoCtx | BaseException] = deque()
self.results: asyncio.Queue[asyncio.Future[YTAudio | None] | None] = asyncio.Queue()
self.es = AsyncExitStack()
self.descheduled: int = 0
def empty(self) -> bool:
return self.empty_processing() and self.results.empty()
def empty_processing(self) -> bool:
return not self.sources and not self.playlists and not self.entries
async def iterate(self) -> AsyncIterable[YTAudio]:
async with self.es:
for _ in range(self.pool.workers()):
await self.es.enter_async_context(YJD(self).at(self.pool))
while not self.empty():
future = await self.results.get()
if future is None:
return
try:
audio = await future
if audio is not None:
yield audio
except OSError:
raise Explicit('extraction error\nunknown ytdl error')
finally:
self.results.task_done()
async def playlist(self, source: UrlCtx) -> list[InfoCtx]:
return [info async for info in source.entries()]
async def result(self, entry: InfoCtx) -> YTAudio | None:
try:
return await create_ytaudio(self.ctx, entry)
except:
if not entry.ignore:
raise
else:
return None
class YJD(JobDescriptor):
def __init__(self, state: YState) -> None:
self.state = state
def _unpack_playlists(self) -> None:
while self.state.playlists and self.state.playlists[0].done():
playlist = self.state.playlists.popleft()
if (e := playlist.exception()) is not None:
self.state.playlists.clear()
self.state.sources.clear()
self.state.entries.append(e)
else:
for entry in playlist.result():
self.state.entries.append(entry)
async def run(self) -> JobDescriptor | None:
if self.state.empty_processing():
if self.state.results.empty():
self.state.results.put_nowait(None)
return None
elif self.state.entries:
entry = self.state.entries.popleft()
audiotask: asyncio.Future[YTAudio | None]
if isinstance(entry, BaseException):
audiotask = asyncio.Future()
audiotask.set_exception(entry)
else:
audiotask = asyncio.create_task(self.state.result(entry))
self.state.results.put_nowait(audiotask)
try:
await audiotask
except:
self.state.entries.clear()
self.state.playlists.clear()
self.state.sources.clear()
return self
elif self.state.sources:
source = self.state.sources.popleft()
playlisttask = asyncio.create_task(self.state.playlist(source))
self.state.playlists.append(playlisttask)
try:
await playlisttask
except:
self.state.sources.clear()
return self
finally:
self._unpack_playlists()
rescheduled = self.state.descheduled
self.state.descheduled = 0
for _ in range(rescheduled):
await self.state.es.enter_async_context(YJD(self.state).at(self.state.pool))
return self
else:
self.state.descheduled += 1
return None

View File

@ -1,21 +1,17 @@
from typing import AsyncIterable
from v6d3music.core.create_ytaudios import create_ytaudios
from v6d3music.core.entries_effects_for_args import entries_effects_for_args
from v6d3music.core.entries_effects_for_args import *
from v6d3music.core.ystate import *
from v6d3music.core.ytaudio import YTAudio
from v6d3music.utils.argctx import InfoCtx
from v6d3music.processing.pool import *
from v6d3music.utils.argctx import *
from v6d2ctx.context import Context
async def yt_audios(ctx: Context, args: list[str]) -> AsyncIterable[YTAudio]:
tuples: list[InfoCtx] = []
assert ctx.guild is not None
async for it in entries_effects_for_args(args, ctx.guild.id):
tuples.append(it)
if len(tuples) >= 5:
async for audio in create_ytaudios(ctx, tuples):
yield audio
tuples.clear()
async for audio in create_ytaudios(ctx, tuples):
yield audio
argctx = ArgCtx(default_effects(ctx.guild.id), args)
async with Pool(5) as pool:
async for audio in YState(pool, ctx, argctx.sources).iterate():
yield audio

View File

@ -30,15 +30,17 @@ body,
#root-container {
display: flex;
height: 100%;
}
#root {
width: auto;
min-width: min(40em, 100%);
flex: auto;
overflow-y: scroll;
}
.sidebars {
width: 100%;
background: #111;
background: #050505;
}

View File

@ -0,0 +1,219 @@
import asyncio
from typing import Union
from v6d3music.processing.yprocess import *
__all__ = ('Job', 'Pool', 'JobDescriptor',)
class Job:
def __init__(self, future: asyncio.Future[None]) -> None:
self.future = future
async def run(self) -> Union['Job', None]:
raise NotImplementedError
class JobDescriptor:
async def run(self) -> Union['JobDescriptor', None]:
raise NotImplementedError
def wrap(self) -> Job:
return DescriptorJob(asyncio.Future(), self)
def at(self, pool: 'Pool') -> 'JDC':
return JDC(self, pool)
class DescriptorJob(Job):
def __init__(self, future: asyncio.Future[None], descriptor: JobDescriptor) -> None:
super().__init__(future)
self.__descriptor = descriptor
async def run(self) -> Job | None:
next_descriptor = await self.__descriptor.run()
if next_descriptor is None:
return None
else:
return DescriptorJob(self.future, next_descriptor)
class Worker:
def __init__(self) -> None:
self.__queue: asyncio.Queue[Job | None] = asyncio.Queue()
self.__working = False
self.__busy = 0
def _put_nowait(self, job: Job | None) -> None:
self.__queue.put_nowait(job)
self.__busy += 1
async def _handle(self, job: Job) -> None:
try:
next_job = await job.run()
except BaseException as e:
job.future.set_exception(e)
else:
if next_job is None:
job.future.set_result(None)
else:
self._put_nowait(next_job)
async def _safe_handle(self, job: Job) -> None:
try:
await self._handle(job)
except asyncio.InvalidStateError:
pass
def _on_shutdown(self) -> bool:
if self.__queue.empty():
self.__working = False
return True
else:
self._put_nowait(None)
return False
async def _on(self, job: Job | None) -> bool:
if job is None:
return self._on_shutdown()
else:
await self._handle(job)
return False
async def _step(self) -> bool:
job = await self.__queue.get()
try:
return await self._on(job)
finally:
self.__busy -= 1
self.__queue.task_done()
async def _work(self) -> None:
while True:
if await self._step():
return
def _cancel_job(self, job: Job) -> None:
try:
job.future.set_exception(RuntimeError('task left in the worker after shutdown'))
except asyncio.InvalidStateError:
pass
def _cancel_one(self, job: Job | None) -> None:
if job is not None:
self._cancel_job(job)
def _cancel_get(self) -> None:
job = self.__queue.get_nowait()
try:
self._cancel_one(job)
finally:
self.__queue.task_done()
def _cancel_all(self) -> None:
while not self.__queue.empty():
self._cancel_get()
async def _task(self) -> None:
await self._work()
if self.__working:
raise RuntimeError('worker left seemingly running after shutdown')
if not self.__queue.empty():
raise RuntimeError('worker failed to finish all its jobs')
def start(self) -> asyncio.Future[None]:
if self.__working:
raise RuntimeError('starting an already running worker')
self.__working = True
return asyncio.create_task(self._task())
def submit(self, job: Job | None) -> None:
if not self.__working:
raise RuntimeError('submitting to a non-working worker')
self._put_nowait(job)
def working(self) -> bool:
return self.__working
def busy(self) -> int:
return self.__busy
class Working:
def __init__(self, worker: Worker, task: asyncio.Future[None]) -> None:
self.__worker = worker
self.__task = task
async def close(self) -> None:
self.__worker.submit(None)
await self.__task
def submit(self, job: Job) -> None:
self.__worker.submit(job)
@classmethod
def start(cls) -> 'Working':
worker = Worker()
task = worker.start()
return cls(worker, task)
def working(self) -> bool:
return self.__worker.working()
def busy(self) -> int:
return self.__worker.busy()
class Pool:
def __init__(self, workers: int) -> None:
if workers < 1:
raise ValueError('non-positive number of workers')
self.__workers = workers
self.__working = False
self.__open = False
async def __aenter__(self) -> 'Pool':
if self.__open:
raise RuntimeError('starting an already open pool')
if self.__working:
raise RuntimeError('starting an already running pool')
self.__working = True
self.__pool = set(Working.start() for _ in range(self.__workers))
self.__open = True
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if not self.__working:
raise RuntimeError('stopping a non-working pool')
if not self.__open:
raise RuntimeError('stopping a closed pool')
self.__open = False
for working in self.__pool:
await working.close()
del self.__pool
self.__working = False
def submit(self, job: Job) -> None:
if not self.__working:
raise RuntimeError('submitting to a non-working pool')
if not self.__open:
raise RuntimeError('submitting to a closed pool')
min(self.__pool, key=lambda working: working.busy()).submit(job)
def workers(self) -> int:
return self.__workers
class JDC:
def __init__(self, descriptor: JobDescriptor, pool: Pool) -> None:
self.__descriptor = descriptor
self.__pool = pool
async def __aenter__(self) -> 'JDC':
job = self.__descriptor.wrap()
self.__future = job.future
self.__pool.submit(job)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.__future

View File

@ -1,17 +1,22 @@
from typing import AsyncIterable, Callable, Any
from typing import Any, AsyncIterable, Callable
from v6d3music.utils.effects_for_preset import effects_for_preset
from v6d3music.utils.entries_for_url import entries_for_url
from v6d3music.utils.options_for_effects import options_for_effects
from v6d3music.utils.sparq import sparq
__all__ = ('InfoCtx', 'UrlCtx', 'ArgCtx',)
class InfoCtx:
def __init__(self, info: dict[str, Any], effects: str | None, already_read: int, tor: bool) -> None:
def __init__(
self, info: dict[str, Any], effects: str | None, already_read: int, tor: bool, ignore: bool
) -> None:
self.info = info
self.effects = effects
self.already_read = already_read
self.tor = tor
self.ignore = ignore
class UrlCtx:
@ -20,14 +25,19 @@ class UrlCtx:
self.effects = effects
self.already_read = 0
self.tor = False
self.ignore = False
async def entries(self) -> AsyncIterable[InfoCtx]:
async for info in entries_for_url(self.url, self.tor):
yield InfoCtx(info, self.effects, self.already_read, self.tor)
try:
async for info in entries_for_url(self.url, self.tor):
yield InfoCtx(info, self.effects, self.already_read, self.tor, self.ignore)
except:
if not self.ignore:
raise
class ArgCtx:
def __init__(self, default_effects: Callable[[int], str | None], args: list[str], gid: int) -> None:
def __init__(self, default_effects: str | None, args: list[str]) -> None:
self.sources: list[UrlCtx] = []
while args:
match args:
@ -36,7 +46,7 @@ class ArgCtx:
case [url, '+', preset, *args]:
effects = effects_for_preset(preset)
case [url, *args]:
effects = default_effects(gid)
effects = default_effects
case _:
raise RuntimeError
ctx = UrlCtx(url, effects)
@ -56,4 +66,9 @@ class ArgCtx:
ctx.tor = True
case [*args]:
pass
match args:
case ['ignore', *args]:
ctx.ignore = True
case [*args]:
pass
self.sources.append(ctx)