push notifications

This commit is contained in:
AF 2020-08-14 01:22:09 +03:00
parent 5c9513d034
commit 8ec2e91f55
6 changed files with 143 additions and 13 deletions

View File

@ -16,7 +16,7 @@ config.from_config(d)
app = Flask(__name__)
app.wsgi_app = DispatcherMiddleware(simple, {
'/v25': API(__name__, DBStorage(d['db']))
'/v25': API(__name__, DBStorage(d['db']).pushing())
})
app.config['ENV'] = 'development'
app.run(port=5013)

View File

@ -1,7 +1,10 @@
import json
from time import time
from typing import Tuple, Optional, Iterable
from sys import stderr
from threading import Thread, Lock
from time import time, sleep
from typing import Tuple, Optional, Iterable, Dict, List
import requests
from nacl.bindings import crypto_sign_PUBLICKEYBYTES
from sqlalchemy import create_engine, LargeBinary, Column, REAL, BLOB, String, or_, and_, ForeignKeyConstraint, \
Integer, UniqueConstraint, Index
@ -12,7 +15,7 @@ from v25.messaging.encoding import NONCE_SIZE, Encoding
from v25.messaging.flags import Flags
from v25.messaging.message import Message
from v25.messaging.subject import Subject
from v25.storage.storage import EventStorage, EVENT_PUSH, EVENT_EDIT, EVENT_DELETE
from v25.storage.storage import EVENT_PUSH, EVENT_EDIT, EVENT_DELETE, PushStorage
Base = declarative_base()
@ -103,8 +106,47 @@ class MsgEvent(Base):
en=bytes([code]) + Encoding.nonce())
class DBStorage(EventStorage):
class PushSub(Base):
__tablename__ = 'pushsubs'
subject = Column(LargeBinary(crypto_sign_PUBLICKEYBYTES), primary_key=True)
subscription = Column(String, nullable=False)
class PushState:
map: Dict[bytes, 'PushState'] = {}
@classmethod
def of(cls, s: Subject) -> 'PushState':
return cls.map.setdefault(s.vkey, cls())
def __init__(self):
self.lock = Lock()
self.notified = False
self.pushmsg = False
self.lastonline: Optional[float] = None
def afk_for(self, s: float):
return self.lastonline is None or time() > self.lastonline + s
def online(self):
with self.lock:
self.notified = False
self.pushmsg = False
self.lastonline = time()
def notify(self) -> bool:
with self.lock:
if not self.pushmsg or self.notified or not self.afk_for(5):
return False
self.pushmsg = False
self.notified = True
return True
class DBStorage(PushStorage):
def __init__(self, *args):
self.args = args
self.engine = create_engine(*args, echo=False)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
@ -137,6 +179,9 @@ class DBStorage(EventStorage):
session.add(msg.sgn())
self.event(session, m, EVENT_PUSH)
session.commit()
state = PushState.of(m.sto)
with state.lock:
state.pushmsg = True
finally:
session.close()
@ -246,6 +291,7 @@ class DBStorage(EventStorage):
def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]:
session = self.Session()
try:
PushState.of(sto).online()
query: Query = session.query(MsgEvent)
query = query.filter(MsgEvent.sf == sfrom.vkey.encode(), MsgEvent.st == sto.vkey.encode())
if after is None:
@ -266,3 +312,67 @@ class DBStorage(EventStorage):
session.commit()
finally:
session.close()
def subscribe(self, subject: Subject, subscription: dict):
session = self.Session()
try:
subject = subject.vkey.encode()
session.query(PushSub).filter_by(subject=subject).delete()
session.add(PushSub(subject=subject, subscription=json.dumps(subscription)))
session.commit()
finally:
session.close()
def subscription(self, subject: Subject) -> Optional[dict]:
session = self.Session()
try:
subscription: Optional[PushSub] = session.query(PushSub).filter_by(
subject=subject.vkey.encode()).one_or_none()
if subscription is None:
return None
return json.loads(subscription.subscription)
finally:
session.close()
@staticmethod
def pushsubscription(subscription: dict):
assert requests.post('http://localhost:5025/api/push', json={
"subscription": subscription,
"notification": {
"title": "New Message (V25PUSH)"
}
}).status_code == 200, "push failed"
def pushpush(self, subject: Subject):
if PushState.of(subject).notify():
subscription = self.subscription(subject)
if subscription:
self.pushsubscription(subscription)
def push_once(self):
session = self.Session()
try:
subs: List[PushSub] = session.query(PushSub).all()
finally:
session.close()
for sub in subs:
try:
self.pushpush(Subject(sub.subject))
except Exception as e:
print('push failed', e, file=stderr)
def _push_forever(self):
while True:
self.push_once()
sleep(2)
def _push_forever_run(self):
storage = DBStorage(*self.args)
storage._push_forever()
def push_forever(self):
Thread(target=self._push_forever_run, daemon=True).start()
def pushing(self):
self.push_forever()
return self

View File

@ -2,11 +2,11 @@ from typing import Tuple, Optional, Iterable
from v25.messaging.message import Message
from v25.messaging.subject import Subject
from v25.storage.storage import EventStorage
from v25.storage.storage import PushStorage
class SecureStorage(EventStorage):
def __init__(self, storage: EventStorage, subject: Subject):
class SecureStorage(PushStorage):
def __init__(self, storage: PushStorage, subject: Subject):
self.storage = storage
self.subject = subject
@ -40,3 +40,7 @@ class SecureStorage(EventStorage):
def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]:
assert self.subject == sto, self.asrt()
return self.storage.events(sfrom, sto, after)
def subscribe(self, subject: Subject, subscription: dict):
assert self.subject == subject, self.asrt()
return self.storage.subscribe(subject, subscription)

View File

@ -1,5 +1,5 @@
from abc import ABC
from typing import Tuple, Optional, List, Iterable
from typing import Tuple, Optional, Iterable
from v25.messaging.message import Message
from v25.messaging.subject import Subject
@ -33,3 +33,8 @@ class EventStorage(AbstractStorage, ABC):
EVENT_PUSH = 0
EVENT_EDIT = 1
EVENT_DELETE = 2
class PushStorage(EventStorage, ABC):
def subscribe(self, subject: Subject, subscription: dict):
raise NotImplementedError

View File

@ -6,10 +6,10 @@ import requests
from v25.messaging.encoding import Encoding
from v25.messaging.message import Message
from v25.messaging.subject import Subject, PrivateSubject
from v25.storage.storage import EventStorage
from v25.storage.storage import PushStorage
class RemoteStorage(EventStorage):
class RemoteStorage(PushStorage):
def __init__(self, url: str, subject: PrivateSubject):
self.url = url
self.subject = subject
@ -49,3 +49,9 @@ class RemoteStorage(EventStorage):
return [(Encoding.decode(ev[0]), Encoding.decode(ev[1])) for ev in
self.req('events', {'sfrom': sfrom.dumps(), 'sto': sto.dumps(),
'after': Encoding.encode(after)})]
def subscribe(self, subject: Subject, subscription: dict):
self.req('subscribe', {
'subject': subject.dumps(),
'subscription': subscription
})

View File

@ -8,7 +8,7 @@ from v25.messaging.encoding import Encoding
from v25.messaging.message import Message
from v25.messaging.subject import Subject
from v25.storage.secstorage import SecureStorage
from v25.storage.storage import EventStorage
from v25.storage.storage import PushStorage
class API(Flask):
@ -19,7 +19,7 @@ class API(Flask):
print('asrt', e, file=stderr)
abort(403)
def __init__(self, import_name, storage: EventStorage):
def __init__(self, import_name, storage: PushStorage):
self.storage = storage
super().__init__(import_name)
self.routes()
@ -75,3 +75,8 @@ class API(Flask):
Subject.loads(d['sto']),
Encoding.decode(d['after'])
)])
@app.route('/subscribe', methods=['POST'])
def subscribe():
return self.nomessassertcall(lambda d, storage: storage.subscribe(Subject.loads(d['subject']),
d['subscription']))