diff --git a/Dockerfile b/Dockerfile index 3f89f89..76f967c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,10 +6,14 @@ RUN apt-get install -y python3-sphinx node.js RUN apt-get install -y npm RUN npm install -g http-server RUN pip install pydata-sphinx-theme +RUN pip install git+https://gitea.parrrate.ru/PTV/rainbowadn.git@e9fba7b064902ceedee0dd5578cb47030665a6aa COPY docs/Makefile Makefile COPY setup.py setup.py -COPY docs/source source +COPY traced_example.py traced_example.py COPY ptvp35 ptvp35 +RUN python traced_example.py > traced_example.txt +RUN python traced_example.py all > traced_example_all.txt +COPY docs/source source RUN make html WORKDIR /app/build/html/ CMD [ "http-server", "-p", "80" ] diff --git a/docs/source/conf.py b/docs/source/conf.py index e61b737..fc95b40 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -9,7 +9,7 @@ project = 'ptvp35' copyright = '2022, PARRRATE TNV' author = 'PARRRATE TNV' -release = '1.0rc4' +release = '1.0rc5' # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration diff --git a/docs/source/index.rst b/docs/source/index.rst index d21ed3b..7a492c0 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -8,6 +8,7 @@ Welcome to ptvp35's documentation! usage modules guarantees + ordering @@ -17,6 +18,7 @@ Indices and tables * :doc:`motivation` * :doc:`usage` * :doc:`guarantees` +* :doc:`ordering` * :ref:`genindex` * :ref:`modindex` * :ref:`search` diff --git a/docs/source/ordering.rst b/docs/source/ordering.rst new file mode 100644 index 0000000..775f703 --- /dev/null +++ b/docs/source/ordering.rst @@ -0,0 +1,20 @@ +Traced example of how ordering works in persistence5 +==================================================== + +Source +------ + +.. literalinclude :: ../traced_example.py + :language: python3 + +Writes/reads log +---------------- + +.. literalinclude :: ../traced_example.txt + :language: plain + +Everything log +-------------- + +.. literalinclude :: ../traced_example_all.txt + :language: plain diff --git a/ptvp35/__init__.py b/ptvp35/__init__.py index 2fd9494..342220a 100644 --- a/ptvp35/__init__.py +++ b/ptvp35/__init__.py @@ -4,7 +4,6 @@ import json import os import pathlib import threading -import traceback from io import StringIO, UnsupportedOperation from typing import Any, IO, Hashable @@ -133,14 +132,14 @@ class KVJson(KVFactory): def line(self, key: Any, value: Any, /) -> str: return json.dumps({'key': key, 'value': value}) + '\n' - @classmethod - def _load_key(cls, key: Any, /) -> Hashable: + def _load_key(self, key: Any, /) -> Hashable: + """note: unstable signature.""" if isinstance(key, Hashable): return key elif isinstance(key, list): - return tuple(map(cls._load_key, key)) + return tuple(map(self._load_key, key)) elif isinstance(key, dict): - return tuple((cls._load_key(k), cls._load_key(v)) for k, v in key.items()) + return tuple((self._load_key(k), self._load_key(v)) for k, v in key.items()) else: raise TypeError( 'unknown KVJson key type, cannot convert to hashable' @@ -294,9 +293,17 @@ class DbConnection: self.__buffer_future.set_result(None) self._clear_buffer() + def _fail_buffer_future(self, exception: BaseException, /) -> None: + self.__buffer_future.set_exception(exception) + self._clear_buffer() + async def _do_dump_buffer(self, /) -> None: - await self._dump_compressed_buffer() - self._satisfy_buffer_future() + try: + await self._dump_compressed_buffer() + except Exception as e: + self._fail_buffer_future(e) + else: + self._satisfy_buffer_future() async def _dump_buffer(self, /) -> None: if self.__buffer.tell(): @@ -307,10 +314,15 @@ class DbConnection: def _request_buffer(self, request: Request, /) -> None: if request.waiting(): + def callback(bf: asyncio.Future) -> None: + if (e := bf.exception()) is not None: + request.set_exception(e) + else: + request.set_result(None) + self.__buffer_future.exception self.__buffer_future.add_done_callback( - lambda bf: request.set_exception(e) if ( - e := bf.exception()) is not None else request.set_result(None) + callback ) if not self.__buffer_requested: self.__buffer_requested = True @@ -434,8 +446,11 @@ intended for heavy tasks.""" self.__loop.call_soon_threadsafe( future.set_result, result ) - - threading.Thread(target=wrap).start() + name = getattr(fn, '__name__', '?') + threading.Thread( + target=wrap, + name=f'persistence5-{self.__path.name}-{name}' + ).start() return future @@ -448,14 +463,14 @@ intended for heavy tasks.""" self._path2db_sync(self.__path, db) self._build_file_sync(db) - def _file_open(self, /) -> None: + def _file_open_sync(self, /) -> None: self.__file = self.__path.open('a') def _reload_sync(self, /) -> None: self.__file.close() del self.__file self._rebuild_file_sync({}) - self._file_open() + self._file_open_sync() async def _reload(self, /) -> None: await self._run_in_thread(self._reload_sync) @@ -471,7 +486,7 @@ intended for heavy tasks.""" def _load_from_file_sync(self, /) -> None: self._assure_truncation_sync() self._initialize_mmdb_sync() - self._file_open() + self._file_open_sync() async def _load_from_file(self, /) -> None: await self._run_in_thread(self._load_from_file_sync) @@ -576,7 +591,9 @@ class DbFactory: ) def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None: - self.__parametres = DbParametres(path, kvfactory=kvfactory, buffersize=buffersize) + self.__parametres = DbParametres( + path, kvfactory=kvfactory, buffersize=buffersize + ) async def __aenter__(self) -> DbConnection: self.__db = await DbConnection.create(self.__parametres) @@ -647,7 +664,8 @@ class TransactionView: subfuture: asyncio.Future | None = self.__subfuture self.__subfuture = None delta = self._delta() - await self.__connection.commit_transaction(delta) + if delta: + await self.__connection.commit_transaction(delta) if subfuture is not None: await subfuture @@ -656,9 +674,10 @@ class TransactionView: _nowait analogue of commit(). bulk analogue of DbConnection.set_nowait().""" delta = self._delta() - future = self.__loop.create_future() - self.__connection.submit_transaction_request(delta, future) - self.__subfuture = self._gather(self.__subfuture, future) + if delta: + future = self.__loop.create_future() + self.__connection.submit_transaction_request(delta, future) + self.__subfuture = self._gather(self.__subfuture, future) def _do_gather(self, left: asyncio.Future, right: asyncio.Future) -> asyncio.Future: future = self.__loop.create_future() @@ -679,12 +698,18 @@ bulk analogue of DbConnection.set_nowait().""" return future + def _reduce_future(self, future: asyncio.Future | None) -> asyncio.Future | None: + if future is None or future.done() and future.exception() is None: + return None + else: + return future + def _gather(self, left: asyncio.Future | None, right: asyncio.Future | None) -> asyncio.Future | None: - match (left, right): - case None, _: - return right - case _, None: - return left + match (self._reduce_future(left), self._reduce_future(right)): + case None, ofr: + return ofr + case ofl, None: + return ofl case asyncio.Future() as fl, asyncio.Future() as fr: return self._do_gather(fl, fr) case _: diff --git a/setup.py b/setup.py index eab54e4..4d3bb36 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup setup( name='ptvp35', - version='1.0rc4', + version='1.0rc5', packages=['ptvp35'], url='https://gitea.ongoteam.net/PTV/ptvp35', license='', diff --git a/traced_example.py b/traced_example.py new file mode 100644 index 0000000..f5fadf9 --- /dev/null +++ b/traced_example.py @@ -0,0 +1,237 @@ +import asyncio +import pathlib +import sys +import threading +from contextlib import ExitStack + +from ptvp35 import DbConnection, DbFactory, KVJson +from rainbowadn.instrument import Instrumentation + + +async def aprint(*args, **kwargs): + print(*args, **kwargs) + + +class LogWrites(Instrumentation): + def __init__(self): + super().__init__(DbConnection, '_write_to_disk_sync') + self.loop = asyncio.get_running_loop() + + def instrument(self, method, db, line, /): + asyncio.run_coroutine_threadsafe( + aprint(f'{self.methodname}[{line}]'), self.loop + ).result() + return method(db, line) + + +class LogEE(Instrumentation): + def __init__(self, target, methodname: str): + super().__init__(target, methodname) + self.loop = asyncio.get_running_loop() + + def _target_id(self) -> str: + name = ( + self.target.__name__ + if + hasattr(self.target, '__name__') + else + self.target.__class__.__name__ + ) + return f'{name}.{self.methodname}' + + def _print(self, thread, *args) -> None: + print( + thread, + self._target_id(), + *args, + sep='\t' + ) + + async def aprint(self, thread, *args) -> None: + self._print(thread, *args) + + def print(self, *args) -> None: + if (ct := threading.current_thread()) is threading.main_thread(): + self._print('main', *args) + else: + asyncio.run_coroutine_threadsafe( + self.aprint('aux', *args), self.loop + ).result() + + def instrument(self, method, *args, **kwargs): + self.print('enter') + try: + result = method(*args, **kwargs) + except: + self.print('error') + raise + else: + self.print('exit') + return result + + +class ALogEE(LogEE): + async def instrument(self, method, *args, **kwargs): + self._print('aio', 'enter') + try: + result = await method(*args, **kwargs) + except: + self._print('aio', 'error') + raise + else: + self._print('aio', 'exit') + return result + + +async def transaction_test(db: DbConnection): + def logdb(*args): + if args: + args = (' ', ' ', '@',) + args + print(db.get('test', '0'), *args, sep='\t') + + def logstate(*args): + if args: + args = ('@',) + args + print(db.get('test', '0'), '|', state.get('test', '0'), *args, sep='\t') + + logdb('empty db') + db.set_nowait('test', '1') + logdb('after set_nowait') + await db.set('test', '2') + logdb('after set') + try: + async with db.transaction() as state: + logstate('empty transaction') + state.set_nowait('test', '3') + logstate('after transaction.set_nowait') + state.submit() + logstate('after transaction.submit') + await state.commit() + logstate('after transaction.commit') + state.set_nowait('test', print) # will throw TypeError later + logstate() + except TypeError: + print('type error') + logdb('after transaction') + async with db.transaction() as state: + logstate() + state.set_nowait('test', '4') + logstate('before implicit transaction.commit') + logdb('after transaction with implicit commit') + with db.transaction() as state: + logstate() + state.set_nowait('test', '5') + logstate('before implicit transaction.submit') + logdb('after transaction with implicit submit') + + +async def main(): + (path := pathlib.Path('dev.db')).unlink(missing_ok=True) + + with ExitStack() as es: + LogWrites().enter(es) + if 'all' in sys.argv: + LogEE(__import__('ptvp35').Request, '__init__').enter(es) + LogEE(__import__('ptvp35').Request, 'waiting').enter(es) + LogEE(__import__('ptvp35').Request, 'set_result').enter(es) + LogEE(__import__('ptvp35').Request, 'set_exception').enter(es) + ALogEE(__import__('ptvp35').Request, 'wait').enter(es) + LogEE(__import__('ptvp35').LineRequest, '__init__').enter(es) + LogEE(__import__('ptvp35').KVFactory, 'run').enter(es) + LogEE(__import__('ptvp35').KVFactory, 'request').enter(es) + LogEE(__import__('ptvp35').KVFactory, 'free').enter(es) + LogEE(__import__('ptvp35').KVFactory, 'io2db').enter(es) + LogEE(__import__('ptvp35').KVFactory, 'db2io').enter(es) + LogEE(__import__('ptvp35').KVRequest, '__init__').enter(es) + LogEE(__import__('ptvp35').KVJson, 'line').enter(es) + LogEE(__import__('ptvp35').KVJson, '_load_key').enter(es) + LogEE(__import__('ptvp35').KVJson, 'fromline').enter(es) + LogEE(__import__('ptvp35').TransactionRequest, '__init__').enter(es) + LogEE(__import__('ptvp35').DbParametres, '__init__').enter(es) + LogEE(__import__('ptvp35').DbConnection, '__init__').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_create_future').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_save_error_sync').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_save_error').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_queue_error').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_save_error_from_thread').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_path2db_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_db2path_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, 'get').enter(es) + ALogEE(__import__('ptvp35').DbConnection, 'set').enter(es) + LogEE(__import__('ptvp35').DbConnection, 'set_nowait').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_clear_buffer').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_compress_buffer').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_dump_compressed_buffer_sync').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_dump_compressed_buffer').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_satisfy_buffer_future').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_fail_buffer_future').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_do_dump_buffer').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_dump_buffer').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_request_buffer').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_dump_buffer_or_request_so').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_write').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_truncation_set_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_truncation_unset_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_file_truncate_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_truncation_target_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_truncate_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_assure_truncation_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_write_to_disk_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_file_write_sync').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_reload_if_oversized').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_handle_request').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_background_cycle').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_background_task').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_start_task').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_recovery_set_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_recovery_unset_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_copy_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_finish_recovery_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_build_file_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_run_in_thread').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_build_file').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_rebuild_file_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_file_open_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_reload_sync').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_reload').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_load_mmdb_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_initialize_mmdb_sync').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_load_from_file_sync').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_load_from_file').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_initialize_queue').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_initialize_running').enter(es) + ALogEE(__import__('ptvp35').DbConnection, '_initialize').enter(es) + ALogEE(__import__('ptvp35').DbConnection, 'aclose').enter(es) + LogEE(__import__('ptvp35').DbConnection, '_transaction_buffer').enter(es) + ALogEE(__import__('ptvp35').DbConnection, 'commit_transaction').enter(es) + LogEE(__import__('ptvp35').DbConnection, 'submit_transaction').enter(es) + LogEE(__import__('ptvp35').DbConnection, 'submit_transaction_request').enter(es) + ALogEE(__import__('ptvp35').DbConnection, 'commit').enter(es) + LogEE(__import__('ptvp35').DbConnection, 'transaction').enter(es) + LogEE(__import__('ptvp35').DbFactory, '__init__').enter(es) + ALogEE(__import__('ptvp35').DbFactory, '__aenter__').enter(es) + ALogEE(__import__('ptvp35').DbFactory, '__aexit__').enter(es) + LogEE(__import__('ptvp35').Db, '__init__').enter(es) + ALogEE(__import__('ptvp35').Db, '__aenter__').enter(es) + ALogEE(__import__('ptvp35').Db, '__aexit__').enter(es) + LogEE(__import__('ptvp35').TransactionView, '__init__').enter(es) + LogEE(__import__('ptvp35').TransactionView, 'get').enter(es) + LogEE(__import__('ptvp35').TransactionView, 'set_nowait').enter(es) + LogEE(__import__('ptvp35').TransactionView, '_delta').enter(es) + ALogEE(__import__('ptvp35').TransactionView, 'commit').enter(es) + LogEE(__import__('ptvp35').TransactionView, 'submit').enter(es) + LogEE(__import__('ptvp35').TransactionView, '_do_gather').enter(es) + LogEE(__import__('ptvp35').TransactionView, '_reduce_future').enter(es) + LogEE(__import__('ptvp35').TransactionView, '_gather').enter(es) + LogEE(__import__('ptvp35').Transaction, '__init__').enter(es) + ALogEE(__import__('ptvp35').Transaction, '__aenter__').enter(es) + ALogEE(__import__('ptvp35').Transaction, '__aexit__').enter(es) + LogEE(__import__('ptvp35').Transaction, '_clean').enter(es) + LogEE(__import__('ptvp35').Transaction, '__enter__').enter(es) + LogEE(__import__('ptvp35').Transaction, '__exit__').enter(es) + async with DbFactory(path, kvfactory=KVJson()) as db: + await transaction_test(db) + + +if __name__ == '__main__': + asyncio.run(main())