From 28f964a3e66de642baf1e0b10fe5bed793493621 Mon Sep 17 00:00:00 2001 From: timofey Date: Fri, 3 Feb 2023 14:38:25 +0000 Subject: [PATCH] connection classes separation --- docs/scripts/traced_example.py | 2 + ptvp35/__init__.py | 798 ++++++++++++++++++++------------- ptvp35/instrumentation.py | 2 +- 3 files changed, 498 insertions(+), 304 deletions(-) diff --git a/docs/scripts/traced_example.py b/docs/scripts/traced_example.py index e2e1c5a..7b970bd 100644 --- a/docs/scripts/traced_example.py +++ b/docs/scripts/traced_example.py @@ -144,6 +144,8 @@ async def main(): with ExitStack() as es: LogWrites().enter(es) if run_all: + print('not yet properly instrumented; exiting;') + raise NotImplementedError LogEE(__import__('ptvp35').Request, '__init__').enter(es) LogEE(__import__('ptvp35').Request, 'waiting').enter(es) LogEE(__import__('ptvp35').Request, 'set_result').enter(es) diff --git a/ptvp35/__init__.py b/ptvp35/__init__.py index 997f79c..416eee9 100644 --- a/ptvp35/__init__.py +++ b/ptvp35/__init__.py @@ -1,5 +1,7 @@ # Licensed under MIT License. Copyright: 2022-2023 PARRRATE TNV. +from __future__ import annotations + __all__ = ( 'KVDELETE', 'KVFactory', @@ -12,7 +14,6 @@ __all__ = ( 'FutureContext', ) - import abc import asyncio import concurrent.futures @@ -42,7 +43,7 @@ class Request: if self.__future is not None: self.__future.set_result(result) - def set_exception(self, exception, /): + def set_exception(self, exception, /) -> None: if self.__future is not None: self.__future.set_exception(exception) @@ -63,7 +64,7 @@ class LineRequest(Request): class KVProtocol(abc.ABC): @abc.abstractmethod - def dbget(self, db: dict, key: Any, default: Any, /): + def dbget(self, db: dict, key: Any, default: Any, /) -> Any: raise NotImplementedError @@ -96,32 +97,32 @@ note: unstable signature.""" key, value = self.fromline(line) self._dbset(db, key, value, reduce) - def _dbset(self, db: dict, key: Any, value: Any, reduce: bool, /): + def _dbset(self, db: dict, key: Any, value: Any, reduce: bool, /) -> None: if reduce and value is KVDELETE: db.pop(key, None) else: db[key] = value - def dbset(self, db: dict, key: Any, value: Any, /): + def dbset(self, db: dict, key: Any, value: Any, /) -> None: self._dbset(db, key, value, True) - def dbget(self, db: dict, key: Any, default: Any, /): + def dbget(self, db: dict, key: Any, default: Any, /) -> Any: value = db.get(key, default) return self.filter_value(value, default) - def filter_value(self, value: Any, default: Any, /): + def filter_value(self, value: Any, default: Any, /) -> Any: if value is KVDELETE: return default else: return value - def request(self, key: Any, value: Any, /, *, future: asyncio.Future | None) -> 'KVRequest': + def request(self, key: Any, value: Any, /, *, future: asyncio.Future | None) -> KVRequest: """form request with Future. low-level API. note: unstable signature.""" return KVRequest(key, value, future=future, factory=self) - def free(self, key: Any, value: Any, /) -> 'KVRequest': + def free(self, key: Any, value: Any, /) -> KVRequest: """result free from Future. note: unstable signature.""" return self.request(key, value, future=None) @@ -141,6 +142,17 @@ note: unstable signature.""" size += io.write(self.line(key, value)) return size + def path2db_sync(self, path: pathlib.Path, db: dict, /) -> int: + path.touch() + with path.open('r') as file: + return self.io2db(file, db, True) + + def db2path_sync(self, db: dict, path: pathlib.Path, /) -> int: + with path.open('w') as file: + initial_size = self.db2io(db, file) + os.fsync(file.fileno()) + return initial_size + class KVRequest(LineRequest): __slots__ = ( @@ -203,7 +215,7 @@ class TransactionRequest(LineRequest): self.buffer = buffer -class DbParametres: +class DbParameters: __slots__ = ( 'path', 'kvfactory', @@ -241,7 +253,7 @@ def nightly(decorated: TDecoratedNightly = None, prefix: str = '', stacklevel=2) decorated.__init__, prefix ) decorated.__init_subclass__ = nightly( - decorated.__init_subclass__, prefix, stacklevel=3 + decorated.__init_subclass__, prefix, stacklevel=3 if issubclass(decorated, abc.ABC) else 2 ) return decorated # type: ignore if not callable(decorated): @@ -273,7 +285,9 @@ nightly = nightly(nightly) # type: ignore @nightly -class VirtualConnection(abc.ABC): +class VirtualConnection( + # abc.ABC +): """intersection of DbConnection and TransactionView functionality""" __slots__ = () @@ -299,116 +313,313 @@ class VirtualConnection(abc.ABC): def loop(self, /) -> asyncio.AbstractEventLoop: raise NotImplementedError - def transaction(self, /) -> 'Transaction': + def transaction(self, /) -> Transaction: return Transaction(self) -class DbConnection(VirtualConnection): - """note: unstable constructor signature.""" - +class Loop: __slots__ = ( - '__kvfactory', - '__buffersize', - '__path', - '__path_backup', - '__path_recover', - '__path_error', - '__path_truncate', - '__path_truncate_flag', - '__not_running', - '__mmdb', '__loop', - '__queue', - '__file', - '__buffer', - '__buffer_future', - '__buffer_requested', - '__task', - '__initial_size', ) - __mmdb: dict - __loop: asyncio.AbstractEventLoop - __queue: asyncio.Queue[Request] - __file: IO[str] - __buffer: StringIO - __buffer_future: asyncio.Future - __buffer_requested: bool - __task: asyncio.Future - __initial_size: int + def __init__(self, loop: asyncio.AbstractEventLoop, /) -> None: + self.__loop = loop - def __init__( - self, - parametres: DbParametres, - / - ) -> None: - self.__kvfactory = parametres.kvfactory - self.__buffersize = parametres.buffersize - self.__path = path = parametres.path - name = self.__path.name - self.__path_backup = path.with_name(name + '.backup') - self.__path_recover = path.with_name(name + '.recover') - self.__path_error = path.with_name(name + '.error') - self.__path_truncate = path.with_name(name + '.truncate') - self.__path_truncate_flag = path.with_name(name + '.truncate_flag') - self.__not_running = True - - def _create_future(self, /) -> asyncio.Future: + def create_future(self, /) -> asyncio.Future: return self.__loop.create_future() - def _save_error_sync(self, line: str, /) -> None: - with self.__path_error.open('a') as file: + def loop(self, /) -> asyncio.AbstractEventLoop: + return self.__loop + + def run_in_thread(self, name: str, fn, /, *args, **kwargs) -> asyncio.Future: + """we are using our own thread to guarantee as much of autonomy and control as possible. +intended for heavy tasks.""" + future = self.create_future() + + def wrap() -> None: + try: + result = fn(*args, **kwargs) + except Exception as exception: + self.__loop.call_soon_threadsafe( + future.set_exception, exception + ) + else: + self.__loop.call_soon_threadsafe( + future.set_result, result + ) + fname = getattr(fn, '__name__', '?') + threading.Thread( + target=wrap, + name=f'persistence5-{name}-{fname}' + ).start() + + return future + + +class Errors: + __slots__ = ( + '__path', + '__loop', + '__event_loop', + ) + + def __init__(self, path: pathlib.Path, loop: Loop, /) -> None: + self.__path = path.with_name(path.name + '.error') + self.__loop = loop + self.__event_loop = loop.loop() + + def save_sync(self, line: str, /) -> None: + with self.__path.open('a') as file: file.write(line.strip() + '\n') async def _save_error(self, line: str, /) -> None: - await self.__loop.run_in_executor(None, self._save_error_sync, line) + await self.__event_loop.run_in_executor(None, self.save_sync, line) - def _queue_error(self, line: str, /) -> concurrent.futures.Future: + def _schedule_error(self, line: str, /) -> concurrent.futures.Future: return asyncio.run_coroutine_threadsafe( - self._save_error(line), self.__loop + self._save_error(line), self.__event_loop ) - def _save_error_from_thread(self, line: str, /) -> None: - self._queue_error(line).result() + def save_error_from_thread(self, line: str, /) -> None: + self._schedule_error(line).result() - def _path2db_sync(self, path: pathlib.Path, db: dict, /) -> int: - path.touch() - with path.open('r') as file: - return self.__kvfactory.io2db(file, db, True) - def _db2path_sync(self, db: dict, path: pathlib.Path, /) -> int: - with path.open('w') as file: - initial_size = self.__kvfactory.db2io(db, file) - os.fsync(file.fileno()) - return initial_size +class File: + __slots__ = ( + '__path', + '__file', + ) - def kvprotocol(self, /) -> KVProtocol: + __file: IO[str] + + def __init__(self, path: pathlib.Path, /) -> None: + self.__path = path + + def path(self, /) -> pathlib.Path: + return self.__path + + def tell(self, /) -> int: + return self.__file.tell() + + def write_to_disk_sync(self, line: str, /) -> None: + self.__file.write(line) + self.__file.flush() + try: + os.fsync(self.__file.fileno()) + except UnsupportedOperation: + pass + + def _open_sync(self, /) -> None: + self.__file = self.__path.open('a') + + def close_sync(self, /) -> None: + self.__file.close() + del self.__file + + +class Backup: + __slots__ = ( + '__file', + '__kvfactory', + '__loop', + '__path', + '__backup', + '__recover', + '__initial_size', + ) + + __initial_size: int + + def __init__(self, path: pathlib.Path, kvfactory: KVFactory, loop: Loop, /) -> None: + self.__file = File(path) + self.__kvfactory = kvfactory + self.__loop = loop + self.__path = path + self.__backup = path.with_name(path.name + '.backup') + self.__recover = path.with_name(path.name + '.recover') + + def file(self, /) -> File: + return self.__file + + def kvfactory(self, /) -> KVFactory: return self.__kvfactory - def get(self, key: Any, default: Any, /): - """dict-like get with mandatory default parametre.""" - return self.__kvfactory.dbget(self.__mmdb, key, default) + def _copy_sync(self, db: dict | None, /) -> None: + if db is None: + db = {} + self.__kvfactory.path2db_sync(self.__backup, db) + self.__kvfactory.db2path_sync(db, self.__path) - async def set(self, key: Any, value: Any, /) -> None: - """set the value and wait until it's written to disk.""" - future = self._create_future() - request = self.__kvfactory.request(key, value, future=future) - self.__kvfactory.dbset(self.__mmdb, key, value) - self.__queue.put_nowait(request) - await future + def _recovery_unset_sync(self, /) -> None: + self.__recover.unlink() + self.__backup.unlink() - def set_nowait(self, key: Any, value: Any, /) -> None: - """set value and add write-to-disk request to queue.""" - request = self.__kvfactory.free(key, value) - self.__kvfactory.dbset(self.__mmdb, key, value) + def _finish_recovery_sync(self, db: dict | None, /) -> None: + self._copy_sync(db) + self._recovery_unset_sync() + + def _recovery_set_sync(self, db: dict, /) -> None: + self.__initial_size = self.__kvfactory.db2path_sync(db, self.__backup) + self.__recover.touch() + + def build_file_sync(self, db: dict, /) -> None: + self._recovery_set_sync(db) + self._finish_recovery_sync(db) + + def _rebuild_file_sync(self, db: dict, /) -> None: + if self.__recover.exists(): + self._finish_recovery_sync(None) + self.__kvfactory.path2db_sync(self.__path, db) + self.build_file_sync(db) + + def _reload_sync(self, /) -> None: + self.__file.close_sync() + self._rebuild_file_sync({}) + self.__file._open_sync() + + def run_in_thread(self, fn, /, *args, **kwargs) -> asyncio.Future: + return self.__loop.run_in_thread(self.__path.name, fn, *args, **kwargs) + + async def _reload(self, /) -> None: + await self.run_in_thread(self._reload_sync) + + async def reload_if_oversized(self, /) -> None: + if self.__file.tell() > 2 * self.__initial_size: + await self._reload() + + def _load_mmdb_sync(self, /) -> dict: + db = {} + self._rebuild_file_sync(db) + return db + + def uninitialize(self, /) -> None: + del self.__initial_size + + +class Truncation: + __slots__ = ( + '__backup', + '__error', + '__file', + '__path', + '__truncate', + '__flag', + ) + + def __init__(self, backup: Backup, error: Errors, /) -> None: + self.__backup = backup + self.__error = error + self.__file = backup.file() + self.__path = path = self.__file.path() + self.__truncate = path.with_name(path.name + '.truncate') + self.__flag = path.with_name(path.name + '.truncate_flag') + + def backup(self, /) -> Backup: + return self.__backup + + def _write_bytes_sync(self, s: bytes, /) -> None: + # consider subclassing/rewriting to use `os.fsync` + self.__truncate.write_bytes(s) + + def _write_value_sync(self, value: int, /) -> None: + self._write_bytes_sync(value.to_bytes(16, 'little')) + + def set_sync(self, /) -> None: + self._write_value_sync(self.__file.tell()) + self.__flag.touch() + + def unset_sync(self, /) -> None: + self.__flag.unlink(missing_ok=True) + self.__truncate.unlink(missing_ok=True) + + def _target_sync(self, /) -> int: + return int.from_bytes(self.__truncate.read_bytes(), 'little') + + def _truncate_sync(self, /) -> None: + with self.__path.open('r+') as file: + self._file_truncate_sync(file, self._target_sync()) + + def assure_sync(self, /) -> None: + if self.__flag.exists(): + self._truncate_sync() + self.unset_sync() + + def _file_truncate_sync(self, file: IO[str], pos: int, /) -> None: + file.seek(pos) + self.__error.save_error_from_thread(file.read()) + file.truncate(pos) + + +class WriteableFile: + __slots__ = ( + '__truncation', + '__backup', + '__file', + ) + + def __init__(self, truncation: Truncation, /) -> None: + self.__truncation = truncation + self.__backup = truncation.backup() + self.__file = self.__backup.file() + + def truncation(self, /) -> Truncation: + return self.__truncation + + def file_write_sync(self, line: str, /) -> None: + self.__truncation.set_sync() + self.__file.write_to_disk_sync(line) + self.__truncation.unset_sync() + + +class ReceivingQueue: + __all__ = ( + '__queue', + ) + + def __init__(self, queue: asyncio.Queue[Request], /) -> None: + self.__queue: asyncio.Queue[Request] = queue + + def submit(self, request: Request, /) -> None: self.__queue.put_nowait(request) - def _clear_buffer(self, /) -> None: - self.__buffer = StringIO() - self.__buffer_future = self._create_future() - self.__buffer_requested = False - def _compress_buffer(self, /) -> StringIO: +class WriteableBuffer: + __slots__ = ( + '__buffersize', + '__writeable', + '__queue', + '__backup', + '__kvfactory', + '__loop', + '__event_loop', + '__buffer', + '__buffer_future', + '__buffer_requested', + ) + + __buffer: StringIO + __buffer_future: asyncio.Future + __buffer_requested: bool + + def __init__( + self, buffersize: int, writeable: WriteableFile, queue: ReceivingQueue, loop: Loop, / + ) -> None: + self.__buffersize = buffersize + self.__writeable = writeable + self.__queue = queue + self.__backup = writeable.truncation().backup() + self.__kvfactory = self.__backup.kvfactory() + self.__loop = loop + self.__event_loop = self.__loop.loop() + self._clear() + + def writeable(self, /) -> WriteableFile: + return self.__writeable + + def loop(self, /) -> Loop: + return self.__loop + + def _compressed(self, /) -> StringIO: self.__buffer.seek(0) bufferdb = {} self.__kvfactory.io2db(self.__buffer, bufferdb, False) @@ -416,34 +627,32 @@ class DbConnection(VirtualConnection): self.__kvfactory.db2io(bufferdb, buffer) return buffer - def _commit_compressed_buffer_sync(self, /) -> None: - self._file_write_sync(self._compress_buffer().getvalue()) + def _commit_compressed_sync(self, /) -> None: + self.__writeable.file_write_sync(self._compressed().getvalue()) - async def _commit_compressed_buffer(self, /) -> None: - await self.__loop.run_in_executor(None, self._commit_compressed_buffer_sync) + async def _commit_compressed(self, /) -> None: + await self.__event_loop.run_in_executor(None, self._commit_compressed_sync) - def _satisfy_buffer_future(self, /) -> None: + def _clear(self, /) -> None: + self.__buffer = StringIO() + self.__buffer_future = self.__loop.create_future() + self.__buffer_requested = False + + def _satisfy_future(self, /) -> None: self.__buffer_future.set_result(None) - self._clear_buffer() + self._clear() - def _fail_buffer_future(self, exception: BaseException, /) -> None: + def _fail_future(self, exception: BaseException, /) -> None: self.__buffer_future.set_exception(exception) - self._clear_buffer() + self._clear() async def _do_commit_buffer(self, /) -> None: try: - await self._commit_compressed_buffer() + await self._commit_compressed() except Exception as e: - self._fail_buffer_future(e) + self._fail_future(e) else: - self._satisfy_buffer_future() - - async def _commit_buffer(self, /) -> None: - if self.__buffer.tell(): - await self._do_commit_buffer() - await self._reload_if_oversized() - elif self.__buffer_requested: - self._satisfy_buffer_future() + self._satisfy_future() def _request_buffer(self, request: Request, /) -> None: if request.waiting(): @@ -459,82 +668,116 @@ class DbConnection(VirtualConnection): ) if not self.__buffer_requested: self.__buffer_requested = True - self.__queue.put_nowait(CommitRequest(None)) + self.__queue.submit(CommitRequest(None)) - async def _commit_buffer_or_request_so(self, request: Request, /) -> None: + async def _commit(self, /) -> None: + if self.__buffer.tell(): + await self._do_commit_buffer() + await self.__backup.reload_if_oversized() + elif self.__buffer_requested: + self._satisfy_future() + + async def _commit_or_request_so(self, request: Request, /) -> None: if self.__buffer.tell() >= self.__buffersize: - await self._commit_buffer() + await self._commit() request.set_result(None) else: self._request_buffer(request) async def _write(self, line: str, request: Request, /) -> None: self.__buffer.write(line) - await self._commit_buffer_or_request_so(request) - - def _write_truncation_bytes(self, s: bytes, /) -> None: - # consider subclassing/rewriting to use `os.fsync` - self.__path_truncate.write_bytes(s) - - def _write_truncation_value(self, value: int, /) -> None: - self._write_truncation_bytes(value.to_bytes(16, 'little')) - - def _truncation_set_sync(self, /) -> None: - self._write_truncation_value(self.__file.tell()) - self.__path_truncate_flag.touch() - - def _truncation_unset_sync(self, /) -> None: - self.__path_truncate_flag.unlink(missing_ok=True) - self.__path_truncate.unlink(missing_ok=True) - - def _file_truncate_sync(self, file: IO[str], pos: int, /) -> None: - file.seek(pos) - self._save_error_from_thread(file.read()) - file.truncate(pos) - - def _truncation_target_sync(self, /) -> int: - return int.from_bytes(self.__path_truncate.read_bytes(), 'little') - - def _truncate_sync(self, /) -> None: - with self.__path.open('r+') as file: - self._file_truncate_sync(file, self._truncation_target_sync()) - - def _assure_truncation_sync(self, /) -> None: - if self.__path_truncate_flag.exists(): - self._truncate_sync() - self._truncation_unset_sync() - - def _write_to_disk_sync(self, line: str, /) -> None: - self.__file.write(line) - self.__file.flush() - try: - os.fsync(self.__file.fileno()) - except UnsupportedOperation: - pass - - def _file_write_sync(self, line: str, /) -> None: - self._truncation_set_sync() - self._write_to_disk_sync(line) - self._truncation_unset_sync() - - async def _reload_if_oversized(self, /) -> None: - if self.__file.tell() > 2 * self.__initial_size: - await self._reload() + await self._commit_or_request_so(request) async def _handle_request(self, request: Request, /) -> None: match request: case LineRequest(): await self._write(request.line, request) case CommitRequest(): - await self._commit_buffer() + await self._commit() request.set_result(None) case _: raise UnknownRequestType + async def _close(self, /): + await self._commit() + if not self.__buffer_future.done(): + self.__buffer_future.set_exception(RequestToClosedConnection()) + if not isinstance(self.__buffer_future.exception(), RequestToClosedConnection): + raise RuntimeError + del self.__buffer_requested + del self.__buffer_future + del self.__buffer + + +class MMDB: + __slots__ = ( + '__backup', + '__truncation', + '__file', + '__kvfactory', + '__loop', + '__mmdb', + ) + + __mmdb: dict + + def __init__(self, truncation: Truncation, /) -> None: + self.__truncation = truncation + self.__backup = truncation.backup() + self.__file = self.__backup.file() + self.__kvfactory = self.__backup.kvfactory() + + def _initialize_sync(self, /) -> None: + self.__mmdb = self.__backup._load_mmdb_sync() + + def _load_from_file_sync(self, /) -> None: + self.__truncation.assure_sync() + self._initialize_sync() + self.__file._open_sync() + + async def _load_from_file(self, /) -> None: + await self.__backup.run_in_thread(self._load_from_file_sync) + + def _close_sync(self, /) -> None: + self.__file.close_sync() + self.__backup.build_file_sync(self.__mmdb) + del self.__mmdb + self.__backup.uninitialize() + + async def _close(self, /) -> None: + await self.__backup.run_in_thread(self._close_sync) + + def _transaction_buffer(self, delta: dict, /) -> StringIO: + buffer = StringIO() + self.__kvfactory.db2io(delta, buffer) + for key, value in delta.items(): + self.__kvfactory.dbset(self.__mmdb, key, value) + return buffer + + def get(self, key: Any, default: Any, /) -> None: + return self.__kvfactory.dbget(self.__mmdb, key, default) + + def set(self, key: Any, value: Any, /) -> None: + self.__kvfactory.dbset(self.__mmdb, key, value) + + +class QueueTask: + __slots__ = ( + '__queue', + '__buffer', + '__event_loop', + '__task', + ) + + def __init__(self, queue: asyncio.Queue[Request], buffer: WriteableBuffer, /) -> None: + self.__queue = queue + self.__buffer = buffer + self.__event_loop = buffer.loop().loop() + async def _background_cycle(self, /) -> None: request: Request = await self.__queue.get() try: - await self._handle_request(request) + await self.__buffer._handle_request(request) except Exception as e: request.set_exception(e) finally: @@ -544,101 +787,82 @@ class DbConnection(VirtualConnection): while True: await self._background_cycle() - def _start_task(self, /) -> None: - self.__task = self.__loop.create_task(self._background_task()) + async def close(self, /) -> None: + if not self.__task.done(): + await self.__queue.join() + self.__task.cancel() + del self.__task + if not self.__queue.empty(): + raise RuntimeError + del self.__queue + await self.__buffer._close() - def _recovery_set_sync(self, db: dict, /) -> None: - self.__initial_size = self._db2path_sync(db, self.__path_backup) - self.__path_recover.touch() + def start(self, /) -> None: + self.__task = self.__event_loop.create_task(self._background_task()) - def _recovery_unset_sync(self, /) -> None: - self.__path_recover.unlink() - self.__path_backup.unlink() - def _copy_sync(self, db: dict | None, /) -> None: - if db is None: - db = {} - self._path2db_sync(self.__path_backup, db) - self._db2path_sync(db, self.__path) +class DbConnection(VirtualConnection): + """note: unstable constructor signature.""" - def _finish_recovery_sync(self, db: dict | None, /) -> None: - self._copy_sync(db) - self._recovery_unset_sync() + __slots__ = ( + '__kvfactory', + '__buffersize', + '__path', + '__error', + '__not_running', + '__mmdb', + '__loop', + '__queue', + '__file', + '__task', + ) - def _build_file_sync(self, db: dict, /) -> None: - self._recovery_set_sync(db) - self._finish_recovery_sync(db) + __mmdb: MMDB + __loop: Loop + __queue: ReceivingQueue + __task: QueueTask - def _run_in_thread(self, fn, /, *args, **kwargs) -> asyncio.Future: - """we are using our own thread to guarantee as much of autonomy and control as possible. -intended for heavy tasks.""" - future = self._create_future() + def __init__(self, parametres: DbParameters, /) -> None: + self.__kvfactory = parametres.kvfactory + self.__buffersize = parametres.buffersize + self.__path = path = parametres.path + name = path.name + self.__not_running = True - def wrap() -> None: - try: - result = fn(*args, **kwargs) - except Exception as exception: - self.__loop.call_soon_threadsafe( - future.set_exception, exception - ) - else: - self.__loop.call_soon_threadsafe( - future.set_result, result - ) - name = getattr(fn, '__name__', '?') - threading.Thread( - target=wrap, - name=f'persistence5-{self.__path.name}-{name}' - ).start() + def kvprotocol(self, /) -> KVProtocol: + return self.__kvfactory - return future + def get(self, key: Any, default: Any, /) -> Any: + """dict-like get with mandatory default parametre.""" + return self.__mmdb.get(key, default) - def _rebuild_file_sync(self, db: dict, /) -> None: - if self.__path_recover.exists(): - self._finish_recovery_sync(None) - self._path2db_sync(self.__path, db) - self._build_file_sync(db) + async def set(self, key: Any, value: Any, /) -> None: + """set the value and wait until it's written to disk.""" + future = self.__loop.create_future() + request = self.__kvfactory.request(key, value, future=future) + self.__mmdb.set(key, value) + self.__queue.submit(request) + await future - def _file_open_sync(self, /) -> None: - self.__file = self.__path.open('a') - - def _file_close_sync(self, /) -> None: - self.__file.close() - del self.__file - - def _reload_sync(self, /) -> None: - self._file_close_sync() - self._rebuild_file_sync({}) - self._file_open_sync() - - async def _reload(self, /) -> None: - await self._run_in_thread(self._reload_sync) - - def _load_mmdb_sync(self, /) -> dict: - db = {} - self._rebuild_file_sync(db) - return db - - def _initialize_mmdb_sync(self, /) -> None: - self.__mmdb = self._load_mmdb_sync() - - def _load_from_file_sync(self, /) -> None: - self._assure_truncation_sync() - self._initialize_mmdb_sync() - self._file_open_sync() - - async def _load_from_file(self, /) -> None: - await self._run_in_thread(self._load_from_file_sync) - - def _initialize_queue(self, /) -> None: - self.__queue = asyncio.Queue() - self._clear_buffer() + def set_nowait(self, key: Any, value: Any, /) -> None: + """set value and add write-to-disk request to queue.""" + request = self.__kvfactory.free(key, value) + self.__mmdb.set(key, value) + self.__queue.submit(request) async def _initialize_running(self, /) -> None: - self.__loop = asyncio.get_running_loop() - self._initialize_queue() - await self._load_from_file() - self._start_task() + self.__loop = Loop(asyncio.get_running_loop()) + error = Errors(self.__path, self.__loop) + backup = Backup(self.__path, self.__kvfactory, self.__loop) + truncation = Truncation(backup, error) + write = WriteableFile(truncation) + queue: asyncio.Queue[Request] = asyncio.Queue() + self.__queue = ReceivingQueue(queue) + buffer = WriteableBuffer(self.__buffersize, write, self.__queue, self.__loop) + self.__mmdb = MMDB(truncation) + await self.__mmdb._load_from_file() + self.__task = QueueTask(queue, buffer) + self.__task.start() async def _initialize(self, /) -> None: if not self.__not_running: @@ -647,46 +871,21 @@ intended for heavy tasks.""" await self._initialize_running() @classmethod - async def create(cls, parametres: DbParametres, /) -> 'DbConnection': + async def create(cls, parametres: DbParameters, /) -> 'DbConnection': """connect to the factory. note: unstable signature.""" dbconnection = DbConnection(parametres) await dbconnection._initialize() return dbconnection - async def _close_buffer(self, /): - await self._commit_buffer() - if not self.__buffer_future.done(): - self.__buffer_future.set_exception(RequestToClosedConnection()) - if not isinstance(self.__buffer_future.exception(), RequestToClosedConnection): - raise RuntimeError - del self.__buffer_requested - del self.__buffer_future - del self.__buffer - - async def _close_queue(self, /) -> None: - if not self.__task.done(): - await self.__queue.join() - self.__task.cancel() - del self.__task - if not self.__queue.empty(): - raise RuntimeError - del self.__queue - await self._close_buffer() - - def _close_mmdb_sync(self, /) -> None: - self._file_close_sync() - self._build_file_sync(self.__mmdb) - del self.__mmdb - del self.__initial_size - - async def _close_mmdb(self, /) -> None: - await self._run_in_thread(self._close_mmdb_sync) - async def _close_running(self, /) -> None: - await self._close_queue() - await self._close_mmdb() + mmdb = self.__mmdb + del self.__mmdb + del self.__queue del self.__loop + await self.__task.close() + del self.__task + await mmdb._close() async def aclose(self, /) -> None: """close the connection. @@ -694,21 +893,14 @@ note: unstable signature.""" await self._close_running() self.__not_running = True - def _transaction_buffer(self, delta: dict, /) -> StringIO: - buffer = StringIO() - self.__kvfactory.db2io(delta, buffer) - for key, value in delta.items(): - self.__kvfactory.dbset(self.__mmdb, key, value) - return buffer - async def commit_transaction(self, delta: dict, /) -> None: """hybrid of set() and dict.update(). note: unstable signature.""" if not delta: return - buffer = self._transaction_buffer(delta) - future = self._create_future() - self.__queue.put_nowait(TransactionRequest(buffer, future=future)) + buffer = self.__mmdb._transaction_buffer(delta) + future = self.__loop.create_future() + self.__queue.submit(TransactionRequest(buffer, future=future)) await future def submit_transaction(self, delta: dict, /) -> None: @@ -718,8 +910,8 @@ note: this method was added only for async-sync symmetry with commit_transaction note: unstable signature.""" if not delta: return - buffer = self._transaction_buffer(delta) - self.__queue.put_nowait(TransactionRequest(buffer, future=None)) + buffer = self.__mmdb._transaction_buffer(delta) + self.__queue.submit(TransactionRequest(buffer, future=None)) def submit_transaction_request(self, delta: dict, future: asyncio.Future | None, /) -> None: """low-level API. @@ -729,18 +921,18 @@ note: unstable signature.""" if future: future.set_result(None) return - buffer = self._transaction_buffer(delta) - self.__queue.put_nowait(TransactionRequest(buffer, future=future)) + buffer = self.__mmdb._transaction_buffer(delta) + self.__queue.submit(TransactionRequest(buffer, future=future)) async def commit(self, /) -> None: """wait until all requests queued before are completed.""" - future = self._create_future() - self.__queue.put_nowait(CommitRequest(future)) + future = self.__loop.create_future() + self.__queue.submit(CommitRequest(future)) await future @nightly def loop(self, /) -> asyncio.AbstractEventLoop: - return self.__loop + return self.__loop.loop() def transaction(self, /) -> 'Transaction': return super().transaction() @@ -753,7 +945,7 @@ class DbFactory: ) def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None: - self.__parametres = DbParametres( + self.__parametres = DbParameters( path, kvfactory=kvfactory, buffersize=buffersize ) @@ -773,7 +965,7 @@ class Db(DbConnection): def __init__(self, path: str | pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576): DbConnection.__init__( self, - DbParametres( + DbParameters( pathlib.Path(path), kvfactory=kvfactory, buffersize=buffersize ) ) @@ -876,7 +1068,7 @@ class TransactionView(VirtualConnection): def kvprotocol(self, /) -> KVProtocol: return self.__kvprotocol - def get(self, key: Any, default: Any, /): + def get(self, key: Any, default: Any, /) -> Any: """get from the delta (unsubmitted), else from the shadow (submitted), else from the connection.""" if key in self.__delta: return self.__kvprotocol.dbget(self.__delta, key, default) diff --git a/ptvp35/instrumentation.py b/ptvp35/instrumentation.py index e230b02..14f3de4 100644 --- a/ptvp35/instrumentation.py +++ b/ptvp35/instrumentation.py @@ -6,7 +6,7 @@ __all__ = ('InstrumentDiskWrites', 'NightlyInstrumentation') class InstrumentDiskWrites(Instrumentation): def __init__(self, /): - super().__init__(ptvp35.DbConnection, '_write_to_disk_sync') + super().__init__(ptvp35.File, 'write_to_disk_sync') def on_write(self, line: str, /) -> None: pass