asyncio.create_subprocess_exec

This commit is contained in:
AF 2022-11-10 12:40:27 +00:00
parent 744020528f
commit 5f2632a58a
6 changed files with 52 additions and 62 deletions

View File

@ -1,8 +1,6 @@
import asyncio import asyncio
import subprocess
from ptvp35 import Db, KVJson from ptvp35 import Db, KVJson
from v6d2ctx.context import Benchmark
from v6d2ctx.lock_for import lock_for from v6d2ctx.lock_for import lock_for
from v6d3music.config import myroot from v6d3music.config import myroot
@ -33,15 +31,12 @@ async def cache_url(hurl: str, rurl: str, override: bool, tor: bool) -> None:
'-y', '-i', rurl, '-b:a', '128k', str(tmp_path) '-y', '-i', rurl, '-b:a', '128k', str(tmp_path)
] ]
) )
p = subprocess.Popen( ap = await asyncio.create_subprocess_exec(*args)
args, code = await ap.wait()
)
loop = asyncio.get_running_loop()
with Benchmark('CCH'):
code = await loop.run_in_executor(None, p.wait)
if code: if code:
print(f'caching {hurl} failed with {code}') print(f'caching {hurl} failed with {code}')
return return
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, tmp_path.rename, path) await loop.run_in_executor(None, tmp_path.rename, path)
await cache_db.set(f'url:{hurl}', str(path)) await cache_db.set(f'url:{hurl}', str(path))
print('cached', hurl) print('cached', hurl)

View File

@ -99,7 +99,13 @@ class QueueAudio(discord.AudioSource):
def clear(self, member: discord.Member) -> None: def clear(self, member: discord.Member) -> None:
assert_admin(member) assert_admin(member)
self.cleanup() to_clean = list(self.queue)
self.queue.clear()
for audio in to_clean:
try:
audio.cleanup()
except ValueError:
pass
def swap(self, member: discord.Member, a: int, b: int) -> None: def swap(self, member: discord.Member, a: int, b: int) -> None:
assert_admin(member) assert_admin(member)

View File

@ -1,11 +1,8 @@
import asyncio import asyncio
import os import os
import subprocess
from typing import Optional from typing import Optional
from adaas.cachedb import RemoteCache from adaas.cachedb import RemoteCache
from v6d2ctx.context import Benchmark
from v6d3music.core.cache_url import cache_db, cache_url from v6d3music.core.cache_url import cache_db, cache_url
from v6d3music.utils.bytes_hash import bytes_hash from v6d3music.utils.bytes_hash import bytes_hash
from v6d3music.utils.tor_prefix import tor_prefix from v6d3music.utils.tor_prefix import tor_prefix
@ -16,6 +13,32 @@ if adaas_available:
print('running real_url through adaas') print('running real_url through adaas')
_tasks = set()
def _schedule_cache(hurl: str, rurl: str, override: bool, tor: bool):
task = asyncio.create_task(cache_url(hurl, rurl, override, tor))
_tasks.add(task)
task.add_done_callback(_tasks.discard)
async def _resolve_url(url: str, tor: bool) -> str:
args = []
if tor:
args.extend(tor_prefix())
args.extend(
[
'youtube-dl', '--no-playlist', '-f', 'bestaudio', '-g', '--', url
]
)
ap = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE)
code = await ap.wait()
if code:
raise RuntimeError(code)
assert ap.stdout is not None
return (await ap.stdout.readline()).decode()[:-1]
async def real_url(url: str, override: bool, tor: bool) -> str: async def real_url(url: str, override: bool, tor: bool) -> str:
if adaas_available and not tor: if adaas_available and not tor:
return await RemoteCache().real_url(url, override, tor) return await RemoteCache().real_url(url, override, tor)
@ -25,24 +48,6 @@ async def real_url(url: str, override: bool, tor: bool) -> str:
if curl is not None: if curl is not None:
print('using cached', hurl) print('using cached', hurl)
return curl return curl
args = [] rurl: str = await _resolve_url(url, tor)
if tor: _schedule_cache(hurl, rurl, override, tor)
args.extend(tor_prefix())
args.extend(
[
'youtube-dl', '--no-playlist', '-f', 'bestaudio', '-g', '--', url
]
)
p = subprocess.Popen(
args,
stdout=subprocess.PIPE
)
loop = asyncio.get_running_loop()
with Benchmark('URL'):
code = await loop.run_in_executor(None, p.wait)
if code:
raise RuntimeError(code)
assert p.stdout is not None
rurl: str = p.stdout.readline().decode()[:-1]
loop.create_task(cache_url(hurl, rurl, override, tor))
return rurl return rurl

View File

@ -1,6 +1,5 @@
import asyncio import asyncio
import random import random
import subprocess
from typing import Optional from typing import Optional
import discord import discord
@ -89,17 +88,13 @@ class YTAudio(discord.AudioSource):
'-of', 'default=noprint_wrappers=1:nokey=1', '-of', 'default=noprint_wrappers=1:nokey=1',
'-sexagesimal' '-sexagesimal'
] ]
p = subprocess.Popen( ap = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE)
args, code = await ap.wait()
stdout=subprocess.PIPE
)
with Benchmark('FFP'):
code = await self.loop.run_in_executor(None, p.wait)
if code: if code:
pass pass
else: else:
assert p.stdout is not None assert ap.stdout is not None
self._durations[url] = p.stdout.read().decode().strip().split('.')[0] self._durations[url] = (await ap.stdout.read()).decode().strip().split('.')[0]
def duration(self) -> str: def duration(self) -> str:
duration = self._durations.get(self.url) duration = self._durations.get(self.url)

View File

@ -1,6 +1,5 @@
import asyncio import asyncio
import os import os
import subprocess
import sys import sys
import traceback import traceback
@ -165,10 +164,7 @@ async def main():
await client.login(token) await client.login(token)
loop.create_task(setup_tasks()) loop.create_task(setup_tasks())
if os.getenv('v6tor', None) is None: if os.getenv('v6tor', None) is None:
try: print('no tor')
subprocess.Popen('tor')
except FileNotFoundError:
print('no tor')
await client.connect() await client.connect()
print('exited') print('exited')

View File

@ -1,6 +1,5 @@
import asyncio import asyncio
import json import json
import subprocess
from v6d3music.utils.tor_prefix import tor_prefix from v6d3music.utils.tor_prefix import tor_prefix
@ -8,20 +7,14 @@ from v6d3music.utils.tor_prefix import tor_prefix
async def tor_extract(params: dict, url: str, **kwargs): async def tor_extract(params: dict, url: str, **kwargs):
print(f'tor extracting {url}') print(f'tor extracting {url}')
args = [*tor_prefix(), 'python', '-m', 'v6d3music.run-extract'] args = [*tor_prefix(), 'python', '-m', 'v6d3music.run-extract']
p = subprocess.Popen( ap = await asyncio.create_subprocess_exec(*args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
args, assert ap.stdin is not None
stdin=subprocess.PIPE, ap.stdin.write(f'{json.dumps(params)}\n'.encode())
stdout=subprocess.PIPE, ap.stdin.write(f'{json.dumps(url)}\n'.encode())
text=True ap.stdin.write(f'{json.dumps(kwargs)}\n'.encode())
) ap.stdin.write_eof()
assert p.stdin is not None code = await ap.wait()
p.stdin.write(f'{json.dumps(params)}\n')
p.stdin.write(f'{json.dumps(url)}\n')
p.stdin.write(f'{json.dumps(kwargs)}\n')
p.stdin.flush()
p.stdin.close()
code = await asyncio.get_running_loop().run_in_executor(None, p.wait)
if code: if code:
raise RuntimeError(code) raise RuntimeError(code)
assert p.stdout is not None assert ap.stdout is not None
return json.loads(p.stdout.read()) return json.loads(await ap.stdout.read())