role auth api

This commit is contained in:
AF 2021-12-22 19:44:41 +03:00
parent 2dca5338ec
commit 59e9ba94ad
6 changed files with 62 additions and 28 deletions

View File

@ -13,6 +13,6 @@ setup(
'aiohttp',
'PyNaCl~=1.4.0',
'ptvp35 @ git+https://gitea.ongoteam.net/PTV/ptvp35.git@25727aabd7afd69f66051c806190480302e67260',
'v6d0auth @ git+https://gitea.ongoteam.net/PTV/v6d0auth.git@20a8c7a4f67d193e193f3862f1019976343ad711'
'v6d0auth @ git+https://gitea.ongoteam.net/PTV/v6d0auth.git@b161c4c88c4cee531b6e3773cb61d2fceeaf661b'
]
)

View File

@ -2,14 +2,17 @@ import json
from aiohttp import web
from nacl.exceptions import BadSignatureError
from nacl.public import PublicKey
from nacl.signing import VerifyKey
from nacl.utils import random
from v6d0auth import certs
__all__ = ('V6D1TokensAppFactory',)
from v6d0auth.appfactory import AppFactory
from v6d0auth.client import has_role
from v6d1tokens.tdb import TDB
__all__ = ('V6D1TokensAppFactory',)
class V6D1TokensAppFactory(AppFactory):
def __init__(self, tdb: TDB):
@ -22,21 +25,43 @@ class V6D1TokensAppFactory(AppFactory):
async def home(_request: web.Request):
return web.Response(body='v6d1tokens\n')
@routes.post('/reg')
async def ws_reg(ws: web.WebSocketResponse):
nonce = random(16)
await ws.send_bytes(nonce)
[token_id, token], hnonce = json.loads(certs.verify(await ws.receive_bytes()))
assert hnonce == nonce.hex()
await self.tdb.reg(token_id, token)
await ws.send_bytes(b'1')
await ws.close()
@routes.get('/reg')
async def reg(request: web.Request):
try:
await self.tdb.reg(await request.read())
except BadSignatureError:
raise web.HTTPUnauthorized
except json.JSONDecodeError:
raise web.HTTPBadRequest
ws = web.WebSocketResponse()
await ws.prepare(request)
await ws_reg(ws)
return ws
def role_matches_token_id(role: str, token_id: str) -> bool:
return role.startswith(f'token:{token_id}::') or token_id.startswith(f'role:{role}::')
async def requester_for_request(request: web.Request, requester_cert: bytes, token_id: str) -> VerifyKey:
role = request.headers.get('v6role')
if role is None:
return VerifyKey(certs.averify(requester_cert))
else:
raise web.HTTPOk
requester = VerifyKey(requester_cert)
assert (await has_role(requester, role))
assert role_matches_token_id(role, token_id)
return requester
@routes.post('/get')
async def get(request: web.Request):
try:
token_encrypted = await self.tdb.get(await request.read())
token_id, requester_hcert = json.loads(await request.read())
requester_cert = bytes.fromhex(requester_hcert)
requester = await requester_for_request(request, requester_cert, token_id)
requesterpk: PublicKey = requester.to_curve25519_public_key()
token_encrypted = await self.tdb.get_encrypted(requesterpk, token_id)
except BadSignatureError:
raise web.HTTPUnauthorized
except (json.JSONDecodeError, AssertionError):

View File

@ -1,4 +1,5 @@
import json
from typing import Optional
import aiohttp
from v6d0auth import certs
@ -9,12 +10,20 @@ from v6d1tokens.config import taurl
__all__ = ('request_token',)
async def request_token(token_id: str) -> str:
async def request_token(token_id: str, role: Optional[str] = None) -> str:
async with aiohttp.ClientSession() as session:
async with session.post(
if role is None:
request = session.post(
f'{taurl}/get',
data=json.dumps([token_id, (await mycert()).hex()]).encode()
) as response:
)
else:
request = session.post(
f'{taurl}/get',
data=json.dumps([token_id, certs.vkey.encode().hex()]).encode(),
headers={'v6role': role}
)
async with request as response:
if response.status == 200:
return certs.receive(await response.read()).decode()
else:

View File

@ -7,11 +7,12 @@ from v6d0auth.config import host, port
async def main():
request = certs.encrypt_self(certs.sign(json.dumps([input('token_id:'), input('token:')]).encode()))
async with aiohttp.ClientSession() as session:
# noinspection HttpUrlsUsage
async with session.post(f'http://{host}:{port}/reg', data=request) as response:
print(response.status)
async with session.ws_connect(f'http://{host}:{port}/reg') as ws:
nonce = await ws.receive_bytes()
await ws.send_bytes(certs.sign(json.dumps([[input('token_id:'), input('token:')], nonce.hex()]).encode()))
print((await ws.receive_bytes()).hex())
if __name__ == '__main__':

View File

@ -1,10 +1,7 @@
import json
from typing import Optional
from nacl.public import PublicKey, SealedBox
from nacl.signing import VerifyKey
from ptvp35 import Db, KVJson
from v6d0auth import certs
from v6d1tokens.config import myroot
@ -20,9 +17,7 @@ class TDB:
def _get(self, token_id: str) -> Optional[str]:
return self.db.get(token_id, None)
async def get(self, request: bytes) -> bytes:
token_id, requester_cert = json.loads(request)
requester: PublicKey = VerifyKey(certs.averify(bytes.fromhex(requester_cert))).to_curve25519_public_key()
async def get_encrypted(self, requester: PublicKey, token_id: str) -> bytes:
token = self._get(token_id)
if token is None:
raise KeyError
@ -31,8 +26,6 @@ class TDB:
async def _reg(self, token_id: str, token: str) -> None:
await self.db.set(token_id, token)
async def reg(self, request: bytes) -> None:
request = certs.verify(certs.receive(request))
token_id, token = json.loads(request)
async def reg(self, token_id: str, token: str) -> None:
assert type(token_id) == type(token) == str
await self._reg(token_id, token)

View File

@ -1,10 +1,16 @@
import asyncio
from v6d0auth.client import with_role
from v6d1tokens.client import request_token
async def main():
print(await request_token('test'))
await with_role('t3st')
await with_role('token:te5t::tes7')
print(await request_token('role:t3st::7est', 't3st'))
print(await request_token('te5t', 'token:te5t::tes7'))
if __name__ == '__main__':