import abc import asyncio import concurrent.futures import json import os import pathlib import threading import warnings from functools import wraps from io import StringIO, UnsupportedOperation from typing import IO, Any, Callable, Hashable, TypeVar __all__ = ( 'KVFactory', 'KVJson', 'DbConnection', 'DbFactory', 'Db', 'Transaction', 'TransactionView', ) class Request: __slots__ = ( '__future', ) def __init__(self, future: asyncio.Future | None, /) -> None: self.__future = future def waiting(self, /) -> bool: return self.__future is not None def set_result(self, result, /) -> None: if self.__future is not None: self.__future.set_result(result) def set_exception(self, exception, /): if self.__future is not None: self.__future.set_exception(exception) async def wait(self, /): if self.__future is not None: await self.__future class LineRequest(Request): __slots__ = ( 'line', ) def __init__(self, line: str, /, *, future: asyncio.Future | None) -> None: super().__init__(future) self.line = line class KVFactory: """this class is for working with already normalised data values, not for data transformation (e.g. reducing keys to a common form). that functionality may be added in the future, though, probably, only for custom DbConnection implementations. note: unstable signature.""" __slots__ = () @abc.abstractmethod def line(self, key: Any, value: Any, /) -> str: """line must contain exactly one '\\n' at exactly the end if the line is not empty. note: other forms of requests will later be represented by different methods or by instances of Action class.""" raise NotImplementedError @abc.abstractmethod def fromline(self, line: str, /) -> tuple[Any, Any]: """inverse of line(). note: unstable signature.""" raise NotImplementedError def run(self, line: str, db: dict, /) -> None: """run request against the db. extensible to allow forms of requests other than set. note: unstable signature.""" key, value = self.fromline(line) db[key] = value def request(self, key: Any, value: Any, /, *, future: asyncio.Future | None) -> 'KVRequest': """form request with Future. low-level API. note: unstable signature.""" return KVRequest(key, value, future=future, factory=self) def free(self, key: Any, value: Any, /) -> 'KVRequest': """result free from Future. note: unstable signature.""" return self.request(key, value, future=None) def io2db(self, io: IO[str], db: dict, /) -> int: """note: unstable signature.""" size = 0 for line in io: self.run(line, db) size += len(line) return size def db2io(self, db: dict, io: IO[str], /) -> int: """note: unstable signature.""" size = 0 for key, value in db.items(): size += io.write(self.line(key, value)) return size class KVRequest(LineRequest): __slots__ = ( '__factory', 'key', 'value', ) def __init__(self, key: Any, value: Any, /, *, future: asyncio.Future | None, factory: KVFactory): super().__init__(factory.line(key, value), future=future) self.__factory = factory self.key = key self.value = value class CommitRequest(Request): __slots__ = () class UnknownRequestType(TypeError): __slots__ = () class KVJson(KVFactory): """note: unstable signature.""" __slots__ = () def line(self, key: Any, value: Any, /) -> str: return json.dumps({'key': key, 'value': value}) + '\n' def _load_key(self, key: Any, /) -> Hashable: """note: unstable signature.""" if isinstance(key, Hashable): return key elif isinstance(key, list): return tuple(map(self._load_key, key)) elif isinstance(key, dict): return tuple((self._load_key(k), self._load_key(v)) for k, v in key.items()) else: raise TypeError( 'unknown KVJson key type, cannot convert to hashable' ) def fromline(self, line: str, /) -> tuple[Any, Any]: d = json.loads(line) return self._load_key(d['key']), d['value'] class TransactionRequest(LineRequest): __slots__ = ( 'buffer', ) def __init__(self, buffer: StringIO, /, *, future: asyncio.Future | None): super().__init__(buffer.getvalue(), future=future) self.buffer = buffer class DbParametres: __slots__ = ( 'path', 'kvfactory', 'buffersize', ) def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize: int) -> None: self.path = path """note: unstable signature.""" self.kvfactory = kvfactory """note: unstable signature.""" self.buffersize = buffersize """note: unstable signature.""" class RequestToClosedConnection(asyncio.InvalidStateError): pass class NightlyWarning(Warning): enabled = True TDecoratedNightly = TypeVar('TDecoratedNightly', bound=Callable | type | bool) def nightly(decorated: TDecoratedNightly = None, prefix: str = '', stacklevel=2) -> TDecoratedNightly: if decorated is None: NightlyWarning.enabled = False return None # type: ignore if isinstance(decorated, type): prefix = f'{prefix}{decorated.__name__}.' decorated.__init__ = nightly( decorated.__init__, prefix ) decorated.__init_subclass__ = nightly( decorated.__init_subclass__, prefix, stacklevel=3 ) return decorated # type: ignore assert callable(decorated) message = f"{prefix}{decorated.__name__}" @wraps(decorated) def wrap(*args, **kwargs): if NightlyWarning.enabled: warnings.warn(message, NightlyWarning, stacklevel=stacklevel) return wrap.__non_nightly__(*args, **kwargs) if wrap.__doc__ is None or not wrap.__doc__: wrap.__doc__ = '@nightly' elif wrap.__doc__.startswith('@nightly'): pass else: wrap.__doc__ = '@nightly\n\n' + wrap.__doc__ wrap.__non_nightly__ = decorated return wrap # type: ignore warnings.filterwarnings('default', category=NightlyWarning, module='__main__') warnings.filterwarnings('ignore', category=NightlyWarning, module='ptvp35') nightly = nightly(nightly) # type: ignore @nightly class VirtualConnection(abc.ABC): """intersection of DbConnection and TransactionView functionality""" __slots__ = () @abc.abstractmethod def get(self, key: Any, default: Any, /): raise NotImplementedError @abc.abstractmethod async def commit_transaction(self, delta: dict, /) -> None: raise NotImplementedError @abc.abstractmethod def submit_transaction_request(self, delta: dict, future: asyncio.Future | None, /) -> None: raise NotImplementedError @abc.abstractmethod @nightly def loop(self, /) -> asyncio.AbstractEventLoop: raise NotImplementedError @abc.abstractmethod def transaction(self, /) -> 'Transaction': return Transaction(self) class DbConnection(VirtualConnection): """note: unstable constructor signature.""" __slots__ = ( '__kvfactory', '__buffersize', '__path', '__path_backup', '__path_recover', '__path_error', '__path_truncate', '__path_truncate_flag', '__not_running', '__mmdb', '__loop', '__queue', '__file', '__buffer', '__buffer_future', '__buffer_requested', '__task', '__initial_size', ) __mmdb: dict __loop: asyncio.AbstractEventLoop __queue: asyncio.Queue[Request] __file: IO[str] __buffer: StringIO __buffer_future: asyncio.Future __buffer_requested: bool __task: asyncio.Future __initial_size: int def __init__( self, parametres: DbParametres, / ) -> None: self.__kvfactory = parametres.kvfactory self.__buffersize = parametres.buffersize self.__path = path = parametres.path name = self.__path.name self.__path_backup = path.with_name(name + '.backup') self.__path_recover = path.with_name(name + '.recover') self.__path_error = path.with_name(name + '.error') self.__path_truncate = path.with_name(name + '.truncate') self.__path_truncate_flag = path.with_name(name + '.truncate_flag') self.__not_running = True def _create_future(self, /) -> asyncio.Future: return self.__loop.create_future() def _save_error_sync(self, line: str, /) -> None: with self.__path_error.open('a') as file: file.write(line.strip() + '\n') async def _save_error(self, line: str, /) -> None: await self.__loop.run_in_executor(None, self._save_error_sync, line) def _queue_error(self, line: str, /) -> concurrent.futures.Future: return asyncio.run_coroutine_threadsafe( self._save_error(line), self.__loop ) def _save_error_from_thread(self, line: str, /) -> None: self._queue_error(line).result() def _path2db_sync(self, path: pathlib.Path, db: dict, /) -> int: path.touch() with path.open('r') as file: return self.__kvfactory.io2db(file, db) def _db2path_sync(self, db: dict, path: pathlib.Path, /) -> int: with path.open('w') as file: return self.__kvfactory.db2io(db, file) def get(self, key: Any, default: Any, /): """dict-like get with mandatory default parametre.""" return self.__mmdb.get(key, default) async def set(self, key: Any, value: Any, /) -> None: """set the value and wait until it's written to disk.""" future = self._create_future() request = self.__kvfactory.request(key, value, future=future) self.__mmdb[key] = value self.__queue.put_nowait(request) await future def set_nowait(self, key: Any, value: Any, /) -> None: """set value and add write-to-disk request to queue.""" self.__mmdb[key] = value self.__queue.put_nowait(self.__kvfactory.free(key, value)) def _clear_buffer(self, /) -> None: self.__buffer = StringIO() self.__buffer_future = self._create_future() self.__buffer_requested = False def _compress_buffer(self, /) -> StringIO: self.__buffer.seek(0) bufferdb = {} self.__kvfactory.io2db(self.__buffer, bufferdb) buffer = StringIO() self.__kvfactory.db2io(bufferdb, buffer) return buffer def _commit_compressed_buffer_sync(self, /) -> None: self._file_write_sync(self._compress_buffer().getvalue()) async def _commit_compressed_buffer(self, /) -> None: await self.__loop.run_in_executor(None, self._commit_compressed_buffer_sync) def _satisfy_buffer_future(self, /) -> None: self.__buffer_future.set_result(None) self._clear_buffer() def _fail_buffer_future(self, exception: BaseException, /) -> None: self.__buffer_future.set_exception(exception) self._clear_buffer() async def _do_commit_buffer(self, /) -> None: try: await self._commit_compressed_buffer() except Exception as e: self._fail_buffer_future(e) else: self._satisfy_buffer_future() async def _commit_buffer(self, /) -> None: if self.__buffer.tell(): await self._do_commit_buffer() await self._reload_if_oversized() elif self.__buffer_requested: self._satisfy_buffer_future() def _request_buffer(self, request: Request, /) -> None: if request.waiting(): def callback(bf: asyncio.Future) -> None: if (e := bf.exception()) is not None: request.set_exception(e) else: request.set_result(None) self.__buffer_future.exception self.__buffer_future.add_done_callback( callback ) if not self.__buffer_requested: self.__buffer_requested = True self.__queue.put_nowait(CommitRequest(None)) async def _commit_buffer_or_request_so(self, request: Request, /) -> None: if self.__buffer.tell() >= self.__buffersize: await self._commit_buffer() request.set_result(None) else: self._request_buffer(request) async def _write(self, line: str, request: Request, /) -> None: self.__buffer.write(line) await self._commit_buffer_or_request_so(request) def _truncation_set_sync(self, /) -> None: self.__path_truncate.write_bytes( self.__file.tell().to_bytes(16, 'little') ) self.__path_truncate_flag.touch() def _truncation_unset_sync(self, /) -> None: self.__path_truncate_flag.unlink(missing_ok=True) self.__path_truncate.unlink(missing_ok=True) def _file_truncate_sync(self, file: IO[str], pos: int, /) -> None: file.seek(pos) self._save_error_from_thread(file.read()) file.truncate(pos) def _truncation_target_sync(self, /) -> int: return int.from_bytes(self.__path_truncate.read_bytes(), 'little') def _truncate_sync(self, /) -> None: with self.__path.open('r+') as file: self._file_truncate_sync(file, self._truncation_target_sync()) def _assure_truncation_sync(self, /) -> None: if self.__path_truncate_flag.exists(): self._truncate_sync() self._truncation_unset_sync() def _write_to_disk_sync(self, line: str, /) -> None: self.__file.write(line) self.__file.flush() try: os.fsync(self.__file.fileno()) except UnsupportedOperation: pass def _file_write_sync(self, line: str, /) -> None: self._truncation_set_sync() self._write_to_disk_sync(line) self._truncation_unset_sync() async def _reload_if_oversized(self, /) -> None: if self.__file.tell() > 2 * self.__initial_size: await self._reload() async def _handle_request(self, request: Request, /) -> None: if isinstance(request, LineRequest): await self._write(request.line, request) elif isinstance(request, CommitRequest): await self._commit_buffer() request.set_result(None) else: raise UnknownRequestType async def _background_cycle(self, /) -> None: request: Request = await self.__queue.get() try: await self._handle_request(request) except Exception as e: request.set_exception(e) finally: self.__queue.task_done() async def _background_task(self, /) -> None: while True: await self._background_cycle() def _start_task(self, /) -> None: self.__task = self.__loop.create_task(self._background_task()) def _recovery_set_sync(self, db: dict, /) -> None: self.__initial_size = self._db2path_sync(db, self.__path_backup) self.__path_recover.touch() def _recovery_unset_sync(self, /) -> None: self.__path_recover.unlink() self.__path_backup.unlink() def _copy_sync(self, db: dict | None, /) -> None: if db is None: db = {} self._path2db_sync(self.__path_backup, db) self._db2path_sync(db, self.__path) def _finish_recovery_sync(self, db: dict | None, /) -> None: self._copy_sync(db) self._recovery_unset_sync() def _build_file_sync(self, db: dict, /) -> None: self._recovery_set_sync(db) self._finish_recovery_sync(db) def _run_in_thread(self, fn, /, *args, **kwargs) -> asyncio.Future: """we are using our own thread to guarantee as much of autonomy and control as possible. intended for heavy tasks.""" future = self._create_future() def wrap() -> None: try: result = fn(*args, **kwargs) except Exception as exception: self.__loop.call_soon_threadsafe( future.set_exception, exception ) else: self.__loop.call_soon_threadsafe( future.set_result, result ) name = getattr(fn, '__name__', '?') threading.Thread( target=wrap, name=f'persistence5-{self.__path.name}-{name}' ).start() return future def _rebuild_file_sync(self, db: dict, /) -> None: if self.__path_recover.exists(): self._finish_recovery_sync(None) self._path2db_sync(self.__path, db) self._build_file_sync(db) def _file_open_sync(self, /) -> None: self.__file = self.__path.open('a') def _file_close_sync(self, /) -> None: self.__file.close() del self.__file def _reload_sync(self, /) -> None: self._file_close_sync() self._rebuild_file_sync({}) self._file_open_sync() async def _reload(self, /) -> None: await self._run_in_thread(self._reload_sync) def _load_mmdb_sync(self, /) -> dict: db = {} self._rebuild_file_sync(db) return db def _initialize_mmdb_sync(self, /) -> None: self.__mmdb = self._load_mmdb_sync() def _load_from_file_sync(self, /) -> None: self._assure_truncation_sync() self._initialize_mmdb_sync() self._file_open_sync() async def _load_from_file(self, /) -> None: await self._run_in_thread(self._load_from_file_sync) def _initialize_queue(self, /) -> None: self.__queue = asyncio.Queue() self._clear_buffer() async def _initialize_running(self, /) -> None: self.__loop = asyncio.get_running_loop() self._initialize_queue() await self._load_from_file() self._start_task() async def _initialize(self, /) -> None: assert self.__not_running self.__not_running = False await self._initialize_running() @classmethod async def create(cls, parametres: DbParametres, /) -> 'DbConnection': """connect to the factory. note: unstable signature.""" dbconnection = cls(parametres) await dbconnection._initialize() return dbconnection async def _close_buffer(self, /): await self._commit_buffer() if not self.__buffer_future.done(): self.__buffer_future.set_exception(RequestToClosedConnection()) assert isinstance(self.__buffer_future.exception(), RequestToClosedConnection) del self.__buffer_requested del self.__buffer_future del self.__buffer async def _close_queue(self, /) -> None: if not self.__task.done(): await self.__queue.join() self.__task.cancel() del self.__task assert self.__queue.empty() del self.__queue await self._close_buffer() def _close_mmdb_sync(self, /) -> None: self._file_close_sync() self._build_file_sync(self.__mmdb) del self.__mmdb del self.__initial_size async def _close_mmdb(self, /) -> None: await self._run_in_thread(self._close_mmdb_sync) async def _close_running(self, /) -> None: await self._close_queue() await self._close_mmdb() del self.__loop async def aclose(self, /) -> None: """close the connection. note: unstable signature.""" await self._close_running() self.__not_running = True def _transaction_buffer(self, delta: dict, /) -> StringIO: buffer = StringIO() self.__kvfactory.db2io(delta, buffer) self.__mmdb.update(delta) return buffer async def commit_transaction(self, delta: dict, /) -> None: """hybrid of set() and dict.update(). note: unstable signature.""" if not delta: return buffer = self._transaction_buffer(delta) future = self._create_future() self.__queue.put_nowait(TransactionRequest(buffer, future=future)) await future def submit_transaction(self, delta: dict, /) -> None: """hybrid of set_nowait() and dict.update(). _nowait analogue of commit_transaction(). note: this method was added only for async-sync symmetry with commit_transaction(). note: unstable signature.""" if not delta: return buffer = self._transaction_buffer(delta) self.__queue.put_nowait(TransactionRequest(buffer, future=None)) def submit_transaction_request(self, delta: dict, future: asyncio.Future | None, /) -> None: """low-level API. for high-level synchronisation use transaction() instead. note: unstable signature.""" if not delta: if future: future.set_result(None) return buffer = self._transaction_buffer(delta) self.__queue.put_nowait(TransactionRequest(buffer, future=future)) async def commit(self, /) -> None: """wait until all requests queued before are completed.""" future = self._create_future() self.__queue.put_nowait(CommitRequest(future)) await future @nightly def loop(self, /) -> asyncio.AbstractEventLoop: return self.__loop def transaction(self, /) -> 'Transaction': return Transaction(self) class DbFactory: __slots__ = ( '__parametres', '__db', ) def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None: self.__parametres = DbParametres( path, kvfactory=kvfactory, buffersize=buffersize ) async def __aenter__(self) -> DbConnection: self.__db = await DbConnection.create(self.__parametres) return self.__db async def __aexit__(self, exc_type, exc_val, exc_tb): await self.__db.aclose() class Db(DbConnection): """simplified usecase combining the factory and the connection in one class.""" __slots__ = () def __init__(self, path: str | pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576): DbConnection.__init__( self, DbParametres( pathlib.Path(path), kvfactory=kvfactory, buffersize=buffersize ) ) async def __aenter__(self) -> DbConnection: await self._initialize() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.aclose() class FutureContext: def __init__(self, future: asyncio.Future | None) -> None: self.__future = future async def __aenter__(self) -> None: pass async def __aexit__(self, exc_type, exc_val, exc_tb): if self.__future is not None: await self.__future class TransactionView(VirtualConnection): """note: unstable constructor signature.""" __slots__ = ( '__delta', '__shadow', '__connection', '__loop', '__subfuture', ) def __init__(self, delta: dict, connection: VirtualConnection, /) -> None: self.__delta = delta self.__shadow = {} self.__connection = connection self.__loop = connection.loop() self.__subfuture: asyncio.Future | None = None def future_context(self, /) -> FutureContext: """do something (inside of async with), then wait for submitted changes to be committed.""" return FutureContext(self.__subfuture) def rollback(self, /) -> None: """clear unsubmitted changes.""" self.__delta.clear() @nightly def illuminate(self, /) -> None: """clear submitted changes, thus syncing the view (underlying the delta) with the connection.""" self.__shadow.clear() @nightly async def ailluminate(self, /) -> None: """illuminate, then wait for submitted changes to be committed.""" async with self.future_context(): self.illuminate() @nightly def fork(self, /) -> None: """keep delta, but forget about the shadow entirely (including making sure it's committed).""" self.illuminate() self.__subfuture = None @nightly async def afork(self, /) -> None: """fork, then wait for submitted changes to be committed.""" async with self.future_context(): self.fork() @nightly def clear(self, /) -> None: """clear all changes (including the shadow).""" self.rollback() self.illuminate() @nightly async def aclear(self, /) -> None: """clear, then wait for submitted changes to be committed.""" async with self.future_context(): self.clear() @nightly def reset(self, /) -> None: """reset transaction.""" self.clear() self.__subfuture = None @nightly async def areset(self, /) -> None: """reset, then wait for submitted changes to be committed.""" async with self.future_context(): self.reset() def get(self, key: Any, default: Any, /): """get from the delta (unsubmitted), else from the shadow (submitted), else from the connection.""" if key in self.__delta: return self.__delta[key] if key in self.__shadow: return self.__shadow[key] return self.__connection.get(key, default) def set_nowait(self, key: Any, value: Any, /) -> None: """note: unlike the corresponding db method, this one does not catch serialisation errors early.""" self.__delta[key] = value def _delta(self, /) -> dict: delta = self.__delta.copy() self.__shadow |= delta self.__delta.clear() return delta async def commit(self, /) -> None: """bulk analogue of DbConnection.set().""" # for persistence5('s forks) developers: # q: why not self.__subfuture = None here? # a: run two commit calls concurrently. one will quit early and fail semantically. # we also never implicitly reset self.__subfuture because newly created future may depend on it. # q: why not self.submit() inside FC block? # a: that would require using FC block later once more + that future may include extra submitted changes; # so one would need to do submit, then do an empty FC block. that maybe introduced in the future # q: why use if delta? # a: to have code symmetric to that of submit + to not create an extra coroutine. # note: q&a comments above may become obsolete async with self.future_context(): delta = self._delta() if delta: await self.__connection.commit_transaction(delta) def submit(self, /) -> None: """submit changes. _nowait analogue of commit(). bulk analogue of DbConnection.set_nowait().""" # for persistence5('s forks) developers: # q: why use if delta? # a: to have code symmetric to that of commit + to not create an extra future. # note: q&a comments above may become obsolete delta = self._delta() if delta: future = self.__loop.create_future() self.__connection.submit_transaction_request(delta, future) self.__subfuture = self._gather(self.__subfuture, future) def _do_gather(self, left: asyncio.Future, right: asyncio.Future) -> asyncio.Future: future = self.__loop.create_future() def rcallback(fr: asyncio.Future) -> None: if (e := fr.exception()) is not None: future.set_exception(e) else: future.set_result(None) def lcallback(fl: asyncio.Future) -> None: if (e := fl.exception()) is not None: future.set_exception(e) else: right.add_done_callback(rcallback) left.add_done_callback(lcallback) return future def _reduce_future(self, future: asyncio.Future | None) -> asyncio.Future | None: if future is None or future.done() and future.exception() is None: return None else: return future def _gather(self, left: asyncio.Future | None, right: asyncio.Future | None) -> asyncio.Future | None: match (self._reduce_future(left), self._reduce_future(right)): case None, ofr: return ofr case ofl, None: return ofl case asyncio.Future() as fl, asyncio.Future() as fr: return self._do_gather(fl, fr) case _: raise TypeError @nightly async def commit_transaction(self, delta: dict, /) -> None: if not delta: return self.__delta.update(delta) await self.commit() @nightly def submit_transaction(self, delta: dict, /) -> None: if not delta: return self.__delta.update(delta) self.submit() @nightly def submit_transaction_request(self, delta: dict, future: asyncio.Future | None, /) -> None: def set_result(sf: asyncio.Future | None): if future is None: pass elif sf is None or (e := sf.exception()) is None: future.set_result(None) else: future.set_exception(e) if not delta: set_result(None) return self.submit_transaction(delta) if self.__subfuture is None: set_result(None) return self.__subfuture.add_done_callback(set_result) @nightly def loop(self, /) -> asyncio.AbstractEventLoop: return self.__loop @nightly def transaction(self, /) -> 'Transaction': return super().transaction() class Transaction: """note: unstable signature.""" __slots__ = ( '__connection', '__view', '__running', ) __view: TransactionView def __init__(self, connection: VirtualConnection, /) -> None: self.__connection = connection self.__running = False async def __aenter__(self) -> TransactionView: return self.__enter__() async def __aexit__(self, exc_type, exc_val, exc_tb): if exc_type is None: await self.__view.commit() else: self.__view.rollback() self._clean() def _clean(self, /) -> None: del self.__view self.__running = False def __enter__(self) -> TransactionView: assert not self.__running self.__running = True self.__view = TransactionView({}, self.__connection) return self.__view def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is None: self.__view.submit() else: self.__view.rollback() self._clean()