1.1rc5: DbInterface

This commit is contained in:
AF 2023-02-09 19:32:29 +00:00
parent 28f964a3e6
commit 1ccd2009ee
6 changed files with 239 additions and 255 deletions

View File

@ -6,8 +6,8 @@ from contextlib import ExitStack
try:
sys.path.append(str((pathlib.Path(__file__).parent / '../..').absolute()))
from ptvp35 import *
from ptvp35.instrumentation import *
from ptvp35 import DbFactory, DbConnection, KVJson
from ptvp35.instrumentation import InstrumentDiskWrites, NightlyInstrumentation
except:
raise
@ -144,8 +144,6 @@ async def main():
with ExitStack() as es:
LogWrites().enter(es)
if run_all:
print('not yet properly instrumented; exiting;')
raise NotImplementedError
LogEE(__import__('ptvp35').Request, '__init__').enter(es)
LogEE(__import__('ptvp35').Request, 'waiting').enter(es)
LogEE(__import__('ptvp35').Request, 'set_result').enter(es)
@ -155,10 +153,16 @@ async def main():
LogEE(__import__('ptvp35').LineRequest, '__init__').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'run').enter(es)
LogEE(__import__('ptvp35').KVFactory, '_dbset').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'dbset').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'dbget').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'filter_value').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'request').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'free').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'io2db').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'db2io').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'path2db_sync').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'db2path_sync').enter(es)
LogEE(__import__('ptvp35').KVRequest, '__init__').enter(es)
@ -168,80 +172,111 @@ async def main():
LogEE(__import__('ptvp35').TransactionRequest, '__init__').enter(es)
LogEE(__import__('ptvp35').DbParametres, '__init__').enter(es)
LogEE(__import__('ptvp35').DbParameters, '__init__').enter(es)
LogEE(__import__('ptvp35').VirtualConnection, 'transaction').enter(es)
LogEE(__import__('ptvp35').DbConnection, '__init__').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_create_future').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_save_error_sync').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_save_error').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_queue_error').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_save_error_from_thread').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_path2db_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_db2path_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, 'get').enter(es)
ALogEE(__import__('ptvp35').DbConnection, 'set').enter(es)
LogEE(__import__('ptvp35').DbConnection, 'set_nowait').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_clear_buffer').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_compress_buffer').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_commit_compressed_buffer_sync').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_commit_compressed_buffer').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_satisfy_buffer_future').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_fail_buffer_future').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_do_commit_buffer').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_commit_buffer').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_request_buffer').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_commit_buffer_or_request_so').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_write').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_truncation_set_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_truncation_unset_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_file_truncate_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_truncation_target_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_truncate_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_assure_truncation_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_write_to_disk_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_file_write_sync').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_reload_if_oversized').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_handle_request').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_background_cycle').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_background_task').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_start_task').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_recovery_set_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_recovery_unset_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_copy_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_finish_recovery_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_build_file_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_run_in_thread').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_rebuild_file_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_file_open_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_file_close_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_reload_sync').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_reload').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_load_mmdb_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_initialize_mmdb_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_load_from_file_sync').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_load_from_file').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_initialize_queue').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_initialize_running').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_initialize').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_close_buffer').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_close_queue').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_close_mmdb_sync').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_close_mmdb').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_close_running').enter(es)
ALogEE(__import__('ptvp35').DbConnection, 'aclose').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_transaction_buffer').enter(es)
ALogEE(__import__('ptvp35').DbConnection, 'commit_transaction').enter(es)
LogEE(__import__('ptvp35').DbConnection, 'submit_transaction').enter(es)
LogEE(__import__('ptvp35').DbConnection, 'submit_transaction_request').enter(es)
ALogEE(__import__('ptvp35').DbConnection, 'commit').enter(es)
LogEE(__import__('ptvp35').DbConnection, 'loop').enter(es)
LogEE(__import__('ptvp35').DbConnection, 'transaction').enter(es)
LogEE(__import__('ptvp35')._Loop, '__init__').enter(es)
LogEE(__import__('ptvp35')._Loop, 'create_future').enter(es)
LogEE(__import__('ptvp35')._Loop, 'loop').enter(es)
LogEE(__import__('ptvp35')._Loop, 'run_in_thread').enter(es)
LogEE(__import__('ptvp35').DbFactory, '__init__').enter(es)
ALogEE(__import__('ptvp35').DbFactory, '__aenter__').enter(es)
ALogEE(__import__('ptvp35').DbFactory, '__aexit__').enter(es)
LogEE(__import__('ptvp35')._Errors, '__init__').enter(es)
LogEE(__import__('ptvp35')._Errors, '_save_sync').enter(es)
ALogEE(__import__('ptvp35')._Errors, '_save').enter(es)
LogEE(__import__('ptvp35')._Errors, 'save_from_thread').enter(es)
LogEE(__import__('ptvp35')._File, '__init__').enter(es)
LogEE(__import__('ptvp35')._File, 'path').enter(es)
LogEE(__import__('ptvp35')._File, 'tell').enter(es)
LogEE(__import__('ptvp35')._File, 'write_to_disk_sync').enter(es)
LogEE(__import__('ptvp35')._File, 'open_sync').enter(es)
LogEE(__import__('ptvp35')._File, 'close_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, '__init__').enter(es)
LogEE(__import__('ptvp35')._Backup, 'file').enter(es)
LogEE(__import__('ptvp35')._Backup, 'kvfactory').enter(es)
LogEE(__import__('ptvp35')._Backup, '_copy_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, '_recovery_unset_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, '_finish_recovery_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, '_recovery_set_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, 'build_file_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, '_rebuild_file_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, '_reload_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, 'run_in_thread').enter(es)
ALogEE(__import__('ptvp35')._Backup, '_reload').enter(es)
ALogEE(__import__('ptvp35')._Backup, 'reload_if_oversized').enter(es)
LogEE(__import__('ptvp35')._Backup, 'load_mmdb_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, 'uninitialize').enter(es)
LogEE(__import__('ptvp35')._Truncation, '__init__').enter(es)
LogEE(__import__('ptvp35')._Truncation, 'backup').enter(es)
LogEE(__import__('ptvp35')._Truncation, '_write_bytes_sync').enter(es)
LogEE(__import__('ptvp35')._Truncation, '_write_value_sync').enter(es)
LogEE(__import__('ptvp35')._Truncation, '_set_sync').enter(es)
LogEE(__import__('ptvp35')._Truncation, '_unset_sync').enter(es)
LogEE(__import__('ptvp35')._Truncation, '_target_sync').enter(es)
LogEE(__import__('ptvp35')._Truncation, '_truncate_sync').enter(es)
LogEE(__import__('ptvp35')._Truncation, 'assure_sync').enter(es)
LogEE(__import__('ptvp35')._Truncation, '_file_truncate_sync').enter(es)
LogEE(__import__('ptvp35')._Truncation, 'file_write_sync').enter(es)
LogEE(__import__('ptvp35')._ReceivingQueue, '__init__').enter(es)
LogEE(__import__('ptvp35')._ReceivingQueue, 'submit').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, '__init__').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, 'writeable').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, 'loop').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, '_compressed').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, '_commit_compressed_sync').enter(es)
ALogEE(__import__('ptvp35')._WriteableBuffer, '_commit_compressed').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, '_clear').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, '_satisfy_future').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, '_fail_future').enter(es)
ALogEE(__import__('ptvp35')._WriteableBuffer, '_do_commit_buffer').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, '_request_buffer').enter(es)
ALogEE(__import__('ptvp35')._WriteableBuffer, '_commit').enter(es)
ALogEE(__import__('ptvp35')._WriteableBuffer, '_commit_or_request_so').enter(es)
ALogEE(__import__('ptvp35')._WriteableBuffer, '_write').enter(es)
ALogEE(__import__('ptvp35')._WriteableBuffer, '_handle_request').enter(es)
ALogEE(__import__('ptvp35')._WriteableBuffer, '_close').enter(es)
LogEE(__import__('ptvp35')._Memory, '__init__').enter(es)
LogEE(__import__('ptvp35')._Memory, '_initialize_sync').enter(es)
LogEE(__import__('ptvp35')._Memory, '_load_from_file_sync').enter(es)
ALogEE(__import__('ptvp35')._Memory, '_load_from_file').enter(es)
LogEE(__import__('ptvp35')._Memory, '_close_sync').enter(es)
ALogEE(__import__('ptvp35')._Memory, '_close').enter(es)
LogEE(__import__('ptvp35')._Memory, '_transaction_buffer').enter(es)
LogEE(__import__('ptvp35')._Memory, 'get').enter(es)
LogEE(__import__('ptvp35')._Memory, 'set').enter(es)
LogEE(__import__('ptvp35')._QueueTask, '__init__').enter(es)
ALogEE(__import__('ptvp35')._QueueTask, '_background_cycle').enter(es)
ALogEE(__import__('ptvp35')._QueueTask, '_background_task').enter(es)
ALogEE(__import__('ptvp35')._QueueTask, 'close').enter(es)
LogEE(__import__('ptvp35')._QueueTask, 'start').enter(es)
LogEE(__import__('ptvp35')._DbConnection, '__init__').enter(es)
LogEE(__import__('ptvp35')._DbConnection, 'kvprotocol').enter(es)
LogEE(__import__('ptvp35')._DbConnection, 'get').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, 'set').enter(es)
LogEE(__import__('ptvp35')._DbConnection, 'set_nowait').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, '_initialize_running').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, '_initialize').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, 'create').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, '_close_running').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, 'aclose').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, 'commit_transaction').enter(es)
LogEE(__import__('ptvp35')._DbConnection, 'submit_transaction').enter(es)
LogEE(__import__('ptvp35')._DbConnection, 'submit_transaction_request').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, 'commit').enter(es)
LogEE(__import__('ptvp35')._DbConnection, 'loop').enter(es)
LogEE(__import__('ptvp35')._DbConnection, 'transaction').enter(es)
LogEE(__import__('ptvp35').DbManager, '__init__').enter(es)
ALogEE(__import__('ptvp35').DbManager, '__aenter__').enter(es)
ALogEE(__import__('ptvp35').DbManager, '__aexit__').enter(es)
LogEE(__import__('ptvp35').Db, '__init__').enter(es)
ALogEE(__import__('ptvp35').Db, '__aenter__').enter(es)

View File

@ -21,7 +21,7 @@ These two facts together tell that, if you intend on using the connection, you s
.. code-block:: python3
import pathlib
from ptvp35 import *
from ptvp35 import DbFactory, KVJson
async def main():
async with DbFactory(pathlib.Path('example.db'), kvfactory=KVJson()) as connection:
@ -31,7 +31,7 @@ Different ways to get/set a value:
.. code-block:: python3
from ptvp35 import *
from ptvp35 import DbConnection
async def _main(connection: DbConnection):
value0 = connection.get('increment-0', 0)

View File

@ -3,10 +3,11 @@
from __future__ import annotations
__all__ = (
'KVDELETE',
'VDELETE',
'KVFactory',
'KVJson',
'DbConnection',
'DbManager',
'DbFactory',
'Db',
'Transaction',
@ -21,11 +22,9 @@ import json
import os
import pathlib
import threading
import warnings
from collections.abc import Hashable
from functools import wraps
from io import StringIO, UnsupportedOperation
from typing import IO, Any, Callable, TypeVar
from typing import IO, Any, TypeAlias
class Request:
@ -68,7 +67,7 @@ class KVProtocol(abc.ABC):
raise NotImplementedError
KVDELETE = object()
VDELETE = object()
class KVFactory(KVProtocol):
@ -98,7 +97,7 @@ note: unstable signature."""
self._dbset(db, key, value, reduce)
def _dbset(self, db: dict, key: Any, value: Any, reduce: bool, /) -> None:
if reduce and value is KVDELETE:
if reduce and value is VDELETE:
db.pop(key, None)
else:
db[key] = value
@ -111,7 +110,7 @@ note: unstable signature."""
return self.filter_value(value, default)
def filter_value(self, value: Any, default: Any, /) -> Any:
if value is KVDELETE:
if value is VDELETE:
return default
else:
return value
@ -182,7 +181,7 @@ class KVJson(KVFactory):
__slots__ = ()
def line(self, key: Any, value: Any, /) -> str:
if value is KVDELETE:
if value is VDELETE:
obj = {'key': key}
else:
obj = {'key': key, 'value': value}
@ -202,7 +201,7 @@ class KVJson(KVFactory):
def fromline(self, line: str, /) -> tuple[Any, Any]:
d = json.loads(line)
return self._load_key(d['key']), d.get('value', KVDELETE)
return self._load_key(d['key']), d.get('value', VDELETE)
class TransactionRequest(LineRequest):
@ -235,60 +234,10 @@ class RequestToClosedConnection(asyncio.InvalidStateError):
pass
class NightlyWarning(Warning):
enabled = True
TDecoratedNightly = TypeVar('TDecoratedNightly', bound=Callable | type | None)
def nightly(decorated: TDecoratedNightly = None, prefix: str = '', stacklevel=2) -> TDecoratedNightly:
if decorated is None:
NightlyWarning.enabled = False
return None # type: ignore
if isinstance(decorated, type):
prefix = f'{prefix}{decorated.__name__}.'
decorated.__init__ = nightly(
decorated.__init__, prefix
)
decorated.__init_subclass__ = nightly(
decorated.__init_subclass__, prefix, stacklevel=3 if issubclass(decorated, abc.ABC) else 2
)
return decorated # type: ignore
if not callable(decorated):
raise TypeError
message = f"{prefix}{decorated.__name__}"
@wraps(decorated)
def wrap(*args, **kwargs):
if NightlyWarning.enabled:
warnings.warn(message, NightlyWarning, stacklevel=stacklevel)
return wrap.__non_nightly__(*args, **kwargs)
if wrap.__doc__ is None or not wrap.__doc__:
wrap.__doc__ = '@nightly'
elif wrap.__doc__.startswith('@nightly'):
pass
else:
wrap.__doc__ = '@nightly\n\n' + wrap.__doc__
wrap.__non_nightly__ = decorated
return wrap # type: ignore
warnings.filterwarnings('default', category=NightlyWarning, module='__main__')
warnings.filterwarnings('ignore', category=NightlyWarning, module='ptvp35')
nightly = nightly(nightly) # type: ignore
@nightly
class VirtualConnection(
# abc.ABC
abc.ABC
):
"""intersection of DbConnection and TransactionView functionality"""
"""minimal intersection of DbConnection and TransactionView functionality"""
__slots__ = ()
@ -309,7 +258,6 @@ class VirtualConnection(
raise NotImplementedError
@abc.abstractmethod
@nightly
def loop(self, /) -> asyncio.AbstractEventLoop:
raise NotImplementedError
@ -317,7 +265,33 @@ class VirtualConnection(
return Transaction(self)
class Loop:
class ExtendedVirtualConnection(
VirtualConnection
):
"""maximal intersection of DbConnection and TransactionView functionality"""
@abc.abstractmethod
def set_nowait(self, key: Any, value: Any, /) -> None:
raise NotImplementedError
@abc.abstractmethod
def submit_transaction(self, delta: dict, /) -> None:
raise NotImplementedError
@abc.abstractmethod
async def commit(self, /) -> None:
raise NotImplementedError
class DbInterface(
ExtendedVirtualConnection
):
@abc.abstractmethod
async def set(self, key: Any, value: Any, /) -> None:
raise NotImplementedError
class _Loop:
__slots__ = (
'__loop',
)
@ -356,35 +330,35 @@ intended for heavy tasks."""
return future
class Errors:
class _Errors:
__slots__ = (
'__path',
'__loop',
'__event_loop',
)
def __init__(self, path: pathlib.Path, loop: Loop, /) -> None:
def __init__(self, path: pathlib.Path, loop: _Loop, /) -> None:
self.__path = path.with_name(path.name + '.error')
self.__loop = loop
self.__event_loop = loop.loop()
def save_sync(self, line: str, /) -> None:
def _save_sync(self, line: str, /) -> None:
with self.__path.open('a') as file:
file.write(line.strip() + '\n')
async def _save_error(self, line: str, /) -> None:
await self.__event_loop.run_in_executor(None, self.save_sync, line)
async def _save(self, line: str, /) -> None:
await self.__event_loop.run_in_executor(None, self._save_sync, line)
def _schedule_error(self, line: str, /) -> concurrent.futures.Future:
def _schedule(self, line: str, /) -> concurrent.futures.Future:
return asyncio.run_coroutine_threadsafe(
self._save_error(line), self.__event_loop
self._save(line), self.__event_loop
)
def save_error_from_thread(self, line: str, /) -> None:
self._schedule_error(line).result()
def save_from_thread(self, line: str, /) -> None:
self._schedule(line).result()
class File:
class _File:
__slots__ = (
'__path',
'__file',
@ -409,7 +383,7 @@ class File:
except UnsupportedOperation:
pass
def _open_sync(self, /) -> None:
def open_sync(self, /) -> None:
self.__file = self.__path.open('a')
def close_sync(self, /) -> None:
@ -417,7 +391,7 @@ class File:
del self.__file
class Backup:
class _Backup:
__slots__ = (
'__file',
'__kvfactory',
@ -430,15 +404,15 @@ class Backup:
__initial_size: int
def __init__(self, path: pathlib.Path, kvfactory: KVFactory, loop: Loop, /) -> None:
self.__file = File(path)
def __init__(self, path: pathlib.Path, kvfactory: KVFactory, loop: _Loop, /) -> None:
self.__file = _File(path)
self.__kvfactory = kvfactory
self.__loop = loop
self.__path = path
self.__backup = path.with_name(path.name + '.backup')
self.__recover = path.with_name(path.name + '.recover')
def file(self, /) -> File:
def file(self, /) -> _File:
return self.__file
def kvfactory(self, /) -> KVFactory:
@ -475,7 +449,7 @@ class Backup:
def _reload_sync(self, /) -> None:
self.__file.close_sync()
self._rebuild_file_sync({})
self.__file._open_sync()
self.__file.open_sync()
def run_in_thread(self, fn, /, *args, **kwargs) -> asyncio.Future:
return self.__loop.run_in_thread(self.__path.name, fn, *args, **kwargs)
@ -487,7 +461,7 @@ class Backup:
if self.__file.tell() > 2 * self.__initial_size:
await self._reload()
def _load_mmdb_sync(self, /) -> dict:
def load_mmdb_sync(self, /) -> dict:
db = {}
self._rebuild_file_sync(db)
return db
@ -496,7 +470,7 @@ class Backup:
del self.__initial_size
class Truncation:
class _Truncation:
__slots__ = (
'__backup',
'__error',
@ -506,7 +480,7 @@ class Truncation:
'__flag',
)
def __init__(self, backup: Backup, error: Errors, /) -> None:
def __init__(self, backup: _Backup, error: _Errors, /) -> None:
self.__backup = backup
self.__error = error
self.__file = backup.file()
@ -514,7 +488,7 @@ class Truncation:
self.__truncate = path.with_name(path.name + '.truncate')
self.__flag = path.with_name(path.name + '.truncate_flag')
def backup(self, /) -> Backup:
def backup(self, /) -> _Backup:
return self.__backup
def _write_bytes_sync(self, s: bytes, /) -> None:
@ -524,11 +498,11 @@ class Truncation:
def _write_value_sync(self, value: int, /) -> None:
self._write_bytes_sync(value.to_bytes(16, 'little'))
def set_sync(self, /) -> None:
def _set_sync(self, /) -> None:
self._write_value_sync(self.__file.tell())
self.__flag.touch()
def unset_sync(self, /) -> None:
def _unset_sync(self, /) -> None:
self.__flag.unlink(missing_ok=True)
self.__truncate.unlink(missing_ok=True)
@ -542,36 +516,20 @@ class Truncation:
def assure_sync(self, /) -> None:
if self.__flag.exists():
self._truncate_sync()
self.unset_sync()
self._unset_sync()
def _file_truncate_sync(self, file: IO[str], pos: int, /) -> None:
file.seek(pos)
self.__error.save_error_from_thread(file.read())
self.__error.save_from_thread(file.read())
file.truncate(pos)
class WriteableFile:
__slots__ = (
'__truncation',
'__backup',
'__file',
)
def __init__(self, truncation: Truncation, /) -> None:
self.__truncation = truncation
self.__backup = truncation.backup()
self.__file = self.__backup.file()
def truncation(self, /) -> Truncation:
return self.__truncation
def file_write_sync(self, line: str, /) -> None:
self.__truncation.set_sync()
self._set_sync()
self.__file.write_to_disk_sync(line)
self.__truncation.unset_sync()
self._unset_sync()
class ReceivingQueue:
class _ReceivingQueue:
__all__ = (
'__queue',
)
@ -583,10 +541,10 @@ class ReceivingQueue:
self.__queue.put_nowait(request)
class WriteableBuffer:
class _WriteableBuffer:
__slots__ = (
'__buffersize',
'__writeable',
'__truncation',
'__queue',
'__backup',
'__kvfactory',
@ -602,21 +560,21 @@ class WriteableBuffer:
__buffer_requested: bool
def __init__(
self, buffersize: int, writeable: WriteableFile, queue: ReceivingQueue, loop: Loop, /
self, buffersize: int, truncation: _Truncation, queue: _ReceivingQueue, loop: _Loop, /
) -> None:
self.__buffersize = buffersize
self.__writeable = writeable
self.__truncation = truncation
self.__queue = queue
self.__backup = writeable.truncation().backup()
self.__backup = self.__truncation.backup()
self.__kvfactory = self.__backup.kvfactory()
self.__loop = loop
self.__event_loop = self.__loop.loop()
self._clear()
def writeable(self, /) -> WriteableFile:
return self.__writeable
def writeable(self, /) -> _Truncation:
return self.__truncation
def loop(self, /) -> Loop:
def loop(self, /) -> _Loop:
return self.__loop
def _compressed(self, /) -> StringIO:
@ -628,7 +586,7 @@ class WriteableBuffer:
return buffer
def _commit_compressed_sync(self, /) -> None:
self.__writeable.file_write_sync(self._compressed().getvalue())
self.__truncation.file_write_sync(self._compressed().getvalue())
async def _commit_compressed(self, /) -> None:
await self.__event_loop.run_in_executor(None, self._commit_compressed_sync)
@ -709,7 +667,7 @@ class WriteableBuffer:
del self.__buffer
class MMDB:
class _Memory:
__slots__ = (
'__backup',
'__truncation',
@ -721,19 +679,19 @@ class MMDB:
__mmdb: dict
def __init__(self, truncation: Truncation, /) -> None:
def __init__(self, truncation: _Truncation, /) -> None:
self.__truncation = truncation
self.__backup = truncation.backup()
self.__file = self.__backup.file()
self.__kvfactory = self.__backup.kvfactory()
def _initialize_sync(self, /) -> None:
self.__mmdb = self.__backup._load_mmdb_sync()
self.__mmdb = self.__backup.load_mmdb_sync()
def _load_from_file_sync(self, /) -> None:
self.__truncation.assure_sync()
self._initialize_sync()
self.__file._open_sync()
self.__file.open_sync()
async def _load_from_file(self, /) -> None:
await self.__backup.run_in_thread(self._load_from_file_sync)
@ -761,7 +719,7 @@ class MMDB:
self.__kvfactory.dbset(self.__mmdb, key, value)
class QueueTask:
class _QueueTask:
__slots__ = (
'__queue',
'__buffer',
@ -769,7 +727,7 @@ class QueueTask:
'__task',
)
def __init__(self, queue: asyncio.Queue[Request], buffer: WriteableBuffer, /) -> None:
def __init__(self, queue: asyncio.Queue[Request], buffer: _WriteableBuffer, /) -> None:
self.__queue = queue
self.__buffer = buffer
self.__event_loop = buffer.loop().loop()
@ -801,7 +759,9 @@ class QueueTask:
self.__task = self.__event_loop.create_task(self._background_task())
class DbConnection(VirtualConnection):
class _DbConnection(
DbInterface
):
"""note: unstable constructor signature."""
__slots__ = (
@ -817,23 +777,22 @@ class DbConnection(VirtualConnection):
'__task',
)
__mmdb: MMDB
__loop: Loop
__queue: ReceivingQueue
__task: QueueTask
__mmdb: _Memory
__loop: _Loop
__queue: _ReceivingQueue
__task: _QueueTask
def __init__(self, parametres: DbParameters, /) -> None:
self.__kvfactory = parametres.kvfactory
self.__buffersize = parametres.buffersize
self.__path = path = parametres.path
name = path.name
self.__not_running = True
def __init__(self, parameters: DbParameters, /) -> None:
self.__kvfactory = parameters.kvfactory
self.__buffersize = parameters.buffersize
self.__path = parameters.path
self.__running = False
def kvprotocol(self, /) -> KVProtocol:
return self.__kvfactory
def get(self, key: Any, default: Any, /) -> Any:
"""dict-like get with mandatory default parametre."""
"""dict-like get with mandatory default parameter."""
return self.__mmdb.get(key, default)
async def set(self, key: Any, value: Any, /) -> None:
@ -851,30 +810,32 @@ class DbConnection(VirtualConnection):
self.__queue.submit(request)
async def _initialize_running(self, /) -> None:
self.__loop = Loop(asyncio.get_running_loop())
error = Errors(self.__path, self.__loop)
backup = Backup(self.__path, self.__kvfactory, self.__loop)
truncation = Truncation(backup, error)
write = WriteableFile(truncation)
self.__loop = _Loop(asyncio.get_running_loop())
truncation = _Truncation(
_Backup(self.__path, self.__kvfactory, self.__loop),
_Errors(self.__path, self.__loop),
)
queue: asyncio.Queue[Request] = asyncio.Queue()
self.__queue = ReceivingQueue(queue)
buffer = WriteableBuffer(self.__buffersize, write, self.__queue, self.__loop)
self.__mmdb = MMDB(truncation)
self.__queue = _ReceivingQueue(queue)
self.__mmdb = _Memory(truncation)
await self.__mmdb._load_from_file()
self.__task = QueueTask(queue, buffer)
self.__task = _QueueTask(
queue,
_WriteableBuffer(self.__buffersize, truncation, self.__queue, self.__loop)
)
self.__task.start()
async def _initialize(self, /) -> None:
if not self.__not_running:
if self.__running:
raise RuntimeError
self.__not_running = False
self.__running = True
await self._initialize_running()
@classmethod
async def create(cls, parametres: DbParameters, /) -> 'DbConnection':
async def create(cls, parameters: DbParameters, /) -> _DbConnection:
"""connect to the factory.
note: unstable signature."""
dbconnection = DbConnection(parametres)
dbconnection = _DbConnection(parameters)
await dbconnection._initialize()
return dbconnection
@ -891,7 +852,7 @@ note: unstable signature."""
"""close the connection.
note: unstable signature."""
await self._close_running()
self.__not_running = True
self.__running = False
async def commit_transaction(self, delta: dict, /) -> None:
"""hybrid of set() and dict.update().
@ -930,47 +891,49 @@ note: unstable signature."""
self.__queue.submit(CommitRequest(future))
await future
@nightly
def loop(self, /) -> asyncio.AbstractEventLoop:
return self.__loop.loop()
def transaction(self, /) -> 'Transaction':
return super().transaction()
DbConnection: TypeAlias = DbInterface
class DbFactory:
class DbManager:
__slots__ = (
'__parametres',
'__parameters',
'__db',
)
def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None:
self.__parametres = DbParameters(
self.__parameters = DbParameters(
path, kvfactory=kvfactory, buffersize=buffersize
)
async def __aenter__(self) -> DbConnection:
self.__db = await DbConnection.create(self.__parametres)
async def __aenter__(self) -> DbInterface:
self.__db = await _DbConnection.create(self.__parameters)
return self.__db
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.__db.aclose()
class Db(DbConnection):
DbFactory: TypeAlias = DbManager
class Db(_DbConnection):
"""simplified usecase combining the factory and the connection in one class."""
__slots__ = ()
def __init__(self, path: str | pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576):
DbConnection.__init__(
_DbConnection.__init__(
self,
DbParameters(
pathlib.Path(path), kvfactory=kvfactory, buffersize=buffersize
)
)
async def __aenter__(self) -> DbConnection:
async def __aenter__(self) -> _DbConnection:
await self._initialize()
return self
@ -990,7 +953,9 @@ class FutureContext:
await self.__future
class TransactionView(VirtualConnection):
class TransactionView(
ExtendedVirtualConnection
):
"""note: unstable constructor signature."""
__slots__ = (
@ -1018,48 +983,40 @@ class TransactionView(VirtualConnection):
"""clear unsubmitted changes."""
self.__delta.clear()
@nightly
def illuminate(self, /) -> None:
"""clear submitted changes, thus syncing the view (underlying the delta) with the connection."""
self.__shadow.clear()
@nightly
async def ailluminate(self, /) -> None:
"""illuminate, then wait for submitted changes to be committed."""
async with self.future_context():
self.illuminate()
@nightly
def fork(self, /) -> None:
"""keep delta, but forget about the shadow entirely (including making sure it's committed)."""
self.illuminate()
self.__subfuture = None
@nightly
async def afork(self, /) -> None:
"""fork, then wait for submitted changes to be committed."""
async with self.future_context():
self.fork()
@nightly
def clear(self, /) -> None:
"""clear all changes (including the shadow)."""
self.rollback()
self.illuminate()
@nightly
async def aclear(self, /) -> None:
"""clear, then wait for submitted changes to be committed."""
async with self.future_context():
self.clear()
@nightly
def reset(self, /) -> None:
"""reset transaction."""
self.clear()
self.__subfuture = None
@nightly
async def areset(self, /) -> None:
"""reset, then wait for submitted changes to be committed."""
async with self.future_context():
@ -1154,21 +1111,18 @@ bulk analogue of DbConnection.set_nowait()."""
case _:
raise TypeError
@nightly
async def commit_transaction(self, delta: dict, /) -> None:
if not delta:
return
self.__delta.update(delta)
await self.commit()
@nightly
def submit_transaction(self, delta: dict, /) -> None:
if not delta:
return
self.__delta.update(delta)
self.submit()
@nightly
def submit_transaction_request(self, delta: dict, future: asyncio.Future | None, /) -> None:
def set_result(sf: asyncio.Future | None):
if future is None:
@ -1187,14 +1141,9 @@ bulk analogue of DbConnection.set_nowait()."""
return
self.__subfuture.add_done_callback(set_result)
@nightly
def loop(self, /) -> asyncio.AbstractEventLoop:
return self.__loop
@nightly
def transaction(self, /) -> 'Transaction':
return super().transaction()
class Transaction:
"""note: unstable signature."""

View File

@ -6,7 +6,7 @@ __all__ = ('InstrumentDiskWrites', 'NightlyInstrumentation')
class InstrumentDiskWrites(Instrumentation):
def __init__(self, /):
super().__init__(ptvp35.File, 'write_to_disk_sync')
super().__init__(ptvp35._File, 'write_to_disk_sync')
def on_write(self, line: str, /) -> None:
pass

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name='ptvp35',
version='1.1rc4',
version='1.1rc5',
packages=['ptvp35'],
url='https://gitea.ongoteam.net/PTV/ptvp35',
license='MIT',

View File

@ -1,7 +1,7 @@
import asyncio
import pathlib
from ptvp35 import *
from ptvp35 import DbFactory, KVJson, VDELETE
async def main():
@ -13,7 +13,7 @@ async def main():
await connection.commit()
async with connection.transaction() as transaction:
print(transaction.get(0, 1))
transaction.set_nowait(0, KVFactory.DELETE)
transaction.set_nowait(0, VDELETE)
print(transaction.get(0, 1))
input()
print(connection.get(0, 1))