1.0rc3: LineRequest

This commit is contained in:
AF 2022-11-20 22:23:52 +00:00
parent 04e8ba559e
commit af90b9c9c6
6 changed files with 97 additions and 44 deletions

View File

@ -9,7 +9,7 @@
project = 'ptvp35'
copyright = '2022, PARRRATE TNV'
author = 'PARRRATE TNV'
release = '1.0rc2'
release = '1.0rc3'
# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration

View File

@ -0,0 +1,36 @@
Guarantees
==========
MMDB-level guarantees
---------------
* All 0L writes change MMDB instantly.
* Transaction write works as one atomic write.
Queue-level guarantees
----------------------
This level is the mediator between MMDB and filesystem levels. This level includes the pre-write compression buffer.
* Queue requests occur in the same orders as they were acted upon at MMDB level and in the same order as they will be acted upon at filesystem level.
* DB can't close until all requests are done.
* No request can crash the task.
* Every request eventually succeedes or fails (except for the case of runtime termination).
Buffer-specific guarantees:
* If buffer dump was ever requested, then this request (queued or indirect) will eventually be satisfied.
Filesystem-level guarantees
---------------------------
* If main file exists without .recover or .truncate_flag, then it's valid.
* If .recover file is present, then .backup is valid.
* If .truncate_flag is present, then .truncate is valid and first :code:`.truncate_target()` (contents of .truncate) characters of main file are valid.
* Every write is final and irreversible (can't be reversed unintentionally due to termination), otherwise it's not considered done. That's achieved using :code:`os.fsync`.
Performance guarantees
----------------------
* No normal sync methods (except for :code:`io2db`/:code:`db2io` if they're supplied with blocking :code:`IO`) block on IO. Other methods are explicitly marked with postfix :code:`_sync`.
* All requests are resolved as soon as their conditions are met.

View File

@ -1,8 +1,3 @@
.. ptvp35 documentation master file, created by
sphinx-quickstart on Sat Nov 19 20:02:24 2022.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to ptvp35's documentation!
==================================
@ -12,6 +7,7 @@ Welcome to ptvp35's documentation!
motivation
usage
modules
guarantees
@ -20,6 +16,7 @@ Indices and tables
* :doc:`motivation`
* :doc:`usage`
* :doc:`guarantees`
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

View File

@ -1,6 +1,8 @@
Motivation
==========
This page describes reasons for certain design decisions.
General structure
-----------------

View File

@ -16,7 +16,7 @@ __all__ = (
'DbFactory',
'Db',
'Transaction',
'FallbackMapping',
'TransactionView',
)
@ -44,19 +44,38 @@ class Request:
await self.__future
class LineRequest(Request):
__slots__ = (
'line',
)
def __init__(self, line: str, /, *, future: Optional[asyncio.Future]) -> None:
super().__init__(future)
self.line = line
class KVFactory:
"""note: unstable signature."""
__slots__ = ()
def line(self, key: Any, value: Any, /) -> str:
"""line must contain exactly one '\\n' at exactly the end if the line is not empty."""
"""line must contain exactly one '\\n' at exactly the end if the line is not empty.
note: other forms of requests will later be represented by different methods or by instances of Action class."""
raise NotImplementedError
def fromline(self, line: str, /) -> 'KVRequest':
"""inverse of line(). should use free() method to construct the request."""
def fromline(self, line: str, /) -> tuple[Any, Any]:
"""inverse of line(). should use free() method to construct the request.
note: unstable signature."""
raise NotImplementedError
def run(self, line: str, db: dict, /) -> None:
"""run request against the db.
extensible to allow forms of requests other than set.
note: unstable signature."""
key, value = self.fromline(line)
db[key] = value
def request(self, key: Any, value: Any, /, *, future: Optional[asyncio.Future]) -> 'KVRequest':
"""form request with Future.
low-level API.
@ -69,7 +88,7 @@ note: unstable signature."""
return self.request(key, value, future=None)
class KVRequest(Request):
class KVRequest(LineRequest):
__slots__ = (
'__factory',
'key',
@ -77,24 +96,18 @@ class KVRequest(Request):
)
def __init__(self, key: Any, value: Any, /, *, future: Optional[asyncio.Future], factory: KVFactory):
super().__init__(future)
super().__init__(factory.line(key, value), future=future)
self.__factory = factory
self.key = key
self.value = value
def free(self, /):
return self.__factory.free(self.key, self.value)
def line(self, /) -> str:
return self.__factory.line(self.key, self.value)
class DumpRequest(Request):
__slots__ = ()
class UnknownRequestType(TypeError):
pass
__slots__ = ()
class KVJson(KVFactory):
@ -118,23 +131,20 @@ class KVJson(KVFactory):
'unknown KVJson key type, cannot convert to hashable'
)
def fromline(self, line: str, /) -> 'KVRequest':
def fromline(self, line: str, /) -> tuple[Any, Any]:
d = json.loads(line)
return self.free(self._load_key(d['key']), d['value'])
return self._load_key(d['key']), d['value']
class TransactionRequest(Request):
class TransactionRequest(LineRequest):
__slots__ = (
'buffer',
)
def __init__(self, buffer: StringIO, /, *, future: Optional[asyncio.Future]):
super().__init__(future)
super().__init__(buffer.getvalue(), future=future)
self.buffer = buffer
def line(self, /) -> str:
return self.buffer.getvalue()
class DbConnection:
"""note: unstable constructor signature."""
@ -208,8 +218,7 @@ note: unstable signature."""
size = 0
for line in io:
try:
request = self.__factory.kvfactory.fromline(line)
db[request.key] = request.value
self.__factory.kvfactory.run(line, db)
size += len(line)
except (json.JSONDecodeError, EOFError):
traceback.print_exc()
@ -222,7 +231,7 @@ note: unstable signature."""
note: unstable signature."""
size = 0
for key, value in db.items():
size += io.write(self.__factory.kvfactory.free(key, value).line())
size += io.write(self.__factory.kvfactory.line(key, value))
return size
def _path2db_sync(self, path: pathlib.Path, db: dict, /) -> int:
@ -240,10 +249,10 @@ note: unstable signature."""
async def set(self, key: Any, value: Any, /) -> None:
"""set the value and wait until it's written to disk."""
self.__mmdb[key] = value
future = self._create_future()
self.__queue.put_nowait(
self.__factory.kvfactory.request(key, value, future=future))
request = self.__factory.kvfactory.request(key, value, future=future)
self.__mmdb[key] = value
self.__queue.put_nowait(request)
await future
def set_nowait(self, key: Any, value: Any, /) -> None:
@ -352,13 +361,11 @@ note: unstable signature."""
await self._reload()
async def _handle_request(self, request: Request, /) -> None:
if isinstance(request, KVRequest):
await self._write(request.line(), request)
if isinstance(request, LineRequest):
await self._write(request.line, request)
elif isinstance(request, DumpRequest):
await self._dump_buffer()
request.set_result(None)
elif isinstance(request, TransactionRequest):
await self._write(request.line(), request)
else:
raise UnknownRequestType
@ -514,9 +521,10 @@ note: unstable signature."""
self.__queue.put_nowait(TransactionRequest(buffer, future=future))
await future
def submit_transaction(self, delta: dict, /) -> asyncio.Future | None:
def submit_transaction(self, delta: dict, future: asyncio.Future | None, /) -> None:
"""not implemented.
low-level API."""
low-level API.
note: unstable signature."""
raise NotImplementedError
async def commit(self, /) -> None:
@ -575,7 +583,7 @@ class Db(DbConnection):
await self.aclose()
class FallbackMapping:
class TransactionView:
"""note: unstable constructor signature."""
__slots__ = (
@ -620,7 +628,8 @@ bulk analog of DbConnection.set_nowait method."""
class Transaction:
"""note: unstable signature."""
"""note: unstable signature.
note: synchronous with is not implemented."""
__slots__ = (
'__connection',
@ -632,11 +641,20 @@ class Transaction:
def __init__(self, connection: DbConnection, /) -> None:
self.__connection = connection
async def __aenter__(self) -> FallbackMapping:
async def __aenter__(self) -> TransactionView:
self.__delta = {}
return FallbackMapping(self.__delta, self.__connection)
return TransactionView(self.__delta, self.__connection)
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
await self.__connection.complete_transaction(self.__delta)
del self.__delta
def __enter__(self) -> TransactionView:
self.__delta = {}
return TransactionView(self.__delta, self.__connection)
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.__connection.submit_transaction(self.__delta, None)
del self.__delta

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name='ptvp35',
version='1.0rc1',
version='1.0rc3',
packages=['ptvp35'],
url='https://gitea.ongoteam.net/PTV/ptvp35',
license='',