This commit is contained in:
AF 2020-11-08 17:42:40 +03:00
parent 41b1579785
commit 5f49cd7f46
5 changed files with 77 additions and 68 deletions

View File

@ -1,7 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="SqlDialectMappings">
<file url="file://$PROJECT_DIR$/report.sql" dialect="GenericSQL" />
<file url="PROJECT" dialect="SQLite" />
</component>
</project>

View File

@ -1,4 +1,5 @@
import json import json
from contextlib import closing
from subprocess import run from subprocess import run
from sys import stderr from sys import stderr
from threading import Thread from threading import Thread
@ -145,6 +146,43 @@ class PushState(Base):
return True return True
class TypingState(Base):
__tablename__ = 'typingstate'
sf = Column(LargeBinary(crypto_sign_PUBLICKEYBYTES), primary_key=True)
st = Column(LargeBinary(crypto_sign_PUBLICKEYBYTES), primary_key=True)
last = Column(REAL)
@classmethod
def _get(cls, sfrom: Subject, sto: Subject, session) -> Tuple['TypingState', 'TypingState']:
sf = sfrom.vkey.encode()
st = sto.vkey.encode()
try:
statef = session.query(TypingState).filter_by(sf=sf, st=st).one()
except NoResultFound:
statef = TypingState(sf=sf, st=st, last=0)
session.add(statef)
try:
statet = session.query(TypingState).filter_by(sf=st, st=sf).one()
except NoResultFound:
statet = TypingState(sf=st, st=sf, last=0)
session.add(statet)
return statef, statet
@classmethod
def typing(cls, sfrom: Subject, sto: Subject, last: float, session) -> float:
statef, statet = cls._get(sfrom, sto, session)
statef.last = max(statef.last, last)
session.commit()
return statet.last
@classmethod
def reset(cls, m: Message, session):
statef, _ = cls._get(m.sfrom, m.sto, session)
statef.last = 0
session.commit()
class DBStorage(PushStorage): class DBStorage(PushStorage):
def __init__(self, *args): def __init__(self, *args):
self.args = args self.args = args
@ -153,8 +191,7 @@ class DBStorage(PushStorage):
self.Session = sessionmaker(bind=self.engine) self.Session = sessionmaker(bind=self.engine)
def check(self, subject: Subject) -> dict: def check(self, subject: Subject) -> dict:
session = self.Session() with closing(self.Session()) as session:
try:
sssj = session.query(SSSJ).filter_by(subject=subject.vkey.encode()).one_or_none() sssj = session.query(SSSJ).filter_by(subject=subject.vkey.encode()).one_or_none()
status = json.loads(sssj.status) if sssj else {} status = json.loads(sssj.status) if sssj else {}
if 'contacts' in status: if 'contacts' in status:
@ -169,29 +206,23 @@ class DBStorage(PushStorage):
contacts.add(st) contacts.add(st)
status['contacts'] = list(map(Encoding.encode, contacts)) status['contacts'] = list(map(Encoding.encode, contacts))
return status return status
finally:
session.close()
def push(self, m: Message) -> None: def push(self, m: Message) -> None:
session = self.Session() with closing(self.Session()) as session:
try:
msg = Msg.from_message(m) msg = Msg.from_message(m)
session.add(msg) session.add(msg)
session.add(msg.sgn()) session.add(msg.sgn())
self.event(session, m, EVENT_PUSH) self.event(session, m, EVENT_PUSH)
session.commit() session.commit()
if m.sfrom == m.sto: if m.sfrom != m.sto:
return state = PushState.of(m.sto, session)
state = PushState.of(m.sto, session) state.pushmsg = True
state.pushmsg = True session.commit()
session.commit() TypingState.reset(m, session)
finally:
session.close()
def edit(self, old: Message, new: Message): def edit(self, old: Message, new: Message):
assert old.edited(new), 'edit misuse' assert old.edited(new), 'edit misuse'
session = self.Session() with closing(self.Session()) as session:
try:
msg = self.one_alike(session, old) msg = self.one_alike(session, old)
assert msg.en == old.editnonce, 'edit misuse' assert msg.en == old.editnonce, 'edit misuse'
msgn = Msg.from_message(new) msgn = Msg.from_message(new)
@ -200,27 +231,22 @@ class DBStorage(PushStorage):
msg.flags = msgn.flags msg.flags = msgn.flags
self.event(session, new, EVENT_EDIT) self.event(session, new, EVENT_EDIT)
session.commit() session.commit()
finally: TypingState.reset(new, session)
session.close()
@staticmethod @staticmethod
def one_alike(session, m: Message) -> Msg: def one_alike(session, m: Message) -> Msg:
return session.query(Msg).filter_by(sf=m.sfrom.vkey.encode(), st=m.sto.vkey.encode(), idn=m.idnonce).one() return session.query(Msg).filter_by(sf=m.sfrom.vkey.encode(), st=m.sto.vkey.encode(), idn=m.idnonce).one()
def delete(self, m: Message): def delete(self, m: Message):
session = self.Session() with closing(self.Session()) as session:
try:
session.delete(self.one_alike(session, m)) session.delete(self.one_alike(session, m))
self.event(session, m, EVENT_DELETE) self.event(session, m, EVENT_DELETE)
session.commit() session.commit()
finally:
session.close()
def pull(self, pair: Tuple[Subject, Subject], params: Optional[dict] = None) -> Iterable[Message]: def pull(self, pair: Tuple[Subject, Subject], params: Optional[dict] = None) -> Iterable[Message]:
if params is None: if params is None:
params = {} params = {}
session = self.Session() with closing(self.Session()) as session:
try:
cquery: Query = session.query(Msg).filter(or_( cquery: Query = session.query(Msg).filter(or_(
and_( and_(
Msg.sf == pair[0].vkey.encode(), Msg.sf == pair[0].vkey.encode(),
@ -249,39 +275,28 @@ class DBStorage(PushStorage):
if 'limit' in params: if 'limit' in params:
query = query.limit(params['limit']) query = query.limit(params['limit'])
return map(Msg.to_message, list(query.from_self().order_by(Msg.oid))) return map(Msg.to_message, list(query.from_self().order_by(Msg.oid)))
finally:
session.close()
def flags(self, m: Message, flags: str): def flags(self, m: Message, flags: str):
assert not Flags(flags).quable(), 'flags misuse' assert not Flags(flags).quable(), 'flags misuse'
session = self.Session() with closing(self.Session()) as session:
try:
msg: Msg = self.one_alike(session, m) msg: Msg = self.one_alike(session, m)
assert msg.en == m.editnonce, 'flags misuse' assert msg.en == m.editnonce, 'flags misuse'
assert Flags(msg.flags).quable(), 'flags misuse' assert Flags(msg.flags).quable(), 'flags misuse'
msg.flags = flags msg.flags = flags
session.commit() session.commit()
finally:
session.close()
def clearsssj(self): def clearsssj(self):
session = self.Session() with closing(self.Session()) as session:
try:
session.query(SSSJ).delete() session.query(SSSJ).delete()
session.commit() session.commit()
finally:
session.close()
def ssssj(self, subject: str, status: str): def ssssj(self, subject: str, status: str):
"""set SSSJ""" """set SSSJ"""
session = self.Session() with closing(self.Session()) as session:
try:
subject = Subject(Encoding.decode(subject)).vkey.encode() subject = Subject(Encoding.decode(subject)).vkey.encode()
session.query(SSSJ).filter_by(subject=subject).delete() session.query(SSSJ).filter_by(subject=subject).delete()
session.add(SSSJ(subject=subject, status=status)) session.add(SSSJ(subject=subject, status=status))
session.commit() session.commit()
finally:
session.close()
@staticmethod @staticmethod
def event(session, m: Message, code: int): def event(session, m: Message, code: int):
@ -294,8 +309,7 @@ class DBStorage(PushStorage):
return query.filter_by(en=en).one() return query.filter_by(en=en).one()
def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]: def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]:
session = self.Session() with closing(self.Session()) as session:
try:
PushState.of(sto, session).online() PushState.of(sto, session).online()
session.commit() session.commit()
query: Query = session.query(MsgEvent) query: Query = session.query(MsgEvent)
@ -308,37 +322,26 @@ class DBStorage(PushStorage):
query = query.order_by(MsgEvent.oid) query = query.order_by(MsgEvent.oid)
ev: MsgEvent ev: MsgEvent
return [(ev.en, ev.idn) for ev in query.all()] return [(ev.en, ev.idn) for ev in query.all()]
finally:
session.close()
def cleanevents(self): def cleanevents(self):
session = self.Session() with closing(self.Session()) as session:
try:
session.query(MsgEvent).delete(MsgEvent.ts < time() - 604800) session.query(MsgEvent).delete(MsgEvent.ts < time() - 604800)
session.commit() session.commit()
finally:
session.close()
def subscribe(self, subject: Subject, subscription: dict): def subscribe(self, subject: Subject, subscription: dict):
session = self.Session() with closing(self.Session()) as session:
try:
subject = subject.vkey.encode() subject = subject.vkey.encode()
session.query(PushSub).filter_by(subject=subject).delete() session.query(PushSub).filter_by(subject=subject).delete()
session.add(PushSub(subject=subject, subscription=json.dumps(subscription))) session.add(PushSub(subject=subject, subscription=json.dumps(subscription)))
session.commit() session.commit()
finally:
session.close()
def subscription(self, subject: Subject) -> Optional[dict]: def subscription(self, subject: Subject) -> Optional[dict]:
session = self.Session() with closing(self.Session()) as session:
try:
subscription: Optional[PushSub] = session.query(PushSub).filter_by( subscription: Optional[PushSub] = session.query(PushSub).filter_by(
subject=subject.vkey.encode()).one_or_none() subject=subject.vkey.encode()).one_or_none()
if subscription is None: if subscription is None:
return None return None
return json.loads(subscription.subscription) return json.loads(subscription.subscription)
finally:
session.close()
@staticmethod @staticmethod
def pushsubscription(subscription: dict): def pushsubscription(subscription: dict):
@ -353,23 +356,17 @@ class DBStorage(PushStorage):
}), encoding='utf-8').returncode, "push subprocess failed" }), encoding='utf-8').returncode, "push subprocess failed"
def pushpush(self, subject: Subject): def pushpush(self, subject: Subject):
session = self.Session() with closing(self.Session()) as session:
try:
notify = PushState.of(subject, session).notify() notify = PushState.of(subject, session).notify()
session.commit() session.commit()
finally:
session.close()
if notify: if notify:
subscription = self.subscription(subject) subscription = self.subscription(subject)
if subscription: if subscription:
self.pushsubscription(subscription) self.pushsubscription(subscription)
def push_once(self): def push_once(self):
session = self.Session() with closing(self.Session()) as session:
try:
subs: List[PushSub] = session.query(PushSub).all() subs: List[PushSub] = session.query(PushSub).all()
finally:
session.close()
for sub in subs: for sub in subs:
try: try:
self.pushpush(Subject(sub.subject)) self.pushpush(Subject(sub.subject))
@ -391,3 +388,7 @@ class DBStorage(PushStorage):
def pushing(self): def pushing(self):
self.push_forever() self.push_forever()
return self return self
def typing(self, sfrom: Subject, sto: Subject, last: float) -> float:
with closing(self.Session()) as session:
return TypingState.typing(sfrom, sto, last, session)

View File

@ -44,3 +44,7 @@ class SecureStorage(PushStorage):
def subscribe(self, subject: Subject, subscription: dict): def subscribe(self, subject: Subject, subscription: dict):
assert self.subject == subject, self.asrt() assert self.subject == subject, self.asrt()
return self.storage.subscribe(subject, subscription) return self.storage.subscribe(subject, subscription)
def typing(self, sfrom: Subject, sto: Subject, last: float) -> float:
assert self.subject == sfrom, self.asrt()
return self.storage.typing(sfrom, sto, last)

View File

@ -29,6 +29,9 @@ class EventStorage(AbstractStorage, ABC):
def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]: def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]:
raise NotImplementedError raise NotImplementedError
def typing(self, sfrom: Subject, sto: Subject, last: float) -> float:
raise NotImplementedError
EVENT_PUSH = 0 EVENT_PUSH = 0
EVENT_EDIT = 1 EVENT_EDIT = 1

View File

@ -80,3 +80,11 @@ class API(Flask):
def subscribe(): def subscribe():
return self.nomessassertcall(lambda d, storage: storage.subscribe(Subject.loads(d['subject']), return self.nomessassertcall(lambda d, storage: storage.subscribe(Subject.loads(d['subject']),
d['subscription'])) d['subscription']))
@app.route('/typing', methods=['POST'])
def typing():
return self.nomessassertcall(lambda d, storage: storage.typing(
Subject.loads(d['sfrom']),
Subject.loads(d['sto']),
d['last']
))