From 08192b5d93e5f9308bc3673dd03db54e9b3fdbd8 Mon Sep 17 00:00:00 2001 From: timofey Date: Sun, 15 Jan 2023 08:51:29 +0000 Subject: [PATCH] docs + operation --- Dockerfile | 3 + base.requirements.txt | 1 + docs/source/administration.rst | 42 ++++ docs/source/usage.rst | 58 ++++++ requirements.txt | 2 +- setup.py | 12 ++ v6d3music/api.py | 256 +++++++++++++++++++------ v6d3music/app.py | 39 ++-- v6d3music/commands.py | 107 ++++++----- v6d3music/core/mainaudio.py | 9 +- v6d3music/core/mainservice.py | 75 ++++++-- v6d3music/core/monitoring.py | 117 +++++++++++ v6d3music/core/queueaudio.py | 32 +++- v6d3music/core/real_url.py | 2 +- v6d3music/core/ystate.py | 58 +++++- v6d3music/core/ytaudio.py | 14 +- v6d3music/html/main.css | 4 + v6d3music/html/main.js | 11 +- v6d3music/html/operator.css | 22 +++ v6d3music/html/operator.html | 24 +++ v6d3music/html/operator.js | 69 +++++++ v6d3music/main.py | 176 +++++++++++++++++ v6d3music/processing/abstractrunner.py | 29 ++- v6d3music/processing/pool.py | 183 +++++++++++++++--- v6d3music/run-bot.py | 171 +---------------- v6d3musicbase/event.py | 60 ++++++ v6d3musicbase/responsetype.py | 17 ++ v6d3musicbase/targets.py | 76 ++++++++ 28 files changed, 1300 insertions(+), 369 deletions(-) create mode 100644 setup.py create mode 100644 v6d3music/core/monitoring.py create mode 100644 v6d3music/html/operator.css create mode 100644 v6d3music/html/operator.html create mode 100644 v6d3music/html/operator.js create mode 100644 v6d3music/main.py create mode 100644 v6d3musicbase/event.py create mode 100644 v6d3musicbase/responsetype.py create mode 100644 v6d3musicbase/targets.py diff --git a/Dockerfile b/Dockerfile index ff08454..350faae 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,5 +9,8 @@ COPY base.requirements.txt base.requirements.txt RUN pip install -r base.requirements.txt COPY requirements.txt requirements.txt RUN pip install -r requirements.txt +RUN mkdir ${v6root} +COPY v6d3musicbase v6d3musicbase COPY v6d3music v6d3music +RUN python3 -m v6d3music.main CMD ["python3", "-m", "v6d3music.run-bot"] diff --git a/base.requirements.txt b/base.requirements.txt index 250d0d0..921ee1d 100644 --- a/base.requirements.txt +++ b/base.requirements.txt @@ -1,3 +1,4 @@ aiohttp>=3.7.4,<4 discord.py[voice]~=2.1.0 yt-dlp~=2022.11.11 +typing_extensions~=4.4.0 diff --git a/docs/source/administration.rst b/docs/source/administration.rst index d856935..bba38b4 100644 --- a/docs/source/administration.rst +++ b/docs/source/administration.rst @@ -1,2 +1,44 @@ For Administrators ================== + +.. _volume-command: + +:code:`?/volume` command +------------------------ + +command syntax:: + + ?/volume + ?/volume volume + +Getting parrrate-music bot on your server +========================================= + +Self-hosting +------------ + +See :doc:`operation`. + +Developer-hosted +---------------- + +Ask parrrate-music's developers via Discord for the invite link. + +Things to consider when using this option: + +* Updates requiring bot restart (5~20 second outage) are quite frequent. +* All updates are tested live, i.e. the bot currently has no fallback stable version. + +Guild (Discord server) data we store +==================================== + +Guild IDs +--------- + +Stored for queues and volume settings. +IDs of banned guild are also stored. + +Volume settings +--------------- + +See :ref:`volume-command`. diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 248e30f..7e6fbbc 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -1,2 +1,60 @@ For Users ========= + +:code:`?/play` command +---------------------- + +command syntax:: + + ?/play url [- effects | + preset] [[[h] m] s] [tor|ignore]* ... + +examples:: + + ?/play http://127.0.0.1/audio.mp3 + bassboost tor + ?/play http://127.0.0.1/audio.mp3 - "bass=g=10" 23 59 59 ignore + ?/play http://127.0.0.1/audio.mp3 http://127.0.0.1/audio.mp3 + +:code:`?/skip` command +---------------------- + +command syntax:: + + ?/skip + ?/skip at + ?/skip start end + +examples:: + + ?/skip + ?/skip 0 + ?/skip 0 0 + +:code:`?/queue` commands +------------------------ + +command syntax:: + + ?/queue resume + ?/queue pause + ?/queue [limit] + ?/queue tail limit + +User data we store +================== + +Audio URLs, effects, user IDs +----------------------------- + +Those are required for bot's functionality. +This data is stored only for the tracks that are currently in queue. + +Audio contents +-------------- + +Persistent storage of audio is performed only for caching. +URLs aren't stored in a reversible form (only represented as hashes). + +Tokens (for the web app) +------------------------ + +Revokable as any other Discord app token. diff --git a/requirements.txt b/requirements.txt index e805fbc..1fbb143 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,4 @@ v6d0auth @ git+https://gitea.parrrate.ru/PTV/v6d0auth.git@c718d4d1422945a756213d v6d1tokens @ git+https://gitea.parrrate.ru/PTV/v6d1tokens.git@9ada50f111bd6e9a49c9c6683fa7504fee030056 v6d2ctx @ git+https://gitea.parrrate.ru/PTV/v6d2ctx.git@18001ff3403646db46f36175a824e571c5734fd6 rainbowadn @ git+https://gitea.parrrate.ru/PTV/rainbowadn.git@fc1d11f4b53ac4653ffac1bbcad130855e1b7f10 -adaas @ git+https://gitea.parrrate.ru/PTV/adaas.git@0a0da256a3be72c76fbe6af4b941ff70881d3704 +adaas @ git+https://gitea.parrrate.ru/PTV/adaas.git@0c7f974ec4955204b35f463749df138663c98550 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..9990419 --- /dev/null +++ b/setup.py @@ -0,0 +1,12 @@ +from setuptools import setup + +setup( + name='v6d3music', + version='', + packages=['v6d3music', 'v6d3musicbase'], + url='', + license='', + author='PARRRATE T&V', + author_email='', + description='' +) diff --git a/v6d3music/api.py b/v6d3music/api.py index 8a5a0cf..3d8db24 100644 --- a/v6d3music/api.py +++ b/v6d3music/api.py @@ -1,21 +1,21 @@ import asyncio import time -from typing import TypeAlias import discord +from typing_extensions import Self +from v6d3musicbase.responsetype import * +from v6d3musicbase.targets import * +from rainbowadn.instrument import Instrumentation from v6d2ctx.context import * from v6d3music.core.mainaudio import * from v6d3music.core.mainservice import * -ResponseType: TypeAlias = list | dict | float | str | None - - __all__ = ('Api',) class Api: - class MisusedApi(KeyError): + class MisusedApi(Exception): def json(self) -> dict: return {'error': list(map(str, self.args)), 'errormessage': str(self)} @@ -34,12 +34,40 @@ class Api: self.mainservice = mainservice self.client = mainservice.client self.roles = roles + self.targets = mainservice.targets + self.targets.register_instance(self, 'api', Async) + self.targets.register_instrumentation('Count', lambda t, n: Count(t, n)) + self.targets.register_instrumentation('Concurrency', lambda t, n: Concurrency(t, n), Async) + + def user_id(self) -> int | None: + if self.client.user is None: + return None + else: + return self.client.user.id def is_operator(self, user_id: int) -> bool: return '(operator)' in self.roles.get(f'roles{user_id}', '') async def api(self, request: dict, user_id: int) -> ResponseType: - return await UserApi(self, request, user_id).api() + response = await UserApi(ApiSession(self), request, user_id).api() + match response, request: + case {'time': _}, _: + pass + case dict() as d, {'time': _}: + response = d | {'time': time.time()} + return response + + +class ApiSession: + def __init__(self, api: Api) -> None: + self.__api = api + self.__complexity = 1000 + + def api(self): + if self.__complexity <= 0: + raise Api.MisusedApi('hit complexity limit') + self.__complexity -= 1 + return self.__api class UserApi: @@ -47,33 +75,52 @@ class UserApi: def json(self) -> dict: return super().json() | {'unknownmember': None} - def __init__(self, api: Api, request: dict, user_id: int) -> None: - self.pi = api - self.client = api.client + def __init__(self, session: ApiSession, request: dict, user_id: int) -> None: + self.session = session + self.pi = session.api() + self.client = self.pi.client self.request = request self.user_id = user_id + self._parent: Self | None = None + self._key: int | str | None = None async def subs(self, requests: list[dict] | dict[str, dict]) -> ResponseType: + match self.request: + case {'idkey': str() as idkey}: + pass + case _: + idkey = 'type' + match self.request: + case {'idbase': dict() as base}: + pass + case _: + base = {} match requests: case list(): return list( await asyncio.gather( - *(self.sub(request).api() for request in requests) + *(self.sub(request, key).api() for (key, request) in enumerate(requests)) ) ) case dict(): items = list(requests.items()) responses = await asyncio.gather( - *(self.sub(request if 'type' in request else request | {'type': key}).api() for key, request in items) + *(self.sub({idkey: key} | base | request, key).api() for key, request in items) ) return dict((key, response) for (key, _), response in zip(items, responses)) case _: raise Api.MisusedApi('that should not happen') - def sub(self, request: dict) -> 'UserApi': - return UserApi(self.pi, request, self.user_id) + def _sub(self, request: dict) -> Self: + return UserApi(self.session, request, self.user_id) - async def _guild_api(self, guild_id: int) -> 'GuildApi': + def sub(self, request: dict, key: str | int) -> Self: + sub = self._sub(request) + sub._parent = self + sub._key = key + return sub + + async def to_guild_api(self, guild_id: int) -> 'GuildApi': guild = self.client.get_guild(guild_id) or await self.client.fetch_guild(guild_id) if guild is None: raise UserApi.UnknownMember('unknown guild') @@ -82,19 +129,31 @@ class UserApi: raise UserApi.UnknownMember('unknown member of a guild') return GuildApi(self, member) - async def _operator_api(self) -> 'OperatorApi': + async def to_operator_api(self) -> 'OperatorApi': if not self.pi.is_operator(self.user_id): raise UserApi.UnknownMember('not an operator') - return OperatorApi(self.pi, self.request, self.user_id) + return OperatorApi(self) + + def _api_text(self) -> str: + return 'user api' + + async def _fall_through_api(self) -> ResponseType: + match self.request: + case {'type': '?'}: + return f'this is {self._api_text()}' + case {'type': '*', 'requests': list() | dict() as requests}: + return await self.subs(requests) + case _: + raise Api.UnknownApi(f'unknown {self._api_text()}') async def _api(self) -> ResponseType: match self.request: case {'guild': str() as guild_id_str} if guild_id_str.isdecimal() and len(guild_id_str) < 100: self.request.pop('guild') - return await (await self._guild_api(int(guild_id_str))).api() + return await (await self.to_guild_api(int(guild_id_str))).api() case {'operator': _}: self.request.pop('operator') - return await (await self._operator_api()).api() + return await (await self.to_operator_api()).api() case {'type': 'ping', 't': (float() | int()) as t}: return time.time() - t case {'type': 'guilds'}: @@ -103,14 +162,10 @@ class UserApi: if guild.get_member(self.user_id) is not None: guilds.append(str(guild.id)) return guilds - case {'type': '?'}: - return 'this is user api' - case {'type': '*', 'requests': list() | dict() as requests}: - return await self.subs(requests) case _: - raise Api.UnknownApi('unknown user api') + return await self._fall_through_api() - async def api(self): + async def api(self) -> ResponseType: try: try: return await self._api() @@ -131,11 +186,11 @@ class GuildApi(UserApi): return super().json() | {'notconnected': None} def __init__(self, api: UserApi, member: discord.Member) -> None: - super().__init__(api.pi, api.request, member.id) + super().__init__(api.session, api.request, member.id) self.member = member self.guild = member.guild - async def voice_api(self) -> 'VoiceApi': + async def to_voice_api(self) -> 'VoiceApi': voice = self.member.voice if voice is None: raise GuildApi.VoiceNotConnected('you are not connected to voice') @@ -148,20 +203,19 @@ class GuildApi(UserApi): raise GuildApi.VoiceNotConnected('bot not connected') return VoiceApi(self, channel) - def sub(self, request: dict) -> 'GuildApi': - return GuildApi(super().sub(request), self.member) + def _sub(self, request: dict) -> Self: + return GuildApi(super()._sub(request), self.member) + + def _api_text(self) -> str: + return 'guild api' async def _api(self) -> ResponseType: match self.request: case {'voice': _}: self.request.pop('voice') - return await (await self.voice_api()).api() - case {'type': '?'}: - return 'this is guild api' - case {'type': '*', 'requests': list() | dict() as requests}: - return await self.subs(requests) + return await (await self.to_voice_api()).api() case _: - raise Api.UnknownApi('unknown guild api') + return await self._fall_through_api() class VoiceApi(GuildApi): @@ -172,25 +226,24 @@ class VoiceApi(GuildApi): self.channel = channel self.mainservice = self.pi.mainservice - async def _main_api(self) -> 'MainApi': + async def to_main_api(self) -> 'MainApi': vc = await self.mainservice.raw_vc_for_member(self.member) - main = await self.mainservice.descriptor(create=False, force_play=False).main_for_raw_vc(vc) + main = await self.mainservice.mode(create=False, force_play=False).main_for_raw_vc(vc) return MainApi(self, vc, main) - def sub(self, request: dict) -> 'VoiceApi': - return VoiceApi(super().sub(request), self.channel) + def _sub(self, request: dict) -> Self: + return VoiceApi(super()._sub(request), self.channel) + + def _api_text(self) -> str: + return 'voice api' async def _api(self) -> ResponseType: match self.request: case {'main': _}: self.request.pop('main') - return await (await self._main_api()).api() - case {'type': '?'}: - return 'this is voice api' - case {'type': '*', 'requests': list() | dict() as requests}: - return await self.subs(requests) + return await (await self.to_main_api()).api() case _: - raise Api.UnknownApi('unknown voice api') + return await self._fall_through_api() class MainApi(VoiceApi): @@ -201,8 +254,11 @@ class MainApi(VoiceApi): self.vc = vc self.main = main - def sub(self, request: dict) -> 'MainApi': - return MainApi(super().sub(request), self.vc, self.main) + def _sub(self, request: dict) -> Self: + return MainApi(super()._sub(request), self.vc, self.main) + + def _api_text(self) -> str: + return 'main api' async def _api(self) -> ResponseType: match self.request: @@ -216,23 +272,29 @@ class MainApi(VoiceApi): return await self.main.queue.format() case {'type': 'queuejson'}: return await self.main.queue.pubjson(self.member, self.request.get('limit', 1000)) - case {'type': '?'}: - return 'this is main api' - case {'type': '*', 'requests': list() | dict() as requests}: - return await self.subs(requests) case _: - raise Api.UnknownApi('unknown main api') + return await self._fall_through_api() class OperatorApi(UserApi): - def sub(self, request: dict) -> 'OperatorApi': - return OperatorApi(self.pi, request, self.user_id) + def __init__(self, api: UserApi) -> None: + super().__init__(api.session, api.request, api.user_id) def _guild_visible(self, guild: discord.Guild) -> bool: return True + def _sub(self, request: dict) -> Self: + return OperatorApi(super()._sub(request)) + + def _api_text(self) -> str: + return 'operator api' + async def _api(self) -> ResponseType: match self.request: + case {'target': str() as targetname}: + return await InstrumentationApi(self, targetname).api() + case {'type': 'resetmonitoring'}: + return self.pi.mainservice.pmonitoring.reset() case {'type': 'guilds'}: guilds = [] for guild in self.client.guilds: @@ -245,9 +307,87 @@ class OperatorApi(UserApi): } ) return guilds - case {'type': '?'}: - return 'this is operator api' - case {'type': '*', 'requests': list() | dict() as requests}: - return await self.subs(requests) + case {'type': 'sleep', 'duration': (float() | int()) as duration, 'echo': _ as echo}: + await asyncio.sleep(duration) + return echo + case {'type': 'pool'}: + return self.pi.mainservice.pool_json() case _: - raise Api.UnknownApi('unknown operator api') + return await self._fall_through_api() + + +class InstrumentationApi(OperatorApi): + class UnknownTarget(Api.UnknownApi): + def json(self) -> dict: + return super().json() | {'unknowntarget': None} + + def __init__(self, api: OperatorApi, targetname: str) -> None: + super().__init__(api) + self.targets = self.pi.targets + self.targetname = targetname + target_tuple = self.targets.targets.get(targetname, None) + if target_tuple is None: + raise InstrumentationApi.UnknownTarget('unknown target', targetname) + self.target, self.methodname = target_tuple.value + + def _sub(self, request: dict) -> Self: + return InstrumentationApi(super()._sub(request), self.targetname) + + def _api_text(self) -> str: + return 'instrumentation api' + + async def _api(self) -> ResponseType: + match self.request: + case { + 'type': str() as instrumentationname + } if ( + instrumentation_factory := self.targets.instrumentations.get(instrumentationname) + ) is not None: + try: + instrumentation: Instrumentation = await self.pi.mainservice.pmonitoring.get( + self.targets.get_factory( + self.targetname, + self.target, + self.methodname, + instrumentationname, + instrumentation_factory.value, + ) + ) + except KeyError as e: + raise InstrumentationApi.UnknownTarget( + 'binding failed', self.targetname, instrumentationname, str(e) + ) from e + if not isinstance(instrumentation, JsonLike): + raise TypeError + return instrumentation.json() + case _: + return await self._fall_through_api() + + +class Count(Instrumentation, JsonLike): + def __init__(self, target, methodname: str): + super().__init__(target, methodname) + self.count = 0 + + def instrument(self, method, *args, **kwargs): + self.count += 1 + return method(*args, **kwargs) + + def json(self) -> ResponseType: + return self.count + + +class Concurrency(Instrumentation, JsonLike): + def __init__(self, target, methodname: str): + super().__init__(target, methodname) + self.concurrency = 0 + + async def instrument(self, method, *args, **kwargs): + self.concurrency += 1 + try: + return await method(*args, **kwargs) + finally: + self.concurrency -= 1 + + def json(self) -> ResponseType: + return self.concurrency diff --git a/v6d3music/app.py b/v6d3music/app.py index 073f770..0e9ceb7 100644 --- a/v6d3music/app.py +++ b/v6d3music/app.py @@ -1,13 +1,11 @@ import asyncio import functools -import os import urllib.parse from contextlib import AsyncExitStack from pathlib import Path from typing import Any, Callable, Coroutine, Generic, Hashable, TypeVar import aiohttp -import discord from aiohttp import web from ptvp35 import * @@ -16,7 +14,6 @@ from v6d0auth.run_app import * from v6d1tokens.client import * from v6d3music.api import * from v6d3music.config import auth_redirect, myroot -from v6d3music.core.mainservice import * from v6d3music.utils.bytes_hash import * __all__ = ('AppContext',) @@ -64,14 +61,12 @@ class MusicAppFactory(AppFactory): def __init__( self, secret: str, - client: discord.Client, api: Api, db: DbConnection ): self.secret = secret self.redirect = auth_redirect self.loop = asyncio.get_running_loop() - self.client = client self._api = api self.db = db self._token_clients: CachedDictionary[str, dict | None] = CachedDictionary( @@ -79,10 +74,11 @@ class MusicAppFactory(AppFactory): ) def auth_link(self) -> str: - if self.client.user is None: + client_id = self._api.user_id() + if client_id is None: return '' else: - return f'https://discord.com/api/oauth2/authorize?client_id={self.client.user.id}' \ + return f'https://discord.com/api/oauth2/authorize?client_id={client_id}' \ f'&redirect_uri={urllib.parse.quote(self.redirect)}&response_type=code&scope=identify' def _path(self, file: str): @@ -93,9 +89,10 @@ class MusicAppFactory(AppFactory): return f.read() async def code_token(self, code: str) -> dict: - assert self.client.user is not None + client_id = self._api.user_id() + assert client_id is not None data = { - 'client_id': str(self.client.user.id), + 'client_id': str(client_id), 'client_secret': self.secret, 'grant_type': 'authorization_code', 'code': code, @@ -205,6 +202,10 @@ class MusicAppFactory(AppFactory): @routes.get('/') async def home(_request: web.Request) -> web.StreamResponse: return web.FileResponse(self._path('home.html')) + + @routes.get('/operator/') + async def operatorhome(_request: web.Request) -> web.StreamResponse: + return web.FileResponse(self._path('operator.html')) @routes.get('/login/') async def login(_request: web.Request) -> web.StreamResponse: @@ -256,10 +257,18 @@ class MusicAppFactory(AppFactory): async def mainjs(_request: web.Request) -> web.StreamResponse: return web.FileResponse(self._path('main.js')) + @routes.get('/operator.js') + async def operatorjs(_request: web.Request) -> web.StreamResponse: + return web.FileResponse(self._path('operator.js')) + @routes.get('/main.css') async def maincss(_request: web.Request) -> web.StreamResponse: return web.FileResponse(self._path('main.css')) + @routes.get('/operator.css') + async def operatorcss(_request: web.Request) -> web.StreamResponse: + return web.FileResponse(self._path('operator.css')) + @routes.post('/api/') async def api(request: web.Request) -> web.Response: session = request.query.get('session') @@ -289,18 +298,14 @@ class MusicAppFactory(AppFactory): class AppContext: - def __init__(self, mainservice: MainService) -> None: - self.mainservice = mainservice + def __init__(self, api: Api) -> None: + self.api = api async def start(self) -> tuple[web.Application, asyncio.Task[None]] | None: try: factory = MusicAppFactory( await request_token('music-client', 'token'), - self.mainservice.client, - Api( - self.mainservice, - {key: value for key, value in os.environ.items() if key.startswith('roles')}, - ), + self.api, self.__db ) except aiohttp.ClientConnectorError: @@ -313,10 +318,10 @@ class AppContext: async def __aenter__(self) -> 'AppContext': async with AsyncExitStack() as es: self.__db = await es.enter_async_context(DbFactory(myroot / 'session.db', kvfactory=KVJson())) - self.__es = es.pop_all() self.__task: asyncio.Task[ tuple[web.Application, asyncio.Task[None]] | None ] = asyncio.create_task(self.start()) + self.__es = es.pop_all() return self async def __aexit__(self, exc_type, exc_val, exc_tb): diff --git a/v6d3music/commands.py b/v6d3music/commands.py index ed86ab2..45abb34 100644 --- a/v6d3music/commands.py +++ b/v6d3music/commands.py @@ -32,11 +32,11 @@ def get_of(mainservice: MainService) -> Callable[[str], command_type]: await catch( ctx, args, f''' - `play ...args` - `play url [- effects]/[+ preset] [[[h]]] [[m]] [s] [tor] ...args` - `pause` - `resume` - presets: {shlex.join(allowed_presets)} +`play ...args` +`play url [- effects]/[+ preset] [[[h]]] [[m]] [s] [tor] ...args` +`pause` +`resume` +presets: {shlex.join(allowed_presets)} ''', (), 'help' ) @@ -54,8 +54,8 @@ def get_of(mainservice: MainService) -> Callable[[str], command_type]: async def skip(ctx: Context, args: list[str]) -> None: await catch( ctx, args, ''' - `skip [first] [last]` - ''', 'help' +`skip [first] [last]` +''', 'help' ) assert ctx.member is not None match args: @@ -69,7 +69,7 @@ def get_of(mainservice: MainService) -> Callable[[str], command_type]: case [pos0, pos1] if pos0.isdecimal() and pos1.isdecimal(): pos0, pos1 = int(pos0), int(pos1) queue = await mainservice.context(ctx, create=False, force_play=False).queue() - for i in range(pos0, pos1 + 1): + for _ in range(pos0, pos1 + 1): if not queue.skip_at(pos0, ctx.member): pos0 += 1 case _: @@ -80,28 +80,36 @@ def get_of(mainservice: MainService) -> Callable[[str], command_type]: async def skip_to(ctx: Context, args: list[str]) -> None: await catch( ctx, args, ''' - `to [[h]] [m] s` - ''', 'help' +`to [[h]] [m] s` +''', 'help' ) match args: - case [h, m, s] if h.isdecimal() and m.isdecimal() and s.isdecimal(): + case [h, m, s, *args] if h.isdecimal() and m.isdecimal() and s.isdecimal(): seconds = 3600 * int(h) + 60 * int(m) + int(s) - case [m, s] if m.isdecimal() and s.isdecimal(): + case [m, s, *args] if m.isdecimal() and s.isdecimal(): seconds = 60 * int(m) + int(s) - case [s] if s.isdecimal(): + case [s, *args] if s.isdecimal(): seconds = int(s) case _: - raise Explicit('misformatted') + raise Explicit('misformatted, expected time') + match args: + case ['at', spos] if spos.isdecimal(): + pos = int(spos) + case []: + pos = 0 + case _: + raise Explicit('misformatted, expected position') + assert_admin(ctx.member) queue = await mainservice.context(ctx, create=False, force_play=False).queue() - queue.queue[0].set_seconds(seconds) + queue.queue[pos].set_seconds(seconds) @at('effects') async def effects_(ctx: Context, args: list[str]) -> None: await catch( ctx, args, ''' - `effects - effects` - `effects + preset` - ''', 'help' +`effects - effects` +`effects + preset` +''', 'help' ) match args: case ['-', effects]: @@ -121,9 +129,9 @@ def get_of(mainservice: MainService) -> Callable[[str], command_type]: async def default(ctx: Context, args: list[str]) -> None: await catch( ctx, args, ''' - `default - effects` - `default + preset` - `default none` +`default - effects` +`default + preset` +`default none` ''', 'help' ) assert ctx.guild is not None @@ -214,49 +222,53 @@ def get_of(mainservice: MainService) -> Callable[[str], command_type]: async def queue_(ctx: Context, args: list[str]) -> None: await catch( ctx, args, ''' - `queue` - `queue clear` - `queue resume` - `queue pause` +`queue` +`queue clear` +`queue resume` +`queue pause` ''', 'help' ) assert ctx.member is not None - limit = 100 - match args: - case [lstr, *args] if lstr.isdecimal(): - limit = int(lstr) - case [*args]: - pass match args: case []: - await ctx.long( - ( - await ( - await mainservice.context(ctx, create=True, force_play=False).queue() - ).format(limit) - ).strip() or 'no queue' - ) + limit = 24 + case [lstr] if lstr.isdecimal(): + limit = int(lstr) + case ['tail', lstr] if lstr.isdecimal(): + limit = -int(lstr) + if limit >= 0: + raise Explicit('limit of at least `1` required') case ['clear']: (await mainservice.context(ctx, create=False, force_play=False).queue()).clear(ctx.member) await ctx.reply('done') + return case ['resume']: async with mainservice.lock_for(ctx.guild): await mainservice.context(ctx, create=True, force_play=True).vc() await ctx.reply('done') + return case ['pause']: async with mainservice.lock_for(ctx.guild): vc = await mainservice.context(ctx, create=True, force_play=False).vc() vc.pause() await ctx.reply('done') + return case _: raise Explicit('misformatted') + await ctx.long( + ( + await ( + await mainservice.context(ctx, create=True, force_play=False).queue() + ).format(limit) + ).strip() or 'no queue' + ) @at('swap') async def swap(ctx: Context, args: list[str]) -> None: await catch( ctx, args, ''' - `swap a b` - ''', 'help' +`swap a b` +''', 'help' ) assert ctx.member is not None match args: @@ -270,8 +282,8 @@ def get_of(mainservice: MainService) -> Callable[[str], command_type]: async def move(ctx: Context, args: list[str]) -> None: await catch( ctx, args, ''' - `move a b` - ''', 'help' +`move a b` +''', 'help' ) assert ctx.member is not None match args: @@ -285,14 +297,17 @@ def get_of(mainservice: MainService) -> Callable[[str], command_type]: async def volume_(ctx: Context, args: list[str]) -> None: await catch( ctx, args, ''' - `volume volume` - ''', 'help' +`volume volume` +''', 'help' ) assert ctx.member is not None match args: - case [volume]: - volume = float(volume) + case [svolume]: + volume = float(svolume) await (await mainservice.context(ctx, create=True, force_play=False).main()).set(volume, ctx.member) + case []: + volume = (await mainservice.context(ctx, create=True, force_play=False).main()).get() + await ctx.reply(f'volume is {volume}') case _: raise Explicit('misformatted') diff --git a/v6d3music/core/mainaudio.py b/v6d3music/core/mainaudio.py index 441601e..096dfe0 100644 --- a/v6d3music/core/mainaudio.py +++ b/v6d3music/core/mainaudio.py @@ -10,10 +10,10 @@ __all__ = ('MainAudio',) class MainAudio(discord.PCMVolumeTransformer): - def __init__(self, db: DbConnection, queue: QueueAudio, volume: float): + def __init__(self, db: DbConnection, queue: QueueAudio): self.db = db self.queue = queue - super().__init__(self.queue, volume=volume) + super().__init__(self.queue, volume=self.get()) async def set(self, volume: float, member: discord.Member): assert_admin(member) @@ -24,6 +24,9 @@ class MainAudio(discord.PCMVolumeTransformer): self.volume = volume await self.db.set(member.guild.id, volume) + def get(self) -> float: + return self.db.get(self.queue.guild.id, 0.2) + @classmethod async def create(cls, servicing: YTAServicing, db: DbConnection, queues: DbConnection, guild: discord.Guild) -> 'MainAudio': - return cls(db, await QueueAudio.create(servicing, queues, guild), volume=db.get(guild.id, 0.2)) + return cls(db, await QueueAudio.create(servicing, queues, guild)) diff --git a/v6d3music/core/mainservice.py b/v6d3music/core/mainservice.py index 8031731..1bac940 100644 --- a/v6d3music/core/mainservice.py +++ b/v6d3music/core/mainservice.py @@ -4,10 +4,19 @@ from contextlib import AsyncExitStack from typing import AsyncIterable, TypeVar import discord +from v6d3musicbase.event import * +from v6d3musicbase.responsetype import * +from v6d3musicbase.targets import * + +import v6d3music.processing.pool +from ptvp35 import * +from v6d2ctx.context import * +from v6d2ctx.lock_for import * from v6d3music.config import myroot from v6d3music.core.caching import * from v6d3music.core.default_effects import * from v6d3music.core.mainaudio import * +from v6d3music.core.monitoring import * from v6d3music.core.queueaudio import * from v6d3music.core.ystate import * from v6d3music.core.ytaservicing import * @@ -15,22 +24,50 @@ from v6d3music.core.ytaudio import * from v6d3music.processing.pool import * from v6d3music.utils.argctx import * -from ptvp35 import * -from v6d2ctx.context import * -from v6d2ctx.lock_for import * - -__all__ = ('MainService', 'MainDescriptor', 'MainContext') +__all__ = ('MainService', 'MainMode', 'MainContext', 'MainEvent') T = TypeVar('T') +class MainEvent(Event): + pass + + +class _PMEvent(MainEvent): + def __init__(self, event: PoolEvent, /) -> None: + self.event = event + + def json(self) -> ResponseType: + return {'pool': self.event.json()} + + +class _PMSendable(SendableEvents[PoolEvent]): + def __init__(self, sendable: SendableEvents[MainEvent], /) -> None: + self.sendable = sendable + + def send(self, event: PoolEvent, /) -> None: + return self.sendable.send(_PMEvent(event)) + + class MainService: - def __init__(self, defaulteffects: DefaultEffects, client: discord.Client) -> None: + def __init__( + self, + targets: Targets, + defaulteffects: DefaultEffects, + client: discord.Client, + events: SendableEvents[MainEvent], + ) -> None: + self.targets = targets self.defaulteffects = defaulteffects self.client = client self.mains: dict[discord.Guild, MainAudio] = {} self.restore_lock = asyncio.Lock() + self.__events: SendableEvents[MainEvent] = events + self.__pool_events: SendableEvents[PoolEvent] = _PMSendable(self.__events) + + def register_instrumentation(self): + self.targets.register_type(v6d3music.processing.pool.UnitJob, 'run', Async) @staticmethod async def raw_vc_for_member(member: discord.Member) -> discord.VoiceClient: @@ -58,11 +95,11 @@ class MainService: raise Explicit('not in a guild') return await self.raw_vc_for_member(ctx.member) - def descriptor(self, *, create: bool, force_play: bool) -> 'MainDescriptor': - return MainDescriptor(self, create=create, force_play=force_play) + def mode(self, *, create: bool, force_play: bool) -> 'MainMode': + return MainMode(self, create=create, force_play=force_play) def context(self, ctx: Context, *, create: bool, force_play: bool) -> 'MainContext': - return self.descriptor(create=create, force_play=force_play).context(ctx) + return self.mode(create=create, force_play=force_play).context(ctx) async def create(self, guild: discord.Guild) -> MainAudio: return await MainAudio.create(self.__servicing, self.__volumes, self.__queues, guild) @@ -73,10 +110,13 @@ class MainService: self.__volumes = await es.enter_async_context(DbFactory(myroot / 'volume.db', kvfactory=KVJson())) self.__queues = await es.enter_async_context(DbFactory(myroot / 'queue.db', kvfactory=KVJson())) self.__caching = await es.enter_async_context(Caching()) - self.__pool = await es.enter_async_context(Pool(5)) + self.__pool = await es.enter_async_context(Pool(5, self.__pool_events)) self.__servicing = YTAServicing(self.__caching, self.__pool) self.__vcs_restored: asyncio.Future[None] = asyncio.Future() self.__save_task = asyncio.create_task(self.save_daemon()) + self.monitoring = await es.enter_async_context(Monitoring()) + self.pmonitoring = es.enter_context(PersistentMonitoring(self.monitoring)) + self.register_instrumentation() self.__es = es.pop_all() return self @@ -153,7 +193,7 @@ class MainService: vp: discord.VoiceProtocol = await channel.connect() assert isinstance(vp, discord.VoiceClient) vc = vp - await self.descriptor(create=True, force_play=True).main_for_raw_vc(vc) + await self.mode(create=True, force_play=True).main_for_raw_vc(vc) if vc_is_paused: vc.pause() @@ -193,8 +233,11 @@ class MainService: async for audio in YState(self.__servicing, self.__pool, ctx, argctx.sources).iterate(): yield audio + def pool_json(self) -> ResponseType: + return self.__pool.json() -class MainDescriptor: + +class MainMode: def __init__(self, service: MainService, *, create: bool, force_play: bool) -> None: self.mainservice = service self.mains = service.mains @@ -220,14 +263,14 @@ class MainDescriptor: class MainContext: - def __init__(self, descriptor: MainDescriptor, ctx: Context) -> None: - self.mainservice = descriptor.mainservice - self.descriptor = descriptor + def __init__(self, mode: MainMode, ctx: Context) -> None: + self.mainservice = mode.mainservice + self.mode = mode self.ctx = ctx async def vc_main(self) -> tuple[discord.VoiceClient, MainAudio]: vc = await self.mainservice.raw_vc_for(self.ctx) - return vc, await self.descriptor.main_for_raw_vc(vc) + return vc, await self.mode.main_for_raw_vc(vc) async def vc(self) -> discord.VoiceClient: vc, _ = await self.vc_main() diff --git a/v6d3music/core/monitoring.py b/v6d3music/core/monitoring.py new file mode 100644 index 0000000..7a755e3 --- /dev/null +++ b/v6d3music/core/monitoring.py @@ -0,0 +1,117 @@ +__all__ = ('Monitoring', 'PersistentMonitoring') + +import asyncio +from contextlib import AsyncExitStack, ExitStack +from typing import Any, Callable, Generic, TypeVar + +from rainbowadn.instrument import Instrumentation + +T = TypeVar('T', bound=Instrumentation, covariant=True) + + +class Provider(Generic[T]): + def __init__(self, provider: Callable[[], T], /) -> None: + self.provider = provider + self.__count = 0 + self.__empty = asyncio.Event() + self.__empty.set() + self.__closed = False + + def __enter__(self) -> T: + if self.__closed: + raise RuntimeError('the provider is closed') + if self.__count < 0: + raise RuntimeError + if self.__count == 0: + self.__instrumentation = self.provider().__enter__() + self.__empty.clear() + self.__count += 1 + return self.__instrumentation + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.__count <= 0: + raise RuntimeError + self.__count -= 1 + if self.__count == 0: + self.__empty.set() + try: + self.__instrumentation.__exit__(exc_type, exc_val, exc_tb) + except: + self.__closed = True + raise + finally: + del self.__instrumentation + + async def stop(self) -> None: + while self.__count: + await self.__empty.wait() + self.__closed = True + + +class ProviderManager(Generic[T]): + def __init__(self, provider: Callable[[], T], /) -> None: + self.provider = Provider(provider) + + async def __aenter__(self) -> Provider: + return self.provider + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.provider.stop() + + +class Monitoring: + async def get(self, provider: Callable[[], T]) -> Provider[T]: + if provider not in self.__providers: + self.__providers[provider] = asyncio.create_task( + self.__es.enter_async_context(ProviderManager(provider)) + ) + return await self.__providers[provider] + + async def __aenter__(self) -> 'Monitoring': + async with AsyncExitStack() as es: + self.__providers: dict[ + Callable[[], Instrumentation], + asyncio.Future[Provider] + ] = {} + self.__es = es.pop_all() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + async with self.__es: + del self.__es + + +class PersistentMonitoring: + def __init__(self, monitoring: Monitoring) -> None: + self.__monitoring = monitoring + + async def _get(self, provider: Callable[[], T]) -> T: + return self.__es.enter_context(await self.__monitoring.get(provider)) + + async def get(self, provider: Callable[[], T]) -> T: + if provider not in self.__instrumentations: + self.__instrumentations[provider] = asyncio.create_task( + self._get(provider) + ) + return await self.__instrumentations[provider] + + def __enter__(self) -> 'PersistentMonitoring': + self.__instrumentations: dict[ + Callable[[], Instrumentation], + asyncio.Future + ] = {} + self.__es = ExitStack() + return self + + def reset(self) -> int: + with self.__es: + self.__es = ExitStack() + length = len(self.__instrumentations) + self.__instrumentations.clear() + return length + raise RuntimeError + + def __exit__(self, exc_type, exc_val, exc_tb): + with self.__es: + del self.__instrumentations + del self.__es diff --git a/v6d3music/core/queueaudio.py b/v6d3music/core/queueaudio.py index f1f16b3..a562a82 100644 --- a/v6d3music/core/queueaudio.py +++ b/v6d3music/core/queueaudio.py @@ -130,22 +130,34 @@ class QueueAudio(discord.AudioSource): self.update_sources() async def format(self, limit=100) -> str: - if limit > 100: + if limit > 100 or limit < -100: raise Explicit('queue limit is too large') stream = StringIO() - for i, audio in enumerate(lst := list(self.queue)): - if i >= limit: - stream.write(f'cutting queue at {limit} results, {len(lst) - limit} remaining.\n') - break + lst = list(self.queue) + llst = len(lst) + + def write(): stream.write(f'`[{i}]` `{audio.source_timecode()} / {audio.duration()}` {audio.description}\n') + + if limit >= 0: + for i, audio in enumerate(lst): + if i >= limit: + stream.write(f'cutting queue at {limit} results, {llst - limit} remaining.\n') + break + write() + else: + for i_, audio in enumerate(lst[limit:]): + i = llst + limit + i_ + write() return stream.getvalue() def cleanup(self): - for audio in self.queue: - try: - audio.cleanup() - except ValueError: - pass + pass + # for audio in self.queue: + # try: + # audio.cleanup() + # except ValueError: + # pass async def pubjson(self, member: discord.Member, limit: int) -> list: import random diff --git a/v6d3music/core/real_url.py b/v6d3music/core/real_url.py index ff712a9..a72874d 100644 --- a/v6d3music/core/real_url.py +++ b/v6d3music/core/real_url.py @@ -32,7 +32,7 @@ async def _resolve_url(url: str, tor: bool) -> str: async def real_url(caching: Caching, url: str, override: bool, tor: bool) -> str: if adaas_available and not tor: - return await RemoteCache().real_url(url, override, tor) + return await RemoteCache().real_url(url, override, tor, True) hurl: str = bytes_hash(url.encode()) if not override: curl: str | None = caching.get(hurl) diff --git a/v6d3music/core/ystate.py b/v6d3music/core/ystate.py index c5b4773..cd1d70b 100644 --- a/v6d3music/core/ystate.py +++ b/v6d3music/core/ystate.py @@ -3,14 +3,15 @@ from collections import deque from contextlib import AsyncExitStack from typing import AsyncIterable, Iterable +from v6d3musicbase.responsetype import * + +from v6d2ctx.context import * from v6d3music.core.create_ytaudio import * from v6d3music.core.ytaservicing import * from v6d3music.core.ytaudio import * from v6d3music.processing.pool import * from v6d3music.utils.argctx import * -from v6d2ctx.context import * - __all__ = ('YState',) @@ -38,7 +39,7 @@ class YState: async def _start_workers(self) -> None: for _ in range(self.pool.workers()): - await self.es.enter_async_context(YJD(self).at(self.pool)) + await self.es.enter_async_context(YStream(self).at(self.pool)) async def _next_audio(self) -> YTAudio | None | _Stop: future = await self.results.get() @@ -82,9 +83,11 @@ class YState: return None -class YJD(JobDescriptor): +class YStream(JobUnit): def __init__(self, state: YState) -> None: self.state = state + self.__running = False + self.__details: dict[str, ResponseType] = {'status': 'stopped'} def _unpack_playlists(self) -> None: while self.state.playlists and self.state.playlists[0].done(): @@ -97,8 +100,9 @@ class YJD(JobDescriptor): for entry in playlist.result(): self.state.entries.append(entry) - async def run(self) -> JobDescriptor | None: + async def _run(self, context: JobContext, /) -> JobUnit | None: if self.state.empty_processing(): + self.__details = {'status': 'stopping'} if self.state.results.empty(): self.state.results.put_nowait(_Stop()) return None @@ -106,9 +110,21 @@ class YJD(JobDescriptor): entry = self.state.entries.popleft() audiotask: asyncio.Future[YTAudio | None] if isinstance(entry, BaseException): + self._set_details(context, {'status': 'breaking downstream audio creation'}) audiotask = asyncio.Future() audiotask.set_exception(entry) else: + self._set_details( + context, + { + 'status': 'creating audio', + 'info': cast_to_response(entry.info), + 'effects': entry.effects, + 'already_read': entry.already_read, + 'tor': entry.tor, + 'ignore': entry.ignore, + } + ) audiotask = asyncio.create_task(self.state.result(entry)) self.state.results.put_nowait(audiotask) try: @@ -117,9 +133,21 @@ class YJD(JobDescriptor): self.state.entries.clear() self.state.playlists.clear() self.state.sources.clear() + self._set_details(context, {'status': 'rescheduling self from entries'}) return self elif self.state.sources: source = self.state.sources.popleft() + self._set_details( + context, + { + 'status': 'parsing playlist', + 'url': source.url, + 'effects': source.effects, + 'already_read': source.already_read, + 'tor': source.tor, + 'ignore': source.ignore, + } + ) playlisttask = asyncio.create_task(self.state.playlist(source)) self.state.playlists.append(playlisttask) try: @@ -131,9 +159,27 @@ class YJD(JobDescriptor): self._unpack_playlists() rescheduled = self.state.descheduled self.state.descheduled = 0 + self._set_details(context, {'status': 'rescheduling others', 'rescheduling': rescheduled}) for _ in range(rescheduled): - await self.state.es.enter_async_context(YJD(self.state).at(self.state.pool)) + await self.state.es.enter_async_context(YStream(self.state).at(self.state.pool)) + self._set_details(context, {'status': 'rescheduling self from sources'}) return self else: + self._set_details(context, {'status': 'descheduling'}) self.state.descheduled += 1 return None + + def _set_details(self, context: JobContext, details: dict[str, ResponseType], /) -> None: + self.__details = details + context.events.send(JobStatusChanged(self)) + + async def run(self, context: JobContext, /) -> JobUnit | None: + try: + self.__running = True + return await self._run(context) + finally: + self.__running = False + self.__details = {'status': 'stopped'} + + def json(self) -> ResponseType: + return {'type': 'ystream', 'details': self.__details, 'running': self.__running} diff --git a/v6d3music/core/ytaudio.py b/v6d3music/core/ytaudio.py index 9a08d01..94c948d 100644 --- a/v6d3music/core/ytaudio.py +++ b/v6d3music/core/ytaudio.py @@ -8,6 +8,7 @@ from v6d2ctx.context import * from v6d3music.core.ffmpegnormalaudio import * from v6d3music.core.real_url import * from v6d3music.core.ytaservicing import * +from v6d3music.processing.abstractrunner import * from v6d3music.utils.fill import * from v6d3music.utils.sparq import * from v6d3music.utils.tor_prefix import * @@ -43,6 +44,7 @@ class YTAudio(discord.AudioSource): self.regenerating = False # self.set_source() self._durations: dict[str, str] = {} + self._duration_lock = asyncio.Lock() self.loop = asyncio.get_running_loop() self.stop_at: int | None = stop_at @@ -84,7 +86,7 @@ class YTAudio(discord.AudioSource): def schedule_duration_update(self): self.loop.call_soon_threadsafe(self._schedule_duration_update) - async def _update_duration(self): + async def _do_update_duration(self): url: str = self.url if url in self._durations: return @@ -108,6 +110,14 @@ class YTAudio(discord.AudioSource): assert ap.stdout is not None self._durations[url] = (await ap.stdout.read()).decode().strip().split('.')[0] + async def _update_duration(self): + async with self._duration_lock: + await self._do_update_duration() + + async def _update_duration_context(self, context: CoroContext): + context.events.send(CoroStatusChanged({'ytaudio': 'duration'})) + await self._update_duration() + async def update_duration(self): await self.servicing.runner.run(self._update_duration()) @@ -121,7 +131,7 @@ class YTAudio(discord.AudioSource): before_options = '' if 'https' in self.url: before_options += ( - '-reconnect 1 -reconnect_at_eof 0 -reconnect_streamed 1 -reconnect_delay_max 60 -rw_timeout 5000000 -copy_unknown' + '-reconnect 1 -reconnect_at_eof 0 -reconnect_streamed 1 -reconnect_delay_max 60 -copy_unknown' ) if self.already_read: before_options += ( diff --git a/v6d3music/html/main.css b/v6d3music/html/main.css index 26b5ce0..93c7531 100644 --- a/v6d3music/html/main.css +++ b/v6d3music/html/main.css @@ -44,3 +44,7 @@ body, width: 100%; background: #050505; } + +#homeroot { + padding: 1em; +} diff --git a/v6d3music/html/main.js b/v6d3music/html/main.js index 475a409..6b1772b 100644 --- a/v6d3music/html/main.js +++ b/v6d3music/html/main.js @@ -191,17 +191,14 @@ const aQueueWidget = async () => { return el; }; const pageHome = async () => { - return baseEl( - "div", + const el = document.createElement("div"); + el.append( baseEl("div", aLogin()), baseEl("div", await userAvatarImg()), baseEl("div", await userId()), baseEl("div", await userUsername()), baseEl("div", await aQueueWidget()) ); + el.id = "homeroot"; + return el; }; -aApi({ - type: "guilds", - operator: null, - catches: { "not an operator": null, "*": null }, -}).then(console.log); diff --git a/v6d3music/html/operator.css b/v6d3music/html/operator.css new file mode 100644 index 0000000..64b8b6f --- /dev/null +++ b/v6d3music/html/operator.css @@ -0,0 +1,22 @@ +#operatorroot { + height: 10em; + width: 100%; +} + +#operation { + width: 100%; +} + +#workerpool { + display: grid; + grid-template-columns: repeat(5, 1fr); + gap: 1em; + padding: 1em; + height: 5em; + overflow: hidden; +} + +.workerview { + background: #0f0f0f; + overflow: hidden; +} diff --git a/v6d3music/html/operator.html b/v6d3music/html/operator.html new file mode 100644 index 0000000..2ed4b38 --- /dev/null +++ b/v6d3music/html/operator.html @@ -0,0 +1,24 @@ + + + + + + +
+ +
+ +
+ + + + + diff --git a/v6d3music/html/operator.js b/v6d3music/html/operator.js new file mode 100644 index 0000000..2457b6c --- /dev/null +++ b/v6d3music/html/operator.js @@ -0,0 +1,69 @@ +aApi({ + type: "guilds", + operator: null, + catches: { "not an operator": null, "*": null }, +}).then(console.log); +aApi({ + type: "sleep", + operator: null, + duration: 1, + echo: {}, + time: null, + catches: { "not an operator": null, "*": null }, +}).then(console.log); +aApi({ + type: "*", + idkey: "target", + idbase: { + type: "*", + requests: { + Count: {}, + Concurrency: {}, + }, + }, + operator: null, + requests: { + "v6d3music.api.Api().api": {}, + "v6d3music.processing.pool.UnitJob.run": {}, + }, + catches: { "not an operator": null, "*": null }, + time: null, +}).then((value) => console.log(JSON.stringify(value, undefined, 2))); +aApi({ + type: "pool", + operator: null, + catches: { "not an operator": null, "*": null }, +}).then((value) => console.log(JSON.stringify(value, undefined, 2))); +const elJob = (job) => { + const jobview = document.createElement("div"); + jobview.classList.add("jobview"); + jobview.innerText = JSON.stringify(job); + return jobview; +}; +const elWorker = (worker) => { + const workerview = document.createElement("div"); + workerview.classList.add("workerview"); + workerview.append(elJob(worker.job)); + workerview.append(`qsize: ${worker.qsize}`); + return workerview; +}; +const elPool = async () => { + const pool = document.createElement("div"); + pool.id = "workerpool"; + const workers = await aApi({ + type: "pool", + operator: null, + catches: { "not an operator": null, "*": null }, + }); + if (workers === null || workers.error !== undefined) return null; + for (const worker of workers) { + pool.append(elWorker(worker)); + } + return pool; +}; +const pageOperator = async () => { + const operation = document.createElement("div"); + operation.id = "operation"; + operation.append(await elPool()); + return operation; +}; diff --git a/v6d3music/main.py b/v6d3music/main.py new file mode 100644 index 0000000..2e6deba --- /dev/null +++ b/v6d3music/main.py @@ -0,0 +1,176 @@ +import asyncio +import contextlib +import os +import sys +import time +from traceback import print_exc + +import discord +from v6d3musicbase.event import * +from v6d3musicbase.targets import * + +from ptvp35 import * +from rainbowadn.instrument import Instrumentation +from v6d1tokens.client import * +from v6d2ctx.handle_content import * +from v6d2ctx.pain import * +from v6d2ctx.serve import * +from v6d3music.api import * +from v6d3music.app import * +from v6d3music.commands import * +from v6d3music.config import prefix +from v6d3music.core.caching import * +from v6d3music.core.default_effects import * +from v6d3music.core.mainservice import * + +loop = asyncio.new_event_loop() +asyncio.set_event_loop(loop) + + +class MusicClient(discord.Client): + pass + + +_client = MusicClient( + intents=discord.Intents( + members=True, + guilds=True, + bans=True, + emojis=True, + invites=True, + voice_states=True, + guild_messages=True, + reactions=True, + message_content=True, + ), + loop=loop, +) + + +banned_guilds = set(map(int, filter(bool, map(str.strip, os.getenv('banned_guilds', '').split(':'))))) + + +def guild_allowed(guild: discord.Guild | None) -> bool: + return guild is not None and guild.id not in banned_guilds + + +def message_allowed(message: discord.Message) -> bool: + return guild_allowed(message.guild) + + +def register_handlers(client: discord.Client, mainservice: MainService): + of = get_of(mainservice) + + @client.event + async def on_message(message: discord.Message) -> None: + if message_allowed(message): + try: + await handle_content(of, message, message.content, prefix, client) + except: + print_exc() + + @client.event + async def on_ready(): + print('ready') + await client.change_presence( + activity=discord.Game( + name='феноменально', + ) + ) + await mainservice.restore() + + +class UpgradeABMInit(Instrumentation): + def __init__(self): + super().__init__(ABlockMonitor, '__init__') + + def instrument(self, method, abm, *, threshold=0.0, delta=10.0, interval=0.0): + print('created upgraded') + method(abm, threshold=threshold, delta=delta, interval=interval) + abm.threshold = threshold + + +class UpgradeABMTask(Instrumentation): + def __init__(self): + super().__init__(ABlockMonitor, '_monitor') + + async def instrument(self, _, abm): + print('started upgraded') + while True: + delta = abm.delta + t = time.time() + await asyncio.sleep(delta) + spent = time.time() - t + delay = spent - delta + if delay > abm.threshold: + abm.threshold = delay + print( + f'upgraded block monitor reached new peak delay {delay:.4f}') + interval = abm.interval + if interval > 0: + await asyncio.sleep(interval) + + +def _upgrade_abm() -> contextlib.ExitStack: + with contextlib.ExitStack() as es: + es.enter_context(UpgradeABMInit()) + es.enter_context(UpgradeABMTask()) + return es.pop_all() + raise RuntimeError + + +class PathPrint(Instrumentation): + def __init__(self, methodname: str, pref: str): + super().__init__(DbConnection, methodname) + self.pref = pref + + async def instrument(self, method, db: DbConnection, *args, **kwargs): + result = await method(db, *args, **kwargs) + try: + print(self.pref, db._DbConnection__path) # type: ignore + except Exception: + from traceback import print_exc + print_exc() + return result + + +def _db_ee() -> contextlib.ExitStack: + with contextlib.ExitStack() as es: + es.enter_context(PathPrint('_initialize', 'open :')) + es.enter_context(PathPrint('aclose', 'close:')) + return es.pop_all() + raise RuntimeError + + +async def amain(client: discord.Client): + roles = {key: value for key, value in os.environ.items() if key.startswith('roles')} + async with ( + client, + DefaultEffects() as defaulteffects, + MainService(Targets(), defaulteffects, client, Events()) as mainservice, + AppContext(Api(mainservice, roles)), + ABlockMonitor(delta=0.5) + ): + register_handlers(client, mainservice) + if 'guerilla' in sys.argv: + from pathlib import Path + tokenpath = Path('.token.txt') + if tokenpath.exists(): + token = tokenpath.read_text() + else: + token = input('token:') + tokenpath.write_text(token) + elif (token_ := os.getenv('trial_token')): + token = token_ + else: + token = await request_token('music', 'token') + await client.login(token) + if os.getenv('v6tor', None) is None: + print('no tor') + await client.connect() + print('exited') + + +def main() -> None: + with _upgrade_abm(), _db_ee(): + serve(amain(_client), _client, loop) diff --git a/v6d3music/processing/abstractrunner.py b/v6d3music/processing/abstractrunner.py index 7d5605f..448eef5 100644 --- a/v6d3music/processing/abstractrunner.py +++ b/v6d3music/processing/abstractrunner.py @@ -1,13 +1,36 @@ -from typing import Any, Coroutine, TypeVar +__all__ = ('AbstractRunner', 'CoroEvent', 'CoroContext', 'CoroStatusChanged') + from abc import ABC, abstractmethod +from typing import Any, Callable, Coroutine, TypeVar + +from v6d3musicbase.event import * +from v6d3musicbase.responsetype import * T = TypeVar('T') -__all__ = ('AbstractRunner',) +class CoroEvent(Event): + pass + + +class CoroContext: + def __init__(self, events: SendableEvents[CoroEvent]) -> None: + self.events = events + + +class CoroStatusChanged(CoroEvent): + def __init__(self, status: ResponseType) -> None: + self.status = status + + def json(self) -> ResponseType: + return {'status': self.status} class AbstractRunner(ABC): @abstractmethod - async def run(self, coro: Coroutine[Any, Any, T]) -> T: + async def run(self, coro: Coroutine[Any, Any, T], /) -> T: + raise NotImplementedError + + @abstractmethod + async def runctx(self, ctxcoro: Callable[[CoroContext], Coroutine[Any, Any, T]], /) -> T: raise NotImplementedError diff --git a/v6d3music/processing/pool.py b/v6d3music/processing/pool.py index dd1c747..c70ac95 100644 --- a/v6d3music/processing/pool.py +++ b/v6d3music/processing/pool.py @@ -1,56 +1,115 @@ +__all__ = ('Job', 'Pool', 'JobUnit', 'JobContext', 'JobStatusChanged', 'PoolEvent') + import asyncio -from typing import Any, Coroutine, Generic, TypeVar, Union +from typing import Any, Callable, Coroutine, Generic, TypeVar, Union + +from v6d3musicbase.event import * +from v6d3musicbase.responsetype import * from .abstractrunner import * -__all__ = ('Job', 'Pool', 'JobDescriptor',) + +class JobEvent(Event): + pass + + +class JobContext: + def __init__(self, events: SendableEvents[JobEvent]) -> None: + self.events = events class Job: def __init__(self, future: asyncio.Future[None]) -> None: self.future = future - async def run(self) -> Union['Job', None]: + async def run(self, context: JobContext, /) -> Union['Job', None]: raise NotImplementedError + def json(self) -> ResponseType: + return {'type': 'unknown'} -class JobDescriptor: - async def run(self) -> Union['JobDescriptor', None]: + +class JobUnit: + async def run(self, context: JobContext, /) -> Union['JobUnit', None]: raise NotImplementedError def wrap(self) -> Job: - return DescriptorJob(asyncio.Future(), self) + return UnitJob(asyncio.Future(), self) def at(self, pool: 'Pool') -> 'JDC': return JDC(self, pool) + def json(self) -> ResponseType: + return {'type': 'unknown'} -class DescriptorJob(Job): - def __init__(self, future: asyncio.Future[None], descriptor: JobDescriptor) -> None: + +class JobStatusChanged(JobEvent): + def __init__(self, job: Job | JobUnit) -> None: + self.job = job + + def json(self) -> ResponseType: + return {'status': self.job.json()} + + +class UnitJob(Job): + def __init__(self, future: asyncio.Future[None], unit: JobUnit) -> None: super().__init__(future) - self.__descriptor = descriptor + self.__unit = unit - async def run(self) -> Job | None: - next_descriptor = await self.__descriptor.run() - if next_descriptor is None: + async def run(self, context: JobContext, /) -> Job | None: + next_unit = await self.__unit.run(context) + if next_unit is None: return None else: - return DescriptorJob(self.future, next_descriptor) + return UnitJob(self.future, next_unit) + + def json(self) -> ResponseType: + return self.__unit.json() + + +class WorkerEvent(Event): + pass + + +class _JWEvent(WorkerEvent): + def __init__(self, event: JobEvent, /) -> None: + self.event = event + + def json(self) -> ResponseType: + return {'job': self.event.json()} + + +class _JWSendable(SendableEvents[JobEvent]): + def __init__(self, sendable: SendableEvents[WorkerEvent]) -> None: + self.sendable = sendable + + def send(self, event: JobEvent, /) -> None: + return self.sendable.send(_JWEvent(event)) class Worker: - def __init__(self) -> None: + def __init__(self, events: SendableEvents[WorkerEvent], /) -> None: self.__queue: asyncio.Queue[Job | None] = asyncio.Queue() self.__working = False self.__busy = 0 + self.__job = None + self.__events = events + self.__job_events = _JWSendable(self.__events) - def _put_nowait(self, job: Job | None) -> None: + def _put_nowait(self, job: Job | None, /) -> None: self.__queue.put_nowait(job) self.__busy += 1 + async def _run(self, job: Job, /) -> Job | None: + try: + self.__job = job + return await job.run(JobContext(self.__job_events)) + finally: + self.__job = None + async def _handle(self, job: Job) -> None: try: - next_job = await job.run() + next_job = await self._run(job) except BaseException as e: job.future.set_exception(e) else: @@ -138,6 +197,18 @@ class Worker: def busy(self) -> int: return self.__busy + def _job_json(self) -> ResponseType: + if self.__job is None: + return None + else: + return self.__job.json() + + def json(self) -> ResponseType: + return { + 'job': self._job_json(), + 'qsize': self.__queue.qsize(), + } + class Working: def __init__(self, worker: Worker, task: asyncio.Future[None]) -> None: @@ -152,8 +223,8 @@ class Working: self.__worker.submit(job) @classmethod - def start(cls) -> 'Working': - worker = Worker() + def start(cls, events: SendableEvents[WorkerEvent], /) -> 'Working': + worker = Worker(events) task = worker.start() return cls(worker, task) @@ -163,30 +234,76 @@ class Working: def busy(self) -> int: return self.__worker.busy() + def json(self) -> ResponseType: + return self.__worker.json() + T = TypeVar('T') -class CoroJD(JobDescriptor, Generic[T]): - def __init__(self, coro: Coroutine[Any, Any, T]) -> None: +class CoroJD(JobUnit, Generic[T]): + def __init__(self, ctxcoro: Callable[[CoroContext], Coroutine[Any, Any, T]], /) -> None: self.future = asyncio.Future() - self.coro = coro + self.ctxcoro = ctxcoro + self.status: ResponseType = None - async def run(self) -> JobDescriptor | None: + async def run(self, context: JobContext, /) -> JobUnit | None: try: - self.future.set_result(await self.coro) + self.future.set_result(await self.ctxcoro(CoroContext(_CJSendable(context.events, self)))) except BaseException as e: self.future.set_exception(e) return None + def json(self) -> ResponseType: + return {'coroutine': self.status} + + +class _CJSendable(SendableEvents[CoroEvent]): + def __init__(self, sendable: SendableEvents[JobEvent], corojd: CoroJD) -> None: + self.sendable = sendable + self.corojd = corojd + + def send(self, event: CoroEvent, /) -> None: + match event: + case CoroStatusChanged() as csc: + self.corojd.status = csc.status + self.sendable.send(JobStatusChanged(self.corojd)) + case _: + pass + + +class PoolEvent(Event): + pass + + +class _WPEvent(PoolEvent): + def __init__(self, event: WorkerEvent, /) -> None: + self.event = event + + def json(self) -> ResponseType: + return {'worker': self.event.json()} + + +class _WPSendable(SendableEvents[WorkerEvent]): + def __init__(self, sendable: SendableEvents[PoolEvent], /) -> None: + self.sendable = sendable + + def send(self, event: WorkerEvent, /) -> None: + return self.sendable.send(_WPEvent(event)) + class Pool(AbstractRunner): - def __init__(self, workers: int) -> None: + def __init__(self, workers: int, events: SendableEvents[PoolEvent], /) -> None: if workers < 1: raise ValueError('non-positive number of workers') self.__workers = workers self.__working = False self.__open = False + self.__events: SendableEvents[PoolEvent] = events + self.__worker_events: SendableEvents[WorkerEvent] = _WPSendable(self.__events) + + def _start_worker(self) -> Working: + return Working.start(self.__worker_events) async def __aenter__(self) -> 'Pool': if self.__open: @@ -194,7 +311,7 @@ class Pool(AbstractRunner): if self.__working: raise RuntimeError('starting an already running pool') self.__working = True - self.__pool = set(Working.start() for _ in range(self.__workers)) + self.__pool: set[Working] = set(self._start_worker() for _ in range(self.__workers)) self.__open = True return self @@ -219,19 +336,25 @@ class Pool(AbstractRunner): def workers(self) -> int: return self.__workers - async def run(self, coro: Coroutine[Any, Any, T]) -> T: - job = CoroJD(coro) + async def run(self, coro: Coroutine[Any, Any, T], /) -> T: + return await self.runctx(lambda _: coro) + + async def runctx(self, ctxcoro: Callable[[CoroContext], Coroutine[Any, Any, T]], /) -> T: + job = CoroJD(ctxcoro) self.submit(job.wrap()) return await job.future + def json(self) -> ResponseType: + return [working.json() for working in self.__pool] + class JDC: - def __init__(self, descriptor: JobDescriptor, pool: Pool) -> None: - self.__descriptor = descriptor + def __init__(self, unit: JobUnit, pool: Pool) -> None: + self.__unit = unit self.__pool = pool async def __aenter__(self) -> 'JDC': - job = self.__descriptor.wrap() + job = self.__unit.wrap() self.__future = job.future self.__pool.submit(job) return self diff --git a/v6d3music/run-bot.py b/v6d3music/run-bot.py index 565ced8..f5d590c 100644 --- a/v6d3music/run-bot.py +++ b/v6d3music/run-bot.py @@ -1,172 +1,5 @@ -import asyncio -import contextlib -import os -import sys -import time -from traceback import print_exc - -import discord - -from ptvp35 import * -from rainbowadn.instrument import Instrumentation -from v6d1tokens.client import * -from v6d2ctx.handle_content import * -from v6d2ctx.pain import * -from v6d2ctx.serve import * -from v6d3music.app import * -from v6d3music.commands import * -from v6d3music.config import prefix -from v6d3music.core.caching import * -from v6d3music.core.default_effects import * -from v6d3music.core.mainservice import * - -loop = asyncio.new_event_loop() -asyncio.set_event_loop(loop) - - -class MusicClient(discord.Client): - pass - - -_client = MusicClient( - intents=discord.Intents( - members=True, - guilds=True, - bans=True, - emojis=True, - invites=True, - voice_states=True, - guild_messages=True, - reactions=True, - message_content=True, - ), - loop=loop, -) - - -banned_guilds = set(map(int, map(str.strip, os.getenv('banned_guilds', '').split(':')))) - - -def guild_allowed(guild: discord.Guild | None) -> bool: - return guild is not None and guild.id not in banned_guilds - - -def message_allowed(message: discord.Message) -> bool: - return guild_allowed(message.guild) - - -def register_handlers(client: discord.Client, mainservice: MainService): - of = get_of(mainservice) - - @client.event - async def on_message(message: discord.Message) -> None: - if message_allowed(message): - try: - await handle_content(of, message, message.content, prefix, client) - except: - print_exc() - - @client.event - async def on_ready(): - print('ready') - await client.change_presence( - activity=discord.Game( - name='феноменально', - ) - ) - await mainservice.restore() - - -class UpgradeABMInit(Instrumentation): - def __init__(self): - super().__init__(ABlockMonitor, '__init__') - - def instrument(self, method, abm, *, threshold=0.0, delta=10.0, interval=0.0): - print('created upgraded') - method(abm, threshold=threshold, delta=delta, interval=interval) - abm.threshold = threshold - - -class UpgradeABMTask(Instrumentation): - def __init__(self): - super().__init__(ABlockMonitor, '_monitor') - - async def instrument(self, _, abm): - print('started upgraded') - while True: - delta = abm.delta - t = time.time() - await asyncio.sleep(delta) - spent = time.time() - t - delay = spent - delta - if delay > abm.threshold: - abm.threshold = delay - print( - f'upgraded block monitor reached new peak delay {delay:.4f}') - interval = abm.interval - if interval > 0: - await asyncio.sleep(interval) - - -def _upgrade_abm() -> contextlib.ExitStack: - with contextlib.ExitStack() as es: - es.enter_context(UpgradeABMInit()) - es.enter_context(UpgradeABMTask()) - return es.pop_all() - raise RuntimeError - - -class PathPrint(Instrumentation): - def __init__(self, methodname: str, pref: str): - super().__init__(DbConnection, methodname) - self.pref = pref - - async def instrument(self, method, db: DbConnection, *args, **kwargs): - result = await method(db, *args, **kwargs) - try: - print(self.pref, db._DbConnection__path) # type: ignore - except Exception: - from traceback import print_exc - print_exc() - return result - - -def _db_ee() -> contextlib.ExitStack: - with contextlib.ExitStack() as es: - es.enter_context(PathPrint('_initialize', 'open :')) - es.enter_context(PathPrint('aclose', 'close:')) - return es.pop_all() - raise RuntimeError - - -async def main(client: discord.Client): - async with ( - client, - DefaultEffects() as defaulteffects, - MainService(defaulteffects, client) as mainservice, - AppContext(mainservice), - ABlockMonitor(delta=0.5) - ): - register_handlers(client, mainservice) - if 'guerilla' in sys.argv: - from pathlib import Path - tokenpath = Path('.token.txt') - if tokenpath.exists(): - token = tokenpath.read_text() - else: - token = input('token:') - tokenpath.write_text(token) - elif (token_ := os.getenv('trial_token')): - token = token_ - else: - token = await request_token('music', 'token') - await client.login(token) - if os.getenv('v6tor', None) is None: - print('no tor') - await client.connect() - print('exited') +from .main import main if __name__ == '__main__': - with _upgrade_abm(), _db_ee(): - serve(main(_client), _client, loop) + main() diff --git a/v6d3musicbase/event.py b/v6d3musicbase/event.py new file mode 100644 index 0000000..7f8b4fb --- /dev/null +++ b/v6d3musicbase/event.py @@ -0,0 +1,60 @@ +__all__ = ('Event', 'SendableEvents', 'ReceivableEvents', 'Events', 'Receiver') + +import asyncio +from typing import Callable, Generic, TypeVar + +from typing_extensions import Self + +from .responsetype import ResponseType + + +class Event: + def json(self) -> ResponseType: + raise NotImplementedError + + +T = TypeVar('T', bound=Event) +T_co = TypeVar('T_co', bound=Event, covariant=True) +T_contra = TypeVar('T_contra', bound=Event, contravariant=True) + + +class Receiver(Generic[T_contra]): + def __init__(self, receive: Callable[[T_contra], None], receivers: set[Self], /) -> None: + self.__receive = receive + self.__receivers = receivers + self.__receiving = False + + def __enter__(self) -> None: + self.__receivers.add(self) + self.__receiving = True + + def __exit__(self, exc_type, exc_val, exc_tb): + self.__receiving = False + self.__receivers.remove(self) + + def receive(self, event: T_contra, /) -> None: + if self.__receiving: + self.__receive(event) + + +class SendableEvents(Generic[T_contra]): + def send(self, event: T_contra, /) -> None: + raise NotImplementedError + + +class ReceivableEvents(Generic[T_co]): + def receive(self, receive: Callable[[T_co], None], /) -> Receiver[T_co]: + raise NotImplementedError + + +class Events(Generic[T], SendableEvents[T], ReceivableEvents[T]): + def __init__(self) -> None: + self.__receivers: set[Receiver[T]] = set() + self.__loop = asyncio.get_running_loop() + + def send(self, event: T, /) -> None: + for receiver in self.__receivers: + self.__loop.call_soon(receiver.receive, event) + + def receive(self, receive: Callable[[T], None], /) -> Receiver[T]: + return Receiver(receive, self.__receivers) diff --git a/v6d3musicbase/responsetype.py b/v6d3musicbase/responsetype.py new file mode 100644 index 0000000..2181c0d --- /dev/null +++ b/v6d3musicbase/responsetype.py @@ -0,0 +1,17 @@ +__all__ = ('ResponseType', 'cast_to_response') + +from typing import Any, TypeAlias + +ResponseType: TypeAlias = list['ResponseType'] | dict[str, 'ResponseType'] | float | int | bool | str | None + + +def cast_to_response(target: Any) -> ResponseType: + match target: + case str() | int() | float() | bool() | None: + return target + case list() | tuple(): + return list(map(cast_to_response, target)) + case dict(): + return {str(key): cast_to_response(value) for key, value in target.items()} + case _: + return str(target) diff --git a/v6d3musicbase/targets.py b/v6d3musicbase/targets.py new file mode 100644 index 0000000..eb58130 --- /dev/null +++ b/v6d3musicbase/targets.py @@ -0,0 +1,76 @@ +__all__ = ('Targets', 'JsonLike', 'Async') + +import abc +from typing import Any, Callable, Generic, TypeVar + +from rainbowadn.instrument import Instrumentation + +from .responsetype import * + + +def qualname(t: type) -> str: + return f'{t.__module__}.{t.__qualname__}' + + +T = TypeVar('T') + + +class Flagful(Generic[T]): + def __init__(self, value: T, flags: set[object]) -> None: + self.value = value + self.flags = flags + + +class Targets: + def __init__(self) -> None: + self.targets: dict[str, Flagful[tuple[Any, str]]] = {} + self.instrumentations: dict[str, Flagful[Callable[[Any, str], Instrumentation]]] = {} + self.factories: dict[tuple[str, str], Callable[[], Instrumentation]] = {} + + def register_target(self, targetname: str, target: Any, methodname: str, /, *flags: object) -> None: + self.targets[targetname] = Flagful((target, methodname), set(flags)) + print(f'registered target: {targetname}') + + def register_type(self, target: type, methodname: str, /, *flags: object) -> None: + self.register_target(f'{qualname(target)}.{methodname}', target, methodname, *flags) + + def register_instance(self, target: object, methodname: str, /, *flags: object) -> None: + self.register_target(f'{qualname(target.__class__)}().{methodname}', target, methodname, *flags) + + def register_instrumentation( + self, + instrumentationname: str, + instrumentation_factory: Callable[[Any, str], Instrumentation], + /, + *flags: object, + ) -> None: + self.instrumentations[instrumentationname] = Flagful(instrumentation_factory, set(flags)) + print(f'registered instrumentation: {instrumentationname}') + + def get_factory( + self, + targetname: str, + target: Any, + methodname: str, + instrumentationname: str, + instrumentation_factory: Callable[[Any, str], Instrumentation], + / + ) -> Callable[[], Instrumentation]: + if (targetname, instrumentationname) not in self.factories: + flags_required = self.instrumentations[instrumentationname].flags + flags_present = self.targets[targetname].flags + if not flags_required.issubset(flags_present): + raise KeyError('target lacks flags required by instrumentation') + self.factories[targetname, instrumentationname] = ( + lambda: instrumentation_factory(target, methodname) + ) + return self.factories[targetname, instrumentationname] + + +class JsonLike(abc.ABC): + @abc.abstractmethod + def json(self) -> ResponseType: + raise NotImplementedError + + +Async = object()