1.0rc4: submit implementation + dbparametres

+ move io2db/db2io to kvfactory
This commit is contained in:
AF 2022-11-21 12:22:42 +00:00
parent 09bb45e867
commit 20b7c31f0a
4 changed files with 159 additions and 87 deletions

View File

@ -9,7 +9,7 @@
project = 'ptvp35' project = 'ptvp35'
copyright = '2022, PARRRATE TNV' copyright = '2022, PARRRATE TNV'
author = 'PARRRATE TNV' author = 'PARRRATE TNV'
release = '1.0rc3' release = '1.0rc4'
# -- General configuration --------------------------------------------------- # -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration

View File

@ -8,7 +8,7 @@ Default installation option is to use pip+git
.. code-block:: console .. code-block:: console
(.venv) $ pip install git+https://gitea.parrrate.ru/PTV/ptvp35.git (venv) $ pip install git+https://gitea.parrrate.ru/PTV/ptvp35.git
Basic functionality Basic functionality
------------------- -------------------

View File

@ -65,7 +65,7 @@ note: other forms of requests will later be represented by different methods or
raise NotImplementedError raise NotImplementedError
def fromline(self, line: str, /) -> tuple[Any, Any]: def fromline(self, line: str, /) -> tuple[Any, Any]:
"""inverse of line(). should use free() method to construct the request. """inverse of line().
note: unstable signature.""" note: unstable signature."""
raise NotImplementedError raise NotImplementedError
@ -87,6 +87,21 @@ note: unstable signature."""
note: unstable signature.""" note: unstable signature."""
return self.request(key, value, future=None) return self.request(key, value, future=None)
def io2db(self, io: IO[str], db: dict, /) -> int:
"""note: unstable signature."""
size = 0
for line in io:
self.run(line, db)
size += len(line)
return size
def db2io(self, db: dict, io: IO[str], /) -> int:
"""note: unstable signature."""
size = 0
for key, value in db.items():
size += io.write(self.line(key, value))
return size
class KVRequest(LineRequest): class KVRequest(LineRequest):
__slots__ = ( __slots__ = (
@ -146,11 +161,28 @@ class TransactionRequest(LineRequest):
self.buffer = buffer self.buffer = buffer
class DbParametres:
__slots__ = (
'path',
'kvfactory',
'buffersize',
)
def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize: int) -> None:
self.path = path
"""note: unstable signature."""
self.kvfactory = kvfactory
"""note: unstable signature."""
self.buffersize = buffersize
"""note: unstable signature."""
class DbConnection: class DbConnection:
"""note: unstable constructor signature.""" """note: unstable constructor signature."""
__slots__ = ( __slots__ = (
'__factory', '__kvfactory',
'__buffersize',
'__path', '__path',
'__path_backup', '__path_backup',
'__path_recover', '__path_recover',
@ -181,11 +213,12 @@ class DbConnection:
def __init__( def __init__(
self, self,
factory: 'DbFactory', parametres: 'DbParametres',
/ /
) -> None: ) -> None:
self.__factory = factory self.__kvfactory = parametres.kvfactory
self.__path = path = self.__factory.path self.__buffersize = parametres.buffersize
self.__path = path = parametres.path
name = self.__path.name name = self.__path.name
self.__path_backup = path.with_name(name + '.backup') self.__path_backup = path.with_name(name + '.backup')
self.__path_recover = path.with_name(name + '.recover') self.__path_recover = path.with_name(name + '.recover')
@ -212,36 +245,14 @@ class DbConnection:
def _save_error_from_thread(self, line: str, /) -> None: def _save_error_from_thread(self, line: str, /) -> None:
self._queue_error(line).result() self._queue_error(line).result()
def io2db(self, io: IO[str], db: dict, /) -> int:
"""there are no guarantees about .error file if an error occurs here.
note: unstable signature."""
size = 0
for line in io:
try:
self.__factory.kvfactory.run(line, db)
size += len(line)
except (json.JSONDecodeError, EOFError):
traceback.print_exc()
# this condition should never occur, but we should be able to handle this UB as best as we can
self._queue_error(line)
return size
def db2io(self, db: dict, io: IO[str], /) -> int:
"""does not handle any errors.
note: unstable signature."""
size = 0
for key, value in db.items():
size += io.write(self.__factory.kvfactory.line(key, value))
return size
def _path2db_sync(self, path: pathlib.Path, db: dict, /) -> int: def _path2db_sync(self, path: pathlib.Path, db: dict, /) -> int:
path.touch() path.touch()
with path.open('r') as file: with path.open('r') as file:
return self.io2db(file, db) return self.__kvfactory.io2db(file, db)
def _db2path_sync(self, db: dict, path: pathlib.Path, /) -> int: def _db2path_sync(self, db: dict, path: pathlib.Path, /) -> int:
with path.open('w') as file: with path.open('w') as file:
return self.db2io(db, file) return self.__kvfactory.db2io(db, file)
def get(self, key: Any, default: Any, /): def get(self, key: Any, default: Any, /):
"""dict-like get with mandatory default parametre.""" """dict-like get with mandatory default parametre."""
@ -250,7 +261,7 @@ note: unstable signature."""
async def set(self, key: Any, value: Any, /) -> None: async def set(self, key: Any, value: Any, /) -> None:
"""set the value and wait until it's written to disk.""" """set the value and wait until it's written to disk."""
future = self._create_future() future = self._create_future()
request = self.__factory.kvfactory.request(key, value, future=future) request = self.__kvfactory.request(key, value, future=future)
self.__mmdb[key] = value self.__mmdb[key] = value
self.__queue.put_nowait(request) self.__queue.put_nowait(request)
await future await future
@ -258,7 +269,7 @@ note: unstable signature."""
def set_nowait(self, key: Any, value: Any, /) -> None: def set_nowait(self, key: Any, value: Any, /) -> None:
"""set value and add write-to-disk request to queue.""" """set value and add write-to-disk request to queue."""
self.__mmdb[key] = value self.__mmdb[key] = value
self.__queue.put_nowait(self.__factory.kvfactory.free(key, value)) self.__queue.put_nowait(self.__kvfactory.free(key, value))
def _clear_buffer(self, /) -> None: def _clear_buffer(self, /) -> None:
self.__buffer = StringIO() self.__buffer = StringIO()
@ -268,9 +279,9 @@ note: unstable signature."""
def _compress_buffer(self, /) -> StringIO: def _compress_buffer(self, /) -> StringIO:
self.__buffer.seek(0) self.__buffer.seek(0)
bufferdb = {} bufferdb = {}
self.io2db(self.__buffer, bufferdb) self.__kvfactory.io2db(self.__buffer, bufferdb)
buffer = StringIO() buffer = StringIO()
self.db2io(bufferdb, buffer) self.__kvfactory.db2io(bufferdb, buffer)
return buffer return buffer
def _dump_compressed_buffer_sync(self, /) -> None: def _dump_compressed_buffer_sync(self, /) -> None:
@ -306,7 +317,7 @@ note: unstable signature."""
self.__queue.put_nowait(DumpRequest(None)) self.__queue.put_nowait(DumpRequest(None))
async def _dump_buffer_or_request_so(self, request: Request, /) -> None: async def _dump_buffer_or_request_so(self, request: Request, /) -> None:
if self.__buffer.tell() >= self.__factory.buffersize: if self.__buffer.tell() >= self.__buffersize:
await self._dump_buffer() await self._dump_buffer()
request.set_result(None) request.set_result(None)
else: else:
@ -481,10 +492,10 @@ intended for heavy tasks."""
await self._initialize_running() await self._initialize_running()
@classmethod @classmethod
async def create(cls, factory: 'DbFactory', /) -> 'DbConnection': async def create(cls, parametres: 'DbParametres', /) -> 'DbConnection':
"""connect to the factory. """connect to the factory.
note: unstable signature.""" note: unstable signature."""
dbconnection = DbConnection(factory) dbconnection = DbConnection(parametres)
await dbconnection._initialize() await dbconnection._initialize()
return dbconnection return dbconnection
@ -510,22 +521,42 @@ note: unstable signature."""
del self.__loop del self.__loop
self.__not_running = True self.__not_running = True
async def complete_transaction(self, delta: dict, /) -> None: def _transaction_buffer(self, delta: dict, /) -> StringIO:
"""hybrid of set() and dict.update().""" buffer = StringIO()
self.__kvfactory.db2io(delta, buffer)
self.__mmdb.update(delta)
return buffer
async def commit_transaction(self, delta: dict, /) -> None:
"""hybrid of set() and dict.update().
note: unstable signature."""
if not delta: if not delta:
return return
buffer = StringIO() buffer = self._transaction_buffer(delta)
self.db2io(delta, buffer)
self.__mmdb.update(delta)
future = self._create_future() future = self._create_future()
self.__queue.put_nowait(TransactionRequest(buffer, future=future)) self.__queue.put_nowait(TransactionRequest(buffer, future=future))
await future await future
def submit_transaction(self, delta: dict, future: asyncio.Future | None, /) -> None: def submit_transaction(self, delta: dict, /) -> None:
"""not implemented. """hybrid of set_nowait() and dict.update().
low-level API. _nowait analogue of commit_transaction().
note: unstable (undefined) signature.""" note: this method was added only for async-sync symmetry with commit_transaction().
raise NotImplementedError note: unstable signature."""
if not delta:
return
buffer = self._transaction_buffer(delta)
self.__queue.put_nowait(TransactionRequest(buffer, future=None))
def submit_transaction_request(self, delta: dict, future: asyncio.Future | None, /) -> None:
"""low-level API.
for high-level synchronisation use transaction() instead.
note: unstable signature."""
if not delta:
if future:
future.set_result(None)
return
buffer = self._transaction_buffer(delta)
self.__queue.put_nowait(TransactionRequest(buffer, future=future))
async def commit(self, /) -> None: async def commit(self, /) -> None:
"""wait until all requests queued before are completed.""" """wait until all requests queued before are completed."""
@ -535,27 +566,20 @@ note: unstable (undefined) signature."""
def transaction(self, /) -> 'Transaction': def transaction(self, /) -> 'Transaction':
"""open new transaction.""" """open new transaction."""
return Transaction(self) return Transaction(self, self.__loop)
class DbFactory: class DbFactory:
__slots__ = ( __slots__ = (
'path', '__parametres',
'kvfactory',
'buffersize',
'__db', '__db',
) )
def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None: def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None:
self.path = path self.__parametres = DbParametres(path, kvfactory=kvfactory, buffersize=buffersize)
"""note: unstable signature."""
self.kvfactory = kvfactory
"""note: unstable signature."""
self.buffersize = buffersize
"""note: unstable signature."""
async def __aenter__(self) -> DbConnection: async def __aenter__(self) -> DbConnection:
self.__db = await DbConnection.create(self) self.__db = await DbConnection.create(self.__parametres)
return self.__db return self.__db
async def __aexit__(self, exc_type, exc_val, exc_tb): async def __aexit__(self, exc_type, exc_val, exc_tb):
@ -570,7 +594,7 @@ class Db(DbConnection):
def __init__(self, path: str | pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576): def __init__(self, path: str | pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576):
DbConnection.__init__( DbConnection.__init__(
self, self,
DbFactory( DbParametres(
pathlib.Path(path), kvfactory=kvfactory, buffersize=buffersize pathlib.Path(path), kvfactory=kvfactory, buffersize=buffersize
) )
) )
@ -590,12 +614,16 @@ class TransactionView:
'__delta', '__delta',
'__shadow', '__shadow',
'__connection', '__connection',
'__loop',
'__subfuture',
) )
def __init__(self, delta: dict, connection: DbConnection, /) -> None: def __init__(self, delta: dict, connection: DbConnection, loop: asyncio.AbstractEventLoop, /) -> None:
self.__delta = delta self.__delta = delta
self.__shadow = {} self.__shadow = {}
self.__connection = connection self.__connection = connection
self.__loop = loop
self.__subfuture: asyncio.Future | None = None
def get(self, key: Any, default: Any, /): def get(self, key: Any, default: Any, /):
if key in self.__delta: if key in self.__delta:
@ -605,56 +633,100 @@ class TransactionView:
return self.__connection.get(key, default) return self.__connection.get(key, default)
def set_nowait(self, key: Any, value: Any, /) -> None: def set_nowait(self, key: Any, value: Any, /) -> None:
"""note: unlike the corresponding db method, this one does not catch serialisation errors early."""
self.__delta[key] = value self.__delta[key] = value
async def commit(self, /) -> None: def _delta(self, /) -> dict:
"""bulk analog of DbConnection.set method."""
delta = self.__delta.copy() delta = self.__delta.copy()
self.__shadow |= delta self.__shadow |= delta
self.__delta.clear() self.__delta.clear()
await self.__connection.complete_transaction(delta) return delta
async def commit_submitted(self, /) -> None: async def commit(self, /) -> None:
"""not implemented. """bulk analogue of DbConnection.set()."""
commit previously submitted changes.""" subfuture: asyncio.Future | None = self.__subfuture
raise NotImplementedError self.__subfuture = None
delta = self._delta()
await self.__connection.commit_transaction(delta)
if subfuture is not None:
await subfuture
def submit(self, /) -> None: def submit(self, /) -> None:
"""not implemented. """submit changes.
submit changes. _nowait analogue of commit().
_nowait analog of commit. bulk analogue of DbConnection.set_nowait()."""
bulk analog of DbConnection.set_nowait method.""" delta = self._delta()
raise NotImplementedError future = self.__loop.create_future()
self.__connection.submit_transaction_request(delta, future)
self.__subfuture = self._gather(self.__subfuture, future)
def _do_gather(self, left: asyncio.Future, right: asyncio.Future) -> asyncio.Future:
future = self.__loop.create_future()
def rcallback(fr: asyncio.Future) -> None:
if (e := fr.exception()) is not None:
future.set_exception(e)
else:
future.set_result(None)
def lcallback(fl: asyncio.Future) -> None:
if (e := fl.exception()) is not None:
future.set_exception(e)
else:
right.add_done_callback(rcallback)
left.add_done_callback(lcallback)
return future
def _gather(self, left: asyncio.Future | None, right: asyncio.Future | None) -> asyncio.Future | None:
match (left, right):
case None, _:
return right
case _, None:
return left
case asyncio.Future() as fl, asyncio.Future() as fr:
return self._do_gather(fl, fr)
case _:
raise TypeError
class Transaction: class Transaction:
"""note: unstable signature. """note: unstable signature."""
note: synchronous with is not implemented."""
__slots__ = ( __slots__ = (
'__connection', '__connection',
'__delta', '__loop',
'__view',
'__running',
) )
__delta: dict __view: TransactionView
def __init__(self, connection: DbConnection, /) -> None: def __init__(self, connection: DbConnection, loop: asyncio.AbstractEventLoop, /) -> None:
self.__connection = connection self.__connection = connection
self.__loop = loop
self.__running = False
async def __aenter__(self) -> TransactionView: async def __aenter__(self) -> TransactionView:
self.__delta = {} return self.__enter__()
return TransactionView(self.__delta, self.__connection)
async def __aexit__(self, exc_type, exc_val, exc_tb): async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is None: if exc_type is None:
await self.__connection.complete_transaction(self.__delta) await self.__view.commit()
del self.__delta self._clean()
def _clean(self, /) -> None:
del self.__view
self.__running = False
def __enter__(self) -> TransactionView: def __enter__(self) -> TransactionView:
self.__delta = {} assert not self.__running
return TransactionView(self.__delta, self.__connection) self.__running = True
self.__view = TransactionView({}, self.__connection, self.__loop)
return self.__view
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None: if exc_type is None:
self.__connection.submit_transaction(self.__delta, None) self.__view.submit()
del self.__delta self._clean()

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup( setup(
name='ptvp35', name='ptvp35',
version='1.0rc3', version='1.0rc4',
packages=['ptvp35'], packages=['ptvp35'],
url='https://gitea.ongoteam.net/PTV/ptvp35', url='https://gitea.ongoteam.net/PTV/ptvp35',
license='', license='',