127 lines
4.2 KiB
Python
127 lines
4.2 KiB
Python
import json
|
|
from time import time
|
|
from typing import Optional, Tuple
|
|
|
|
import nacl.encoding
|
|
import nacl.public
|
|
import nacl.secret
|
|
import nacl.signing
|
|
import nacl.utils
|
|
|
|
from v25.messaging.encoding import Encoding
|
|
from v25.messaging.flags import Flags
|
|
from v25.messaging.subject import Subject, PrivateSubject
|
|
|
|
KEY_SIZE = 80
|
|
SFROM = 0
|
|
STO = 1
|
|
|
|
|
|
class Message:
|
|
def __init__(self, sfrom: Subject, sto: Subject, idnonce: bytes, timestamp: Optional[float],
|
|
editnonce: Optional[bytes], pcontent: Optional[bytes], econtent: Optional[bytes],
|
|
flags: str):
|
|
self.sfrom = sfrom
|
|
self.sto = sto
|
|
self.idnonce = idnonce
|
|
self.timestamp = timestamp
|
|
self.editnonce = editnonce
|
|
self.pcontent = pcontent
|
|
self.econtent = econtent
|
|
self.flags = flags
|
|
|
|
@classmethod
|
|
def send(cls, sfrom: PrivateSubject, sto: Subject, pcontent: bytes):
|
|
return cls(sfrom, sto, Encoding.nonce(), time(),
|
|
Encoding.nonce(), pcontent, None,
|
|
Flags.default).sealed()
|
|
|
|
@classmethod
|
|
def loads(cls, s: str):
|
|
d = json.loads(s)
|
|
return cls(
|
|
Subject.loads(d['sf']), Subject.loads(d['st']), Encoding.decode(d['in']), d['ts'],
|
|
Encoding.decode(d['en']), None, Encoding.decode(d['ec']),
|
|
d['fl']
|
|
)
|
|
|
|
@property
|
|
def pair(self):
|
|
return self.sfrom, self.sto
|
|
|
|
def seal(self) -> bytes:
|
|
assert isinstance(self.sfrom, PrivateSubject), 'Subject misuse'
|
|
scontent: nacl.signing.SignedMessage = self.sfrom.skey.sign(self.pcontent)
|
|
self.encrypt(scontent)
|
|
return self.econtent
|
|
|
|
def encrypt(self, scontent: bytes):
|
|
key = nacl.utils.random(nacl.secret.SecretBox.KEY_SIZE)
|
|
self.econtent = (nacl.public.SealedBox(self.sfrom.pkey).encrypt(key) +
|
|
nacl.public.SealedBox(self.sto.pkey).encrypt(key) +
|
|
nacl.secret.SecretBox(key).encrypt(scontent))
|
|
|
|
def sealed(self) -> 'Message':
|
|
self.seal()
|
|
return self
|
|
|
|
def dumps(self) -> str:
|
|
return json.dumps({
|
|
'sf': self.sfrom.dumps(),
|
|
'st': self.sto.dumps(),
|
|
'in': Encoding.encode(self.idnonce),
|
|
'ts': self.timestamp,
|
|
'en': Encoding.encode(self.editnonce),
|
|
'ec': Encoding.encode(self.econtent),
|
|
'fl': self.flags,
|
|
})
|
|
|
|
def unseal(self, s: int) -> bytes:
|
|
scontent = self.decrypt(s)
|
|
self.pcontent = self.sfrom.vkey.verify(scontent)
|
|
return self.pcontent
|
|
|
|
def decrypt(self, s: int) -> bytes:
|
|
subject = self.pair[s]
|
|
assert isinstance(subject, PrivateSubject), 'Subject misuse'
|
|
key: bytes = nacl.public.SealedBox(subject.ekey).decrypt(
|
|
[self.econtent[:KEY_SIZE], self.econtent[KEY_SIZE:2 * KEY_SIZE]][s])
|
|
return nacl.secret.SecretBox(key).decrypt(self.econtent[2 * KEY_SIZE:])
|
|
|
|
def unsealed(self, s: int):
|
|
self.unseal(s)
|
|
return self
|
|
|
|
def edit(self, pcontent: bytes) -> 'Message':
|
|
return Message(self.sfrom, self.sto, self.idnonce, None,
|
|
Encoding.nonce(), pcontent, None,
|
|
self.flags.replace('<unedited>', '<edited>')).sealed()
|
|
|
|
def edit_(self):
|
|
return self.flags_(self.flags)
|
|
|
|
def edited(self, other: 'Message'):
|
|
return self.pair == other.pair and self.idnonce == other.idnonce
|
|
|
|
def editt(self, content: bytes) -> Tuple['Message', 'Message']:
|
|
return self.flags_(self.flags), self.edit(content)
|
|
|
|
def delete(self):
|
|
return Message(self.sfrom, self.sto, self.idnonce, None,
|
|
None, None, None,
|
|
'')
|
|
|
|
def flags_(self, flags: str):
|
|
return Message(self.sfrom, self.sto, self.idnonce, None,
|
|
self.editnonce, None, None,
|
|
flags)
|
|
|
|
def alter(self, sfrom: Optional[Subject] = None, sto: Optional[Subject] = None):
|
|
if sfrom is not None:
|
|
assert self.sfrom == sfrom, 'alter mismatch'
|
|
self.sfrom = sfrom
|
|
if sto is not None:
|
|
assert self.sto == sto, 'alter mismatch'
|
|
self.sto = sto
|
|
return self
|