From 7aea826c60e8cae796ffcc40ce00bcfa31dc608d Mon Sep 17 00:00:00 2001 From: timotheyca Date: Fri, 14 Aug 2020 21:04:41 +0300 Subject: [PATCH] PushState model --- v25/storage/dbstorage.py | 80 ++++++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 36 deletions(-) diff --git a/v25/storage/dbstorage.py b/v25/storage/dbstorage.py index bc269f0..6d8429a 100644 --- a/v25/storage/dbstorage.py +++ b/v25/storage/dbstorage.py @@ -1,15 +1,16 @@ import json from sys import stderr -from threading import Thread, Lock +from threading import Thread from time import time, sleep -from typing import Tuple, Optional, Iterable, Dict, List +from typing import Tuple, Optional, Iterable, List import requests from nacl.bindings import crypto_sign_PUBLICKEYBYTES from sqlalchemy import create_engine, LargeBinary, Column, REAL, BLOB, String, or_, and_, ForeignKeyConstraint, \ - Integer, UniqueConstraint, Index + Integer, UniqueConstraint, Index, Boolean from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, Query +from sqlalchemy.orm.exc import NoResultFound from v25.messaging.encoding import NONCE_SIZE, Encoding from v25.messaging.flags import Flags @@ -70,8 +71,7 @@ class Msg(Base): @classmethod def from_message(cls, m: Message): - # noinspection PyArgumentList - return cls(sf=m.sfrom.vkey.encode(), st=m.sto.vkey.encode(), idn=m.idnonce, ts=m.timestamp, + return Msg(sf=m.sfrom.vkey.encode(), st=m.sto.vkey.encode(), idn=m.idnonce, ts=m.timestamp, en=m.editnonce, ec=m.econtent, flags=m.flags) @@ -101,9 +101,8 @@ class MsgEvent(Base): @classmethod def from_message(cls, m: Message, code: int): - # noinspection PyArgumentList - return cls(sf=m.sfrom.vkey.encode(), st=m.sto.vkey.encode(), idn=m.idnonce, ts=time(), - en=bytes([code]) + Encoding.nonce()) + return MsgEvent(sf=m.sfrom.vkey.encode(), st=m.sto.vkey.encode(), idn=m.idnonce, ts=time(), + en=bytes([code]) + Encoding.nonce()) class PushSub(Base): @@ -113,35 +112,37 @@ class PushSub(Base): subscription = Column(String, nullable=False) -class PushState: - map: Dict[bytes, 'PushState'] = {} +class PushState(Base): + __tablename__ = 'pushstate' + + subject = Column(LargeBinary(crypto_sign_PUBLICKEYBYTES), primary_key=True) + notified = Column(Boolean, nullable=False) + pushmsg = Column(Boolean, nullable=False) + lastonline: Optional[float] = Column(REAL, nullable=True) @classmethod - def of(cls, s: Subject) -> 'PushState': - return cls.map.setdefault(s.vkey, cls()) - - def __init__(self): - self.lock = Lock() - self.notified = False - self.pushmsg = False - self.lastonline: Optional[float] = None + def of(cls, s: Subject, session) -> 'PushState': + try: + return session.query(PushState).filter_by(subject=s.vkey.encode()).one() + except NoResultFound: + state = PushState(subject=s.vkey.encode(), notified=False, pushmsg=False, lastonline=None) + session.add(state) + return state def afk_for(self, s: float): return self.lastonline is None or time() > self.lastonline + s def online(self): - with self.lock: - self.notified = False - self.pushmsg = False - self.lastonline = time() + self.notified = False + self.pushmsg = False + self.lastonline = time() def notify(self) -> bool: - with self.lock: - if not self.pushmsg or self.notified or not self.afk_for(5): - return False - self.pushmsg = False - self.notified = True - return True + if not self.pushmsg or self.notified or not self.afk_for(5): + return False + self.pushmsg = False + self.notified = True + return True class DBStorage(PushStorage): @@ -181,9 +182,9 @@ class DBStorage(PushStorage): session.commit() if m.sfrom == m.sto: return - state = PushState.of(m.sto) - with state.lock: - state.pushmsg = True + state = PushState.of(m.sto, session) + state.pushmsg = True + session.commit() finally: session.close() @@ -241,7 +242,7 @@ class DBStorage(PushStorage): if params.get('after'): query = query.filter(Msg.oid > cquery.filter(Msg.idn == Encoding.decode(params['after'])).one().oid) if params.get('exact'): - query = query.filter(Msg.idn == Encoding.decode(params['exact'])) + query = query.filter_by(idn=Encoding.decode(params['exact'])) for flag in params.get('flags', ()): query = query.filter(Msg.flags.contains(flag)) query = query.order_by(Msg.oid.desc()) @@ -290,14 +291,15 @@ class DBStorage(PushStorage): @staticmethod def ev_oid(query: Query, en: bytes) -> MsgEvent: - return query.filter(MsgEvent.en == en).one() + return query.filter_by(en=en).one() def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]: session = self.Session() try: - PushState.of(sto).online() + PushState.of(sto, session).online() + session.commit() query: Query = session.query(MsgEvent) - query = query.filter(MsgEvent.sf == sfrom.vkey.encode(), MsgEvent.st == sto.vkey.encode()) + query = query.filter_by(sf=sfrom.vkey.encode(), st=sto.vkey.encode()) if after is None: query = query.order_by(MsgEvent.oid.desc()) query = query.limit(1) @@ -350,7 +352,13 @@ class DBStorage(PushStorage): }).status_code == 200, "push failed" def pushpush(self, subject: Subject): - if PushState.of(subject).notify(): + session = self.Session() + try: + notify = PushState.of(subject, session).notify() + session.commit() + finally: + session.close() + if notify: subscription = self.subscription(subject) if subscription: self.pushsubscription(subscription)