diff --git a/dev-main.py b/dev-main.py index e42d751..8fb9000 100644 --- a/dev-main.py +++ b/dev-main.py @@ -16,7 +16,7 @@ config.from_config(d) app = Flask(__name__) app.wsgi_app = DispatcherMiddleware(simple, { - '/v25': API(__name__, DBStorage(d['db'])) + '/v25': API(__name__, DBStorage(d['db']).pushing()) }) app.config['ENV'] = 'development' app.run(port=5013) diff --git a/v25/storage/dbstorage.py b/v25/storage/dbstorage.py index a1877e1..7f383d6 100644 --- a/v25/storage/dbstorage.py +++ b/v25/storage/dbstorage.py @@ -1,7 +1,10 @@ import json -from time import time -from typing import Tuple, Optional, Iterable +from sys import stderr +from threading import Thread, Lock +from time import time, sleep +from typing import Tuple, Optional, Iterable, Dict, List +import requests from nacl.bindings import crypto_sign_PUBLICKEYBYTES from sqlalchemy import create_engine, LargeBinary, Column, REAL, BLOB, String, or_, and_, ForeignKeyConstraint, \ Integer, UniqueConstraint, Index @@ -12,7 +15,7 @@ from v25.messaging.encoding import NONCE_SIZE, Encoding from v25.messaging.flags import Flags from v25.messaging.message import Message from v25.messaging.subject import Subject -from v25.storage.storage import EventStorage, EVENT_PUSH, EVENT_EDIT, EVENT_DELETE +from v25.storage.storage import EVENT_PUSH, EVENT_EDIT, EVENT_DELETE, PushStorage Base = declarative_base() @@ -103,8 +106,47 @@ class MsgEvent(Base): en=bytes([code]) + Encoding.nonce()) -class DBStorage(EventStorage): +class PushSub(Base): + __tablename__ = 'pushsubs' + + subject = Column(LargeBinary(crypto_sign_PUBLICKEYBYTES), primary_key=True) + subscription = Column(String, nullable=False) + + +class PushState: + map: Dict[bytes, 'PushState'] = {} + + @classmethod + def of(cls, s: Subject) -> 'PushState': + return cls.map.setdefault(s.vkey, cls()) + + def __init__(self): + self.lock = Lock() + self.notified = False + self.pushmsg = False + self.lastonline: Optional[float] = None + + def afk_for(self, s: float): + return self.lastonline is None or time() > self.lastonline + s + + def online(self): + with self.lock: + self.notified = False + self.pushmsg = False + self.lastonline = time() + + def notify(self) -> bool: + with self.lock: + if not self.pushmsg or self.notified or not self.afk_for(5): + return False + self.pushmsg = False + self.notified = True + return True + + +class DBStorage(PushStorage): def __init__(self, *args): + self.args = args self.engine = create_engine(*args, echo=False) Base.metadata.create_all(self.engine) self.Session = sessionmaker(bind=self.engine) @@ -137,6 +179,9 @@ class DBStorage(EventStorage): session.add(msg.sgn()) self.event(session, m, EVENT_PUSH) session.commit() + state = PushState.of(m.sto) + with state.lock: + state.pushmsg = True finally: session.close() @@ -246,6 +291,7 @@ class DBStorage(EventStorage): def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]: session = self.Session() try: + PushState.of(sto).online() query: Query = session.query(MsgEvent) query = query.filter(MsgEvent.sf == sfrom.vkey.encode(), MsgEvent.st == sto.vkey.encode()) if after is None: @@ -266,3 +312,67 @@ class DBStorage(EventStorage): session.commit() finally: session.close() + + def subscribe(self, subject: Subject, subscription: dict): + session = self.Session() + try: + subject = subject.vkey.encode() + session.query(PushSub).filter_by(subject=subject).delete() + session.add(PushSub(subject=subject, subscription=json.dumps(subscription))) + session.commit() + finally: + session.close() + + def subscription(self, subject: Subject) -> Optional[dict]: + session = self.Session() + try: + subscription: Optional[PushSub] = session.query(PushSub).filter_by( + subject=subject.vkey.encode()).one_or_none() + if subscription is None: + return None + return json.loads(subscription.subscription) + finally: + session.close() + + @staticmethod + def pushsubscription(subscription: dict): + assert requests.post('http://localhost:5025/api/push', json={ + "subscription": subscription, + "notification": { + "title": "New Message (V25PUSH)" + } + }).status_code == 200, "push failed" + + def pushpush(self, subject: Subject): + if PushState.of(subject).notify(): + subscription = self.subscription(subject) + if subscription: + self.pushsubscription(subscription) + + def push_once(self): + session = self.Session() + try: + subs: List[PushSub] = session.query(PushSub).all() + finally: + session.close() + for sub in subs: + try: + self.pushpush(Subject(sub.subject)) + except Exception as e: + print('push failed', e, file=stderr) + + def _push_forever(self): + while True: + self.push_once() + sleep(2) + + def _push_forever_run(self): + storage = DBStorage(*self.args) + storage._push_forever() + + def push_forever(self): + Thread(target=self._push_forever_run, daemon=True).start() + + def pushing(self): + self.push_forever() + return self diff --git a/v25/storage/secstorage.py b/v25/storage/secstorage.py index 9c69a17..92e39a6 100644 --- a/v25/storage/secstorage.py +++ b/v25/storage/secstorage.py @@ -2,11 +2,11 @@ from typing import Tuple, Optional, Iterable from v25.messaging.message import Message from v25.messaging.subject import Subject -from v25.storage.storage import EventStorage +from v25.storage.storage import PushStorage -class SecureStorage(EventStorage): - def __init__(self, storage: EventStorage, subject: Subject): +class SecureStorage(PushStorage): + def __init__(self, storage: PushStorage, subject: Subject): self.storage = storage self.subject = subject @@ -40,3 +40,7 @@ class SecureStorage(EventStorage): def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]: assert self.subject == sto, self.asrt() return self.storage.events(sfrom, sto, after) + + def subscribe(self, subject: Subject, subscription: dict): + assert self.subject == subject, self.asrt() + return self.storage.subscribe(subject, subscription) diff --git a/v25/storage/storage.py b/v25/storage/storage.py index 03b641c..29ded43 100644 --- a/v25/storage/storage.py +++ b/v25/storage/storage.py @@ -1,5 +1,5 @@ from abc import ABC -from typing import Tuple, Optional, List, Iterable +from typing import Tuple, Optional, Iterable from v25.messaging.message import Message from v25.messaging.subject import Subject @@ -33,3 +33,8 @@ class EventStorage(AbstractStorage, ABC): EVENT_PUSH = 0 EVENT_EDIT = 1 EVENT_DELETE = 2 + + +class PushStorage(EventStorage, ABC): + def subscribe(self, subject: Subject, subscription: dict): + raise NotImplementedError diff --git a/v25/web/client/remotestorage.py b/v25/web/client/remotestorage.py index 1775836..9e4f896 100644 --- a/v25/web/client/remotestorage.py +++ b/v25/web/client/remotestorage.py @@ -6,10 +6,10 @@ import requests from v25.messaging.encoding import Encoding from v25.messaging.message import Message from v25.messaging.subject import Subject, PrivateSubject -from v25.storage.storage import EventStorage +from v25.storage.storage import PushStorage -class RemoteStorage(EventStorage): +class RemoteStorage(PushStorage): def __init__(self, url: str, subject: PrivateSubject): self.url = url self.subject = subject @@ -49,3 +49,9 @@ class RemoteStorage(EventStorage): return [(Encoding.decode(ev[0]), Encoding.decode(ev[1])) for ev in self.req('events', {'sfrom': sfrom.dumps(), 'sto': sto.dumps(), 'after': Encoding.encode(after)})] + + def subscribe(self, subject: Subject, subscription: dict): + self.req('subscribe', { + 'subject': subject.dumps(), + 'subscription': subscription + }) diff --git a/v25/web/server/api.py b/v25/web/server/api.py index 25fff0e..d19ad79 100644 --- a/v25/web/server/api.py +++ b/v25/web/server/api.py @@ -8,7 +8,7 @@ from v25.messaging.encoding import Encoding from v25.messaging.message import Message from v25.messaging.subject import Subject from v25.storage.secstorage import SecureStorage -from v25.storage.storage import EventStorage +from v25.storage.storage import PushStorage class API(Flask): @@ -19,7 +19,7 @@ class API(Flask): print('asrt', e, file=stderr) abort(403) - def __init__(self, import_name, storage: EventStorage): + def __init__(self, import_name, storage: PushStorage): self.storage = storage super().__init__(import_name) self.routes() @@ -75,3 +75,8 @@ class API(Flask): Subject.loads(d['sto']), Encoding.decode(d['after']) )]) + + @app.route('/subscribe', methods=['POST']) + def subscribe(): + return self.nomessassertcall(lambda d, storage: storage.subscribe(Subject.loads(d['subject']), + d['subscription']))