This commit is contained in:
AF 2020-08-12 20:55:39 +03:00
parent 538236ab42
commit a2c49d340d
7 changed files with 202 additions and 94 deletions

View File

@ -8,6 +8,12 @@
"oNNNAvX5nsJEQGf33xulhh27cpECgQtJT3jzu2VyNKY=": {
"allowed": null,
"contacts": null
},
"Roedh5GosuYKfytFpOnWaxgfJ2JfZlSudy0QHPmwPCg=": {
"allowed": null
},
"Fm-Ehb5Ue5Mg1alT17DyveoOI3u1oDmCTeKN5J0ntqs=": {
"allowed": null
}
}
}

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name='v25',
version='',
version='0.0.1-a0',
packages=['v25', 'v25.web', 'v25.web.client', 'v25.web.server', 'v25.storage', 'v25.messaging'],
url='',
license='',

View File

@ -1,9 +1,10 @@
import json
from time import time
from typing import Tuple, Optional, Iterable
from nacl.bindings import crypto_sign_PUBLICKEYBYTES
from sqlalchemy import create_engine, LargeBinary, Column, REAL, BLOB, String, or_, and_, ForeignKeyConstraint, \
Integer, UniqueConstraint
Integer, UniqueConstraint, Index
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Query
@ -11,7 +12,7 @@ from v25.messaging.encoding import NONCE_SIZE, Encoding
from v25.messaging.flags import Flags
from v25.messaging.message import Message
from v25.messaging.subject import Subject
from v25.storage.storage import AbstractStorage
from v25.storage.storage import EventStorage, EVENT_PUSH, EVENT_EDIT, EVENT_DELETE
Base = declarative_base()
@ -48,6 +49,10 @@ class Msg(Base):
(MsgSgn.sf, MsgSgn.st, MsgSgn.idn,),
),
UniqueConstraint(sf, st, idn, ),
Index("ix_msgs_full_en", sf, st, idn, en),
Index("ix_msgs_full_in", sf, st, idn, ),
Index("ix_msgs_full_ts", sf, st, ts, ),
Index("ix_msgs_full", sf, st, ),
)
def sgn(self):
@ -72,7 +77,33 @@ class Msg(Base):
self.en, None, self.ec, self.flags)
class DBStorage(AbstractStorage):
class MsgEvent(Base):
__tablename__ = 'msgevents'
oid = Column(Integer, autoincrement=True, primary_key=True)
sf = Column(LargeBinary(crypto_sign_PUBLICKEYBYTES), index=True)
st = Column(LargeBinary(crypto_sign_PUBLICKEYBYTES), index=True)
idn = Column(LargeBinary(NONCE_SIZE), index=True)
ts = Column(REAL, nullable=False, index=True)
en = Column(LargeBinary(NONCE_SIZE), nullable=False, index=True)
__table_args__ = (ForeignKeyConstraint((sf, st, idn,),
(MsgSgn.sf, MsgSgn.st, MsgSgn.idn,),
),
UniqueConstraint(sf, st, en, ),
Index("ix_msgevents_full_en", sf, st, en, ),
Index("ix_msgevents_full_ts", sf, st, ts, ),
Index("ix_msgevents_full", sf, st, ),
)
@classmethod
def from_message(cls, m: Message, code: int):
# noinspection PyArgumentList
return cls(sf=m.sfrom.vkey.encode(), st=m.sto.vkey.encode(), idn=m.idnonce, ts=time(),
en=bytes([code]) + Encoding.nonce())
class DBStorage(EventStorage):
def __init__(self, *args):
self.engine = create_engine(*args, echo=False)
Base.metadata.create_all(self.engine)
@ -80,41 +111,49 @@ class DBStorage(AbstractStorage):
def check(self, subject: Subject) -> dict:
session = self.Session()
sssj = session.query(SSSJ).filter_by(subject=subject.vkey.encode()).one_or_none()
status = json.loads(sssj.status) if sssj else {}
if 'contacts' in status:
query: Query = session.query(Msg).filter(or_(
Msg.sf == subject.vkey.encode(),
Msg.st == subject.vkey.encode(),
))
query = query.with_entities(Msg.sf, Msg.st).distinct()
contacts = set()
for sf, st in query:
contacts.add(sf)
contacts.add(st)
status['contacts'] = list(map(Encoding.encode, contacts))
session.close()
return status
try:
sssj = session.query(SSSJ).filter_by(subject=subject.vkey.encode()).one_or_none()
status = json.loads(sssj.status) if sssj else {}
if 'contacts' in status:
query: Query = session.query(Msg).filter(or_(
Msg.sf == subject.vkey.encode(),
Msg.st == subject.vkey.encode(),
))
query = query.with_entities(Msg.sf, Msg.st).distinct()
contacts = set()
for sf, st in query:
contacts.add(sf)
contacts.add(st)
status['contacts'] = list(map(Encoding.encode, contacts))
return status
finally:
session.close()
def push(self, m: Message) -> None:
session = self.Session()
msg = Msg.from_message(m)
session.add(msg)
session.add(msg.sgn())
session.commit()
session.close()
try:
msg = Msg.from_message(m)
session.add(msg)
session.add(msg.sgn())
self.event(session, m, EVENT_PUSH)
session.commit()
finally:
session.close()
def edit(self, old: Message, new: Message):
assert old.edited(new), 'edit misuse'
session = self.Session()
msg = self.one_alike(session, old)
assert msg.en == old.editnonce, 'edit misuse'
msgn = Msg.from_message(new)
msg.en = msgn.en
msg.ec = msgn.ec
msg.flags = msgn.flags
session.commit()
session.close()
try:
msg = self.one_alike(session, old)
assert msg.en == old.editnonce, 'edit misuse'
msgn = Msg.from_message(new)
msg.en = msgn.en
msg.ec = msgn.ec
msg.flags = msgn.flags
self.event(session, new, EVENT_EDIT)
session.commit()
finally:
session.close()
@staticmethod
def one_alike(session, m: Message) -> Msg:
@ -122,73 +161,109 @@ class DBStorage(AbstractStorage):
def delete(self, m: Message):
session = self.Session()
session.delete(self.one_alike(session, m))
session.commit()
session.close()
try:
session.delete(self.one_alike(session, m))
self.event(session, m, EVENT_DELETE)
session.commit()
finally:
session.close()
def pull(self, pair: Tuple[Subject, Subject], params: Optional[dict] = None) -> Iterable[Message]:
if params is None:
params = {}
session = self.Session()
query: Query = session.query(Msg).filter(or_(
and_(
Msg.sf == pair[0].vkey.encode(),
Msg.st == pair[1].vkey.encode(),
),
and_(
Msg.sf == pair[1].vkey.encode(),
Msg.st == pair[0].vkey.encode(),
),
))
if 'ts' in params:
if '>' in params:
query = query.filter(Msg.ts > params['ts']['>'])
if '<' in params:
query = query.filter(Msg.ts < params['ts']['<'])
if params.get('before'):
query = query.filter(Msg.oid < self.one_alike(
session,
Message(pair[0], pair[1], Encoding.decode(params['before']), None,
None, None, None,
'')).oid)
if params.get('after'):
query = query.filter(Msg.oid > self.one_alike(
session,
Message(pair[0], pair[1], Encoding.decode(params['after']), None,
None, None, None,
'')).oid)
if params.get('exact'):
query = query.filter(Msg.idn == Encoding.decode(params['exact']))
for flag in params.get('flags', ()):
query = query.filter(Msg.flags.contains(flag))
query = query.order_by(Msg.oid.desc())
if 'limit' in params:
query = query.limit(params['limit'])
messages = map(Msg.to_message, list(query.from_self().order_by(Msg.oid)))
session.close()
return messages
try:
cquery: Query = session.query(Msg).filter(or_(
and_(
Msg.sf == pair[0].vkey.encode(),
Msg.st == pair[1].vkey.encode(),
),
and_(
Msg.sf == pair[1].vkey.encode(),
Msg.st == pair[0].vkey.encode(),
),
))
query: Query = cquery
if 'ts' in params:
if '>' in params:
query = query.filter(Msg.ts > params['ts']['>'])
if '<' in params:
query = query.filter(Msg.ts < params['ts']['<'])
if params.get('before'):
query = query.filter(Msg.oid < cquery.filter(Msg.idn == Encoding.decode(params['before'])).one().oid)
if params.get('after'):
query = query.filter(Msg.oid > cquery.filter(Msg.idn == Encoding.decode(params['after'])).one().oid)
if params.get('exact'):
print(Encoding.decode(params['exact']))
query = query.filter(Msg.idn == Encoding.decode(params['exact']))
for flag in params.get('flags', ()):
query = query.filter(Msg.flags.contains(flag))
query = query.order_by(Msg.oid.desc())
if 'limit' in params:
query = query.limit(params['limit'])
return map(Msg.to_message, list(query.from_self().order_by(Msg.oid)))
finally:
session.close()
def flags(self, m: Message, flags: str):
session = self.Session()
msg: Msg = self.one_alike(session, m)
assert msg.en == m.editnonce, 'flags misuse'
assert Flags(msg.flags).quable(), 'flags misuse'
assert not Flags(flags).quable(), 'flags misuse'
msg.flags = flags
session.commit()
session.close()
session = self.Session()
try:
msg: Msg = self.one_alike(session, m)
assert msg.en == m.editnonce, 'flags misuse'
assert Flags(msg.flags).quable(), 'flags misuse'
msg.flags = flags
session.commit()
finally:
session.close()
def clearsssj(self):
session = self.Session()
session.query(SSSJ).delete()
session.commit()
session.close()
try:
session.query(SSSJ).delete()
session.commit()
finally:
session.close()
def ssssj(self, subject: str, status: str):
"""set SSSJ"""
subject = Subject(Encoding.decode(subject)).vkey.encode()
session = self.Session()
session.query(SSSJ).filter_by(subject=subject).delete()
session.add(SSSJ(subject=subject, status=status))
session.commit()
session.close()
try:
subject = Subject(Encoding.decode(subject)).vkey.encode()
session.query(SSSJ).filter_by(subject=subject).delete()
session.add(SSSJ(subject=subject, status=status))
session.commit()
finally:
session.close()
@staticmethod
def event(session, m: Message, code: int):
session.add(MsgEvent.from_message(m, code))
@staticmethod
def ev_oid(query: Query, en: bytes) -> MsgEvent:
return query.filter(MsgEvent.en == en).one()
def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]:
session = self.Session()
try:
query: Query = session.query(MsgEvent)
query = query.filter(MsgEvent.sf == sfrom.vkey.encode(), MsgEvent.st == sto.vkey.encode())
if after is None:
query = query.order_by(MsgEvent.oid.desc())
query = query.limit(1)
else:
query = query.filter(MsgEvent.oid > self.ev_oid(query, after).oid)
query = query.order_by(MsgEvent.oid)
ev: MsgEvent
return [(ev.en, ev.idn) for ev in query.all()]
finally:
session.close()
def cleanevents(self):
session = self.Session()
try:
session.query(MsgEvent).delete(MsgEvent.ts < time() - 604800)
session.commit()
finally:
session.close()

View File

@ -2,11 +2,11 @@ from typing import Tuple, Optional, Iterable
from v25.messaging.message import Message
from v25.messaging.subject import Subject
from v25.storage.storage import AbstractStorage
from v25.storage.storage import EventStorage
class SecureStorage(AbstractStorage):
def __init__(self, storage: AbstractStorage, subject: Subject):
class SecureStorage(EventStorage):
def __init__(self, storage: EventStorage, subject: Subject):
self.storage = storage
self.subject = subject
@ -36,3 +36,7 @@ class SecureStorage(AbstractStorage):
def flags(self, m: Message, flags: str):
assert self.subject in m.pair, self.asrt()
return self.storage.flags(m, flags)
def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]:
assert self.subject == sto, self.asrt()
return self.storage.events(sfrom, sto, after)

View File

@ -23,3 +23,13 @@ class AbstractStorage(ABC):
def flags(self, m: Message, flags: str):
raise NotImplementedError
class EventStorage(AbstractStorage, ABC):
def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]:
raise NotImplementedError
EVENT_PUSH = 0
EVENT_EDIT = 1
EVENT_DELETE = 2

View File

@ -6,10 +6,10 @@ import requests
from v25.messaging.encoding import Encoding
from v25.messaging.message import Message
from v25.messaging.subject import Subject, PrivateSubject
from v25.storage.storage import AbstractStorage
from v25.storage.storage import EventStorage
class RemoteStorage(AbstractStorage):
class RemoteStorage(EventStorage):
def __init__(self, url: str, subject: PrivateSubject):
self.url = url
self.subject = subject
@ -44,3 +44,8 @@ class RemoteStorage(AbstractStorage):
def flags(self, m: Message, flags: str):
self.req('flags', {'m': m.dumps(), 'flags': flags})
def events(self, sfrom: Subject, sto: Subject, after: Optional[bytes]) -> Iterable[Tuple[bytes, bytes]]:
return [(Encoding.decode(ev[0]), Encoding.decode(ev[1])) for ev in
self.req('events', {'sfrom': sfrom.dumps(), 'sto': sto.dumps(),
'after': Encoding.encode(after)})]

View File

@ -8,7 +8,7 @@ from v25.messaging.encoding import Encoding
from v25.messaging.message import Message
from v25.messaging.subject import Subject
from v25.storage.secstorage import SecureStorage
from v25.storage.storage import AbstractStorage
from v25.storage.storage import EventStorage
class API(Flask):
@ -19,7 +19,7 @@ class API(Flask):
print('asrt', e, file=stderr)
abort(403)
def __init__(self, import_name, storage: AbstractStorage):
def __init__(self, import_name, storage: EventStorage):
self.storage = storage
super().__init__(import_name)
self.routes()
@ -67,3 +67,11 @@ class API(Flask):
@app.route('/flags', methods=['POST'])
def flags():
return self.nomessassertcall(lambda d, storage: storage.flags(Message.loads(d['m']), d['flags']))
@app.route('/events', methods=['POST'])
def events():
return self.nomessassertcall(lambda d, storage: [list(map(Encoding.encode, ev)) for ev in storage.events(
Subject.loads(d['sfrom']),
Subject.loads(d['sto']),
Encoding.decode(d['after'])
)])