1.0rc5: traced_example + minor fixes

This commit is contained in:
AF 2022-11-21 15:39:54 +00:00
parent ef3bf09cdc
commit e1a2cb59b1
7 changed files with 315 additions and 27 deletions

View File

@ -6,10 +6,14 @@ RUN apt-get install -y python3-sphinx node.js
RUN apt-get install -y npm RUN apt-get install -y npm
RUN npm install -g http-server RUN npm install -g http-server
RUN pip install pydata-sphinx-theme RUN pip install pydata-sphinx-theme
RUN pip install git+https://gitea.parrrate.ru/PTV/rainbowadn.git@e9fba7b064902ceedee0dd5578cb47030665a6aa
COPY docs/Makefile Makefile COPY docs/Makefile Makefile
COPY setup.py setup.py COPY setup.py setup.py
COPY docs/source source COPY traced_example.py traced_example.py
COPY ptvp35 ptvp35 COPY ptvp35 ptvp35
RUN python traced_example.py > traced_example.txt
RUN python traced_example.py all > traced_example_all.txt
COPY docs/source source
RUN make html RUN make html
WORKDIR /app/build/html/ WORKDIR /app/build/html/
CMD [ "http-server", "-p", "80" ] CMD [ "http-server", "-p", "80" ]

View File

@ -9,7 +9,7 @@
project = 'ptvp35' project = 'ptvp35'
copyright = '2022, PARRRATE TNV' copyright = '2022, PARRRATE TNV'
author = 'PARRRATE TNV' author = 'PARRRATE TNV'
release = '1.0rc4' release = '1.0rc5'
# -- General configuration --------------------------------------------------- # -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration

View File

@ -8,6 +8,7 @@ Welcome to ptvp35's documentation!
usage usage
modules modules
guarantees guarantees
ordering
@ -17,6 +18,7 @@ Indices and tables
* :doc:`motivation` * :doc:`motivation`
* :doc:`usage` * :doc:`usage`
* :doc:`guarantees` * :doc:`guarantees`
* :doc:`ordering`
* :ref:`genindex` * :ref:`genindex`
* :ref:`modindex` * :ref:`modindex`
* :ref:`search` * :ref:`search`

20
docs/source/ordering.rst Normal file
View File

@ -0,0 +1,20 @@
Traced example of how ordering works in persistence5
====================================================
Source
------
.. literalinclude :: ../traced_example.py
:language: python3
Writes/reads log
----------------
.. literalinclude :: ../traced_example.txt
:language: plain
Everything log
--------------
.. literalinclude :: ../traced_example_all.txt
:language: plain

View File

@ -4,7 +4,6 @@ import json
import os import os
import pathlib import pathlib
import threading import threading
import traceback
from io import StringIO, UnsupportedOperation from io import StringIO, UnsupportedOperation
from typing import Any, IO, Hashable from typing import Any, IO, Hashable
@ -133,14 +132,14 @@ class KVJson(KVFactory):
def line(self, key: Any, value: Any, /) -> str: def line(self, key: Any, value: Any, /) -> str:
return json.dumps({'key': key, 'value': value}) + '\n' return json.dumps({'key': key, 'value': value}) + '\n'
@classmethod def _load_key(self, key: Any, /) -> Hashable:
def _load_key(cls, key: Any, /) -> Hashable: """note: unstable signature."""
if isinstance(key, Hashable): if isinstance(key, Hashable):
return key return key
elif isinstance(key, list): elif isinstance(key, list):
return tuple(map(cls._load_key, key)) return tuple(map(self._load_key, key))
elif isinstance(key, dict): elif isinstance(key, dict):
return tuple((cls._load_key(k), cls._load_key(v)) for k, v in key.items()) return tuple((self._load_key(k), self._load_key(v)) for k, v in key.items())
else: else:
raise TypeError( raise TypeError(
'unknown KVJson key type, cannot convert to hashable' 'unknown KVJson key type, cannot convert to hashable'
@ -294,8 +293,16 @@ class DbConnection:
self.__buffer_future.set_result(None) self.__buffer_future.set_result(None)
self._clear_buffer() self._clear_buffer()
def _fail_buffer_future(self, exception: BaseException, /) -> None:
self.__buffer_future.set_exception(exception)
self._clear_buffer()
async def _do_dump_buffer(self, /) -> None: async def _do_dump_buffer(self, /) -> None:
try:
await self._dump_compressed_buffer() await self._dump_compressed_buffer()
except Exception as e:
self._fail_buffer_future(e)
else:
self._satisfy_buffer_future() self._satisfy_buffer_future()
async def _dump_buffer(self, /) -> None: async def _dump_buffer(self, /) -> None:
@ -307,10 +314,15 @@ class DbConnection:
def _request_buffer(self, request: Request, /) -> None: def _request_buffer(self, request: Request, /) -> None:
if request.waiting(): if request.waiting():
def callback(bf: asyncio.Future) -> None:
if (e := bf.exception()) is not None:
request.set_exception(e)
else:
request.set_result(None)
self.__buffer_future.exception self.__buffer_future.exception
self.__buffer_future.add_done_callback( self.__buffer_future.add_done_callback(
lambda bf: request.set_exception(e) if ( callback
e := bf.exception()) is not None else request.set_result(None)
) )
if not self.__buffer_requested: if not self.__buffer_requested:
self.__buffer_requested = True self.__buffer_requested = True
@ -434,8 +446,11 @@ intended for heavy tasks."""
self.__loop.call_soon_threadsafe( self.__loop.call_soon_threadsafe(
future.set_result, result future.set_result, result
) )
name = getattr(fn, '__name__', '?')
threading.Thread(target=wrap).start() threading.Thread(
target=wrap,
name=f'persistence5-{self.__path.name}-{name}'
).start()
return future return future
@ -448,14 +463,14 @@ intended for heavy tasks."""
self._path2db_sync(self.__path, db) self._path2db_sync(self.__path, db)
self._build_file_sync(db) self._build_file_sync(db)
def _file_open(self, /) -> None: def _file_open_sync(self, /) -> None:
self.__file = self.__path.open('a') self.__file = self.__path.open('a')
def _reload_sync(self, /) -> None: def _reload_sync(self, /) -> None:
self.__file.close() self.__file.close()
del self.__file del self.__file
self._rebuild_file_sync({}) self._rebuild_file_sync({})
self._file_open() self._file_open_sync()
async def _reload(self, /) -> None: async def _reload(self, /) -> None:
await self._run_in_thread(self._reload_sync) await self._run_in_thread(self._reload_sync)
@ -471,7 +486,7 @@ intended for heavy tasks."""
def _load_from_file_sync(self, /) -> None: def _load_from_file_sync(self, /) -> None:
self._assure_truncation_sync() self._assure_truncation_sync()
self._initialize_mmdb_sync() self._initialize_mmdb_sync()
self._file_open() self._file_open_sync()
async def _load_from_file(self, /) -> None: async def _load_from_file(self, /) -> None:
await self._run_in_thread(self._load_from_file_sync) await self._run_in_thread(self._load_from_file_sync)
@ -576,7 +591,9 @@ class DbFactory:
) )
def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None: def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None:
self.__parametres = DbParametres(path, kvfactory=kvfactory, buffersize=buffersize) self.__parametres = DbParametres(
path, kvfactory=kvfactory, buffersize=buffersize
)
async def __aenter__(self) -> DbConnection: async def __aenter__(self) -> DbConnection:
self.__db = await DbConnection.create(self.__parametres) self.__db = await DbConnection.create(self.__parametres)
@ -647,6 +664,7 @@ class TransactionView:
subfuture: asyncio.Future | None = self.__subfuture subfuture: asyncio.Future | None = self.__subfuture
self.__subfuture = None self.__subfuture = None
delta = self._delta() delta = self._delta()
if delta:
await self.__connection.commit_transaction(delta) await self.__connection.commit_transaction(delta)
if subfuture is not None: if subfuture is not None:
await subfuture await subfuture
@ -656,6 +674,7 @@ class TransactionView:
_nowait analogue of commit(). _nowait analogue of commit().
bulk analogue of DbConnection.set_nowait().""" bulk analogue of DbConnection.set_nowait()."""
delta = self._delta() delta = self._delta()
if delta:
future = self.__loop.create_future() future = self.__loop.create_future()
self.__connection.submit_transaction_request(delta, future) self.__connection.submit_transaction_request(delta, future)
self.__subfuture = self._gather(self.__subfuture, future) self.__subfuture = self._gather(self.__subfuture, future)
@ -679,12 +698,18 @@ bulk analogue of DbConnection.set_nowait()."""
return future return future
def _reduce_future(self, future: asyncio.Future | None) -> asyncio.Future | None:
if future is None or future.done() and future.exception() is None:
return None
else:
return future
def _gather(self, left: asyncio.Future | None, right: asyncio.Future | None) -> asyncio.Future | None: def _gather(self, left: asyncio.Future | None, right: asyncio.Future | None) -> asyncio.Future | None:
match (left, right): match (self._reduce_future(left), self._reduce_future(right)):
case None, _: case None, ofr:
return right return ofr
case _, None: case ofl, None:
return left return ofl
case asyncio.Future() as fl, asyncio.Future() as fr: case asyncio.Future() as fl, asyncio.Future() as fr:
return self._do_gather(fl, fr) return self._do_gather(fl, fr)
case _: case _:

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup( setup(
name='ptvp35', name='ptvp35',
version='1.0rc4', version='1.0rc5',
packages=['ptvp35'], packages=['ptvp35'],
url='https://gitea.ongoteam.net/PTV/ptvp35', url='https://gitea.ongoteam.net/PTV/ptvp35',
license='', license='',

237
traced_example.py Normal file
View File

@ -0,0 +1,237 @@
import asyncio
import pathlib
import sys
import threading
from contextlib import ExitStack
from ptvp35 import DbConnection, DbFactory, KVJson
from rainbowadn.instrument import Instrumentation
async def aprint(*args, **kwargs):
print(*args, **kwargs)
class LogWrites(Instrumentation):
def __init__(self):
super().__init__(DbConnection, '_write_to_disk_sync')
self.loop = asyncio.get_running_loop()
def instrument(self, method, db, line, /):
asyncio.run_coroutine_threadsafe(
aprint(f'{self.methodname}[{line}]'), self.loop
).result()
return method(db, line)
class LogEE(Instrumentation):
def __init__(self, target, methodname: str):
super().__init__(target, methodname)
self.loop = asyncio.get_running_loop()
def _target_id(self) -> str:
name = (
self.target.__name__
if
hasattr(self.target, '__name__')
else
self.target.__class__.__name__
)
return f'{name}.{self.methodname}'
def _print(self, thread, *args) -> None:
print(
thread,
self._target_id(),
*args,
sep='\t'
)
async def aprint(self, thread, *args) -> None:
self._print(thread, *args)
def print(self, *args) -> None:
if (ct := threading.current_thread()) is threading.main_thread():
self._print('main', *args)
else:
asyncio.run_coroutine_threadsafe(
self.aprint('aux', *args), self.loop
).result()
def instrument(self, method, *args, **kwargs):
self.print('enter')
try:
result = method(*args, **kwargs)
except:
self.print('error')
raise
else:
self.print('exit')
return result
class ALogEE(LogEE):
async def instrument(self, method, *args, **kwargs):
self._print('aio', 'enter')
try:
result = await method(*args, **kwargs)
except:
self._print('aio', 'error')
raise
else:
self._print('aio', 'exit')
return result
async def transaction_test(db: DbConnection):
def logdb(*args):
if args:
args = (' ', ' ', '@',) + args
print(db.get('test', '0'), *args, sep='\t')
def logstate(*args):
if args:
args = ('@',) + args
print(db.get('test', '0'), '|', state.get('test', '0'), *args, sep='\t')
logdb('empty db')
db.set_nowait('test', '1')
logdb('after set_nowait')
await db.set('test', '2')
logdb('after set')
try:
async with db.transaction() as state:
logstate('empty transaction')
state.set_nowait('test', '3')
logstate('after transaction.set_nowait')
state.submit()
logstate('after transaction.submit')
await state.commit()
logstate('after transaction.commit')
state.set_nowait('test', print) # will throw TypeError later
logstate()
except TypeError:
print('type error')
logdb('after transaction')
async with db.transaction() as state:
logstate()
state.set_nowait('test', '4')
logstate('before implicit transaction.commit')
logdb('after transaction with implicit commit')
with db.transaction() as state:
logstate()
state.set_nowait('test', '5')
logstate('before implicit transaction.submit')
logdb('after transaction with implicit submit')
async def main():
(path := pathlib.Path('dev.db')).unlink(missing_ok=True)
with ExitStack() as es:
LogWrites().enter(es)
if 'all' in sys.argv:
LogEE(__import__('ptvp35').Request, '__init__').enter(es)
LogEE(__import__('ptvp35').Request, 'waiting').enter(es)
LogEE(__import__('ptvp35').Request, 'set_result').enter(es)
LogEE(__import__('ptvp35').Request, 'set_exception').enter(es)
ALogEE(__import__('ptvp35').Request, 'wait').enter(es)
LogEE(__import__('ptvp35').LineRequest, '__init__').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'run').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'request').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'free').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'io2db').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'db2io').enter(es)
LogEE(__import__('ptvp35').KVRequest, '__init__').enter(es)
LogEE(__import__('ptvp35').KVJson, 'line').enter(es)
LogEE(__import__('ptvp35').KVJson, '_load_key').enter(es)
LogEE(__import__('ptvp35').KVJson, 'fromline').enter(es)
LogEE(__import__('ptvp35').TransactionRequest, '__init__').enter(es)
LogEE(__import__('ptvp35').DbParametres, '__init__').enter(es)
LogEE(__import__('ptvp35').DbConnection, '__init__').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_create_future').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_save_error_sync').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_save_error').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_queue_error').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_save_error_from_thread').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_path2db_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_db2path_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, 'get').enter(es)
ALogEE(__import__('ptvp35').DbConnection, 'set').enter(es)
LogEE(__import__('ptvp35').DbConnection, 'set_nowait').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_clear_buffer').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_compress_buffer').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_dump_compressed_buffer_sync').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_dump_compressed_buffer').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_satisfy_buffer_future').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_fail_buffer_future').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_do_dump_buffer').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_dump_buffer').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_request_buffer').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_dump_buffer_or_request_so').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_write').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_truncation_set_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_truncation_unset_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_file_truncate_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_truncation_target_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_truncate_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_assure_truncation_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_write_to_disk_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_file_write_sync').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_reload_if_oversized').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_handle_request').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_background_cycle').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_background_task').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_start_task').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_recovery_set_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_recovery_unset_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_copy_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_finish_recovery_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_build_file_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_run_in_thread').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_build_file').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_rebuild_file_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_file_open_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_reload_sync').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_reload').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_load_mmdb_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_initialize_mmdb_sync').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_load_from_file_sync').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_load_from_file').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_initialize_queue').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_initialize_running').enter(es)
ALogEE(__import__('ptvp35').DbConnection, '_initialize').enter(es)
ALogEE(__import__('ptvp35').DbConnection, 'aclose').enter(es)
LogEE(__import__('ptvp35').DbConnection, '_transaction_buffer').enter(es)
ALogEE(__import__('ptvp35').DbConnection, 'commit_transaction').enter(es)
LogEE(__import__('ptvp35').DbConnection, 'submit_transaction').enter(es)
LogEE(__import__('ptvp35').DbConnection, 'submit_transaction_request').enter(es)
ALogEE(__import__('ptvp35').DbConnection, 'commit').enter(es)
LogEE(__import__('ptvp35').DbConnection, 'transaction').enter(es)
LogEE(__import__('ptvp35').DbFactory, '__init__').enter(es)
ALogEE(__import__('ptvp35').DbFactory, '__aenter__').enter(es)
ALogEE(__import__('ptvp35').DbFactory, '__aexit__').enter(es)
LogEE(__import__('ptvp35').Db, '__init__').enter(es)
ALogEE(__import__('ptvp35').Db, '__aenter__').enter(es)
ALogEE(__import__('ptvp35').Db, '__aexit__').enter(es)
LogEE(__import__('ptvp35').TransactionView, '__init__').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'get').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'set_nowait').enter(es)
LogEE(__import__('ptvp35').TransactionView, '_delta').enter(es)
ALogEE(__import__('ptvp35').TransactionView, 'commit').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'submit').enter(es)
LogEE(__import__('ptvp35').TransactionView, '_do_gather').enter(es)
LogEE(__import__('ptvp35').TransactionView, '_reduce_future').enter(es)
LogEE(__import__('ptvp35').TransactionView, '_gather').enter(es)
LogEE(__import__('ptvp35').Transaction, '__init__').enter(es)
ALogEE(__import__('ptvp35').Transaction, '__aenter__').enter(es)
ALogEE(__import__('ptvp35').Transaction, '__aexit__').enter(es)
LogEE(__import__('ptvp35').Transaction, '_clean').enter(es)
LogEE(__import__('ptvp35').Transaction, '__enter__').enter(es)
LogEE(__import__('ptvp35').Transaction, '__exit__').enter(es)
async with DbFactory(path, kvfactory=KVJson()) as db:
await transaction_test(db)
if __name__ == '__main__':
asyncio.run(main())