v25/v25/storage/dbstorage.py
2020-08-11 01:56:45 +03:00

193 lines
6.8 KiB
Python

import json
from typing import Tuple, Optional, Iterable
from nacl.bindings import crypto_sign_PUBLICKEYBYTES
from sqlalchemy import create_engine, LargeBinary, Column, REAL, BLOB, String, or_, and_, ForeignKeyConstraint, \
Integer, UniqueConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Query
from v25.messaging.encoding import NONCE_SIZE, Encoding
from v25.messaging.flags import Flags
from v25.messaging.message import Message
from v25.messaging.subject import Subject
from v25.storage.storage import AbstractStorage
Base = declarative_base()
class SSSJ(Base):
"""Subject Status Storage JSON"""
__tablename__ = 'sssj'
subject = Column(LargeBinary(crypto_sign_PUBLICKEYBYTES), primary_key=True)
status = Column(String, nullable=False, default='{}')
class MsgSgn(Base):
__tablename__ = 'msgsgns'
sf = Column(LargeBinary(crypto_sign_PUBLICKEYBYTES), primary_key=True)
st = Column(LargeBinary(crypto_sign_PUBLICKEYBYTES), primary_key=True)
idn = Column(LargeBinary(NONCE_SIZE), primary_key=True)
class Msg(Base):
__tablename__ = 'msgs'
oid = Column(Integer, autoincrement=True, primary_key=True)
sf = Column(LargeBinary(crypto_sign_PUBLICKEYBYTES), index=True)
st = Column(LargeBinary(crypto_sign_PUBLICKEYBYTES), index=True)
idn = Column(LargeBinary(NONCE_SIZE), index=True)
ts = Column(REAL, nullable=False, index=True)
en = Column(LargeBinary(NONCE_SIZE), nullable=False, index=True)
ec = Column(BLOB, nullable=False)
flags = Column(String, nullable=False)
__table_args__ = (ForeignKeyConstraint((sf, st, idn,),
(MsgSgn.sf, MsgSgn.st, MsgSgn.idn,),
),
UniqueConstraint(sf, st, idn, ),
)
def sgn(self):
return MsgSgn(sf=self.sf, st=self.st, idn=self.idn)
def __repr__(self):
return f"{type(self).__name__}(" \
f"sf={self.sf!r}, st={self.st!r}, idn={self.idn!r}, ts={self.ts!r}," \
f" en={self.en!r}, ec={self.ec!r}," \
f" flags={self.flags!r}" \
f")"
@classmethod
def from_message(cls, m: Message):
# noinspection PyArgumentList
return cls(sf=m.sfrom.vkey.encode(), st=m.sto.vkey.encode(), idn=m.idnonce, ts=m.timestamp,
en=m.editnonce, ec=m.econtent,
flags=m.flags)
def to_message(self):
return Message(Subject(self.sf), Subject(self.st), self.idn, self.ts,
self.en, None, self.ec, self.flags)
class DBStorage(AbstractStorage):
def __init__(self, *args):
self.engine = create_engine(*args, echo=False)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
def check(self, subject: Subject) -> dict:
session = self.Session()
sssj = session.query(SSSJ).filter_by(subject=subject.vkey.encode()).one_or_none()
status = json.loads(sssj.status) if sssj else {}
if 'contacts' in status:
query: Query = session.query(Msg).filter(or_(
Msg.sf == subject.vkey.encode(),
Msg.st == subject.vkey.encode(),
))
query = query.with_entities(Msg.sf, Msg.st).distinct()
contacts = set()
for sf, st in query:
contacts.add(sf)
contacts.add(st)
status['contacts'] = list(map(Encoding.encode, contacts))
session.close()
return status
def push(self, m: Message) -> None:
session = self.Session()
msg = Msg.from_message(m)
session.add(msg)
session.add(msg.sgn())
session.commit()
session.close()
def edit(self, old: Message, new: Message):
assert old.edited(new)
session = self.Session()
msg = self.one_alike(session, old)
assert msg.en == old.editnonce
msgn = Msg.from_message(new)
msg.en = msgn.en
msg.ec = msgn.ec
msg.flags = msgn.flags
session.commit()
session.close()
@staticmethod
def one_alike(session, m: Message) -> Msg:
return session.query(Msg).filter_by(sf=m.sfrom.vkey.encode(), st=m.sto.vkey.encode(), idn=m.idnonce).one()
def delete(self, m: Message):
session = self.Session()
session.delete(self.one_alike(session, m))
session.commit()
session.close()
def pull(self, pair: Tuple[Subject, Subject], params: Optional[dict] = None) -> Iterable[Message]:
if params is None:
params = {}
session = self.Session()
query: Query = session.query(Msg).filter(or_(
and_(
Msg.sf == pair[0].vkey.encode(),
Msg.st == pair[1].vkey.encode(),
),
and_(
Msg.sf == pair[1].vkey.encode(),
Msg.st == pair[0].vkey.encode(),
),
))
if 'ts' in params:
if '>' in params:
query = query.filter(Msg.ts > params['ts']['>'])
if '<' in params:
query = query.filter(Msg.ts < params['ts']['<'])
if params.get('before'):
query = query.filter(Msg.oid < self.one_alike(
session,
Message(pair[0], pair[1], Encoding.decode(params['before']), None,
None, None, None,
'')).oid)
if params.get('after'):
query = query.filter(Msg.oid > self.one_alike(
session,
Message(pair[0], pair[1], Encoding.decode(params['before']), None,
None, None, None,
'')).oid)
for flag in params.get('flags', ()):
query = query.filter(Msg.flags.contains(flag))
query = query.order_by(Msg.oid.desc())
if 'limit' in params:
query = query.limit(params['limit'])
messages = map(Msg.to_message, list(query.from_self().order_by(Msg.oid)))
session.close()
return messages
def flags(self, m: Message, flags: str):
session = self.Session()
msg: Msg = self.one_alike(session, m)
assert msg.en == m.editnonce
assert Flags(msg.flags).quable()
assert not Flags(flags).quable()
msg.flags = flags
session.commit()
session.close()
def clearsssj(self):
session = self.Session()
session.query(SSSJ).delete()
session.commit()
session.close()
def ssssj(self, subject: str, status: str):
"""set SSSJ"""
subject = Subject(Encoding.decode(subject)).vkey.encode()
session = self.Session()
session.query(SSSJ).filter_by(subject=subject).delete()
session.add(SSSJ(subject=subject, status=status))
session.commit()
session.close()