diff --git a/docs/scripts/traced_example.py b/docs/scripts/traced_example.py index 7b970bd..e694518 100644 --- a/docs/scripts/traced_example.py +++ b/docs/scripts/traced_example.py @@ -6,8 +6,8 @@ from contextlib import ExitStack try: sys.path.append(str((pathlib.Path(__file__).parent / '../..').absolute())) - from ptvp35 import * - from ptvp35.instrumentation import * + from ptvp35 import DbFactory, DbConnection, KVJson + from ptvp35.instrumentation import InstrumentDiskWrites, NightlyInstrumentation except: raise @@ -144,8 +144,6 @@ async def main(): with ExitStack() as es: LogWrites().enter(es) if run_all: - print('not yet properly instrumented; exiting;') - raise NotImplementedError LogEE(__import__('ptvp35').Request, '__init__').enter(es) LogEE(__import__('ptvp35').Request, 'waiting').enter(es) LogEE(__import__('ptvp35').Request, 'set_result').enter(es) @@ -155,10 +153,16 @@ async def main(): LogEE(__import__('ptvp35').LineRequest, '__init__').enter(es) LogEE(__import__('ptvp35').KVFactory, 'run').enter(es) + LogEE(__import__('ptvp35').KVFactory, '_dbset').enter(es) + LogEE(__import__('ptvp35').KVFactory, 'dbset').enter(es) + LogEE(__import__('ptvp35').KVFactory, 'dbget').enter(es) + LogEE(__import__('ptvp35').KVFactory, 'filter_value').enter(es) LogEE(__import__('ptvp35').KVFactory, 'request').enter(es) LogEE(__import__('ptvp35').KVFactory, 'free').enter(es) LogEE(__import__('ptvp35').KVFactory, 'io2db').enter(es) LogEE(__import__('ptvp35').KVFactory, 'db2io').enter(es) + LogEE(__import__('ptvp35').KVFactory, 'path2db_sync').enter(es) + LogEE(__import__('ptvp35').KVFactory, 'db2path_sync').enter(es) LogEE(__import__('ptvp35').KVRequest, '__init__').enter(es) @@ -168,80 +172,111 @@ async def main(): LogEE(__import__('ptvp35').TransactionRequest, '__init__').enter(es) - LogEE(__import__('ptvp35').DbParametres, '__init__').enter(es) + LogEE(__import__('ptvp35').DbParameters, '__init__').enter(es) LogEE(__import__('ptvp35').VirtualConnection, 'transaction').enter(es) - LogEE(__import__('ptvp35').DbConnection, '__init__').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_create_future').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_save_error_sync').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_save_error').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_queue_error').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_save_error_from_thread').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_path2db_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_db2path_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, 'get').enter(es) - ALogEE(__import__('ptvp35').DbConnection, 'set').enter(es) - LogEE(__import__('ptvp35').DbConnection, 'set_nowait').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_clear_buffer').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_compress_buffer').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_commit_compressed_buffer_sync').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_commit_compressed_buffer').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_satisfy_buffer_future').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_fail_buffer_future').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_do_commit_buffer').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_commit_buffer').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_request_buffer').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_commit_buffer_or_request_so').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_write').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_truncation_set_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_truncation_unset_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_file_truncate_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_truncation_target_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_truncate_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_assure_truncation_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_write_to_disk_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_file_write_sync').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_reload_if_oversized').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_handle_request').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_background_cycle').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_background_task').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_start_task').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_recovery_set_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_recovery_unset_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_copy_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_finish_recovery_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_build_file_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_run_in_thread').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_rebuild_file_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_file_open_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_file_close_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_reload_sync').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_reload').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_load_mmdb_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_initialize_mmdb_sync').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_load_from_file_sync').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_load_from_file').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_initialize_queue').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_initialize_running').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_initialize').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_close_buffer').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_close_queue').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_close_mmdb_sync').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_close_mmdb').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_close_running').enter(es) - ALogEE(__import__('ptvp35').DbConnection, 'aclose').enter(es) - LogEE(__import__('ptvp35').DbConnection, '_transaction_buffer').enter(es) - ALogEE(__import__('ptvp35').DbConnection, 'commit_transaction').enter(es) - LogEE(__import__('ptvp35').DbConnection, 'submit_transaction').enter(es) - LogEE(__import__('ptvp35').DbConnection, 'submit_transaction_request').enter(es) - ALogEE(__import__('ptvp35').DbConnection, 'commit').enter(es) - LogEE(__import__('ptvp35').DbConnection, 'loop').enter(es) - LogEE(__import__('ptvp35').DbConnection, 'transaction').enter(es) + LogEE(__import__('ptvp35')._Loop, '__init__').enter(es) + LogEE(__import__('ptvp35')._Loop, 'create_future').enter(es) + LogEE(__import__('ptvp35')._Loop, 'loop').enter(es) + LogEE(__import__('ptvp35')._Loop, 'run_in_thread').enter(es) - LogEE(__import__('ptvp35').DbFactory, '__init__').enter(es) - ALogEE(__import__('ptvp35').DbFactory, '__aenter__').enter(es) - ALogEE(__import__('ptvp35').DbFactory, '__aexit__').enter(es) + LogEE(__import__('ptvp35')._Errors, '__init__').enter(es) + LogEE(__import__('ptvp35')._Errors, '_save_sync').enter(es) + ALogEE(__import__('ptvp35')._Errors, '_save').enter(es) + LogEE(__import__('ptvp35')._Errors, 'save_from_thread').enter(es) + + LogEE(__import__('ptvp35')._File, '__init__').enter(es) + LogEE(__import__('ptvp35')._File, 'path').enter(es) + LogEE(__import__('ptvp35')._File, 'tell').enter(es) + LogEE(__import__('ptvp35')._File, 'write_to_disk_sync').enter(es) + LogEE(__import__('ptvp35')._File, 'open_sync').enter(es) + LogEE(__import__('ptvp35')._File, 'close_sync').enter(es) + + LogEE(__import__('ptvp35')._Backup, '__init__').enter(es) + LogEE(__import__('ptvp35')._Backup, 'file').enter(es) + LogEE(__import__('ptvp35')._Backup, 'kvfactory').enter(es) + LogEE(__import__('ptvp35')._Backup, '_copy_sync').enter(es) + LogEE(__import__('ptvp35')._Backup, '_recovery_unset_sync').enter(es) + LogEE(__import__('ptvp35')._Backup, '_finish_recovery_sync').enter(es) + LogEE(__import__('ptvp35')._Backup, '_recovery_set_sync').enter(es) + LogEE(__import__('ptvp35')._Backup, 'build_file_sync').enter(es) + LogEE(__import__('ptvp35')._Backup, '_rebuild_file_sync').enter(es) + LogEE(__import__('ptvp35')._Backup, '_reload_sync').enter(es) + LogEE(__import__('ptvp35')._Backup, 'run_in_thread').enter(es) + ALogEE(__import__('ptvp35')._Backup, '_reload').enter(es) + ALogEE(__import__('ptvp35')._Backup, 'reload_if_oversized').enter(es) + LogEE(__import__('ptvp35')._Backup, 'load_mmdb_sync').enter(es) + LogEE(__import__('ptvp35')._Backup, 'uninitialize').enter(es) + + LogEE(__import__('ptvp35')._Truncation, '__init__').enter(es) + LogEE(__import__('ptvp35')._Truncation, 'backup').enter(es) + LogEE(__import__('ptvp35')._Truncation, '_write_bytes_sync').enter(es) + LogEE(__import__('ptvp35')._Truncation, '_write_value_sync').enter(es) + LogEE(__import__('ptvp35')._Truncation, '_set_sync').enter(es) + LogEE(__import__('ptvp35')._Truncation, '_unset_sync').enter(es) + LogEE(__import__('ptvp35')._Truncation, '_target_sync').enter(es) + LogEE(__import__('ptvp35')._Truncation, '_truncate_sync').enter(es) + LogEE(__import__('ptvp35')._Truncation, 'assure_sync').enter(es) + LogEE(__import__('ptvp35')._Truncation, '_file_truncate_sync').enter(es) + LogEE(__import__('ptvp35')._Truncation, 'file_write_sync').enter(es) + + LogEE(__import__('ptvp35')._ReceivingQueue, '__init__').enter(es) + LogEE(__import__('ptvp35')._ReceivingQueue, 'submit').enter(es) + + LogEE(__import__('ptvp35')._WriteableBuffer, '__init__').enter(es) + LogEE(__import__('ptvp35')._WriteableBuffer, 'writeable').enter(es) + LogEE(__import__('ptvp35')._WriteableBuffer, 'loop').enter(es) + LogEE(__import__('ptvp35')._WriteableBuffer, '_compressed').enter(es) + LogEE(__import__('ptvp35')._WriteableBuffer, '_commit_compressed_sync').enter(es) + ALogEE(__import__('ptvp35')._WriteableBuffer, '_commit_compressed').enter(es) + LogEE(__import__('ptvp35')._WriteableBuffer, '_clear').enter(es) + LogEE(__import__('ptvp35')._WriteableBuffer, '_satisfy_future').enter(es) + LogEE(__import__('ptvp35')._WriteableBuffer, '_fail_future').enter(es) + ALogEE(__import__('ptvp35')._WriteableBuffer, '_do_commit_buffer').enter(es) + LogEE(__import__('ptvp35')._WriteableBuffer, '_request_buffer').enter(es) + ALogEE(__import__('ptvp35')._WriteableBuffer, '_commit').enter(es) + ALogEE(__import__('ptvp35')._WriteableBuffer, '_commit_or_request_so').enter(es) + ALogEE(__import__('ptvp35')._WriteableBuffer, '_write').enter(es) + ALogEE(__import__('ptvp35')._WriteableBuffer, '_handle_request').enter(es) + ALogEE(__import__('ptvp35')._WriteableBuffer, '_close').enter(es) + + LogEE(__import__('ptvp35')._Memory, '__init__').enter(es) + LogEE(__import__('ptvp35')._Memory, '_initialize_sync').enter(es) + LogEE(__import__('ptvp35')._Memory, '_load_from_file_sync').enter(es) + ALogEE(__import__('ptvp35')._Memory, '_load_from_file').enter(es) + LogEE(__import__('ptvp35')._Memory, '_close_sync').enter(es) + ALogEE(__import__('ptvp35')._Memory, '_close').enter(es) + LogEE(__import__('ptvp35')._Memory, '_transaction_buffer').enter(es) + LogEE(__import__('ptvp35')._Memory, 'get').enter(es) + LogEE(__import__('ptvp35')._Memory, 'set').enter(es) + + LogEE(__import__('ptvp35')._QueueTask, '__init__').enter(es) + ALogEE(__import__('ptvp35')._QueueTask, '_background_cycle').enter(es) + ALogEE(__import__('ptvp35')._QueueTask, '_background_task').enter(es) + ALogEE(__import__('ptvp35')._QueueTask, 'close').enter(es) + LogEE(__import__('ptvp35')._QueueTask, 'start').enter(es) + + LogEE(__import__('ptvp35')._DbConnection, '__init__').enter(es) + LogEE(__import__('ptvp35')._DbConnection, 'kvprotocol').enter(es) + LogEE(__import__('ptvp35')._DbConnection, 'get').enter(es) + ALogEE(__import__('ptvp35')._DbConnection, 'set').enter(es) + LogEE(__import__('ptvp35')._DbConnection, 'set_nowait').enter(es) + ALogEE(__import__('ptvp35')._DbConnection, '_initialize_running').enter(es) + ALogEE(__import__('ptvp35')._DbConnection, '_initialize').enter(es) + ALogEE(__import__('ptvp35')._DbConnection, 'create').enter(es) + ALogEE(__import__('ptvp35')._DbConnection, '_close_running').enter(es) + ALogEE(__import__('ptvp35')._DbConnection, 'aclose').enter(es) + ALogEE(__import__('ptvp35')._DbConnection, 'commit_transaction').enter(es) + LogEE(__import__('ptvp35')._DbConnection, 'submit_transaction').enter(es) + LogEE(__import__('ptvp35')._DbConnection, 'submit_transaction_request').enter(es) + ALogEE(__import__('ptvp35')._DbConnection, 'commit').enter(es) + LogEE(__import__('ptvp35')._DbConnection, 'loop').enter(es) + LogEE(__import__('ptvp35')._DbConnection, 'transaction').enter(es) + + LogEE(__import__('ptvp35').DbManager, '__init__').enter(es) + ALogEE(__import__('ptvp35').DbManager, '__aenter__').enter(es) + ALogEE(__import__('ptvp35').DbManager, '__aexit__').enter(es) LogEE(__import__('ptvp35').Db, '__init__').enter(es) ALogEE(__import__('ptvp35').Db, '__aenter__').enter(es) diff --git a/docs/source/usage.rst b/docs/source/usage.rst index a41f5a9..230c4ba 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -21,7 +21,7 @@ These two facts together tell that, if you intend on using the connection, you s .. code-block:: python3 import pathlib - from ptvp35 import * + from ptvp35 import DbFactory, KVJson async def main(): async with DbFactory(pathlib.Path('example.db'), kvfactory=KVJson()) as connection: @@ -31,7 +31,7 @@ Different ways to get/set a value: .. code-block:: python3 - from ptvp35 import * + from ptvp35 import DbConnection async def _main(connection: DbConnection): value0 = connection.get('increment-0', 0) diff --git a/ptvp35/__init__.py b/ptvp35/__init__.py index 416eee9..1c52c0e 100644 --- a/ptvp35/__init__.py +++ b/ptvp35/__init__.py @@ -3,10 +3,11 @@ from __future__ import annotations __all__ = ( - 'KVDELETE', + 'VDELETE', 'KVFactory', 'KVJson', 'DbConnection', + 'DbManager', 'DbFactory', 'Db', 'Transaction', @@ -21,11 +22,9 @@ import json import os import pathlib import threading -import warnings from collections.abc import Hashable -from functools import wraps from io import StringIO, UnsupportedOperation -from typing import IO, Any, Callable, TypeVar +from typing import IO, Any, TypeAlias class Request: @@ -68,7 +67,7 @@ class KVProtocol(abc.ABC): raise NotImplementedError -KVDELETE = object() +VDELETE = object() class KVFactory(KVProtocol): @@ -98,7 +97,7 @@ note: unstable signature.""" self._dbset(db, key, value, reduce) def _dbset(self, db: dict, key: Any, value: Any, reduce: bool, /) -> None: - if reduce and value is KVDELETE: + if reduce and value is VDELETE: db.pop(key, None) else: db[key] = value @@ -111,7 +110,7 @@ note: unstable signature.""" return self.filter_value(value, default) def filter_value(self, value: Any, default: Any, /) -> Any: - if value is KVDELETE: + if value is VDELETE: return default else: return value @@ -182,7 +181,7 @@ class KVJson(KVFactory): __slots__ = () def line(self, key: Any, value: Any, /) -> str: - if value is KVDELETE: + if value is VDELETE: obj = {'key': key} else: obj = {'key': key, 'value': value} @@ -202,7 +201,7 @@ class KVJson(KVFactory): def fromline(self, line: str, /) -> tuple[Any, Any]: d = json.loads(line) - return self._load_key(d['key']), d.get('value', KVDELETE) + return self._load_key(d['key']), d.get('value', VDELETE) class TransactionRequest(LineRequest): @@ -235,60 +234,10 @@ class RequestToClosedConnection(asyncio.InvalidStateError): pass -class NightlyWarning(Warning): - enabled = True - - -TDecoratedNightly = TypeVar('TDecoratedNightly', bound=Callable | type | None) - - -def nightly(decorated: TDecoratedNightly = None, prefix: str = '', stacklevel=2) -> TDecoratedNightly: - if decorated is None: - NightlyWarning.enabled = False - return None # type: ignore - - if isinstance(decorated, type): - prefix = f'{prefix}{decorated.__name__}.' - decorated.__init__ = nightly( - decorated.__init__, prefix - ) - decorated.__init_subclass__ = nightly( - decorated.__init_subclass__, prefix, stacklevel=3 if issubclass(decorated, abc.ABC) else 2 - ) - return decorated # type: ignore - if not callable(decorated): - raise TypeError - - message = f"{prefix}{decorated.__name__}" - - @wraps(decorated) - def wrap(*args, **kwargs): - if NightlyWarning.enabled: - warnings.warn(message, NightlyWarning, stacklevel=stacklevel) - return wrap.__non_nightly__(*args, **kwargs) - - if wrap.__doc__ is None or not wrap.__doc__: - wrap.__doc__ = '@nightly' - elif wrap.__doc__.startswith('@nightly'): - pass - else: - wrap.__doc__ = '@nightly\n\n' + wrap.__doc__ - - wrap.__non_nightly__ = decorated - - return wrap # type: ignore - - -warnings.filterwarnings('default', category=NightlyWarning, module='__main__') -warnings.filterwarnings('ignore', category=NightlyWarning, module='ptvp35') -nightly = nightly(nightly) # type: ignore - - -@nightly class VirtualConnection( - # abc.ABC + abc.ABC ): - """intersection of DbConnection and TransactionView functionality""" + """minimal intersection of DbConnection and TransactionView functionality""" __slots__ = () @@ -309,7 +258,6 @@ class VirtualConnection( raise NotImplementedError @abc.abstractmethod - @nightly def loop(self, /) -> asyncio.AbstractEventLoop: raise NotImplementedError @@ -317,7 +265,33 @@ class VirtualConnection( return Transaction(self) -class Loop: +class ExtendedVirtualConnection( + VirtualConnection +): + """maximal intersection of DbConnection and TransactionView functionality""" + + @abc.abstractmethod + def set_nowait(self, key: Any, value: Any, /) -> None: + raise NotImplementedError + + @abc.abstractmethod + def submit_transaction(self, delta: dict, /) -> None: + raise NotImplementedError + + @abc.abstractmethod + async def commit(self, /) -> None: + raise NotImplementedError + + +class DbInterface( + ExtendedVirtualConnection +): + @abc.abstractmethod + async def set(self, key: Any, value: Any, /) -> None: + raise NotImplementedError + + +class _Loop: __slots__ = ( '__loop', ) @@ -356,35 +330,35 @@ intended for heavy tasks.""" return future -class Errors: +class _Errors: __slots__ = ( '__path', '__loop', '__event_loop', ) - def __init__(self, path: pathlib.Path, loop: Loop, /) -> None: + def __init__(self, path: pathlib.Path, loop: _Loop, /) -> None: self.__path = path.with_name(path.name + '.error') self.__loop = loop self.__event_loop = loop.loop() - def save_sync(self, line: str, /) -> None: + def _save_sync(self, line: str, /) -> None: with self.__path.open('a') as file: file.write(line.strip() + '\n') - async def _save_error(self, line: str, /) -> None: - await self.__event_loop.run_in_executor(None, self.save_sync, line) + async def _save(self, line: str, /) -> None: + await self.__event_loop.run_in_executor(None, self._save_sync, line) - def _schedule_error(self, line: str, /) -> concurrent.futures.Future: + def _schedule(self, line: str, /) -> concurrent.futures.Future: return asyncio.run_coroutine_threadsafe( - self._save_error(line), self.__event_loop + self._save(line), self.__event_loop ) - def save_error_from_thread(self, line: str, /) -> None: - self._schedule_error(line).result() + def save_from_thread(self, line: str, /) -> None: + self._schedule(line).result() -class File: +class _File: __slots__ = ( '__path', '__file', @@ -409,7 +383,7 @@ class File: except UnsupportedOperation: pass - def _open_sync(self, /) -> None: + def open_sync(self, /) -> None: self.__file = self.__path.open('a') def close_sync(self, /) -> None: @@ -417,7 +391,7 @@ class File: del self.__file -class Backup: +class _Backup: __slots__ = ( '__file', '__kvfactory', @@ -430,15 +404,15 @@ class Backup: __initial_size: int - def __init__(self, path: pathlib.Path, kvfactory: KVFactory, loop: Loop, /) -> None: - self.__file = File(path) + def __init__(self, path: pathlib.Path, kvfactory: KVFactory, loop: _Loop, /) -> None: + self.__file = _File(path) self.__kvfactory = kvfactory self.__loop = loop self.__path = path self.__backup = path.with_name(path.name + '.backup') self.__recover = path.with_name(path.name + '.recover') - def file(self, /) -> File: + def file(self, /) -> _File: return self.__file def kvfactory(self, /) -> KVFactory: @@ -475,7 +449,7 @@ class Backup: def _reload_sync(self, /) -> None: self.__file.close_sync() self._rebuild_file_sync({}) - self.__file._open_sync() + self.__file.open_sync() def run_in_thread(self, fn, /, *args, **kwargs) -> asyncio.Future: return self.__loop.run_in_thread(self.__path.name, fn, *args, **kwargs) @@ -487,7 +461,7 @@ class Backup: if self.__file.tell() > 2 * self.__initial_size: await self._reload() - def _load_mmdb_sync(self, /) -> dict: + def load_mmdb_sync(self, /) -> dict: db = {} self._rebuild_file_sync(db) return db @@ -496,7 +470,7 @@ class Backup: del self.__initial_size -class Truncation: +class _Truncation: __slots__ = ( '__backup', '__error', @@ -506,7 +480,7 @@ class Truncation: '__flag', ) - def __init__(self, backup: Backup, error: Errors, /) -> None: + def __init__(self, backup: _Backup, error: _Errors, /) -> None: self.__backup = backup self.__error = error self.__file = backup.file() @@ -514,7 +488,7 @@ class Truncation: self.__truncate = path.with_name(path.name + '.truncate') self.__flag = path.with_name(path.name + '.truncate_flag') - def backup(self, /) -> Backup: + def backup(self, /) -> _Backup: return self.__backup def _write_bytes_sync(self, s: bytes, /) -> None: @@ -524,11 +498,11 @@ class Truncation: def _write_value_sync(self, value: int, /) -> None: self._write_bytes_sync(value.to_bytes(16, 'little')) - def set_sync(self, /) -> None: + def _set_sync(self, /) -> None: self._write_value_sync(self.__file.tell()) self.__flag.touch() - def unset_sync(self, /) -> None: + def _unset_sync(self, /) -> None: self.__flag.unlink(missing_ok=True) self.__truncate.unlink(missing_ok=True) @@ -542,36 +516,20 @@ class Truncation: def assure_sync(self, /) -> None: if self.__flag.exists(): self._truncate_sync() - self.unset_sync() + self._unset_sync() def _file_truncate_sync(self, file: IO[str], pos: int, /) -> None: file.seek(pos) - self.__error.save_error_from_thread(file.read()) + self.__error.save_from_thread(file.read()) file.truncate(pos) - -class WriteableFile: - __slots__ = ( - '__truncation', - '__backup', - '__file', - ) - - def __init__(self, truncation: Truncation, /) -> None: - self.__truncation = truncation - self.__backup = truncation.backup() - self.__file = self.__backup.file() - - def truncation(self, /) -> Truncation: - return self.__truncation - def file_write_sync(self, line: str, /) -> None: - self.__truncation.set_sync() + self._set_sync() self.__file.write_to_disk_sync(line) - self.__truncation.unset_sync() + self._unset_sync() -class ReceivingQueue: +class _ReceivingQueue: __all__ = ( '__queue', ) @@ -583,10 +541,10 @@ class ReceivingQueue: self.__queue.put_nowait(request) -class WriteableBuffer: +class _WriteableBuffer: __slots__ = ( '__buffersize', - '__writeable', + '__truncation', '__queue', '__backup', '__kvfactory', @@ -602,21 +560,21 @@ class WriteableBuffer: __buffer_requested: bool def __init__( - self, buffersize: int, writeable: WriteableFile, queue: ReceivingQueue, loop: Loop, / + self, buffersize: int, truncation: _Truncation, queue: _ReceivingQueue, loop: _Loop, / ) -> None: self.__buffersize = buffersize - self.__writeable = writeable + self.__truncation = truncation self.__queue = queue - self.__backup = writeable.truncation().backup() + self.__backup = self.__truncation.backup() self.__kvfactory = self.__backup.kvfactory() self.__loop = loop self.__event_loop = self.__loop.loop() self._clear() - def writeable(self, /) -> WriteableFile: - return self.__writeable + def writeable(self, /) -> _Truncation: + return self.__truncation - def loop(self, /) -> Loop: + def loop(self, /) -> _Loop: return self.__loop def _compressed(self, /) -> StringIO: @@ -628,7 +586,7 @@ class WriteableBuffer: return buffer def _commit_compressed_sync(self, /) -> None: - self.__writeable.file_write_sync(self._compressed().getvalue()) + self.__truncation.file_write_sync(self._compressed().getvalue()) async def _commit_compressed(self, /) -> None: await self.__event_loop.run_in_executor(None, self._commit_compressed_sync) @@ -709,7 +667,7 @@ class WriteableBuffer: del self.__buffer -class MMDB: +class _Memory: __slots__ = ( '__backup', '__truncation', @@ -721,19 +679,19 @@ class MMDB: __mmdb: dict - def __init__(self, truncation: Truncation, /) -> None: + def __init__(self, truncation: _Truncation, /) -> None: self.__truncation = truncation self.__backup = truncation.backup() self.__file = self.__backup.file() self.__kvfactory = self.__backup.kvfactory() def _initialize_sync(self, /) -> None: - self.__mmdb = self.__backup._load_mmdb_sync() + self.__mmdb = self.__backup.load_mmdb_sync() def _load_from_file_sync(self, /) -> None: self.__truncation.assure_sync() self._initialize_sync() - self.__file._open_sync() + self.__file.open_sync() async def _load_from_file(self, /) -> None: await self.__backup.run_in_thread(self._load_from_file_sync) @@ -761,7 +719,7 @@ class MMDB: self.__kvfactory.dbset(self.__mmdb, key, value) -class QueueTask: +class _QueueTask: __slots__ = ( '__queue', '__buffer', @@ -769,7 +727,7 @@ class QueueTask: '__task', ) - def __init__(self, queue: asyncio.Queue[Request], buffer: WriteableBuffer, /) -> None: + def __init__(self, queue: asyncio.Queue[Request], buffer: _WriteableBuffer, /) -> None: self.__queue = queue self.__buffer = buffer self.__event_loop = buffer.loop().loop() @@ -801,7 +759,9 @@ class QueueTask: self.__task = self.__event_loop.create_task(self._background_task()) -class DbConnection(VirtualConnection): +class _DbConnection( + DbInterface +): """note: unstable constructor signature.""" __slots__ = ( @@ -817,23 +777,22 @@ class DbConnection(VirtualConnection): '__task', ) - __mmdb: MMDB - __loop: Loop - __queue: ReceivingQueue - __task: QueueTask + __mmdb: _Memory + __loop: _Loop + __queue: _ReceivingQueue + __task: _QueueTask - def __init__(self, parametres: DbParameters, /) -> None: - self.__kvfactory = parametres.kvfactory - self.__buffersize = parametres.buffersize - self.__path = path = parametres.path - name = path.name - self.__not_running = True + def __init__(self, parameters: DbParameters, /) -> None: + self.__kvfactory = parameters.kvfactory + self.__buffersize = parameters.buffersize + self.__path = parameters.path + self.__running = False def kvprotocol(self, /) -> KVProtocol: return self.__kvfactory def get(self, key: Any, default: Any, /) -> Any: - """dict-like get with mandatory default parametre.""" + """dict-like get with mandatory default parameter.""" return self.__mmdb.get(key, default) async def set(self, key: Any, value: Any, /) -> None: @@ -851,30 +810,32 @@ class DbConnection(VirtualConnection): self.__queue.submit(request) async def _initialize_running(self, /) -> None: - self.__loop = Loop(asyncio.get_running_loop()) - error = Errors(self.__path, self.__loop) - backup = Backup(self.__path, self.__kvfactory, self.__loop) - truncation = Truncation(backup, error) - write = WriteableFile(truncation) + self.__loop = _Loop(asyncio.get_running_loop()) + truncation = _Truncation( + _Backup(self.__path, self.__kvfactory, self.__loop), + _Errors(self.__path, self.__loop), + ) queue: asyncio.Queue[Request] = asyncio.Queue() - self.__queue = ReceivingQueue(queue) - buffer = WriteableBuffer(self.__buffersize, write, self.__queue, self.__loop) - self.__mmdb = MMDB(truncation) + self.__queue = _ReceivingQueue(queue) + self.__mmdb = _Memory(truncation) await self.__mmdb._load_from_file() - self.__task = QueueTask(queue, buffer) + self.__task = _QueueTask( + queue, + _WriteableBuffer(self.__buffersize, truncation, self.__queue, self.__loop) + ) self.__task.start() async def _initialize(self, /) -> None: - if not self.__not_running: + if self.__running: raise RuntimeError - self.__not_running = False + self.__running = True await self._initialize_running() @classmethod - async def create(cls, parametres: DbParameters, /) -> 'DbConnection': + async def create(cls, parameters: DbParameters, /) -> _DbConnection: """connect to the factory. note: unstable signature.""" - dbconnection = DbConnection(parametres) + dbconnection = _DbConnection(parameters) await dbconnection._initialize() return dbconnection @@ -891,7 +852,7 @@ note: unstable signature.""" """close the connection. note: unstable signature.""" await self._close_running() - self.__not_running = True + self.__running = False async def commit_transaction(self, delta: dict, /) -> None: """hybrid of set() and dict.update(). @@ -930,47 +891,49 @@ note: unstable signature.""" self.__queue.submit(CommitRequest(future)) await future - @nightly def loop(self, /) -> asyncio.AbstractEventLoop: return self.__loop.loop() - def transaction(self, /) -> 'Transaction': - return super().transaction() + +DbConnection: TypeAlias = DbInterface -class DbFactory: +class DbManager: __slots__ = ( - '__parametres', + '__parameters', '__db', ) def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None: - self.__parametres = DbParameters( + self.__parameters = DbParameters( path, kvfactory=kvfactory, buffersize=buffersize ) - async def __aenter__(self) -> DbConnection: - self.__db = await DbConnection.create(self.__parametres) + async def __aenter__(self) -> DbInterface: + self.__db = await _DbConnection.create(self.__parameters) return self.__db async def __aexit__(self, exc_type, exc_val, exc_tb): await self.__db.aclose() -class Db(DbConnection): +DbFactory: TypeAlias = DbManager + + +class Db(_DbConnection): """simplified usecase combining the factory and the connection in one class.""" __slots__ = () def __init__(self, path: str | pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576): - DbConnection.__init__( + _DbConnection.__init__( self, DbParameters( pathlib.Path(path), kvfactory=kvfactory, buffersize=buffersize ) ) - async def __aenter__(self) -> DbConnection: + async def __aenter__(self) -> _DbConnection: await self._initialize() return self @@ -990,7 +953,9 @@ class FutureContext: await self.__future -class TransactionView(VirtualConnection): +class TransactionView( + ExtendedVirtualConnection +): """note: unstable constructor signature.""" __slots__ = ( @@ -1018,48 +983,40 @@ class TransactionView(VirtualConnection): """clear unsubmitted changes.""" self.__delta.clear() - @nightly def illuminate(self, /) -> None: """clear submitted changes, thus syncing the view (underlying the delta) with the connection.""" self.__shadow.clear() - @nightly async def ailluminate(self, /) -> None: """illuminate, then wait for submitted changes to be committed.""" async with self.future_context(): self.illuminate() - @nightly def fork(self, /) -> None: """keep delta, but forget about the shadow entirely (including making sure it's committed).""" self.illuminate() self.__subfuture = None - @nightly async def afork(self, /) -> None: """fork, then wait for submitted changes to be committed.""" async with self.future_context(): self.fork() - @nightly def clear(self, /) -> None: """clear all changes (including the shadow).""" self.rollback() self.illuminate() - @nightly async def aclear(self, /) -> None: """clear, then wait for submitted changes to be committed.""" async with self.future_context(): self.clear() - @nightly def reset(self, /) -> None: """reset transaction.""" self.clear() self.__subfuture = None - @nightly async def areset(self, /) -> None: """reset, then wait for submitted changes to be committed.""" async with self.future_context(): @@ -1154,21 +1111,18 @@ bulk analogue of DbConnection.set_nowait().""" case _: raise TypeError - @nightly async def commit_transaction(self, delta: dict, /) -> None: if not delta: return self.__delta.update(delta) await self.commit() - @nightly def submit_transaction(self, delta: dict, /) -> None: if not delta: return self.__delta.update(delta) self.submit() - @nightly def submit_transaction_request(self, delta: dict, future: asyncio.Future | None, /) -> None: def set_result(sf: asyncio.Future | None): if future is None: @@ -1187,14 +1141,9 @@ bulk analogue of DbConnection.set_nowait().""" return self.__subfuture.add_done_callback(set_result) - @nightly def loop(self, /) -> asyncio.AbstractEventLoop: return self.__loop - @nightly - def transaction(self, /) -> 'Transaction': - return super().transaction() - class Transaction: """note: unstable signature.""" diff --git a/ptvp35/instrumentation.py b/ptvp35/instrumentation.py index 14f3de4..bb804d4 100644 --- a/ptvp35/instrumentation.py +++ b/ptvp35/instrumentation.py @@ -6,7 +6,7 @@ __all__ = ('InstrumentDiskWrites', 'NightlyInstrumentation') class InstrumentDiskWrites(Instrumentation): def __init__(self, /): - super().__init__(ptvp35.File, 'write_to_disk_sync') + super().__init__(ptvp35._File, 'write_to_disk_sync') def on_write(self, line: str, /) -> None: pass diff --git a/setup.py b/setup.py index bcff2c8..819d672 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup setup( name='ptvp35', - version='1.1rc4', + version='1.1rc5', packages=['ptvp35'], url='https://gitea.ongoteam.net/PTV/ptvp35', license='MIT', diff --git a/test_delete.py b/test_delete.py index 2dc2d4c..c751a21 100644 --- a/test_delete.py +++ b/test_delete.py @@ -1,7 +1,7 @@ import asyncio import pathlib -from ptvp35 import * +from ptvp35 import DbFactory, KVJson, VDELETE async def main(): @@ -13,7 +13,7 @@ async def main(): await connection.commit() async with connection.transaction() as transaction: print(transaction.get(0, 1)) - transaction.set_nowait(0, KVFactory.DELETE) + transaction.set_nowait(0, VDELETE) print(transaction.get(0, 1)) input() print(connection.get(0, 1))