reformat
This commit is contained in:
parent
9297358dff
commit
5277e2cb85
24
setup.py
24
setup.py
@ -1,21 +1,21 @@
|
|||||||
from setuptools import setup
|
from setuptools import setup
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name='v6d0auth',
|
name="v6d0auth",
|
||||||
version='',
|
version="",
|
||||||
packages=['v6d0auth'],
|
packages=["v6d0auth"],
|
||||||
url='',
|
url="",
|
||||||
license='',
|
license="",
|
||||||
author='PARRRATE TNV',
|
author="PARRRATE TNV",
|
||||||
author_email='',
|
author_email="",
|
||||||
description='',
|
description="",
|
||||||
install_requires=[
|
install_requires=[
|
||||||
'aiohttp',
|
"aiohttp",
|
||||||
'PyNaCl~=1.4.0',
|
"PyNaCl~=1.4.0",
|
||||||
],
|
],
|
||||||
extras_require={
|
extras_require={
|
||||||
'full': [
|
"full": [
|
||||||
'ptvp35 @ git+https://gitea.parrrate.ru/PTV/ptvp35.git@e760fca39e2070b9959aeb95b53e59e895f1ad57',
|
"ptvp35 @ git+https://gitea.parrrate.ru/PTV/ptvp35.git@e760fca39e2070b9959aeb95b53e59e895f1ad57",
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
@ -10,7 +10,7 @@ from v6d0auth import certs
|
|||||||
from v6d0auth.appfactory import *
|
from v6d0auth.appfactory import *
|
||||||
from v6d0auth.cdb import *
|
from v6d0auth.cdb import *
|
||||||
|
|
||||||
__all__ = ('V6D0AuthAppFactory',)
|
__all__ = ("V6D0AuthAppFactory",)
|
||||||
|
|
||||||
|
|
||||||
class V6D0AuthAppFactory(AppFactory):
|
class V6D0AuthAppFactory(AppFactory):
|
||||||
@ -21,26 +21,26 @@ class V6D0AuthAppFactory(AppFactory):
|
|||||||
print(certs.vkey.encode().hex())
|
print(certs.vkey.encode().hex())
|
||||||
self.cdb.start()
|
self.cdb.start()
|
||||||
|
|
||||||
@routes.get('/')
|
@routes.get("/")
|
||||||
async def home(_request: web.Request):
|
async def home(_request: web.Request):
|
||||||
return web.Response(body='v6d0auth\n')
|
return web.Response(body="v6d0auth\n")
|
||||||
|
|
||||||
async def ws_approve(ws: web.WebSocketResponse):
|
async def ws_approve(ws: web.WebSocketResponse):
|
||||||
nonce = random(16)
|
nonce = random(16)
|
||||||
await ws.send_bytes(nonce)
|
await ws.send_bytes(nonce)
|
||||||
hhandle, hnonce = json.loads(certs.verify(await ws.receive_bytes()))
|
hhandle, hnonce = json.loads(certs.verify(await ws.receive_bytes()))
|
||||||
assert hnonce == nonce.hex()
|
assert hnonce == nonce.hex()
|
||||||
if hhandle == 'all':
|
if hhandle == "all":
|
||||||
print('approving all')
|
print("approving all")
|
||||||
for request in self.cdb.handle_mapping.values():
|
for request in self.cdb.handle_mapping.values():
|
||||||
request.approve()
|
request.approve()
|
||||||
print('approved all')
|
print("approved all")
|
||||||
else:
|
else:
|
||||||
approved = self.cdb.approve(bytes.fromhex(hhandle))
|
approved = self.cdb.approve(bytes.fromhex(hhandle))
|
||||||
await ws.send_bytes(approved)
|
await ws.send_bytes(approved)
|
||||||
await ws.close()
|
await ws.close()
|
||||||
|
|
||||||
@routes.get('/approve')
|
@routes.get("/approve")
|
||||||
async def approve(request: web.Request):
|
async def approve(request: web.Request):
|
||||||
ws = web.WebSocketResponse()
|
ws = web.WebSocketResponse()
|
||||||
await ws.prepare(request)
|
await ws.prepare(request)
|
||||||
@ -51,7 +51,7 @@ class V6D0AuthAppFactory(AppFactory):
|
|||||||
return VerifyKey(await request.read())
|
return VerifyKey(await request.read())
|
||||||
|
|
||||||
def role_for_request(request: web.Request) -> Optional[str]:
|
def role_for_request(request: web.Request) -> Optional[str]:
|
||||||
return request.headers.get('v6role')
|
return request.headers.get("v6role")
|
||||||
|
|
||||||
def pushed_for_role(requester: VerifyKey, role: Optional[str]) -> AbstractRequest:
|
def pushed_for_role(requester: VerifyKey, role: Optional[str]) -> AbstractRequest:
|
||||||
if role is None:
|
if role is None:
|
||||||
@ -62,7 +62,7 @@ class V6D0AuthAppFactory(AppFactory):
|
|||||||
async def pushed_for_request(request: web.Request) -> AbstractRequest:
|
async def pushed_for_request(request: web.Request) -> AbstractRequest:
|
||||||
return pushed_for_role(await requester_for_request(request), role_for_request(request))
|
return pushed_for_role(await requester_for_request(request), role_for_request(request))
|
||||||
|
|
||||||
@routes.post('/push')
|
@routes.post("/push")
|
||||||
async def push(request: web.Request):
|
async def push(request: web.Request):
|
||||||
pushed = await pushed_for_request(request)
|
pushed = await pushed_for_request(request)
|
||||||
timeout = pushed.timeout
|
timeout = pushed.timeout
|
||||||
@ -77,7 +77,7 @@ class V6D0AuthAppFactory(AppFactory):
|
|||||||
async def pulled_for_request(request: web.Request) -> Optional[bytes]:
|
async def pulled_for_request(request: web.Request) -> Optional[bytes]:
|
||||||
return pulled_for_role(await requester_for_request(request), role_for_request(request))
|
return pulled_for_role(await requester_for_request(request), role_for_request(request))
|
||||||
|
|
||||||
@routes.post('/pull')
|
@routes.post("/pull")
|
||||||
async def pull(request: web.Request):
|
async def pull(request: web.Request):
|
||||||
try:
|
try:
|
||||||
pulled = await pulled_for_request(request)
|
pulled = await pulled_for_request(request)
|
||||||
@ -86,13 +86,13 @@ class V6D0AuthAppFactory(AppFactory):
|
|||||||
else:
|
else:
|
||||||
return web.Response(body=pulled)
|
return web.Response(body=pulled)
|
||||||
|
|
||||||
@routes.post('/has_role')
|
@routes.post("/has_role")
|
||||||
async def has_role(request: web.Request):
|
async def has_role(request: web.Request):
|
||||||
role = role_for_request(request)
|
role = role_for_request(request)
|
||||||
if role is None:
|
if role is None:
|
||||||
raise web.HTTPBadRequest
|
raise web.HTTPBadRequest
|
||||||
return web.Response(
|
return web.Response(
|
||||||
body=(b'1' if self.cdb.has_role(Role(await requester_for_request(request), role)) else b'')
|
body=(b"1" if self.cdb.has_role(Role(await requester_for_request(request), role)) else b"")
|
||||||
)
|
)
|
||||||
|
|
||||||
async def ws_remove(ws: web.WebSocketResponse):
|
async def ws_remove(ws: web.WebSocketResponse):
|
||||||
@ -101,10 +101,10 @@ class V6D0AuthAppFactory(AppFactory):
|
|||||||
[hrequester, role], hnonce = json.loads(certs.verify(await ws.receive_bytes()))
|
[hrequester, role], hnonce = json.loads(certs.verify(await ws.receive_bytes()))
|
||||||
assert hnonce == nonce.hex()
|
assert hnonce == nonce.hex()
|
||||||
self.cdb.remove_role(Role(VerifyKey(bytes.fromhex(hrequester)), role))
|
self.cdb.remove_role(Role(VerifyKey(bytes.fromhex(hrequester)), role))
|
||||||
await ws.send_bytes(b'0')
|
await ws.send_bytes(b"0")
|
||||||
await ws.close()
|
await ws.close()
|
||||||
|
|
||||||
@routes.get('/remove_role')
|
@routes.get("/remove_role")
|
||||||
async def remove_role(request: web.Request):
|
async def remove_role(request: web.Request):
|
||||||
ws = web.WebSocketResponse()
|
ws = web.WebSocketResponse()
|
||||||
await ws.prepare(request)
|
await ws.prepare(request)
|
||||||
@ -147,7 +147,7 @@ class V6D0AuthAppFactory(AppFactory):
|
|||||||
else:
|
else:
|
||||||
await srq_process(ws, srq)
|
await srq_process(ws, srq)
|
||||||
|
|
||||||
@routes.get('/pullws')
|
@routes.get("/pullws")
|
||||||
async def pullws(request: web.Request):
|
async def pullws(request: web.Request):
|
||||||
ws = web.WebSocketResponse()
|
ws = web.WebSocketResponse()
|
||||||
await ws.prepare(request)
|
await ws.prepare(request)
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
|
|
||||||
__all__ = ('AppFactory',)
|
__all__ = ("AppFactory",)
|
||||||
|
|
||||||
|
|
||||||
class AppFactory:
|
class AppFactory:
|
||||||
|
@ -13,7 +13,11 @@ from ptvp35 import Db, KVJson
|
|||||||
from v6d0auth import certs
|
from v6d0auth import certs
|
||||||
from v6d0auth.config import myroot
|
from v6d0auth.config import myroot
|
||||||
|
|
||||||
__all__ = ('CDB', 'Role', 'AbstractRequest',)
|
__all__ = (
|
||||||
|
"CDB",
|
||||||
|
"Role",
|
||||||
|
"AbstractRequest",
|
||||||
|
)
|
||||||
|
|
||||||
TIMEOUT = 300
|
TIMEOUT = 300
|
||||||
|
|
||||||
@ -60,15 +64,15 @@ class AbstractRequest:
|
|||||||
return approved
|
return approved
|
||||||
self._approved = self._approve()
|
self._approved = self._approve()
|
||||||
self.future.set_result(self._approved)
|
self.future.set_result(self._approved)
|
||||||
print('validating', self.handle.hex())
|
print("validating", self.handle.hex())
|
||||||
self.validate()
|
self.validate()
|
||||||
print('approved', self.handle.hex())
|
print("approved", self.handle.hex())
|
||||||
return self._approved
|
return self._approved
|
||||||
|
|
||||||
def repair(self):
|
def repair(self):
|
||||||
if self.future.done():
|
if self.future.done():
|
||||||
self.future: asyncio.Future[bytes] = self._loop.create_future()
|
self.future: asyncio.Future[bytes] = self._loop.create_future()
|
||||||
print('repaired', self.handle.hex(), self.display())
|
print("repaired", self.handle.hex(), self.display())
|
||||||
|
|
||||||
def force_repair(self):
|
def force_repair(self):
|
||||||
if not self.future.done():
|
if not self.future.done():
|
||||||
@ -131,7 +135,7 @@ class Role(Hashable):
|
|||||||
return NotImplemented
|
return NotImplemented
|
||||||
|
|
||||||
def display(self):
|
def display(self):
|
||||||
return f'{self._requester.encode().hex()}@{self._role}'
|
return f"{self._requester.encode().hex()}@{self._role}"
|
||||||
|
|
||||||
|
|
||||||
class RoleRequest(AbstractRequest):
|
class RoleRequest(AbstractRequest):
|
||||||
@ -142,7 +146,7 @@ class RoleRequest(AbstractRequest):
|
|||||||
|
|
||||||
def _approve(self) -> bytes:
|
def _approve(self) -> bytes:
|
||||||
self._rdb.set_nowait(self._role.key(), True)
|
self._rdb.set_nowait(self._role.key(), True)
|
||||||
return b'1'
|
return b"1"
|
||||||
|
|
||||||
def _validate(self) -> None:
|
def _validate(self) -> None:
|
||||||
assert self._rdb.get(self._role.key(), False)
|
assert self._rdb.get(self._role.key(), False)
|
||||||
@ -151,7 +155,7 @@ class RoleRequest(AbstractRequest):
|
|||||||
return self._role.display()
|
return self._role.display()
|
||||||
|
|
||||||
|
|
||||||
_rdbfile = myroot / 'roles.db'
|
_rdbfile = myroot / "roles.db"
|
||||||
|
|
||||||
|
|
||||||
class CDB:
|
class CDB:
|
||||||
@ -171,14 +175,14 @@ class CDB:
|
|||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
if self.heap:
|
if self.heap:
|
||||||
print('cleaning up')
|
print("cleaning up")
|
||||||
for request in self._cleanup():
|
for request in self._cleanup():
|
||||||
print('cleaned', request.handle.hex())
|
print("cleaned", request.handle.hex())
|
||||||
|
|
||||||
def push_abstract(self, request: AbstractRequest):
|
def push_abstract(self, request: AbstractRequest):
|
||||||
heapq.heappush(self.heap, request)
|
heapq.heappush(self.heap, request)
|
||||||
self.handle_mapping[request.handle] = request
|
self.handle_mapping[request.handle] = request
|
||||||
print('requested', request.handle.hex(), request.display())
|
print("requested", request.handle.hex(), request.display())
|
||||||
|
|
||||||
def push_requester(self, requester: VerifyKey) -> SignatureRequest:
|
def push_requester(self, requester: VerifyKey) -> SignatureRequest:
|
||||||
if requester in self.requester_mapping:
|
if requester in self.requester_mapping:
|
||||||
|
@ -5,9 +5,16 @@ from nacl.signing import SigningKey, VerifyKey, SignedMessage
|
|||||||
|
|
||||||
from v6d0auth.config import myroot, cakey
|
from v6d0auth.config import myroot, cakey
|
||||||
|
|
||||||
__all__ = ('vkey', 'pkey', 'sign', 'averify', 'receive', 'encrypt_self',)
|
__all__ = (
|
||||||
|
"vkey",
|
||||||
|
"pkey",
|
||||||
|
"sign",
|
||||||
|
"averify",
|
||||||
|
"receive",
|
||||||
|
"encrypt_self",
|
||||||
|
)
|
||||||
|
|
||||||
_keyfile = myroot / '.key'
|
_keyfile = myroot / ".key"
|
||||||
if _keyfile.exists():
|
if _keyfile.exists():
|
||||||
_skey = SigningKey(_keyfile.read_bytes())
|
_skey = SigningKey(_keyfile.read_bytes())
|
||||||
else:
|
else:
|
||||||
|
@ -5,15 +5,21 @@ from nacl.signing import VerifyKey
|
|||||||
from v6d0auth import certs
|
from v6d0auth import certs
|
||||||
from v6d0auth.config import caurl, myroot
|
from v6d0auth.config import caurl, myroot
|
||||||
|
|
||||||
__all__ = ('request_signature', 'mycert', 'has_role', 'request_role', 'with_role',)
|
__all__ = (
|
||||||
|
"request_signature",
|
||||||
|
"mycert",
|
||||||
|
"has_role",
|
||||||
|
"request_role",
|
||||||
|
"with_role",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def request_signature() -> bytes:
|
async def request_signature() -> bytes:
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with session.post(f'{caurl}/push', data=certs.vkey.encode()) as response:
|
async with session.post(f"{caurl}/push", data=certs.vkey.encode()) as response:
|
||||||
if response.status not in [200, 429]:
|
if response.status not in [200, 429]:
|
||||||
raise RuntimeError(response.status)
|
raise RuntimeError(response.status)
|
||||||
async with session.ws_connect(f'{caurl}/pullws') as ws:
|
async with session.ws_connect(f"{caurl}/pullws") as ws:
|
||||||
await ws.send_bytes(certs.vkey.encode())
|
await ws.send_bytes(certs.vkey.encode())
|
||||||
try:
|
try:
|
||||||
return await ws.receive_bytes()
|
return await ws.receive_bytes()
|
||||||
@ -21,7 +27,7 @@ async def request_signature() -> bytes:
|
|||||||
raise RuntimeError("signature request failed") from e
|
raise RuntimeError("signature request failed") from e
|
||||||
|
|
||||||
|
|
||||||
_certfile = myroot / 'cert'
|
_certfile = myroot / "cert"
|
||||||
|
|
||||||
|
|
||||||
async def mycert() -> bytes:
|
async def mycert() -> bytes:
|
||||||
@ -36,16 +42,16 @@ async def mycert() -> bytes:
|
|||||||
|
|
||||||
async def has_role(vkey: VerifyKey, role: str):
|
async def has_role(vkey: VerifyKey, role: str):
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with session.post(f'{caurl}/has_role', data=vkey.encode(), headers={'v6role': role}) as response:
|
async with session.post(f"{caurl}/has_role", data=vkey.encode(), headers={"v6role": role}) as response:
|
||||||
return (await response.read()) == b'1'
|
return (await response.read()) == b"1"
|
||||||
|
|
||||||
|
|
||||||
async def request_role(role: str) -> bytes:
|
async def request_role(role: str) -> bytes:
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with session.post(f'{caurl}/push', data=certs.vkey.encode(), headers={'v6role': role}) as response:
|
async with session.post(f"{caurl}/push", data=certs.vkey.encode(), headers={"v6role": role}) as response:
|
||||||
if response.status not in [200, 429]:
|
if response.status not in [200, 429]:
|
||||||
raise RuntimeError(response.status)
|
raise RuntimeError(response.status)
|
||||||
async with session.ws_connect(f'{caurl}/pullws', headers={'v6role': role}) as ws:
|
async with session.ws_connect(f"{caurl}/pullws", headers={"v6role": role}) as ws:
|
||||||
await ws.send_bytes(certs.vkey.encode())
|
await ws.send_bytes(certs.vkey.encode())
|
||||||
try:
|
try:
|
||||||
return await ws.receive_bytes()
|
return await ws.receive_bytes()
|
||||||
|
@ -1,13 +1,20 @@
|
|||||||
import os
|
import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
__all__ = ('root', 'myroot', 'host', 'port', 'cakey', 'caurl',)
|
__all__ = (
|
||||||
|
"root",
|
||||||
|
"myroot",
|
||||||
|
"host",
|
||||||
|
"port",
|
||||||
|
"cakey",
|
||||||
|
"caurl",
|
||||||
|
)
|
||||||
|
|
||||||
root = Path(os.getenv('v6root', './data'))
|
root = Path(os.getenv("v6root", "./data"))
|
||||||
assert root.exists()
|
assert root.exists()
|
||||||
myroot = root / 'v6d0auth'
|
myroot = root / "v6d0auth"
|
||||||
myroot.mkdir(exist_ok=True)
|
myroot.mkdir(exist_ok=True)
|
||||||
host = os.getenv('v6host', '127.0.0.1')
|
host = os.getenv("v6host", "127.0.0.1")
|
||||||
port = int(os.getenv('v6port', '5003'))
|
port = int(os.getenv("v6port", "5003"))
|
||||||
cakey = bytes.fromhex(os.getenv('v6ca', ''))
|
cakey = bytes.fromhex(os.getenv("v6ca", ""))
|
||||||
caurl = os.getenv('v6caurl', f'http://127.0.0.1:{port}')
|
caurl = os.getenv("v6caurl", f"http://127.0.0.1:{port}")
|
||||||
|
@ -8,8 +8,8 @@ from v6d0auth import certs
|
|||||||
from v6d0auth.config import host, port
|
from v6d0auth.config import host, port
|
||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument('requester', type=str)
|
parser.add_argument("requester", type=str)
|
||||||
parser.add_argument('role', type=str)
|
parser.add_argument("role", type=str)
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
@ -17,12 +17,12 @@ async def main():
|
|||||||
role = args.role
|
role = args.role
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
# noinspection HttpUrlsUsage
|
# noinspection HttpUrlsUsage
|
||||||
async with session.ws_connect(f'http://{host}:{port}/remove_role') as ws:
|
async with session.ws_connect(f"http://{host}:{port}/remove_role") as ws:
|
||||||
nonce = await ws.receive_bytes()
|
nonce = await ws.receive_bytes()
|
||||||
await ws.send_bytes(certs.sign(json.dumps([[requester.hex(), role], nonce.hex()]).encode()))
|
await ws.send_bytes(certs.sign(json.dumps([[requester.hex(), role], nonce.hex()]).encode()))
|
||||||
print((await ws.receive_bytes()).hex())
|
print((await ws.receive_bytes()).hex())
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
|
@ -11,7 +11,7 @@ async def main():
|
|||||||
await run_app(V6D0AuthAppFactory(cdb).app())
|
await run_app(V6D0AuthAppFactory(cdb).app())
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
try:
|
try:
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
|
@ -5,7 +5,10 @@ from aiohttp import web
|
|||||||
|
|
||||||
from v6d0auth.config import host, port
|
from v6d0auth.config import host, port
|
||||||
|
|
||||||
__all__ = ('start_app', 'run_app',)
|
__all__ = (
|
||||||
|
"start_app",
|
||||||
|
"run_app",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def start_app(app: web.Application, keepalive_timeout=75.0):
|
async def start_app(app: web.Application, keepalive_timeout=75.0):
|
||||||
@ -17,10 +20,7 @@ async def start_app(app: web.Application, keepalive_timeout=75.0):
|
|||||||
site = web.TCPSite(runner, host=host, port=port)
|
site = web.TCPSite(runner, host=host, port=port)
|
||||||
await site.start()
|
await site.start()
|
||||||
names = sorted(str(s.name) for s in runner.sites)
|
names = sorted(str(s.name) for s in runner.sites)
|
||||||
print(
|
print("======== Running on {} ========\n" "(Press CTRL+C to quit)".format(", ".join(names)))
|
||||||
"======== Running on {} ========\n"
|
|
||||||
"(Press CTRL+C to quit)".format(", ".join(names))
|
|
||||||
)
|
|
||||||
|
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
|
|
||||||
|
@ -9,20 +9,22 @@ from v6d0auth import certs
|
|||||||
from v6d0auth.config import host, port
|
from v6d0auth.config import host, port
|
||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument('handle', type=str)
|
parser.add_argument("handle", type=str)
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
handle: bytes | Literal['all'] = 'all' if args.handle == 'all' else bytes.fromhex(args.handle)
|
handle: bytes | Literal["all"] = "all" if args.handle == "all" else bytes.fromhex(args.handle)
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
# noinspection HttpUrlsUsage
|
# noinspection HttpUrlsUsage
|
||||||
async with session.ws_connect(f'http://{host}:{port}/approve') as ws:
|
async with session.ws_connect(f"http://{host}:{port}/approve") as ws:
|
||||||
nonce = await ws.receive_bytes()
|
nonce = await ws.receive_bytes()
|
||||||
await ws.send_bytes(certs.sign(json.dumps(['all' if handle == 'all' else handle.hex(), nonce.hex()]).encode()))
|
await ws.send_bytes(
|
||||||
if handle != 'all':
|
certs.sign(json.dumps(["all" if handle == "all" else handle.hex(), nonce.hex()]).encode())
|
||||||
|
)
|
||||||
|
if handle != "all":
|
||||||
print((await ws.receive_bytes()).hex())
|
print((await ws.receive_bytes()).hex())
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
|
@ -9,11 +9,11 @@ from v6d0auth.client import *
|
|||||||
async def main():
|
async def main():
|
||||||
print(certs.vkey.encode().hex())
|
print(certs.vkey.encode().hex())
|
||||||
print((await request_signature()).hex())
|
print((await request_signature()).hex())
|
||||||
call([executable, '-m', 'v6d0auth.remove-role', certs.vkey.encode().hex(), 'test'])
|
call([executable, "-m", "v6d0auth.remove-role", certs.vkey.encode().hex(), "test"])
|
||||||
print(await has_role(certs.vkey, 'test'))
|
print(await has_role(certs.vkey, "test"))
|
||||||
print(await request_role('test'))
|
print(await request_role("test"))
|
||||||
print(await has_role(certs.vkey, 'test'))
|
print(await has_role(certs.vkey, "test"))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
|
Loading…
Reference in New Issue
Block a user