pool isolation
This commit is contained in:
		
							parent
							
								
									46d585c762
								
							
						
					
					
						commit
						da8db42dae
					
				| @ -102,7 +102,6 @@ class TransactionRequest(Request): | |||||||
| class DbConnection: | class DbConnection: | ||||||
|     __mmdb: dict |     __mmdb: dict | ||||||
|     __loop: asyncio.AbstractEventLoop |     __loop: asyncio.AbstractEventLoop | ||||||
|     __pool: concurrent.futures.Executor |  | ||||||
|     __queue: asyncio.Queue[Request] |     __queue: asyncio.Queue[Request] | ||||||
|     __file: IO[str] |     __file: IO[str] | ||||||
|     __buffer: StringIO |     __buffer: StringIO | ||||||
| @ -181,7 +180,7 @@ class DbConnection: | |||||||
| 
 | 
 | ||||||
|     async def _dump_compressed_buffer(self): |     async def _dump_compressed_buffer(self): | ||||||
|         buffer = self._compress_buffer() |         buffer = self._compress_buffer() | ||||||
|         await self.__loop.run_in_executor(self.__pool, self.__file.write, buffer.getvalue()) |         await self.__loop.run_in_executor(None, self.__file.write, buffer.getvalue()) | ||||||
| 
 | 
 | ||||||
|     async def _do_dump_buffer(self): |     async def _do_dump_buffer(self): | ||||||
|         await self._dump_compressed_buffer() |         await self._dump_compressed_buffer() | ||||||
| @ -198,7 +197,7 @@ class DbConnection: | |||||||
| 
 | 
 | ||||||
|     async def _save_error(self, line: str): |     async def _save_error(self, line: str): | ||||||
|         with open(self.__path_error, 'a') as file: |         with open(self.__path_error, 'a') as file: | ||||||
|             await self.__loop.run_in_executor(self.__pool, file.write, line.strip() + '\n') |             await self.__loop.run_in_executor(None, file.write, line.strip() + '\n') | ||||||
| 
 | 
 | ||||||
|     async def _handle_request(self, request: Request): |     async def _handle_request(self, request: Request): | ||||||
|         if isinstance(request, self.factory.kvrequest_type): |         if isinstance(request, self.factory.kvrequest_type): | ||||||
| @ -227,13 +226,14 @@ class DbConnection: | |||||||
|             await self._background_cycle() |             await self._background_cycle() | ||||||
| 
 | 
 | ||||||
|     async def _finish_recovery(self): |     async def _finish_recovery(self): | ||||||
|         await self.__loop.run_in_executor(self.__pool, shutil.copy, self.__path_backup, self.__path) |         with concurrent.futures.ThreadPoolExecutor() as pool: | ||||||
|  |             await self.__loop.run_in_executor(pool, shutil.copy, self.__path_backup, self.__path) | ||||||
|         self.__path_recover.unlink() |         self.__path_recover.unlink() | ||||||
|         self.__path_backup.unlink() |         self.__path_backup.unlink() | ||||||
| 
 | 
 | ||||||
|     async def _build_file(self, db: dict): |     async def _build_file(self, db: dict): | ||||||
|         with open(self.__path_backup, "w") as file: |         with open(self.__path_backup, "w") as file, concurrent.futures.ThreadPoolExecutor() as pool: | ||||||
|             self.__initial_size = await self.__loop.run_in_executor(self.__pool, self.db2io, db, file) |             self.__initial_size = await self.__loop.run_in_executor(pool, self.db2io, db, file) | ||||||
|         self.__path_recover.touch() |         self.__path_recover.touch() | ||||||
|         await self._finish_recovery() |         await self._finish_recovery() | ||||||
| 
 | 
 | ||||||
| @ -242,7 +242,7 @@ class DbConnection: | |||||||
|             await self._finish_recovery() |             await self._finish_recovery() | ||||||
|         self.__path.touch() |         self.__path.touch() | ||||||
|         with open(self.__path) as file: |         with open(self.__path) as file: | ||||||
|             await self.__loop.run_in_executor(self.__pool, self.io2db, file, db) |             await self.__loop.run_in_executor(None, self.io2db, file, db) | ||||||
|         await self._build_file(db) |         await self._build_file(db) | ||||||
| 
 | 
 | ||||||
|     async def _reload(self): |     async def _reload(self): | ||||||
| @ -268,7 +268,6 @@ class DbConnection: | |||||||
|     async def _initialize(self): |     async def _initialize(self): | ||||||
|         assert self.not_running |         assert self.not_running | ||||||
|         self.__loop = asyncio.get_event_loop() |         self.__loop = asyncio.get_event_loop() | ||||||
|         self.__pool = concurrent.futures.ThreadPoolExecutor() |  | ||||||
|         await self._initialize_queue() |         await self._initialize_queue() | ||||||
|         await self._initialize_mmdb() |         await self._initialize_mmdb() | ||||||
|         await self._start_task() |         await self._start_task() | ||||||
| @ -286,14 +285,10 @@ class DbConnection: | |||||||
|             self.__task.cancel() |             self.__task.cancel() | ||||||
|         await self._dump_buffer() |         await self._dump_buffer() | ||||||
|         self.__file.close() |         self.__file.close() | ||||||
|         self.__pool.shutdown() |  | ||||||
|         self.__pool = concurrent.futures.ThreadPoolExecutor() |  | ||||||
|         await self._build_file(self.__mmdb) |         await self._build_file(self.__mmdb) | ||||||
|         self.not_running = True |         self.not_running = True | ||||||
|         self.__pool.shutdown() |  | ||||||
|         del self.__mmdb |         del self.__mmdb | ||||||
|         del self.__loop |         del self.__loop | ||||||
|         del self.__pool |  | ||||||
|         del self.__queue |         del self.__queue | ||||||
|         del self.__file |         del self.__file | ||||||
|         del self.__buffer |         del self.__buffer | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user