diff --git a/ptvp35/__init__.py b/ptvp35/__init__.py index eccacbe..767ab41 100644 --- a/ptvp35/__init__.py +++ b/ptvp35/__init__.py @@ -102,7 +102,6 @@ class TransactionRequest(Request): class DbConnection: __mmdb: dict __loop: asyncio.AbstractEventLoop - __pool: concurrent.futures.Executor __queue: asyncio.Queue[Request] __file: IO[str] __buffer: StringIO @@ -181,7 +180,7 @@ class DbConnection: async def _dump_compressed_buffer(self): buffer = self._compress_buffer() - await self.__loop.run_in_executor(self.__pool, self.__file.write, buffer.getvalue()) + await self.__loop.run_in_executor(None, self.__file.write, buffer.getvalue()) async def _do_dump_buffer(self): await self._dump_compressed_buffer() @@ -198,7 +197,7 @@ class DbConnection: async def _save_error(self, line: str): with open(self.__path_error, 'a') as file: - await self.__loop.run_in_executor(self.__pool, file.write, line.strip() + '\n') + await self.__loop.run_in_executor(None, file.write, line.strip() + '\n') async def _handle_request(self, request: Request): if isinstance(request, self.factory.kvrequest_type): @@ -227,13 +226,14 @@ class DbConnection: await self._background_cycle() async def _finish_recovery(self): - await self.__loop.run_in_executor(self.__pool, shutil.copy, self.__path_backup, self.__path) + with concurrent.futures.ThreadPoolExecutor() as pool: + await self.__loop.run_in_executor(pool, shutil.copy, self.__path_backup, self.__path) self.__path_recover.unlink() self.__path_backup.unlink() async def _build_file(self, db: dict): - with open(self.__path_backup, "w") as file: - self.__initial_size = await self.__loop.run_in_executor(self.__pool, self.db2io, db, file) + with open(self.__path_backup, "w") as file, concurrent.futures.ThreadPoolExecutor() as pool: + self.__initial_size = await self.__loop.run_in_executor(pool, self.db2io, db, file) self.__path_recover.touch() await self._finish_recovery() @@ -242,7 +242,7 @@ class DbConnection: await self._finish_recovery() self.__path.touch() with open(self.__path) as file: - await self.__loop.run_in_executor(self.__pool, self.io2db, file, db) + await self.__loop.run_in_executor(None, self.io2db, file, db) await self._build_file(db) async def _reload(self): @@ -268,7 +268,6 @@ class DbConnection: async def _initialize(self): assert self.not_running self.__loop = asyncio.get_event_loop() - self.__pool = concurrent.futures.ThreadPoolExecutor() await self._initialize_queue() await self._initialize_mmdb() await self._start_task() @@ -286,14 +285,10 @@ class DbConnection: self.__task.cancel() await self._dump_buffer() self.__file.close() - self.__pool.shutdown() - self.__pool = concurrent.futures.ThreadPoolExecutor() await self._build_file(self.__mmdb) self.not_running = True - self.__pool.shutdown() del self.__mmdb del self.__loop - del self.__pool del self.__queue del self.__file del self.__buffer