loading cancellation

This commit is contained in:
AF 2023-03-31 11:30:24 +00:00
parent ca39872c8d
commit abae781b52
6 changed files with 73 additions and 34 deletions

View File

@ -1,6 +1,6 @@
ptvp35 @ git+https://gitea.parrrate.ru/PTV/ptvp35.git@e760fca39e2070b9959aeb95b53e59e895f1ad57 ptvp35 @ git+https://gitea.parrrate.ru/PTV/ptvp35.git@e760fca39e2070b9959aeb95b53e59e895f1ad57
v6d0auth @ git+https://gitea.parrrate.ru/PTV/v6d0auth.git@c718d4d1422945a756213d22d9e26aa24babe0f6 v6d0auth @ git+https://gitea.parrrate.ru/PTV/v6d0auth.git@c718d4d1422945a756213d22d9e26aa24babe0f6
v6d1tokens @ git+https://gitea.parrrate.ru/PTV/v6d1tokens.git@9ada50f111bd6e9a49c9c6683fa7504fee030056 v6d1tokens @ git+https://gitea.parrrate.ru/PTV/v6d1tokens.git@9ada50f111bd6e9a49c9c6683fa7504fee030056
v6d2ctx @ git+https://gitea.parrrate.ru/PTV/v6d2ctx.git@c9f3f5ac5c7feb2165fc4fae4eb998a0fe4f5f00 v6d2ctx @ git+https://gitea.parrrate.ru/PTV/v6d2ctx.git@226bf1b6ada0c217408590abf69379d5f44a7972
rainbowadn @ git+https://gitea.parrrate.ru/PTV/rainbowadn.git@fc1d11f4b53ac4653ffac1bbcad130855e1b7f10 rainbowadn @ git+https://gitea.parrrate.ru/PTV/rainbowadn.git@fc1d11f4b53ac4653ffac1bbcad130855e1b7f10
adaas @ git+https://gitea.parrrate.ru/PTV/adaas.git@8093665489901098f92d5a4001f1782dab6ddcf9 adaas @ git+https://gitea.parrrate.ru/PTV/adaas.git@8093665489901098f92d5a4001f1782dab6ddcf9

View File

@ -3,7 +3,7 @@ from typing import Callable
import discord import discord
from v6d2ctx.at_of import * from v6d2ctx.at_of import AtOf
from v6d2ctx.context import * from v6d2ctx.context import *
from v6d3music.core.default_effects import * from v6d3music.core.default_effects import *
from v6d3music.core.mainservice import * from v6d3music.core.mainservice import *
@ -64,6 +64,10 @@ presets: {shlex.join(allowed_presets)}
queue.append(audio) queue.append(audio)
await ctx.reply("done") await ctx.reply("done")
@at("cancel")
async def cancel(ctx: Context, _args: list[str]) -> None:
mainservice.cancel_loading(ctx)
@at("skip") @at("skip")
async def skip(ctx: Context, args: list[str]) -> None: async def skip(ctx: Context, args: list[str]) -> None:
await catch( await catch(
@ -275,6 +279,18 @@ presets: {shlex.join(allowed_presets)}
or "no queue" or "no queue"
) )
@at("np")
@at("cp")
@at("nowplaying")
@at("playing")
@at("//1")
@at("queue1")
async def queue1(ctx: Context, _args: list[str]) -> None:
await ctx.long(
(await (await mainservice.context(ctx, create=True, force_play=False).queue()).format(1)).strip()
or "no queue"
)
@at("swap") @at("swap")
async def swap(ctx: Context, args: list[str]) -> None: async def swap(ctx: Context, args: list[str]) -> None:
await catch( await catch(

View File

@ -1,7 +1,7 @@
import asyncio import asyncio
import traceback import traceback
from contextlib import AsyncExitStack from contextlib import AsyncExitStack
from typing import AsyncIterable, TypeVar from typing import AsyncIterable, Callable, TypeVar
import discord import discord
@ -22,6 +22,7 @@ from v6d3music.core.ystate import *
from v6d3music.core.ytaservicing import * from v6d3music.core.ytaservicing import *
from v6d3music.core.ytaudio import * from v6d3music.core.ytaudio import *
from v6d3music.processing.pool import * from v6d3music.processing.pool import *
from v6d3music.utils.assert_admin import assert_admin
from v6d3music.utils.argctx import * from v6d3music.utils.argctx import *
__all__ = ('MainService', 'MainMode', 'MainContext', 'MainEvent') __all__ = ('MainService', 'MainMode', 'MainContext', 'MainEvent')
@ -65,6 +66,7 @@ class MainService:
self.restore_lock = asyncio.Lock() self.restore_lock = asyncio.Lock()
self.__events: SendableEvents[MainEvent] = events self.__events: SendableEvents[MainEvent] = events
self.__pool_events: SendableEvents[PoolEvent] = _PMSendable(self.__events) self.__pool_events: SendableEvents[PoolEvent] = _PMSendable(self.__events)
self.__ystates: dict[discord.Guild, YState] = {}
def register_instrumentation(self): def register_instrumentation(self):
self.targets.register_type(v6d3music.processing.pool.UnitJob, 'run', Async) self.targets.register_type(v6d3music.processing.pool.UnitJob, 'run', Async)
@ -231,8 +233,22 @@ class MainService:
async def yt_audios(self, ctx: Context, args: list[str]) -> AsyncIterable[YTAudio]: async def yt_audios(self, ctx: Context, args: list[str]) -> AsyncIterable[YTAudio]:
assert ctx.guild is not None assert ctx.guild is not None
argctx = ArgCtx(self.defaulteffects.get(ctx.guild.id), args) argctx = ArgCtx(self.defaulteffects.get(ctx.guild.id), args)
async for audio in YState(self.__servicing, self.__pool, ctx, argctx.sources).iterate(): ystate = YState(self.__servicing, self.__pool, ctx, argctx.sources)
self.__ystates[ctx.guild] = ystate
try:
async for audio in ystate.iterate():
yield audio yield audio
finally:
del self.__ystates[ctx.guild]
def cancel_loading(self, ctx: Context) -> None:
assert ctx.guild is not None
ystate = self.__ystates.get(ctx.guild)
if ystate is None:
return
if ystate.ctx.member != ctx.member:
assert_admin(ctx.member)
ystate.cancel()
def pool_json(self) -> ResponseType: def pool_json(self) -> ResponseType:
return self.__pool.json() return self.__pool.json()

View File

@ -16,7 +16,7 @@ from v6d3music.utils.fill import *
__all__ = ('QueueAudio',) __all__ = ('QueueAudio',)
PRE_SET_LENGTH = 24 PRE_SET_LENGTH = 6
class QueueAudio(discord.AudioSource): class QueueAudio(discord.AudioSource):

View File

@ -3,16 +3,15 @@ from collections import deque
from contextlib import AsyncExitStack from contextlib import AsyncExitStack
from typing import AsyncIterable, Iterable from typing import AsyncIterable, Iterable
from v6d2ctx.integration.responsetype import *
from v6d2ctx.context import * from v6d2ctx.context import *
from v6d2ctx.integration.responsetype import *
from v6d3music.core.create_ytaudio import * from v6d3music.core.create_ytaudio import *
from v6d3music.core.ytaservicing import * from v6d3music.core.ytaservicing import *
from v6d3music.core.ytaudio import * from v6d3music.core.ytaudio import *
from v6d3music.processing.pool import * from v6d3music.processing.pool import *
from v6d3music.utils.argctx import * from v6d3music.utils.argctx import *
__all__ = ('YState',) __all__ = ("YState",)
class _Stop: class _Stop:
@ -48,7 +47,9 @@ class YState:
try: try:
return await future return await future
except OSError as e: except OSError as e:
raise Explicit('extraction error\nunknown ytdl error (probably due to video being unavailable, e.g. because of regional restrictions)') from e raise Explicit(
"extraction error\nunknown ytdl error (probably due to video being unavailable, e.g. because of regional restrictions)"
) from e
finally: finally:
self.results.task_done() self.results.task_done()
@ -82,12 +83,18 @@ class YState:
else: else:
return None return None
def cancel(self) -> None:
self.entries.clear()
self.playlists.clear()
self.sources.clear()
self.results.put_nowait(_Stop())
class YStream(JobUnit): class YStream(JobUnit):
def __init__(self, state: YState) -> None: def __init__(self, state: YState) -> None:
self.state = state self.state = state
self.__running = False self.__running = False
self.__details: dict[str, ResponseType] = {'status': 'stopped'} self.__details: dict[str, ResponseType] = {"status": "stopped"}
def _unpack_playlists(self) -> None: def _unpack_playlists(self) -> None:
while self.state.playlists and self.state.playlists[0].done(): while self.state.playlists and self.state.playlists[0].done():
@ -102,7 +109,7 @@ class YStream(JobUnit):
async def _run(self, context: JobContext, /) -> JobUnit | None: async def _run(self, context: JobContext, /) -> JobUnit | None:
if self.state.empty_processing(): if self.state.empty_processing():
self.__details = {'status': 'stopping'} self.__details = {"status": "stopping"}
if self.state.results.empty(): if self.state.results.empty():
self.state.results.put_nowait(_Stop()) self.state.results.put_nowait(_Stop())
return None return None
@ -110,20 +117,20 @@ class YStream(JobUnit):
entry = self.state.entries.popleft() entry = self.state.entries.popleft()
audiotask: asyncio.Future[YTAudio | None] audiotask: asyncio.Future[YTAudio | None]
if isinstance(entry, BaseException): if isinstance(entry, BaseException):
self._set_details(context, {'status': 'breaking downstream audio creation'}) self._set_details(context, {"status": "breaking downstream audio creation"})
audiotask = asyncio.Future() audiotask = asyncio.Future()
audiotask.set_exception(entry) audiotask.set_exception(entry)
else: else:
self._set_details( self._set_details(
context, context,
{ {
'status': 'creating audio', "status": "creating audio",
'info': cast_to_response(entry.info), "info": cast_to_response(entry.info),
'effects': entry.effects, "effects": entry.effects,
'already_read': entry.already_read, "already_read": entry.already_read,
'tor': entry.tor, "tor": entry.tor,
'ignore': entry.ignore, "ignore": entry.ignore,
} },
) )
audiotask = asyncio.create_task(self.state.result(entry)) audiotask = asyncio.create_task(self.state.result(entry))
self.state.results.put_nowait(audiotask) self.state.results.put_nowait(audiotask)
@ -133,20 +140,20 @@ class YStream(JobUnit):
self.state.entries.clear() self.state.entries.clear()
self.state.playlists.clear() self.state.playlists.clear()
self.state.sources.clear() self.state.sources.clear()
self._set_details(context, {'status': 'rescheduling self from entries'}) self._set_details(context, {"status": "rescheduling self from entries"})
return self return self
elif self.state.sources: elif self.state.sources:
source = self.state.sources.popleft() source = self.state.sources.popleft()
self._set_details( self._set_details(
context, context,
{ {
'status': 'parsing playlist', "status": "parsing playlist",
'url': source.url, "url": source.url,
'effects': source.effects, "effects": source.effects,
'already_read': source.already_read, "already_read": source.already_read,
'tor': source.tor, "tor": source.tor,
'ignore': source.ignore, "ignore": source.ignore,
} },
) )
playlisttask = asyncio.create_task(self.state.playlist(source)) playlisttask = asyncio.create_task(self.state.playlist(source))
self.state.playlists.append(playlisttask) self.state.playlists.append(playlisttask)
@ -159,13 +166,13 @@ class YStream(JobUnit):
self._unpack_playlists() self._unpack_playlists()
rescheduled = self.state.descheduled rescheduled = self.state.descheduled
self.state.descheduled = 0 self.state.descheduled = 0
self._set_details(context, {'status': 'rescheduling others', 'rescheduling': rescheduled}) self._set_details(context, {"status": "rescheduling others", "rescheduling": rescheduled})
for _ in range(rescheduled): for _ in range(rescheduled):
await self.state.es.enter_async_context(YStream(self.state).at(self.state.pool)) await self.state.es.enter_async_context(YStream(self.state).at(self.state.pool))
self._set_details(context, {'status': 'rescheduling self from sources'}) self._set_details(context, {"status": "rescheduling self from sources"})
return self return self
else: else:
self._set_details(context, {'status': 'descheduling'}) self._set_details(context, {"status": "descheduling"})
self.state.descheduled += 1 self.state.descheduled += 1
return None return None
@ -179,7 +186,7 @@ class YStream(JobUnit):
return await self._run(context) return await self._run(context)
finally: finally:
self.__running = False self.__running = False
self.__details = {'status': 'stopped'} self.__details = {"status": "stopped"}
def json(self) -> ResponseType: def json(self) -> ResponseType:
return {'type': 'ystream', 'details': self.__details, 'running': self.__running} return {"type": "ystream", "details": self.__details, "running": self.__running}

View File

@ -15,7 +15,7 @@ from v6d1tokens.client import *
from v6d2ctx.handle_content import * from v6d2ctx.handle_content import *
from v6d2ctx.integration.event import * from v6d2ctx.integration.event import *
from v6d2ctx.integration.targets import * from v6d2ctx.integration.targets import *
from v6d2ctx.pain import ABlockMonitor from v6d2ctx.pain import ABlockMonitor, ALog
from v6d2ctx.serve import * from v6d2ctx.serve import *
from v6d3music.api import * from v6d3music.api import *
from v6d3music.app import * from v6d3music.app import *