From b9c83d13f2d2d4718256680c6e6eb246691fd6f0 Mon Sep 17 00:00:00 2001 From: timofey Date: Mon, 21 Nov 2022 17:30:54 +0000 Subject: [PATCH] better close --- ptvp35/__init__.py | 62 ++++++++++++++++++++++++++++++---------------- traced_example.py | 21 ++++++++++++++-- 2 files changed, 60 insertions(+), 23 deletions(-) diff --git a/ptvp35/__init__.py b/ptvp35/__init__.py index 0991f80..c9565ec 100644 --- a/ptvp35/__init__.py +++ b/ptvp35/__init__.py @@ -176,6 +176,10 @@ class DbParametres: """note: unstable signature.""" +class RequestToClosedConnection(asyncio.InvalidStateError): + pass + + class DbConnection: """note: unstable constructor signature.""" @@ -454,9 +458,6 @@ intended for heavy tasks.""" return future - async def _build_file(self, db: dict, /) -> None: - await self._run_in_thread(self._build_file_sync, db) - def _rebuild_file_sync(self, db: dict, /) -> None: if self.__path_recover.exists(): self._finish_recovery_sync(None) @@ -466,9 +467,12 @@ intended for heavy tasks.""" def _file_open_sync(self, /) -> None: self.__file = self.__path.open('a') - def _reload_sync(self, /) -> None: + def _file_close_sync(self, /) -> None: self.__file.close() del self.__file + + def _reload_sync(self, /) -> None: + self._file_close_sync() self._rebuild_file_sync({}) self._file_open_sync() @@ -513,27 +517,43 @@ note: unstable signature.""" dbconnection = DbConnection(parametres) await dbconnection._initialize() return dbconnection + + async def _close_buffer(self, /): + await self._commit_buffer() + if not self.__buffer_future.done(): + self.__buffer_future.set_exception(RequestToClosedConnection()) + assert isinstance(self.__buffer_future.exception(), RequestToClosedConnection) + del self.__buffer_requested + del self.__buffer_future + del self.__buffer + + async def _close_queue(self, /) -> None: + if not self.__task.done(): + await self.__queue.join() + self.__task.cancel() + del self.__task + assert self.__queue.empty() + del self.__queue + await self._close_buffer() + + def _close_mmdb_sync(self, /) -> None: + self._file_close_sync() + self._build_file_sync(self.__mmdb) + del self.__mmdb + del self.__initial_size + + async def _close_mmdb(self, /) -> None: + await self._run_in_thread(self._close_mmdb_sync) + + async def _close_running(self, /) -> None: + await self._close_queue() + await self._close_mmdb() + del self.__loop async def aclose(self, /) -> None: """close the connection. note: unstable signature.""" - if not self.__task.done(): - await self.__queue.join() - self.__task.cancel() - await self._commit_buffer() - if not self.__buffer_future.done(): - self.__buffer_future.cancel() - self.__file.close() - await self._build_file(self.__mmdb) - del self.__task - del self.__file - del self.__initial_size - del self.__mmdb - del self.__buffer_requested - del self.__buffer_future - del self.__buffer - del self.__queue - del self.__loop + await self._close_running() self.__not_running = True def _transaction_buffer(self, delta: dict, /) -> StringIO: diff --git a/traced_example.py b/traced_example.py index 3f583ff..e25840f 100644 --- a/traced_example.py +++ b/traced_example.py @@ -125,12 +125,22 @@ async def transaction_test(db: DbConnection): logdb('after transaction with implicit submit') +def print_private_db_attrs(db: DbConnection): + if run_all: + for attr in dir(db): + if attr.startswith('_DbConnection') and hasattr(db, attr): + print(attr) + + +run_all = 'all' in sys.argv + + async def main(): (path := pathlib.Path('dev.db')).unlink(missing_ok=True) with ExitStack() as es: LogWrites().enter(es) - if 'all' in sys.argv: + if run_all: LogEE(__import__('ptvp35').Request, '__init__').enter(es) LogEE(__import__('ptvp35').Request, 'waiting').enter(es) LogEE(__import__('ptvp35').Request, 'set_result').enter(es) @@ -189,9 +199,9 @@ async def main(): LogEE(__import__('ptvp35').DbConnection, '_finish_recovery_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_build_file_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_run_in_thread').enter(es) - ALogEE(__import__('ptvp35').DbConnection, '_build_file').enter(es) LogEE(__import__('ptvp35').DbConnection, '_rebuild_file_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_file_open_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_file_close_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_reload_sync').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_reload').enter(es) LogEE(__import__('ptvp35').DbConnection, '_load_mmdb_sync').enter(es) @@ -201,6 +211,11 @@ async def main(): LogEE(__import__('ptvp35').DbConnection, '_initialize_queue').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_initialize_running').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_initialize').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_close_buffer').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_close_queue').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_close_mmdb_sync').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_close_mmdb').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_close_running').enter(es) ALogEE(__import__('ptvp35').DbConnection, 'aclose').enter(es) LogEE(__import__('ptvp35').DbConnection, '_transaction_buffer').enter(es) ALogEE(__import__('ptvp35').DbConnection, 'commit_transaction').enter(es) @@ -231,6 +246,8 @@ async def main(): LogEE(__import__('ptvp35').Transaction, '__exit__').enter(es) async with DbFactory(path, kvfactory=KVJson()) as db: await transaction_test(db) + print_private_db_attrs(db) + print_private_db_attrs(db) if __name__ == '__main__':