better close

This commit is contained in:
AF 2022-11-21 17:30:54 +00:00
parent 0e7efbb1d5
commit b9c83d13f2
2 changed files with 60 additions and 23 deletions

View File

@ -176,6 +176,10 @@ class DbParametres:
"""note: unstable signature."""
class RequestToClosedConnection(asyncio.InvalidStateError):
pass
class DbConnection:
"""note: unstable constructor signature."""
@ -454,9 +458,6 @@ intended for heavy tasks."""
return future
async def _build_file(self, db: dict, /) -> None:
await self._run_in_thread(self._build_file_sync, db)
def _rebuild_file_sync(self, db: dict, /) -> None:
if self.__path_recover.exists():
self._finish_recovery_sync(None)
@ -466,9 +467,12 @@ intended for heavy tasks."""
def _file_open_sync(self, /) -> None:
self.__file = self.__path.open('a')
def _reload_sync(self, /) -> None:
def _file_close_sync(self, /) -> None:
self.__file.close()
del self.__file
def _reload_sync(self, /) -> None:
self._file_close_sync()
self._rebuild_file_sync({})
self._file_open_sync()
@ -513,27 +517,43 @@ note: unstable signature."""
dbconnection = DbConnection(parametres)
await dbconnection._initialize()
return dbconnection
async def _close_buffer(self, /):
await self._commit_buffer()
if not self.__buffer_future.done():
self.__buffer_future.set_exception(RequestToClosedConnection())
assert isinstance(self.__buffer_future.exception(), RequestToClosedConnection)
del self.__buffer_requested
del self.__buffer_future
del self.__buffer
async def _close_queue(self, /) -> None:
if not self.__task.done():
await self.__queue.join()
self.__task.cancel()
del self.__task
assert self.__queue.empty()
del self.__queue
await self._close_buffer()
def _close_mmdb_sync(self, /) -> None:
self._file_close_sync()
self._build_file_sync(self.__mmdb)
del self.__mmdb
del self.__initial_size
async def _close_mmdb(self, /) -> None:
await self._run_in_thread(self._close_mmdb_sync)
async def _close_running(self, /) -> None:
await self._close_queue()
await self._close_mmdb()
del self.__loop
async def aclose(self, /) -> None:
"""close the connection.
note: unstable signature."""
if not self.__task.done():
await self.__queue.join()
self.__task.cancel()
await self._commit_buffer()
if not self.__buffer_future.done():
self.__buffer_future.cancel()
self.__file.close()
await self._build_file(self.__mmdb)
del self.__task
del self.__file
del self.__initial_size
del self.__mmdb
del self.__buffer_requested
del self.__buffer_future
del self.__buffer
del self.__queue
del self.__loop
await self._close_running()
self.__not_running = True
def _transaction_buffer(self, delta: dict, /) -> StringIO:

View File

@ -125,12 +125,22 @@ async def transaction_test(db: DbConnection):
logdb('after transaction with implicit submit')
def print_private_db_attrs(db: DbConnection):
if run_all:
for attr in dir(db):
if attr.startswith('_DbConnection') and hasattr(db, attr):
print(attr)
run_all = 'all' in sys.argv
async def main():
(path := pathlib.Path('dev.db')).unlink(missing_ok=True)
with ExitStack() as es:
LogWrites().enter(es)
if 'all' in sys.argv:
if run_all:
LogEE(__import__('ptvp35').Request, '__init__').enter(es)
LogEE(__import__('ptvp35').Request, 'waiting').enter(es)
LogEE(__import__('ptvp35').Request, 'set_result').enter(es)
@ -189,9 +199,9 @@ async def main():
LogEE(__import__('ptvp35').DbConnection, '_finish_recovery_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_build_file_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_run_in_thread').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_build_file').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_rebuild_file_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_file_open_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_file_close_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_reload_sync').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_reload').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_load_mmdb_sync').enter(es)
@ -201,6 +211,11 @@ async def main():
LogEE(__import__('ptvp35').DbConnection, '_initialize_queue').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_initialize_running').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_initialize').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_close_buffer').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_close_queue').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_close_mmdb_sync').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_close_mmdb').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_close_running').enter(es)
ALogEE(__import__('ptvp35').DbConnection, 'aclose').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_transaction_buffer').enter(es)
ALogEE(__import__('ptvp35').DbConnection, 'commit_transaction').enter(es)
@ -231,6 +246,8 @@ async def main():
LogEE(__import__('ptvp35').Transaction, '__exit__').enter(es)
async with DbFactory(path, kvfactory=KVJson()) as db:
await transaction_test(db)
print_private_db_attrs(db)
print_private_db_attrs(db)
if __name__ == '__main__':