From b2b326fc553502de8f82c479f115514315a05b88 Mon Sep 17 00:00:00 2001 From: timofey Date: Fri, 11 Nov 2022 17:36:29 +0000 Subject: [PATCH] truncation --- ptvp35/__init__.py | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/ptvp35/__init__.py b/ptvp35/__init__.py index f94666f..befc7e1 100644 --- a/ptvp35/__init__.py +++ b/ptvp35/__init__.py @@ -113,11 +113,12 @@ class DbConnection: factory: 'DbFactory', ) -> None: self.factory = factory - path = self.factory.path - self.__path = pathlib.Path(path) - self.__path_backup = pathlib.Path(path + '.backup') - self.__path_recover = pathlib.Path(path + '.recover') - self.__path_error = pathlib.Path(path + '.error') + self.__path = path = self.factory.path + name = self.__path.name + self.__path_backup = path.with_name(name + '.backup') + self.__path_recover = path.with_name(name + '.recover') + self.__path_error = path.with_name(name + '.error') + self.__path_truncate = path.with_name(name + '.truncate') self.not_running = True def _queue_error(self, line: str): @@ -179,13 +180,17 @@ class DbConnection: return buffer def _file_write(self, line: str): + self.__path_truncate.write_bytes(self.__file.tell().to_bytes(16, 'little')) self.__file.write(line) self.__file.flush() os.fsync(self.__file.fileno()) + self.__path_truncate.unlink(missing_ok=True) + + def _dump_compressed_buffer_sync(self): + self._file_write(self._compress_buffer().getvalue()) async def _dump_compressed_buffer(self): - buffer = self._compress_buffer() - await self.__loop.run_in_executor(None, self._file_write, buffer.getvalue()) + await self.__loop.run_in_executor(None, self._dump_compressed_buffer_sync) async def _do_dump_buffer(self): await self._dump_compressed_buffer() @@ -200,9 +205,12 @@ class DbConnection: await self._do_dump_buffer() await self._reload_if_oversized() - async def _save_error(self, line: str): + def _save_error_sync(self, line: str): with open(self.__path_error, 'a') as file: - await self.__loop.run_in_executor(None, file.write, line.strip() + '\n') + file.write(line.strip() + '\n') + + async def _save_error(self, line: str): + await self.__loop.run_in_executor(None, self._save_error_sync, line) async def _handle_request(self, request: Request): if isinstance(request, self.factory.kvrequest_type): @@ -285,8 +293,18 @@ class DbConnection: async def _load_from_file(self): await self._rebuild_file(self.__mmdb) + def _assure_truncation(self): + if self.__path_truncate.exists(): + pos = int.from_bytes(self.__path_truncate.read_bytes(), 'little') + with open(self.__path, 'r+') as file: + file.seek(pos) + asyncio.run_coroutine_threadsafe(self._save_error(file.read()), self.__loop).result() + file.truncate(pos) + self.__path_truncate.unlink(missing_ok=True) + async def _initialize_mmdb(self): self.__mmdb = {} + await self.__loop.run_in_executor(None, self._assure_truncation) await self._load_from_file() self.__file = open(self.__path, "a") @@ -346,7 +364,7 @@ class DbConnection: class DbFactory: def __init__(self, path: str | pathlib.Path, *, kvrequest_type: Type[KVRequest], buffersize=1048576): - self.path = path = str(path) + self.path = pathlib.Path(path) self.kvrequest_type = kvrequest_type self.buffersize = buffersize