import asyncio import pathlib import sys import threading from contextlib import ExitStack from ptvp35 import DbConnection, DbFactory, KVJson from rainbowadn.instrument import Instrumentation async def aprint(*args, **kwargs): print(*args, **kwargs) class LogWrites(Instrumentation): def __init__(self): super().__init__(DbConnection, '_write_to_disk_sync') self.loop = asyncio.get_running_loop() def instrument(self, method, db, line, /): asyncio.run_coroutine_threadsafe( aprint(f'{self.methodname}[{line}]'), self.loop ).result() return method(db, line) class LogEE(Instrumentation): def __init__(self, target, methodname: str): super().__init__(target, methodname) self.loop = asyncio.get_running_loop() def _target_id(self) -> str: name = ( self.target.__name__ if hasattr(self.target, '__name__') else self.target.__class__.__name__ ) return f'{name}.{self.methodname}' def _print(self, thread, *args) -> None: print( thread, self._target_id(), *args, sep='\t' ) async def aprint(self, thread, *args) -> None: self._print(thread, *args) def print(self, *args) -> None: if (ct := threading.current_thread()) is threading.main_thread(): self._print('main', *args) else: asyncio.run_coroutine_threadsafe( self.aprint('aux', *args), self.loop ).result() def instrument(self, method, *args, **kwargs): self.print('enter') try: result = method(*args, **kwargs) except: self.print('error') raise else: self.print('exit') return result class ALogEE(LogEE): async def instrument(self, method, *args, **kwargs): self._print('aio', 'enter') try: result = await method(*args, **kwargs) except: self._print('aio', 'error') raise else: self._print('aio', 'exit') return result async def transaction_test(db: DbConnection): def logdb(*args): if args: args = (' ', ' ', '@',) + args print(db.get('test', '0'), *args, sep='\t') def logstate(*args): if args: args = ('@',) + args print(db.get('test', '0'), '|', state.get('test', '0'), *args, sep='\t') logdb('empty db') db.set_nowait('test', '1') logdb('after set_nowait') await db.set('test', '2') logdb('after set') try: async with db.transaction() as state: logstate('empty transaction') state.set_nowait('test', '3') logstate('after transaction.set_nowait') state.submit() logstate('after transaction.submit') await state.commit() logstate('after transaction.commit') state.set_nowait('test', print) # will throw TypeError later logstate() except TypeError: print('type error') logdb('after transaction') async with db.transaction() as state: logstate() state.set_nowait('test', '4') logstate('before implicit transaction.commit') logdb('after transaction with implicit commit') with db.transaction() as state: logstate() state.set_nowait('test', '5') logstate('before implicit transaction.submit') logdb('after transaction with implicit submit') async def main(): (path := pathlib.Path('dev.db')).unlink(missing_ok=True) with ExitStack() as es: LogWrites().enter(es) if 'all' in sys.argv: LogEE(__import__('ptvp35').Request, '__init__').enter(es) LogEE(__import__('ptvp35').Request, 'waiting').enter(es) LogEE(__import__('ptvp35').Request, 'set_result').enter(es) LogEE(__import__('ptvp35').Request, 'set_exception').enter(es) ALogEE(__import__('ptvp35').Request, 'wait').enter(es) LogEE(__import__('ptvp35').LineRequest, '__init__').enter(es) LogEE(__import__('ptvp35').KVFactory, 'run').enter(es) LogEE(__import__('ptvp35').KVFactory, 'request').enter(es) LogEE(__import__('ptvp35').KVFactory, 'free').enter(es) LogEE(__import__('ptvp35').KVFactory, 'io2db').enter(es) LogEE(__import__('ptvp35').KVFactory, 'db2io').enter(es) LogEE(__import__('ptvp35').KVRequest, '__init__').enter(es) LogEE(__import__('ptvp35').KVJson, 'line').enter(es) LogEE(__import__('ptvp35').KVJson, '_load_key').enter(es) LogEE(__import__('ptvp35').KVJson, 'fromline').enter(es) LogEE(__import__('ptvp35').TransactionRequest, '__init__').enter(es) LogEE(__import__('ptvp35').DbParametres, '__init__').enter(es) LogEE(__import__('ptvp35').DbConnection, '__init__').enter(es) LogEE(__import__('ptvp35').DbConnection, '_create_future').enter(es) LogEE(__import__('ptvp35').DbConnection, '_save_error_sync').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_save_error').enter(es) LogEE(__import__('ptvp35').DbConnection, '_queue_error').enter(es) LogEE(__import__('ptvp35').DbConnection, '_save_error_from_thread').enter(es) LogEE(__import__('ptvp35').DbConnection, '_path2db_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_db2path_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, 'get').enter(es) ALogEE(__import__('ptvp35').DbConnection, 'set').enter(es) LogEE(__import__('ptvp35').DbConnection, 'set_nowait').enter(es) LogEE(__import__('ptvp35').DbConnection, '_clear_buffer').enter(es) LogEE(__import__('ptvp35').DbConnection, '_compress_buffer').enter(es) LogEE(__import__('ptvp35').DbConnection, '_commit_compressed_buffer_sync').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_commit_compressed_buffer').enter(es) LogEE(__import__('ptvp35').DbConnection, '_satisfy_buffer_future').enter(es) LogEE(__import__('ptvp35').DbConnection, '_fail_buffer_future').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_do_commit_buffer').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_commit_buffer').enter(es) LogEE(__import__('ptvp35').DbConnection, '_request_buffer').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_commit_buffer_or_request_so').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_write').enter(es) LogEE(__import__('ptvp35').DbConnection, '_truncation_set_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_truncation_unset_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_file_truncate_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_truncation_target_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_truncate_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_assure_truncation_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_write_to_disk_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_file_write_sync').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_reload_if_oversized').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_handle_request').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_background_cycle').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_background_task').enter(es) LogEE(__import__('ptvp35').DbConnection, '_start_task').enter(es) LogEE(__import__('ptvp35').DbConnection, '_recovery_set_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_recovery_unset_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_copy_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_finish_recovery_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_build_file_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_run_in_thread').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_build_file').enter(es) LogEE(__import__('ptvp35').DbConnection, '_rebuild_file_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_file_open_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_reload_sync').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_reload').enter(es) LogEE(__import__('ptvp35').DbConnection, '_load_mmdb_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_initialize_mmdb_sync').enter(es) LogEE(__import__('ptvp35').DbConnection, '_load_from_file_sync').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_load_from_file').enter(es) LogEE(__import__('ptvp35').DbConnection, '_initialize_queue').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_initialize_running').enter(es) ALogEE(__import__('ptvp35').DbConnection, '_initialize').enter(es) ALogEE(__import__('ptvp35').DbConnection, 'aclose').enter(es) LogEE(__import__('ptvp35').DbConnection, '_transaction_buffer').enter(es) ALogEE(__import__('ptvp35').DbConnection, 'commit_transaction').enter(es) LogEE(__import__('ptvp35').DbConnection, 'submit_transaction').enter(es) LogEE(__import__('ptvp35').DbConnection, 'submit_transaction_request').enter(es) ALogEE(__import__('ptvp35').DbConnection, 'commit').enter(es) LogEE(__import__('ptvp35').DbConnection, 'transaction').enter(es) LogEE(__import__('ptvp35').DbFactory, '__init__').enter(es) ALogEE(__import__('ptvp35').DbFactory, '__aenter__').enter(es) ALogEE(__import__('ptvp35').DbFactory, '__aexit__').enter(es) LogEE(__import__('ptvp35').Db, '__init__').enter(es) ALogEE(__import__('ptvp35').Db, '__aenter__').enter(es) ALogEE(__import__('ptvp35').Db, '__aexit__').enter(es) LogEE(__import__('ptvp35').TransactionView, '__init__').enter(es) LogEE(__import__('ptvp35').TransactionView, 'get').enter(es) LogEE(__import__('ptvp35').TransactionView, 'set_nowait').enter(es) LogEE(__import__('ptvp35').TransactionView, '_delta').enter(es) ALogEE(__import__('ptvp35').TransactionView, 'commit').enter(es) LogEE(__import__('ptvp35').TransactionView, 'submit').enter(es) LogEE(__import__('ptvp35').TransactionView, '_do_gather').enter(es) LogEE(__import__('ptvp35').TransactionView, '_reduce_future').enter(es) LogEE(__import__('ptvp35').TransactionView, '_gather').enter(es) LogEE(__import__('ptvp35').Transaction, '__init__').enter(es) ALogEE(__import__('ptvp35').Transaction, '__aenter__').enter(es) ALogEE(__import__('ptvp35').Transaction, '__aexit__').enter(es) LogEE(__import__('ptvp35').Transaction, '_clean').enter(es) LogEE(__import__('ptvp35').Transaction, '__enter__').enter(es) LogEE(__import__('ptvp35').Transaction, '__exit__').enter(es) async with DbFactory(path, kvfactory=KVJson()) as db: await transaction_test(db) if __name__ == '__main__': asyncio.run(main())