v6d3music/v6d3music/app.py

235 lines
7.7 KiB
Python

import asyncio
import urllib.parse
from pathlib import Path
from typing import Optional
import aiohttp
import discord
from aiohttp import web
from ptvp35 import Db, KVJson
from v6d0auth.appfactory import AppFactory
from v6d0auth.run_app import start_app
from v6d1tokens.client import request_token
from v6d3music.config import auth_redirect, myroot
from v6d3music.utils.bytes_hash import bytes_hash
session_db = Db(myroot / 'session.db', kvrequest_type=KVJson)
class MusicAppFactory(AppFactory):
htmlroot = Path(__file__).parent / 'html'
def __init__(
self,
secret: str,
client: discord.Client
):
self.secret = secret
self.redirect = auth_redirect
self.loop = asyncio.get_running_loop()
self.client = client
def auth_link(self):
if self.client.user is None:
return ''
else:
return f'https://discord.com/api/oauth2/authorize?client_id={self.client.user.id}' \
f'&redirect_uri={urllib.parse.quote(self.redirect)}&response_type=code&scope=identify'
def _file(self, file: str):
with open(self.htmlroot / file) as f:
return f.read()
async def file(self, file: str):
return await self.loop.run_in_executor(
None,
self._file,
file
)
async def html_resp(self, file: str):
text = await self.file(f'{file}.html')
text = text.replace(
'$$DISCORD_AUTH$$',
self.auth_link()
)
return web.Response(
text=text,
content_type='text/html'
)
async def code_token(self, code: str) -> dict:
assert self.client.user is not None
data = {
'client_id': str(self.client.user.id),
'client_secret': self.secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.redirect
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
async with aiohttp.ClientSession() as session:
async with session.post('https://discord.com/api/oauth2/token', data=data, headers=headers) as response:
return await response.json()
@classmethod
async def session_client(cls, data: dict) -> Optional[dict]:
match data:
case {'token': {'access_token': str() as access_token}}:
pass
case _:
return None
headers = {
'Authorization': f'Bearer {access_token}'
}
async with aiohttp.ClientSession() as session:
async with session.get('https://discord.com/api/oauth2/@me', headers=headers) as response:
return await response.json()
@classmethod
def client_status(cls, sclient: dict) -> dict:
user = cls.client_user(sclient)
return {
'expires': sclient.get('expires'),
'user': (None if user is None else cls.user_status(user)),
}
@classmethod
def user_status(cls, user: dict) -> dict:
return {
'avatar': cls.user_avatar_url(user),
'id': cls.user_id(user),
'username': cls.user_username_full(user)
}
@classmethod
def user_username_full(cls, user: dict) -> Optional[str]:
match user:
case {'username': str() as username, 'discriminator': str() as discriminator}:
return f'{username}#{discriminator}'
case _:
return None
@classmethod
def client_user(cls, sclient: dict) -> Optional[dict]:
return sclient.get('user')
@classmethod
def user_id(cls, user: dict) -> Optional[str | int]:
return user.get('id')
@classmethod
def user_avatar(cls, user: dict) -> Optional[str]:
return user.get('avatar')
@classmethod
def user_avatar_url(cls, user: dict) -> Optional[str]:
cid = cls.user_id(user)
if cid is None:
return None
avatar = cls.user_avatar(user)
if avatar is None:
return None
return f'https://cdn.discordapp.com/avatars/{cid}/{avatar}.png'
async def session_status(self, session: str) -> dict:
data = self.session_data(session)
sclient = await self.session_client(data)
return {
'code_set': data.get('code') is not None,
'token_set': data.get('token') is not None,
'client': (None if sclient is None else self.client_status(sclient))
}
async def session_queue(self, session: str):
data = self.session_data(session)
sclient = await self.session_client(data)
if sclient is None:
return None
user = self.client_user(sclient)
if user is None:
return None
cid = self.user_id(user)
return cid
@classmethod
def session_data(cls, session: str) -> dict:
data = session_db.get(session, {})
if not isinstance(data, dict):
data = {}
return data
def define_routes(self, routes: web.RouteTableDef) -> None:
@routes.get('/')
async def home(_request: web.Request) -> web.Response:
return await self.html_resp('home')
@routes.get('/login/')
async def login(_request: web.Request) -> web.Response:
return await self.html_resp('login')
@routes.get('/auth/')
async def auth(request: web.Request) -> web.Response:
if 'session' in request.query:
print(request.query.get('code'))
response = web.HTTPFound('/')
session = str(request.query.get('session'))
s_state = str(request.query.get('state'))
code = str(request.query.get('code'))
if bytes_hash(session.encode()) != s_state:
print(session)
print(bytes_hash(session.encode()), s_state)
raise web.HTTPBadRequest
data = self.session_data(session)
data['code'] = code
data['token'] = await self.code_token(code)
await session_db.set(session, data)
return response
else:
return await self.html_resp('auth')
@routes.get('/state/')
async def get_state(request: web.Request) -> web.Response:
session = str(request.query.get('session'))
return web.json_response(
data=f'{bytes_hash(session.encode())}'
)
@routes.get('/status/')
async def status(request: web.Request) -> web.Response:
session = str(request.query.get('session'))
return web.json_response(
data=await self.session_status(session)
)
@routes.get('/queue/')
async def api_queue(request: web.Request) -> web.Response:
session = str(request.query.get('session'))
return web.json_response(
data=await self.session_queue(session)
)
@routes.get('/main.js')
async def mainjs(_request: web.Request) -> web.Response:
return web.Response(
text=await self.file('main.js')
)
@routes.get('/main.css')
async def maincss(_request: web.Request) -> web.Response:
return web.Response(
text=await self.file('main.css')
)
@classmethod
async def start(cls, client: discord.Client):
try:
factory = cls(await request_token('music-client', 'token'), client)
except aiohttp.ClientConnectorError:
print('no web app (likely due to no token)')
else:
await start_app(factory.app())