diff --git a/docs/source/conf.py b/docs/source/conf.py index 447077b..d4771f1 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -9,7 +9,7 @@ project = 'ptvp35' copyright = '2022, PARRRATE TNV' author = 'PARRRATE TNV' -release = '1.0rc2' +release = '1.0rc3' # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration diff --git a/docs/source/guarantees.rst b/docs/source/guarantees.rst new file mode 100644 index 0000000..4b3a7a9 --- /dev/null +++ b/docs/source/guarantees.rst @@ -0,0 +1,36 @@ +Guarantees +========== + +MMDB-level guarantees +--------------- + +* All 0L writes change MMDB instantly. +* Transaction write works as one atomic write. + +Queue-level guarantees +---------------------- + +This level is the mediator between MMDB and filesystem levels. This level includes the pre-write compression buffer. + +* Queue requests occur in the same orders as they were acted upon at MMDB level and in the same order as they will be acted upon at filesystem level. +* DB can't close until all requests are done. +* No request can crash the task. +* Every request eventually succeedes or fails (except for the case of runtime termination). + +Buffer-specific guarantees: + +* If buffer dump was ever requested, then this request (queued or indirect) will eventually be satisfied. + +Filesystem-level guarantees +--------------------------- + +* If main file exists without .recover or .truncate_flag, then it's valid. +* If .recover file is present, then .backup is valid. +* If .truncate_flag is present, then .truncate is valid and first :code:`.truncate_target()` (contents of .truncate) characters of main file are valid. +* Every write is final and irreversible (can't be reversed unintentionally due to termination), otherwise it's not considered done. That's achieved using :code:`os.fsync`. + +Performance guarantees +---------------------- + +* No normal sync methods (except for :code:`io2db`/:code:`db2io` if they're supplied with blocking :code:`IO`) block on IO. Other methods are explicitly marked with postfix :code:`_sync`. +* All requests are resolved as soon as their conditions are met. diff --git a/docs/source/index.rst b/docs/source/index.rst index acfa18c..d21ed3b 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -1,8 +1,3 @@ -.. ptvp35 documentation master file, created by - sphinx-quickstart on Sat Nov 19 20:02:24 2022. - You can adapt this file completely to your liking, but it should at least - contain the root `toctree` directive. - Welcome to ptvp35's documentation! ================================== @@ -12,6 +7,7 @@ Welcome to ptvp35's documentation! motivation usage modules + guarantees @@ -20,6 +16,7 @@ Indices and tables * :doc:`motivation` * :doc:`usage` +* :doc:`guarantees` * :ref:`genindex` * :ref:`modindex` * :ref:`search` diff --git a/docs/source/motivation.rst b/docs/source/motivation.rst index 04245e0..f3f5ab0 100644 --- a/docs/source/motivation.rst +++ b/docs/source/motivation.rst @@ -1,6 +1,8 @@ Motivation ========== +This page describes reasons for certain design decisions. + General structure ----------------- diff --git a/ptvp35/__init__.py b/ptvp35/__init__.py index e770896..cdae838 100644 --- a/ptvp35/__init__.py +++ b/ptvp35/__init__.py @@ -16,7 +16,7 @@ __all__ = ( 'DbFactory', 'Db', 'Transaction', - 'FallbackMapping', + 'TransactionView', ) @@ -44,19 +44,38 @@ class Request: await self.__future +class LineRequest(Request): + __slots__ = ( + 'line', + ) + + def __init__(self, line: str, /, *, future: Optional[asyncio.Future]) -> None: + super().__init__(future) + self.line = line + + class KVFactory: """note: unstable signature.""" __slots__ = () def line(self, key: Any, value: Any, /) -> str: - """line must contain exactly one '\\n' at exactly the end if the line is not empty.""" + """line must contain exactly one '\\n' at exactly the end if the line is not empty. +note: other forms of requests will later be represented by different methods or by instances of Action class.""" raise NotImplementedError - def fromline(self, line: str, /) -> 'KVRequest': - """inverse of line(). should use free() method to construct the request.""" + def fromline(self, line: str, /) -> tuple[Any, Any]: + """inverse of line(). should use free() method to construct the request. +note: unstable signature.""" raise NotImplementedError + def run(self, line: str, db: dict, /) -> None: + """run request against the db. +extensible to allow forms of requests other than set. +note: unstable signature.""" + key, value = self.fromline(line) + db[key] = value + def request(self, key: Any, value: Any, /, *, future: Optional[asyncio.Future]) -> 'KVRequest': """form request with Future. low-level API. @@ -69,7 +88,7 @@ note: unstable signature.""" return self.request(key, value, future=None) -class KVRequest(Request): +class KVRequest(LineRequest): __slots__ = ( '__factory', 'key', @@ -77,24 +96,18 @@ class KVRequest(Request): ) def __init__(self, key: Any, value: Any, /, *, future: Optional[asyncio.Future], factory: KVFactory): - super().__init__(future) + super().__init__(factory.line(key, value), future=future) self.__factory = factory self.key = key self.value = value - def free(self, /): - return self.__factory.free(self.key, self.value) - - def line(self, /) -> str: - return self.__factory.line(self.key, self.value) - class DumpRequest(Request): __slots__ = () class UnknownRequestType(TypeError): - pass + __slots__ = () class KVJson(KVFactory): @@ -118,23 +131,20 @@ class KVJson(KVFactory): 'unknown KVJson key type, cannot convert to hashable' ) - def fromline(self, line: str, /) -> 'KVRequest': + def fromline(self, line: str, /) -> tuple[Any, Any]: d = json.loads(line) - return self.free(self._load_key(d['key']), d['value']) + return self._load_key(d['key']), d['value'] -class TransactionRequest(Request): +class TransactionRequest(LineRequest): __slots__ = ( 'buffer', ) def __init__(self, buffer: StringIO, /, *, future: Optional[asyncio.Future]): - super().__init__(future) + super().__init__(buffer.getvalue(), future=future) self.buffer = buffer - def line(self, /) -> str: - return self.buffer.getvalue() - class DbConnection: """note: unstable constructor signature.""" @@ -208,8 +218,7 @@ note: unstable signature.""" size = 0 for line in io: try: - request = self.__factory.kvfactory.fromline(line) - db[request.key] = request.value + self.__factory.kvfactory.run(line, db) size += len(line) except (json.JSONDecodeError, EOFError): traceback.print_exc() @@ -222,7 +231,7 @@ note: unstable signature.""" note: unstable signature.""" size = 0 for key, value in db.items(): - size += io.write(self.__factory.kvfactory.free(key, value).line()) + size += io.write(self.__factory.kvfactory.line(key, value)) return size def _path2db_sync(self, path: pathlib.Path, db: dict, /) -> int: @@ -240,10 +249,10 @@ note: unstable signature.""" async def set(self, key: Any, value: Any, /) -> None: """set the value and wait until it's written to disk.""" - self.__mmdb[key] = value future = self._create_future() - self.__queue.put_nowait( - self.__factory.kvfactory.request(key, value, future=future)) + request = self.__factory.kvfactory.request(key, value, future=future) + self.__mmdb[key] = value + self.__queue.put_nowait(request) await future def set_nowait(self, key: Any, value: Any, /) -> None: @@ -352,13 +361,11 @@ note: unstable signature.""" await self._reload() async def _handle_request(self, request: Request, /) -> None: - if isinstance(request, KVRequest): - await self._write(request.line(), request) + if isinstance(request, LineRequest): + await self._write(request.line, request) elif isinstance(request, DumpRequest): await self._dump_buffer() request.set_result(None) - elif isinstance(request, TransactionRequest): - await self._write(request.line(), request) else: raise UnknownRequestType @@ -513,10 +520,11 @@ note: unstable signature.""" future = self._create_future() self.__queue.put_nowait(TransactionRequest(buffer, future=future)) await future - - def submit_transaction(self, delta: dict, /) -> asyncio.Future | None: + + def submit_transaction(self, delta: dict, future: asyncio.Future | None, /) -> None: """not implemented. -low-level API.""" +low-level API. +note: unstable signature.""" raise NotImplementedError async def commit(self, /) -> None: @@ -575,7 +583,7 @@ class Db(DbConnection): await self.aclose() -class FallbackMapping: +class TransactionView: """note: unstable constructor signature.""" __slots__ = ( @@ -620,7 +628,8 @@ bulk analog of DbConnection.set_nowait method.""" class Transaction: - """note: unstable signature.""" + """note: unstable signature. +note: synchronous with is not implemented.""" __slots__ = ( '__connection', @@ -632,11 +641,20 @@ class Transaction: def __init__(self, connection: DbConnection, /) -> None: self.__connection = connection - async def __aenter__(self) -> FallbackMapping: + async def __aenter__(self) -> TransactionView: self.__delta = {} - return FallbackMapping(self.__delta, self.__connection) + return TransactionView(self.__delta, self.__connection) async def __aexit__(self, exc_type, exc_val, exc_tb): if exc_type is None: await self.__connection.complete_transaction(self.__delta) del self.__delta + + def __enter__(self) -> TransactionView: + self.__delta = {} + return TransactionView(self.__delta, self.__connection) + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is None: + self.__connection.submit_transaction(self.__delta, None) + del self.__delta diff --git a/setup.py b/setup.py index 01a1372..befcfa9 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup setup( name='ptvp35', - version='1.0rc1', + version='1.0rc3', packages=['ptvp35'], url='https://gitea.ongoteam.net/PTV/ptvp35', license='',