diff --git a/docs/source/conf.py b/docs/source/conf.py index d4771f1..e61b737 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -9,7 +9,7 @@ project = 'ptvp35' copyright = '2022, PARRRATE TNV' author = 'PARRRATE TNV' -release = '1.0rc3' +release = '1.0rc4' # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 4aa87ae..8812bc3 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -8,7 +8,7 @@ Default installation option is to use pip+git .. code-block:: console - (.venv) $ pip install git+https://gitea.parrrate.ru/PTV/ptvp35.git + (venv) $ pip install git+https://gitea.parrrate.ru/PTV/ptvp35.git Basic functionality ------------------- diff --git a/ptvp35/__init__.py b/ptvp35/__init__.py index 4debb14..2fd9494 100644 --- a/ptvp35/__init__.py +++ b/ptvp35/__init__.py @@ -65,7 +65,7 @@ note: other forms of requests will later be represented by different methods or raise NotImplementedError def fromline(self, line: str, /) -> tuple[Any, Any]: - """inverse of line(). should use free() method to construct the request. + """inverse of line(). note: unstable signature.""" raise NotImplementedError @@ -87,6 +87,21 @@ note: unstable signature.""" note: unstable signature.""" return self.request(key, value, future=None) + def io2db(self, io: IO[str], db: dict, /) -> int: + """note: unstable signature.""" + size = 0 + for line in io: + self.run(line, db) + size += len(line) + return size + + def db2io(self, db: dict, io: IO[str], /) -> int: + """note: unstable signature.""" + size = 0 + for key, value in db.items(): + size += io.write(self.line(key, value)) + return size + class KVRequest(LineRequest): __slots__ = ( @@ -146,11 +161,28 @@ class TransactionRequest(LineRequest): self.buffer = buffer +class DbParametres: + __slots__ = ( + 'path', + 'kvfactory', + 'buffersize', + ) + + def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize: int) -> None: + self.path = path + """note: unstable signature.""" + self.kvfactory = kvfactory + """note: unstable signature.""" + self.buffersize = buffersize + """note: unstable signature.""" + + class DbConnection: """note: unstable constructor signature.""" __slots__ = ( - '__factory', + '__kvfactory', + '__buffersize', '__path', '__path_backup', '__path_recover', @@ -181,11 +213,12 @@ class DbConnection: def __init__( self, - factory: 'DbFactory', + parametres: 'DbParametres', / ) -> None: - self.__factory = factory - self.__path = path = self.__factory.path + self.__kvfactory = parametres.kvfactory + self.__buffersize = parametres.buffersize + self.__path = path = parametres.path name = self.__path.name self.__path_backup = path.with_name(name + '.backup') self.__path_recover = path.with_name(name + '.recover') @@ -212,36 +245,14 @@ class DbConnection: def _save_error_from_thread(self, line: str, /) -> None: self._queue_error(line).result() - def io2db(self, io: IO[str], db: dict, /) -> int: - """there are no guarantees about .error file if an error occurs here. -note: unstable signature.""" - size = 0 - for line in io: - try: - self.__factory.kvfactory.run(line, db) - size += len(line) - except (json.JSONDecodeError, EOFError): - traceback.print_exc() - # this condition should never occur, but we should be able to handle this UB as best as we can - self._queue_error(line) - return size - - def db2io(self, db: dict, io: IO[str], /) -> int: - """does not handle any errors. -note: unstable signature.""" - size = 0 - for key, value in db.items(): - size += io.write(self.__factory.kvfactory.line(key, value)) - return size - def _path2db_sync(self, path: pathlib.Path, db: dict, /) -> int: path.touch() with path.open('r') as file: - return self.io2db(file, db) + return self.__kvfactory.io2db(file, db) def _db2path_sync(self, db: dict, path: pathlib.Path, /) -> int: with path.open('w') as file: - return self.db2io(db, file) + return self.__kvfactory.db2io(db, file) def get(self, key: Any, default: Any, /): """dict-like get with mandatory default parametre.""" @@ -250,7 +261,7 @@ note: unstable signature.""" async def set(self, key: Any, value: Any, /) -> None: """set the value and wait until it's written to disk.""" future = self._create_future() - request = self.__factory.kvfactory.request(key, value, future=future) + request = self.__kvfactory.request(key, value, future=future) self.__mmdb[key] = value self.__queue.put_nowait(request) await future @@ -258,7 +269,7 @@ note: unstable signature.""" def set_nowait(self, key: Any, value: Any, /) -> None: """set value and add write-to-disk request to queue.""" self.__mmdb[key] = value - self.__queue.put_nowait(self.__factory.kvfactory.free(key, value)) + self.__queue.put_nowait(self.__kvfactory.free(key, value)) def _clear_buffer(self, /) -> None: self.__buffer = StringIO() @@ -268,9 +279,9 @@ note: unstable signature.""" def _compress_buffer(self, /) -> StringIO: self.__buffer.seek(0) bufferdb = {} - self.io2db(self.__buffer, bufferdb) + self.__kvfactory.io2db(self.__buffer, bufferdb) buffer = StringIO() - self.db2io(bufferdb, buffer) + self.__kvfactory.db2io(bufferdb, buffer) return buffer def _dump_compressed_buffer_sync(self, /) -> None: @@ -306,7 +317,7 @@ note: unstable signature.""" self.__queue.put_nowait(DumpRequest(None)) async def _dump_buffer_or_request_so(self, request: Request, /) -> None: - if self.__buffer.tell() >= self.__factory.buffersize: + if self.__buffer.tell() >= self.__buffersize: await self._dump_buffer() request.set_result(None) else: @@ -481,10 +492,10 @@ intended for heavy tasks.""" await self._initialize_running() @classmethod - async def create(cls, factory: 'DbFactory', /) -> 'DbConnection': + async def create(cls, parametres: 'DbParametres', /) -> 'DbConnection': """connect to the factory. note: unstable signature.""" - dbconnection = DbConnection(factory) + dbconnection = DbConnection(parametres) await dbconnection._initialize() return dbconnection @@ -510,22 +521,42 @@ note: unstable signature.""" del self.__loop self.__not_running = True - async def complete_transaction(self, delta: dict, /) -> None: - """hybrid of set() and dict.update().""" + def _transaction_buffer(self, delta: dict, /) -> StringIO: + buffer = StringIO() + self.__kvfactory.db2io(delta, buffer) + self.__mmdb.update(delta) + return buffer + + async def commit_transaction(self, delta: dict, /) -> None: + """hybrid of set() and dict.update(). +note: unstable signature.""" if not delta: return - buffer = StringIO() - self.db2io(delta, buffer) - self.__mmdb.update(delta) + buffer = self._transaction_buffer(delta) future = self._create_future() self.__queue.put_nowait(TransactionRequest(buffer, future=future)) await future - def submit_transaction(self, delta: dict, future: asyncio.Future | None, /) -> None: - """not implemented. -low-level API. -note: unstable (undefined) signature.""" - raise NotImplementedError + def submit_transaction(self, delta: dict, /) -> None: + """hybrid of set_nowait() and dict.update(). +_nowait analogue of commit_transaction(). +note: this method was added only for async-sync symmetry with commit_transaction(). +note: unstable signature.""" + if not delta: + return + buffer = self._transaction_buffer(delta) + self.__queue.put_nowait(TransactionRequest(buffer, future=None)) + + def submit_transaction_request(self, delta: dict, future: asyncio.Future | None, /) -> None: + """low-level API. +for high-level synchronisation use transaction() instead. +note: unstable signature.""" + if not delta: + if future: + future.set_result(None) + return + buffer = self._transaction_buffer(delta) + self.__queue.put_nowait(TransactionRequest(buffer, future=future)) async def commit(self, /) -> None: """wait until all requests queued before are completed.""" @@ -535,27 +566,20 @@ note: unstable (undefined) signature.""" def transaction(self, /) -> 'Transaction': """open new transaction.""" - return Transaction(self) + return Transaction(self, self.__loop) class DbFactory: __slots__ = ( - 'path', - 'kvfactory', - 'buffersize', + '__parametres', '__db', ) def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None: - self.path = path - """note: unstable signature.""" - self.kvfactory = kvfactory - """note: unstable signature.""" - self.buffersize = buffersize - """note: unstable signature.""" + self.__parametres = DbParametres(path, kvfactory=kvfactory, buffersize=buffersize) async def __aenter__(self) -> DbConnection: - self.__db = await DbConnection.create(self) + self.__db = await DbConnection.create(self.__parametres) return self.__db async def __aexit__(self, exc_type, exc_val, exc_tb): @@ -570,7 +594,7 @@ class Db(DbConnection): def __init__(self, path: str | pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576): DbConnection.__init__( self, - DbFactory( + DbParametres( pathlib.Path(path), kvfactory=kvfactory, buffersize=buffersize ) ) @@ -590,12 +614,16 @@ class TransactionView: '__delta', '__shadow', '__connection', + '__loop', + '__subfuture', ) - def __init__(self, delta: dict, connection: DbConnection, /) -> None: + def __init__(self, delta: dict, connection: DbConnection, loop: asyncio.AbstractEventLoop, /) -> None: self.__delta = delta self.__shadow = {} self.__connection = connection + self.__loop = loop + self.__subfuture: asyncio.Future | None = None def get(self, key: Any, default: Any, /): if key in self.__delta: @@ -605,56 +633,100 @@ class TransactionView: return self.__connection.get(key, default) def set_nowait(self, key: Any, value: Any, /) -> None: + """note: unlike the corresponding db method, this one does not catch serialisation errors early.""" self.__delta[key] = value - async def commit(self, /) -> None: - """bulk analog of DbConnection.set method.""" + def _delta(self, /) -> dict: delta = self.__delta.copy() self.__shadow |= delta self.__delta.clear() - await self.__connection.complete_transaction(delta) + return delta - async def commit_submitted(self, /) -> None: - """not implemented. -commit previously submitted changes.""" - raise NotImplementedError + async def commit(self, /) -> None: + """bulk analogue of DbConnection.set().""" + subfuture: asyncio.Future | None = self.__subfuture + self.__subfuture = None + delta = self._delta() + await self.__connection.commit_transaction(delta) + if subfuture is not None: + await subfuture def submit(self, /) -> None: - """not implemented. -submit changes. -_nowait analog of commit. -bulk analog of DbConnection.set_nowait method.""" - raise NotImplementedError + """submit changes. +_nowait analogue of commit(). +bulk analogue of DbConnection.set_nowait().""" + delta = self._delta() + future = self.__loop.create_future() + self.__connection.submit_transaction_request(delta, future) + self.__subfuture = self._gather(self.__subfuture, future) + + def _do_gather(self, left: asyncio.Future, right: asyncio.Future) -> asyncio.Future: + future = self.__loop.create_future() + + def rcallback(fr: asyncio.Future) -> None: + if (e := fr.exception()) is not None: + future.set_exception(e) + else: + future.set_result(None) + + def lcallback(fl: asyncio.Future) -> None: + if (e := fl.exception()) is not None: + future.set_exception(e) + else: + right.add_done_callback(rcallback) + + left.add_done_callback(lcallback) + + return future + + def _gather(self, left: asyncio.Future | None, right: asyncio.Future | None) -> asyncio.Future | None: + match (left, right): + case None, _: + return right + case _, None: + return left + case asyncio.Future() as fl, asyncio.Future() as fr: + return self._do_gather(fl, fr) + case _: + raise TypeError class Transaction: - """note: unstable signature. -note: synchronous with is not implemented.""" + """note: unstable signature.""" __slots__ = ( '__connection', - '__delta', + '__loop', + '__view', + '__running', ) - __delta: dict + __view: TransactionView - def __init__(self, connection: DbConnection, /) -> None: + def __init__(self, connection: DbConnection, loop: asyncio.AbstractEventLoop, /) -> None: self.__connection = connection + self.__loop = loop + self.__running = False async def __aenter__(self) -> TransactionView: - self.__delta = {} - return TransactionView(self.__delta, self.__connection) + return self.__enter__() async def __aexit__(self, exc_type, exc_val, exc_tb): if exc_type is None: - await self.__connection.complete_transaction(self.__delta) - del self.__delta + await self.__view.commit() + self._clean() + + def _clean(self, /) -> None: + del self.__view + self.__running = False def __enter__(self) -> TransactionView: - self.__delta = {} - return TransactionView(self.__delta, self.__connection) + assert not self.__running + self.__running = True + self.__view = TransactionView({}, self.__connection, self.__loop) + return self.__view def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is None: - self.__connection.submit_transaction(self.__delta, None) - del self.__delta + self.__view.submit() + self._clean() diff --git a/setup.py b/setup.py index befcfa9..eab54e4 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup setup( name='ptvp35', - version='1.0rc3', + version='1.0rc4', packages=['ptvp35'], url='https://gitea.ongoteam.net/PTV/ptvp35', license='',