From 5f2632a58a89b77e52e47d1db5040ec23d287063 Mon Sep 17 00:00:00 2001 From: timofey Date: Thu, 10 Nov 2022 12:40:27 +0000 Subject: [PATCH] asyncio.create_subprocess_exec --- v6d3music/core/cache_url.py | 11 ++------ v6d3music/core/queueaudio.py | 8 +++++- v6d3music/core/real_url.py | 51 +++++++++++++++++++--------------- v6d3music/core/ytaudio.py | 13 +++------ v6d3music/run-bot.py | 6 +--- v6d3music/utils/tor_extract.py | 25 ++++++----------- 6 files changed, 52 insertions(+), 62 deletions(-) diff --git a/v6d3music/core/cache_url.py b/v6d3music/core/cache_url.py index 3dd4432..c1763cf 100644 --- a/v6d3music/core/cache_url.py +++ b/v6d3music/core/cache_url.py @@ -1,8 +1,6 @@ import asyncio -import subprocess from ptvp35 import Db, KVJson -from v6d2ctx.context import Benchmark from v6d2ctx.lock_for import lock_for from v6d3music.config import myroot @@ -33,15 +31,12 @@ async def cache_url(hurl: str, rurl: str, override: bool, tor: bool) -> None: '-y', '-i', rurl, '-b:a', '128k', str(tmp_path) ] ) - p = subprocess.Popen( - args, - ) - loop = asyncio.get_running_loop() - with Benchmark('CCH'): - code = await loop.run_in_executor(None, p.wait) + ap = await asyncio.create_subprocess_exec(*args) + code = await ap.wait() if code: print(f'caching {hurl} failed with {code}') return + loop = asyncio.get_running_loop() await loop.run_in_executor(None, tmp_path.rename, path) await cache_db.set(f'url:{hurl}', str(path)) print('cached', hurl) diff --git a/v6d3music/core/queueaudio.py b/v6d3music/core/queueaudio.py index 8e6a4f4..e9314f4 100644 --- a/v6d3music/core/queueaudio.py +++ b/v6d3music/core/queueaudio.py @@ -99,7 +99,13 @@ class QueueAudio(discord.AudioSource): def clear(self, member: discord.Member) -> None: assert_admin(member) - self.cleanup() + to_clean = list(self.queue) + self.queue.clear() + for audio in to_clean: + try: + audio.cleanup() + except ValueError: + pass def swap(self, member: discord.Member, a: int, b: int) -> None: assert_admin(member) diff --git a/v6d3music/core/real_url.py b/v6d3music/core/real_url.py index 713fc4b..8a09e28 100644 --- a/v6d3music/core/real_url.py +++ b/v6d3music/core/real_url.py @@ -1,11 +1,8 @@ import asyncio import os -import subprocess from typing import Optional from adaas.cachedb import RemoteCache -from v6d2ctx.context import Benchmark - from v6d3music.core.cache_url import cache_db, cache_url from v6d3music.utils.bytes_hash import bytes_hash from v6d3music.utils.tor_prefix import tor_prefix @@ -16,6 +13,32 @@ if adaas_available: print('running real_url through adaas') +_tasks = set() + + +def _schedule_cache(hurl: str, rurl: str, override: bool, tor: bool): + task = asyncio.create_task(cache_url(hurl, rurl, override, tor)) + _tasks.add(task) + task.add_done_callback(_tasks.discard) + + +async def _resolve_url(url: str, tor: bool) -> str: + args = [] + if tor: + args.extend(tor_prefix()) + args.extend( + [ + 'youtube-dl', '--no-playlist', '-f', 'bestaudio', '-g', '--', url + ] + ) + ap = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE) + code = await ap.wait() + if code: + raise RuntimeError(code) + assert ap.stdout is not None + return (await ap.stdout.readline()).decode()[:-1] + + async def real_url(url: str, override: bool, tor: bool) -> str: if adaas_available and not tor: return await RemoteCache().real_url(url, override, tor) @@ -25,24 +48,6 @@ async def real_url(url: str, override: bool, tor: bool) -> str: if curl is not None: print('using cached', hurl) return curl - args = [] - if tor: - args.extend(tor_prefix()) - args.extend( - [ - 'youtube-dl', '--no-playlist', '-f', 'bestaudio', '-g', '--', url - ] - ) - p = subprocess.Popen( - args, - stdout=subprocess.PIPE - ) - loop = asyncio.get_running_loop() - with Benchmark('URL'): - code = await loop.run_in_executor(None, p.wait) - if code: - raise RuntimeError(code) - assert p.stdout is not None - rurl: str = p.stdout.readline().decode()[:-1] - loop.create_task(cache_url(hurl, rurl, override, tor)) + rurl: str = await _resolve_url(url, tor) + _schedule_cache(hurl, rurl, override, tor) return rurl diff --git a/v6d3music/core/ytaudio.py b/v6d3music/core/ytaudio.py index 8ebec4d..4643e49 100644 --- a/v6d3music/core/ytaudio.py +++ b/v6d3music/core/ytaudio.py @@ -1,6 +1,5 @@ import asyncio import random -import subprocess from typing import Optional import discord @@ -89,17 +88,13 @@ class YTAudio(discord.AudioSource): '-of', 'default=noprint_wrappers=1:nokey=1', '-sexagesimal' ] - p = subprocess.Popen( - args, - stdout=subprocess.PIPE - ) - with Benchmark('FFP'): - code = await self.loop.run_in_executor(None, p.wait) + ap = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE) + code = await ap.wait() if code: pass else: - assert p.stdout is not None - self._durations[url] = p.stdout.read().decode().strip().split('.')[0] + assert ap.stdout is not None + self._durations[url] = (await ap.stdout.read()).decode().strip().split('.')[0] def duration(self) -> str: duration = self._durations.get(self.url) diff --git a/v6d3music/run-bot.py b/v6d3music/run-bot.py index 3ecf7b2..a306afd 100644 --- a/v6d3music/run-bot.py +++ b/v6d3music/run-bot.py @@ -1,6 +1,5 @@ import asyncio import os -import subprocess import sys import traceback @@ -165,10 +164,7 @@ async def main(): await client.login(token) loop.create_task(setup_tasks()) if os.getenv('v6tor', None) is None: - try: - subprocess.Popen('tor') - except FileNotFoundError: - print('no tor') + print('no tor') await client.connect() print('exited') diff --git a/v6d3music/utils/tor_extract.py b/v6d3music/utils/tor_extract.py index 2d71600..26487de 100644 --- a/v6d3music/utils/tor_extract.py +++ b/v6d3music/utils/tor_extract.py @@ -1,6 +1,5 @@ import asyncio import json -import subprocess from v6d3music.utils.tor_prefix import tor_prefix @@ -8,20 +7,14 @@ from v6d3music.utils.tor_prefix import tor_prefix async def tor_extract(params: dict, url: str, **kwargs): print(f'tor extracting {url}') args = [*tor_prefix(), 'python', '-m', 'v6d3music.run-extract'] - p = subprocess.Popen( - args, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - text=True - ) - assert p.stdin is not None - p.stdin.write(f'{json.dumps(params)}\n') - p.stdin.write(f'{json.dumps(url)}\n') - p.stdin.write(f'{json.dumps(kwargs)}\n') - p.stdin.flush() - p.stdin.close() - code = await asyncio.get_running_loop().run_in_executor(None, p.wait) + ap = await asyncio.create_subprocess_exec(*args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) + assert ap.stdin is not None + ap.stdin.write(f'{json.dumps(params)}\n'.encode()) + ap.stdin.write(f'{json.dumps(url)}\n'.encode()) + ap.stdin.write(f'{json.dumps(kwargs)}\n'.encode()) + ap.stdin.write_eof() + code = await ap.wait() if code: raise RuntimeError(code) - assert p.stdout is not None - return json.loads(p.stdout.read()) + assert ap.stdout is not None + return json.loads(await ap.stdout.read())