v6d3music/v6d3music/core/real_url.py
2022-11-08 05:14:39 +00:00

49 lines
1.4 KiB
Python

import asyncio
import os
import subprocess
from typing import Optional
from adaas.cachedb import RemoteCache
from v6d2ctx.context import Benchmark
from v6d3music.core.cache_url import cache_db, cache_url
from v6d3music.utils.bytes_hash import bytes_hash
from v6d3music.utils.tor_prefix import tor_prefix
adaas_available = bool(os.getenv('adaasurl'))
if adaas_available:
print('running real_url through adaas')
async def real_url(url: str, override: bool, tor: bool) -> str:
if adaas_available and not tor:
return await RemoteCache().real_url(url, override, tor)
hurl: str = bytes_hash(url.encode())
if not override:
curl: Optional[str] = cache_db.get(f'url:{hurl}', None)
if curl is not None:
print('using cached', hurl)
return curl
args = []
if tor:
args.extend(tor_prefix())
args.extend(
[
'youtube-dl', '--no-playlist', '-f', 'bestaudio', '-g', '--', url
]
)
p = subprocess.Popen(
args,
stdout=subprocess.PIPE
)
loop = asyncio.get_running_loop()
with Benchmark('URL'):
code = await loop.run_in_executor(None, p.wait)
if code:
raise RuntimeError(code)
assert p.stdout is not None
rurl: str = p.stdout.readline().decode()[:-1]
loop.create_task(cache_url(hurl, rurl, override, tor))
return rurl