PushState model

This commit is contained in:
AF 2020-08-14 21:04:41 +03:00
parent 0ea489b694
commit 7aea826c60

View File

@ -1,15 +1,16 @@
import json import json
from sys import stderr from sys import stderr
from threading import Thread, Lock from threading import Thread
from time import time, sleep from time import time, sleep
from typing import Tuple, Optional, Iterable, Dict, List from typing import Tuple, Optional, Iterable, List
import requests import requests
from nacl.bindings import crypto_sign_PUBLICKEYBYTES from nacl.bindings import crypto_sign_PUBLICKEYBYTES
from sqlalchemy import create_engine, LargeBinary, Column, REAL, BLOB, String, or_, and_, ForeignKeyConstraint, \ from sqlalchemy import create_engine, LargeBinary, Column, REAL, BLOB, String, or_, and_, ForeignKeyConstraint, \
Integer, UniqueConstraint, Index Integer, UniqueConstraint, Index, Boolean
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Query from sqlalchemy.orm import sessionmaker, Query
from sqlalchemy.orm.exc import NoResultFound
from v25.messaging.encoding import NONCE_SIZE, Encoding from v25.messaging.encoding import NONCE_SIZE, Encoding
from v25.messaging.flags import Flags from v25.messaging.flags import Flags
@ -70,8 +71,7 @@ class Msg(Base):
@classmethod @classmethod
def from_message(cls, m: Message): def from_message(cls, m: Message):
# noinspection PyArgumentList return Msg(sf=m.sfrom.vkey.encode(), st=m.sto.vkey.encode(), idn=m.idnonce, ts=m.timestamp,
return cls(sf=m.sfrom.vkey.encode(), st=m.sto.vkey.encode(), idn=m.idnonce, ts=m.timestamp,
en=m.editnonce, ec=m.econtent, en=m.editnonce, ec=m.econtent,
flags=m.flags) flags=m.flags)
@ -101,8 +101,7 @@ class MsgEvent(Base):
@classmethod @classmethod
def from_message(cls, m: Message, code: int): def from_message(cls, m: Message, code: int):
# noinspection PyArgumentList return MsgEvent(sf=m.sfrom.vkey.encode(), st=m.sto.vkey.encode(), idn=m.idnonce, ts=time(),
return cls(sf=m.sfrom.vkey.encode(), st=m.sto.vkey.encode(), idn=m.idnonce, ts=time(),
en=bytes([code]) + Encoding.nonce()) en=bytes([code]) + Encoding.nonce())
@ -113,30 +112,32 @@ class PushSub(Base):
subscription = Column(String, nullable=False) subscription = Column(String, nullable=False)
class PushState: class PushState(Base):
map: Dict[bytes, 'PushState'] = {} __tablename__ = 'pushstate'
subject = Column(LargeBinary(crypto_sign_PUBLICKEYBYTES), primary_key=True)
notified = Column(Boolean, nullable=False)
pushmsg = Column(Boolean, nullable=False)
lastonline: Optional[float] = Column(REAL, nullable=True)
@classmethod @classmethod
def of(cls, s: Subject) -> 'PushState': def of(cls, s: Subject, session) -> 'PushState':
return cls.map.setdefault(s.vkey, cls()) try:
return session.query(PushState).filter_by(subject=s.vkey.encode()).one()
def __init__(self): except NoResultFound:
self.lock = Lock() state = PushState(subject=s.vkey.encode(), notified=False, pushmsg=False, lastonline=None)
self.notified = False session.add(state)
self.pushmsg = False return state
self.lastonline: Optional[float] = None
def afk_for(self, s: float): def afk_for(self, s: float):
return self.lastonline is None or time() > self.lastonline + s return self.lastonline is None or time() > self.lastonline + s
def online(self): def online(self):
with self.lock:
self.notified = False self.notified = False
self.pushmsg = False self.pushmsg = False
self.lastonline = time() self.lastonline = time()
def notify(self) -> bool: def notify(self) -> bool:
with self.lock:
if not self.pushmsg or self.notified or not self.afk_for(5): if not self.pushmsg or self.notified or not self.afk_for(5):
return False return False
self.pushmsg = False self.pushmsg = False
@ -181,9 +182,9 @@ class DBStorage(PushStorage):
session.commit() session.commit()
if m.sfrom == m.sto: if m.sfrom == m.sto:
return return
state = PushState.of(m.sto) state = PushState.of(m.sto, session)
with state.lock:
state.pushmsg = True state.pushmsg = True
session.commit()
finally: finally:
session.close() session.close()
@ -241,7 +242,7 @@ class DBStorage(PushStorage):
if params.get('after'): if params.get('after'):
query = query.filter(Msg.oid > cquery.filter(Msg.idn == Encoding.decode(params['after'])).one().oid) query = query.filter(Msg.oid > cquery.filter(Msg.idn == Encoding.decode(params['after'])).one().oid)
if params.get('exact'): if params.get('exact'):
query = query.filter(Msg.idn == Encoding.decode(params['exact'])) query = query.filter_by(idn=Encoding.decode(params['exact']))
for flag in params.get('flags', ()): for flag in params.get('flags', ()):
query = query.filter(Msg.flags.contains(flag)) query = query.filter(Msg.flags.contains(flag))
query = query.order_by(Msg.oid.desc()) query = query.order_by(Msg.oid.desc())
@ -290,14 +291,15 @@ class DBStorage(PushStorage):
@staticmethod @staticmethod
def ev_oid(query: Query, en: bytes) -> MsgEvent: def ev_oid(query: Query, en: bytes) -> MsgEvent:
return query.filter(MsgEvent.en == en).one() return query.filter_by(en=en).one()
def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]: def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]:
session = self.Session() session = self.Session()
try: try:
PushState.of(sto).online() PushState.of(sto, session).online()
session.commit()
query: Query = session.query(MsgEvent) query: Query = session.query(MsgEvent)
query = query.filter(MsgEvent.sf == sfrom.vkey.encode(), MsgEvent.st == sto.vkey.encode()) query = query.filter_by(sf=sfrom.vkey.encode(), st=sto.vkey.encode())
if after is None: if after is None:
query = query.order_by(MsgEvent.oid.desc()) query = query.order_by(MsgEvent.oid.desc())
query = query.limit(1) query = query.limit(1)
@ -350,7 +352,13 @@ class DBStorage(PushStorage):
}).status_code == 200, "push failed" }).status_code == 200, "push failed"
def pushpush(self, subject: Subject): def pushpush(self, subject: Subject):
if PushState.of(subject).notify(): session = self.Session()
try:
notify = PushState.of(subject, session).notify()
session.commit()
finally:
session.close()
if notify:
subscription = self.subscription(subject) subscription = self.subscription(subject)
if subscription: if subscription:
self.pushsubscription(subscription) self.pushsubscription(subscription)