truncation

This commit is contained in:
AF 2022-11-11 17:36:29 +00:00
parent 8be67bf834
commit b2b326fc55

View File

@ -113,11 +113,12 @@ class DbConnection:
factory: 'DbFactory', factory: 'DbFactory',
) -> None: ) -> None:
self.factory = factory self.factory = factory
path = self.factory.path self.__path = path = self.factory.path
self.__path = pathlib.Path(path) name = self.__path.name
self.__path_backup = pathlib.Path(path + '.backup') self.__path_backup = path.with_name(name + '.backup')
self.__path_recover = pathlib.Path(path + '.recover') self.__path_recover = path.with_name(name + '.recover')
self.__path_error = pathlib.Path(path + '.error') self.__path_error = path.with_name(name + '.error')
self.__path_truncate = path.with_name(name + '.truncate')
self.not_running = True self.not_running = True
def _queue_error(self, line: str): def _queue_error(self, line: str):
@ -179,13 +180,17 @@ class DbConnection:
return buffer return buffer
def _file_write(self, line: str): def _file_write(self, line: str):
self.__path_truncate.write_bytes(self.__file.tell().to_bytes(16, 'little'))
self.__file.write(line) self.__file.write(line)
self.__file.flush() self.__file.flush()
os.fsync(self.__file.fileno()) os.fsync(self.__file.fileno())
self.__path_truncate.unlink(missing_ok=True)
def _dump_compressed_buffer_sync(self):
self._file_write(self._compress_buffer().getvalue())
async def _dump_compressed_buffer(self): async def _dump_compressed_buffer(self):
buffer = self._compress_buffer() await self.__loop.run_in_executor(None, self._dump_compressed_buffer_sync)
await self.__loop.run_in_executor(None, self._file_write, buffer.getvalue())
async def _do_dump_buffer(self): async def _do_dump_buffer(self):
await self._dump_compressed_buffer() await self._dump_compressed_buffer()
@ -200,9 +205,12 @@ class DbConnection:
await self._do_dump_buffer() await self._do_dump_buffer()
await self._reload_if_oversized() await self._reload_if_oversized()
async def _save_error(self, line: str): def _save_error_sync(self, line: str):
with open(self.__path_error, 'a') as file: with open(self.__path_error, 'a') as file:
await self.__loop.run_in_executor(None, file.write, line.strip() + '\n') file.write(line.strip() + '\n')
async def _save_error(self, line: str):
await self.__loop.run_in_executor(None, self._save_error_sync, line)
async def _handle_request(self, request: Request): async def _handle_request(self, request: Request):
if isinstance(request, self.factory.kvrequest_type): if isinstance(request, self.factory.kvrequest_type):
@ -285,8 +293,18 @@ class DbConnection:
async def _load_from_file(self): async def _load_from_file(self):
await self._rebuild_file(self.__mmdb) await self._rebuild_file(self.__mmdb)
def _assure_truncation(self):
if self.__path_truncate.exists():
pos = int.from_bytes(self.__path_truncate.read_bytes(), 'little')
with open(self.__path, 'r+') as file:
file.seek(pos)
asyncio.run_coroutine_threadsafe(self._save_error(file.read()), self.__loop).result()
file.truncate(pos)
self.__path_truncate.unlink(missing_ok=True)
async def _initialize_mmdb(self): async def _initialize_mmdb(self):
self.__mmdb = {} self.__mmdb = {}
await self.__loop.run_in_executor(None, self._assure_truncation)
await self._load_from_file() await self._load_from_file()
self.__file = open(self.__path, "a") self.__file = open(self.__path, "a")
@ -346,7 +364,7 @@ class DbConnection:
class DbFactory: class DbFactory:
def __init__(self, path: str | pathlib.Path, *, kvrequest_type: Type[KVRequest], buffersize=1048576): def __init__(self, path: str | pathlib.Path, *, kvrequest_type: Type[KVRequest], buffersize=1048576):
self.path = path = str(path) self.path = pathlib.Path(path)
self.kvrequest_type = kvrequest_type self.kvrequest_type = kvrequest_type
self.buffersize = buffersize self.buffersize = buffersize