From a2c49d340de126866ef6a1fbd8d20009a1743521 Mon Sep 17 00:00:00 2001 From: timotheyca Date: Wed, 12 Aug 2020 20:55:39 +0300 Subject: [PATCH] add NNN --- config.json | 6 + setup.py | 2 +- v25/storage/dbstorage.py | 247 +++++++++++++++++++++----------- v25/storage/secstorage.py | 10 +- v25/storage/storage.py | 10 ++ v25/web/client/remotestorage.py | 9 +- v25/web/server/api.py | 12 +- 7 files changed, 202 insertions(+), 94 deletions(-) diff --git a/config.json b/config.json index c1283ec..e83df29 100644 --- a/config.json +++ b/config.json @@ -8,6 +8,12 @@ "oNNNAvX5nsJEQGf33xulhh27cpECgQtJT3jzu2VyNKY=": { "allowed": null, "contacts": null + }, + "Roedh5GosuYKfytFpOnWaxgfJ2JfZlSudy0QHPmwPCg=": { + "allowed": null + }, + "Fm-Ehb5Ue5Mg1alT17DyveoOI3u1oDmCTeKN5J0ntqs=": { + "allowed": null } } } diff --git a/setup.py b/setup.py index 8960c25..462adfc 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup setup( name='v25', - version='', + version='0.0.1-a0', packages=['v25', 'v25.web', 'v25.web.client', 'v25.web.server', 'v25.storage', 'v25.messaging'], url='', license='', diff --git a/v25/storage/dbstorage.py b/v25/storage/dbstorage.py index 301f2a8..2956de9 100644 --- a/v25/storage/dbstorage.py +++ b/v25/storage/dbstorage.py @@ -1,9 +1,10 @@ import json +from time import time from typing import Tuple, Optional, Iterable from nacl.bindings import crypto_sign_PUBLICKEYBYTES from sqlalchemy import create_engine, LargeBinary, Column, REAL, BLOB, String, or_, and_, ForeignKeyConstraint, \ - Integer, UniqueConstraint + Integer, UniqueConstraint, Index from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, Query @@ -11,7 +12,7 @@ from v25.messaging.encoding import NONCE_SIZE, Encoding from v25.messaging.flags import Flags from v25.messaging.message import Message from v25.messaging.subject import Subject -from v25.storage.storage import AbstractStorage +from v25.storage.storage import EventStorage, EVENT_PUSH, EVENT_EDIT, EVENT_DELETE Base = declarative_base() @@ -48,6 +49,10 @@ class Msg(Base): (MsgSgn.sf, MsgSgn.st, MsgSgn.idn,), ), UniqueConstraint(sf, st, idn, ), + Index("ix_msgs_full_en", sf, st, idn, en), + Index("ix_msgs_full_in", sf, st, idn, ), + Index("ix_msgs_full_ts", sf, st, ts, ), + Index("ix_msgs_full", sf, st, ), ) def sgn(self): @@ -72,7 +77,33 @@ class Msg(Base): self.en, None, self.ec, self.flags) -class DBStorage(AbstractStorage): +class MsgEvent(Base): + __tablename__ = 'msgevents' + + oid = Column(Integer, autoincrement=True, primary_key=True) + sf = Column(LargeBinary(crypto_sign_PUBLICKEYBYTES), index=True) + st = Column(LargeBinary(crypto_sign_PUBLICKEYBYTES), index=True) + idn = Column(LargeBinary(NONCE_SIZE), index=True) + ts = Column(REAL, nullable=False, index=True) + en = Column(LargeBinary(NONCE_SIZE), nullable=False, index=True) + + __table_args__ = (ForeignKeyConstraint((sf, st, idn,), + (MsgSgn.sf, MsgSgn.st, MsgSgn.idn,), + ), + UniqueConstraint(sf, st, en, ), + Index("ix_msgevents_full_en", sf, st, en, ), + Index("ix_msgevents_full_ts", sf, st, ts, ), + Index("ix_msgevents_full", sf, st, ), + ) + + @classmethod + def from_message(cls, m: Message, code: int): + # noinspection PyArgumentList + return cls(sf=m.sfrom.vkey.encode(), st=m.sto.vkey.encode(), idn=m.idnonce, ts=time(), + en=bytes([code]) + Encoding.nonce()) + + +class DBStorage(EventStorage): def __init__(self, *args): self.engine = create_engine(*args, echo=False) Base.metadata.create_all(self.engine) @@ -80,41 +111,49 @@ class DBStorage(AbstractStorage): def check(self, subject: Subject) -> dict: session = self.Session() - sssj = session.query(SSSJ).filter_by(subject=subject.vkey.encode()).one_or_none() - status = json.loads(sssj.status) if sssj else {} - if 'contacts' in status: - query: Query = session.query(Msg).filter(or_( - Msg.sf == subject.vkey.encode(), - Msg.st == subject.vkey.encode(), - )) - query = query.with_entities(Msg.sf, Msg.st).distinct() - contacts = set() - for sf, st in query: - contacts.add(sf) - contacts.add(st) - status['contacts'] = list(map(Encoding.encode, contacts)) - session.close() - return status + try: + sssj = session.query(SSSJ).filter_by(subject=subject.vkey.encode()).one_or_none() + status = json.loads(sssj.status) if sssj else {} + if 'contacts' in status: + query: Query = session.query(Msg).filter(or_( + Msg.sf == subject.vkey.encode(), + Msg.st == subject.vkey.encode(), + )) + query = query.with_entities(Msg.sf, Msg.st).distinct() + contacts = set() + for sf, st in query: + contacts.add(sf) + contacts.add(st) + status['contacts'] = list(map(Encoding.encode, contacts)) + return status + finally: + session.close() def push(self, m: Message) -> None: session = self.Session() - msg = Msg.from_message(m) - session.add(msg) - session.add(msg.sgn()) - session.commit() - session.close() + try: + msg = Msg.from_message(m) + session.add(msg) + session.add(msg.sgn()) + self.event(session, m, EVENT_PUSH) + session.commit() + finally: + session.close() def edit(self, old: Message, new: Message): assert old.edited(new), 'edit misuse' session = self.Session() - msg = self.one_alike(session, old) - assert msg.en == old.editnonce, 'edit misuse' - msgn = Msg.from_message(new) - msg.en = msgn.en - msg.ec = msgn.ec - msg.flags = msgn.flags - session.commit() - session.close() + try: + msg = self.one_alike(session, old) + assert msg.en == old.editnonce, 'edit misuse' + msgn = Msg.from_message(new) + msg.en = msgn.en + msg.ec = msgn.ec + msg.flags = msgn.flags + self.event(session, new, EVENT_EDIT) + session.commit() + finally: + session.close() @staticmethod def one_alike(session, m: Message) -> Msg: @@ -122,73 +161,109 @@ class DBStorage(AbstractStorage): def delete(self, m: Message): session = self.Session() - session.delete(self.one_alike(session, m)) - session.commit() - session.close() + try: + session.delete(self.one_alike(session, m)) + self.event(session, m, EVENT_DELETE) + session.commit() + finally: + session.close() def pull(self, pair: Tuple[Subject, Subject], params: Optional[dict] = None) -> Iterable[Message]: if params is None: params = {} session = self.Session() - query: Query = session.query(Msg).filter(or_( - and_( - Msg.sf == pair[0].vkey.encode(), - Msg.st == pair[1].vkey.encode(), - ), - and_( - Msg.sf == pair[1].vkey.encode(), - Msg.st == pair[0].vkey.encode(), - ), - )) - if 'ts' in params: - if '>' in params: - query = query.filter(Msg.ts > params['ts']['>']) - if '<' in params: - query = query.filter(Msg.ts < params['ts']['<']) - if params.get('before'): - query = query.filter(Msg.oid < self.one_alike( - session, - Message(pair[0], pair[1], Encoding.decode(params['before']), None, - None, None, None, - '')).oid) - if params.get('after'): - query = query.filter(Msg.oid > self.one_alike( - session, - Message(pair[0], pair[1], Encoding.decode(params['after']), None, - None, None, None, - '')).oid) - if params.get('exact'): - query = query.filter(Msg.idn == Encoding.decode(params['exact'])) - for flag in params.get('flags', ()): - query = query.filter(Msg.flags.contains(flag)) - query = query.order_by(Msg.oid.desc()) - if 'limit' in params: - query = query.limit(params['limit']) - messages = map(Msg.to_message, list(query.from_self().order_by(Msg.oid))) - session.close() - return messages + try: + cquery: Query = session.query(Msg).filter(or_( + and_( + Msg.sf == pair[0].vkey.encode(), + Msg.st == pair[1].vkey.encode(), + ), + and_( + Msg.sf == pair[1].vkey.encode(), + Msg.st == pair[0].vkey.encode(), + ), + )) + query: Query = cquery + if 'ts' in params: + if '>' in params: + query = query.filter(Msg.ts > params['ts']['>']) + if '<' in params: + query = query.filter(Msg.ts < params['ts']['<']) + if params.get('before'): + query = query.filter(Msg.oid < cquery.filter(Msg.idn == Encoding.decode(params['before'])).one().oid) + if params.get('after'): + query = query.filter(Msg.oid > cquery.filter(Msg.idn == Encoding.decode(params['after'])).one().oid) + if params.get('exact'): + print(Encoding.decode(params['exact'])) + query = query.filter(Msg.idn == Encoding.decode(params['exact'])) + for flag in params.get('flags', ()): + query = query.filter(Msg.flags.contains(flag)) + query = query.order_by(Msg.oid.desc()) + if 'limit' in params: + query = query.limit(params['limit']) + return map(Msg.to_message, list(query.from_self().order_by(Msg.oid))) + finally: + session.close() def flags(self, m: Message, flags: str): - session = self.Session() - msg: Msg = self.one_alike(session, m) - assert msg.en == m.editnonce, 'flags misuse' - assert Flags(msg.flags).quable(), 'flags misuse' assert not Flags(flags).quable(), 'flags misuse' - msg.flags = flags - session.commit() - session.close() + session = self.Session() + try: + msg: Msg = self.one_alike(session, m) + assert msg.en == m.editnonce, 'flags misuse' + assert Flags(msg.flags).quable(), 'flags misuse' + msg.flags = flags + session.commit() + finally: + session.close() def clearsssj(self): session = self.Session() - session.query(SSSJ).delete() - session.commit() - session.close() + try: + session.query(SSSJ).delete() + session.commit() + finally: + session.close() def ssssj(self, subject: str, status: str): """set SSSJ""" - subject = Subject(Encoding.decode(subject)).vkey.encode() session = self.Session() - session.query(SSSJ).filter_by(subject=subject).delete() - session.add(SSSJ(subject=subject, status=status)) - session.commit() - session.close() + try: + subject = Subject(Encoding.decode(subject)).vkey.encode() + session.query(SSSJ).filter_by(subject=subject).delete() + session.add(SSSJ(subject=subject, status=status)) + session.commit() + finally: + session.close() + + @staticmethod + def event(session, m: Message, code: int): + session.add(MsgEvent.from_message(m, code)) + + @staticmethod + def ev_oid(query: Query, en: bytes) -> MsgEvent: + return query.filter(MsgEvent.en == en).one() + + def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]: + session = self.Session() + try: + query: Query = session.query(MsgEvent) + query = query.filter(MsgEvent.sf == sfrom.vkey.encode(), MsgEvent.st == sto.vkey.encode()) + if after is None: + query = query.order_by(MsgEvent.oid.desc()) + query = query.limit(1) + else: + query = query.filter(MsgEvent.oid > self.ev_oid(query, after).oid) + query = query.order_by(MsgEvent.oid) + ev: MsgEvent + return [(ev.en, ev.idn) for ev in query.all()] + finally: + session.close() + + def cleanevents(self): + session = self.Session() + try: + session.query(MsgEvent).delete(MsgEvent.ts < time() - 604800) + session.commit() + finally: + session.close() diff --git a/v25/storage/secstorage.py b/v25/storage/secstorage.py index 3449f5a..9c69a17 100644 --- a/v25/storage/secstorage.py +++ b/v25/storage/secstorage.py @@ -2,11 +2,11 @@ from typing import Tuple, Optional, Iterable from v25.messaging.message import Message from v25.messaging.subject import Subject -from v25.storage.storage import AbstractStorage +from v25.storage.storage import EventStorage -class SecureStorage(AbstractStorage): - def __init__(self, storage: AbstractStorage, subject: Subject): +class SecureStorage(EventStorage): + def __init__(self, storage: EventStorage, subject: Subject): self.storage = storage self.subject = subject @@ -36,3 +36,7 @@ class SecureStorage(AbstractStorage): def flags(self, m: Message, flags: str): assert self.subject in m.pair, self.asrt() return self.storage.flags(m, flags) + + def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]: + assert self.subject == sto, self.asrt() + return self.storage.events(sfrom, sto, after) diff --git a/v25/storage/storage.py b/v25/storage/storage.py index 6fca03e..03b641c 100644 --- a/v25/storage/storage.py +++ b/v25/storage/storage.py @@ -23,3 +23,13 @@ class AbstractStorage(ABC): def flags(self, m: Message, flags: str): raise NotImplementedError + + +class EventStorage(AbstractStorage, ABC): + def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]: + raise NotImplementedError + + +EVENT_PUSH = 0 +EVENT_EDIT = 1 +EVENT_DELETE = 2 diff --git a/v25/web/client/remotestorage.py b/v25/web/client/remotestorage.py index 56db487..1775836 100644 --- a/v25/web/client/remotestorage.py +++ b/v25/web/client/remotestorage.py @@ -6,10 +6,10 @@ import requests from v25.messaging.encoding import Encoding from v25.messaging.message import Message from v25.messaging.subject import Subject, PrivateSubject -from v25.storage.storage import AbstractStorage +from v25.storage.storage import EventStorage -class RemoteStorage(AbstractStorage): +class RemoteStorage(EventStorage): def __init__(self, url: str, subject: PrivateSubject): self.url = url self.subject = subject @@ -44,3 +44,8 @@ class RemoteStorage(AbstractStorage): def flags(self, m: Message, flags: str): self.req('flags', {'m': m.dumps(), 'flags': flags}) + + def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]: + return [(Encoding.decode(ev[0]), Encoding.decode(ev[1])) for ev in + self.req('events', {'sfrom': sfrom.dumps(), 'sto': sto.dumps(), + 'after': Encoding.encode(after)})] diff --git a/v25/web/server/api.py b/v25/web/server/api.py index ceaa5e6..25fff0e 100644 --- a/v25/web/server/api.py +++ b/v25/web/server/api.py @@ -8,7 +8,7 @@ from v25.messaging.encoding import Encoding from v25.messaging.message import Message from v25.messaging.subject import Subject from v25.storage.secstorage import SecureStorage -from v25.storage.storage import AbstractStorage +from v25.storage.storage import EventStorage class API(Flask): @@ -19,7 +19,7 @@ class API(Flask): print('asrt', e, file=stderr) abort(403) - def __init__(self, import_name, storage: AbstractStorage): + def __init__(self, import_name, storage: EventStorage): self.storage = storage super().__init__(import_name) self.routes() @@ -67,3 +67,11 @@ class API(Flask): @app.route('/flags', methods=['POST']) def flags(): return self.nomessassertcall(lambda d, storage: storage.flags(Message.loads(d['m']), d['flags'])) + + @app.route('/events', methods=['POST']) + def events(): + return self.nomessassertcall(lambda d, storage: [list(map(Encoding.encode, ev)) for ev in storage.events( + Subject.loads(d['sfrom']), + Subject.loads(d['sto']), + Encoding.decode(d['after']) + )])