import asyncio import pathlib import sys import threading from contextlib import ExitStack from rainbowadn.instrument import Instrumentation try: sys.path.append(str((pathlib.Path(__file__).parent / "../..").absolute())) from ptvp35 import DbConnection, DbFactory, KVJson from ptvp35.instrumentation import InstrumentDiskWrites except: raise async def aprint(*args, **kwargs): print(*args, **kwargs) class LogWrites(InstrumentDiskWrites): def __init__(self, /): super().__init__() self.loop = asyncio.get_running_loop() def on_write(self, line: str, /) -> None: asyncio.run_coroutine_threadsafe(aprint(f"{self.methodname}[{line}]"), self.loop).result() class LogEE(Instrumentation): def __init__(self, target, methodname: str): super().__init__(target, methodname) self.loop = asyncio.get_running_loop() def _target_id(self) -> str: name = self.target.__name__ if hasattr(self.target, "__name__") else self.target.__class__.__name__ return f"{name}.{self.methodname}" def _print(self, thread, *args) -> None: print(thread, self._target_id(), *args, sep="\t") async def aprint(self, thread, *args) -> None: self._print(thread, *args) def print(self, *args) -> None: if (ct := threading.current_thread()) is threading.main_thread(): self._print("main", *args) else: asyncio.run_coroutine_threadsafe(self.aprint("aux", *args), self.loop).result() def instrument(self, method, *args, **kwargs): self.print("enter") try: result = method(*args, **kwargs) except: self.print("error") raise else: self.print("exit") return result class ALogEE(LogEE): async def instrument(self, method, *args, **kwargs): self._print("aio", "enter") try: result = await method(*args, **kwargs) except: self._print("aio", "error") raise else: self._print("aio", "exit") return result async def transaction_test(db: DbConnection): def logdb(*args): if args: args = ( " ", " ", "@", ) + args print(db.get("test", "0"), *args, sep="\t") def logstate(*args): if args: args = ("@",) + args print(db.get("test", "0"), "|", state.get("test", "0"), *args, sep="\t") logdb("empty db") db.set_nowait("test", "1") logdb("after set_nowait") await db.set("test", "2") logdb("after set") try: async with db.transaction() as state: logstate("empty transaction") state.set_nowait("test", "3") logstate("after transaction.set_nowait") state.submit() logstate("after transaction.submit") await state.commit() logstate("after transaction.commit") state.set_nowait("test", print) # will throw TypeError later logstate() except TypeError: print("type error") logdb("after transaction") async with db.transaction() as state: logstate() state.set_nowait("test", "4") logstate("before implicit transaction.commit") logdb("after transaction with implicit commit") with db.transaction() as state: logstate() state.set_nowait("test", "5") logstate("before implicit transaction.submit") logdb("after transaction with implicit submit") def print_private_db_attrs(db: DbConnection): if run_all: for attr in dir(db): if attr.startswith("_DbConnection") and hasattr(db, attr): print(attr) run_all = "all" in sys.argv async def main(): (path := pathlib.Path(__file__).parent / "trace_example.db").unlink(missing_ok=True) with ExitStack() as es: LogWrites().enter(es) if run_all: LogEE(__import__("ptvp35").Request, "__init__").enter(es) LogEE(__import__("ptvp35").Request, "waiting").enter(es) LogEE(__import__("ptvp35").Request, "set_result").enter(es) LogEE(__import__("ptvp35").Request, "set_exception").enter(es) ALogEE(__import__("ptvp35").Request, "wait").enter(es) LogEE(__import__("ptvp35").LineRequest, "__init__").enter(es) LogEE(__import__("ptvp35").KVFactory, "run").enter(es) LogEE(__import__("ptvp35").KVFactory, "_dbset").enter(es) LogEE(__import__("ptvp35").KVFactory, "dbset").enter(es) LogEE(__import__("ptvp35").KVFactory, "dbget").enter(es) LogEE(__import__("ptvp35").KVFactory, "filter_value").enter(es) LogEE(__import__("ptvp35").KVFactory, "request").enter(es) LogEE(__import__("ptvp35").KVFactory, "free").enter(es) LogEE(__import__("ptvp35").KVFactory, "io2db").enter(es) LogEE(__import__("ptvp35").KVFactory, "db2io").enter(es) LogEE(__import__("ptvp35").KVFactory, "path2db_sync").enter(es) LogEE(__import__("ptvp35").KVFactory, "db2path_sync").enter(es) LogEE(__import__("ptvp35").KVRequest, "__init__").enter(es) LogEE(__import__("ptvp35").KVJson, "line").enter(es) LogEE(__import__("ptvp35").KVJson, "_load_key").enter(es) LogEE(__import__("ptvp35").KVJson, "fromline").enter(es) LogEE(__import__("ptvp35").TransactionRequest, "__init__").enter(es) LogEE(__import__("ptvp35").DbParameters, "__init__").enter(es) LogEE(__import__("ptvp35").VirtualConnection, "transaction").enter(es) LogEE(__import__("ptvp35")._Loop, "__init__").enter(es) LogEE(__import__("ptvp35")._Loop, "create_future").enter(es) LogEE(__import__("ptvp35")._Loop, "loop").enter(es) LogEE(__import__("ptvp35")._Loop, "run_in_thread").enter(es) LogEE(__import__("ptvp35")._Errors, "__init__").enter(es) LogEE(__import__("ptvp35")._Errors, "_save_sync").enter(es) ALogEE(__import__("ptvp35")._Errors, "_save").enter(es) LogEE(__import__("ptvp35")._Errors, "save_from_thread").enter(es) LogEE(__import__("ptvp35")._File, "__init__").enter(es) LogEE(__import__("ptvp35")._File, "path").enter(es) LogEE(__import__("ptvp35")._File, "tell").enter(es) LogEE(__import__("ptvp35")._File, "write_to_disk_sync").enter(es) LogEE(__import__("ptvp35")._File, "open_sync").enter(es) LogEE(__import__("ptvp35")._File, "close_sync").enter(es) LogEE(__import__("ptvp35")._Backup, "__init__").enter(es) LogEE(__import__("ptvp35")._Backup, "file").enter(es) LogEE(__import__("ptvp35")._Backup, "kvfactory").enter(es) LogEE(__import__("ptvp35")._Backup, "_copy_sync").enter(es) LogEE(__import__("ptvp35")._Backup, "_recovery_unset_sync").enter(es) LogEE(__import__("ptvp35")._Backup, "_finish_recovery_sync").enter(es) LogEE(__import__("ptvp35")._Backup, "_recovery_set_sync").enter(es) LogEE(__import__("ptvp35")._Backup, "build_file_sync").enter(es) LogEE(__import__("ptvp35")._Backup, "_rebuild_file_sync").enter(es) LogEE(__import__("ptvp35")._Backup, "_reload_sync").enter(es) LogEE(__import__("ptvp35")._Backup, "run_in_thread").enter(es) ALogEE(__import__("ptvp35")._Backup, "_reload").enter(es) ALogEE(__import__("ptvp35")._Backup, "reload_if_oversized").enter(es) LogEE(__import__("ptvp35")._Backup, "load_mmdb_sync").enter(es) LogEE(__import__("ptvp35")._Backup, "uninitialize").enter(es) LogEE(__import__("ptvp35")._Guard, "__init__").enter(es) LogEE(__import__("ptvp35")._Guard, "backup").enter(es) LogEE(__import__("ptvp35")._Guard, "_write_bytes_sync").enter(es) LogEE(__import__("ptvp35")._Guard, "_write_value_sync").enter(es) LogEE(__import__("ptvp35")._Guard, "_set_sync").enter(es) LogEE(__import__("ptvp35")._Guard, "_unset_sync").enter(es) LogEE(__import__("ptvp35")._Guard, "_read_bytes_sync").enter(es) LogEE(__import__("ptvp35")._Guard, "_read_value_sync").enter(es) LogEE(__import__("ptvp35")._Guard, "_truncate_sync").enter(es) LogEE(__import__("ptvp35")._Guard, "assure_sync").enter(es) LogEE(__import__("ptvp35")._Guard, "_file_truncate_sync").enter(es) LogEE(__import__("ptvp35")._Guard, "file_write_sync").enter(es) LogEE(__import__("ptvp35")._ReceivingQueue, "__init__").enter(es) LogEE(__import__("ptvp35")._ReceivingQueue, "submit").enter(es) LogEE(__import__("ptvp35")._WriteableBuffer, "__init__").enter(es) LogEE(__import__("ptvp35")._WriteableBuffer, "writeable").enter(es) LogEE(__import__("ptvp35")._WriteableBuffer, "loop").enter(es) LogEE(__import__("ptvp35")._WriteableBuffer, "_compressed").enter(es) LogEE(__import__("ptvp35")._WriteableBuffer, "_commit_compressed_sync").enter(es) ALogEE(__import__("ptvp35")._WriteableBuffer, "_commit_compressed").enter(es) LogEE(__import__("ptvp35")._WriteableBuffer, "_clear").enter(es) LogEE(__import__("ptvp35")._WriteableBuffer, "_satisfy_future").enter(es) LogEE(__import__("ptvp35")._WriteableBuffer, "_fail_future").enter(es) ALogEE(__import__("ptvp35")._WriteableBuffer, "_do_commit_buffer").enter(es) LogEE(__import__("ptvp35")._WriteableBuffer, "_request_buffer").enter(es) ALogEE(__import__("ptvp35")._WriteableBuffer, "_commit").enter(es) ALogEE(__import__("ptvp35")._WriteableBuffer, "_commit_or_request_so").enter(es) ALogEE(__import__("ptvp35")._WriteableBuffer, "_write").enter(es) ALogEE(__import__("ptvp35")._WriteableBuffer, "_handle_request").enter(es) ALogEE(__import__("ptvp35")._WriteableBuffer, "_close").enter(es) LogEE(__import__("ptvp35")._Memory, "__init__").enter(es) LogEE(__import__("ptvp35")._Memory, "_initialize_sync").enter(es) LogEE(__import__("ptvp35")._Memory, "_load_from_file_sync").enter(es) ALogEE(__import__("ptvp35")._Memory, "_load_from_file").enter(es) LogEE(__import__("ptvp35")._Memory, "_close_sync").enter(es) ALogEE(__import__("ptvp35")._Memory, "_close").enter(es) LogEE(__import__("ptvp35")._Memory, "_transaction_buffer").enter(es) LogEE(__import__("ptvp35")._Memory, "get").enter(es) LogEE(__import__("ptvp35")._Memory, "set").enter(es) LogEE(__import__("ptvp35")._QueueTask, "__init__").enter(es) ALogEE(__import__("ptvp35")._QueueTask, "_background_cycle").enter(es) ALogEE(__import__("ptvp35")._QueueTask, "_background_task").enter(es) ALogEE(__import__("ptvp35")._QueueTask, "close").enter(es) LogEE(__import__("ptvp35")._QueueTask, "start").enter(es) LogEE(__import__("ptvp35")._DbConnection, "__init__").enter(es) LogEE(__import__("ptvp35")._DbConnection, "kvprotocol").enter(es) LogEE(__import__("ptvp35")._DbConnection, "get").enter(es) ALogEE(__import__("ptvp35")._DbConnection, "set").enter(es) LogEE(__import__("ptvp35")._DbConnection, "set_nowait").enter(es) ALogEE(__import__("ptvp35")._DbConnection, "_initialize_running").enter(es) ALogEE(__import__("ptvp35")._DbConnection, "_initialize").enter(es) ALogEE(__import__("ptvp35")._DbConnection, "create").enter(es) ALogEE(__import__("ptvp35")._DbConnection, "_close_running").enter(es) ALogEE(__import__("ptvp35")._DbConnection, "aclose").enter(es) ALogEE(__import__("ptvp35")._DbConnection, "commit_transaction").enter(es) LogEE(__import__("ptvp35")._DbConnection, "submit_transaction").enter(es) LogEE(__import__("ptvp35")._DbConnection, "submit_transaction_request").enter(es) ALogEE(__import__("ptvp35")._DbConnection, "commit").enter(es) LogEE(__import__("ptvp35")._DbConnection, "loop").enter(es) LogEE(__import__("ptvp35")._DbConnection, "transaction").enter(es) LogEE(__import__("ptvp35").DbManager, "__init__").enter(es) ALogEE(__import__("ptvp35").DbManager, "__aenter__").enter(es) ALogEE(__import__("ptvp35").DbManager, "__aexit__").enter(es) LogEE(__import__("ptvp35").Db, "__init__").enter(es) ALogEE(__import__("ptvp35").Db, "__aenter__").enter(es) ALogEE(__import__("ptvp35").Db, "__aexit__").enter(es) LogEE(__import__("ptvp35").FutureContext, "__init__").enter(es) ALogEE(__import__("ptvp35").FutureContext, "__aenter__").enter(es) ALogEE(__import__("ptvp35").FutureContext, "__aexit__").enter(es) ALogEE(__import__("ptvp35").FutureContext, "wait").enter(es) LogEE(__import__("ptvp35").TransactionView, "__init__").enter(es) LogEE(__import__("ptvp35").TransactionView, "future_context").enter(es) LogEE(__import__("ptvp35").TransactionView, "rollback").enter(es) LogEE(__import__("ptvp35").TransactionView, "illuminate").enter(es) ALogEE(__import__("ptvp35").TransactionView, "ailluminate").enter(es) LogEE(__import__("ptvp35").TransactionView, "fork").enter(es) ALogEE(__import__("ptvp35").TransactionView, "afork").enter(es) LogEE(__import__("ptvp35").TransactionView, "clear").enter(es) ALogEE(__import__("ptvp35").TransactionView, "aclear").enter(es) LogEE(__import__("ptvp35").TransactionView, "reset").enter(es) ALogEE(__import__("ptvp35").TransactionView, "areset").enter(es) LogEE(__import__("ptvp35").TransactionView, "get").enter(es) LogEE(__import__("ptvp35").TransactionView, "set_nowait").enter(es) LogEE(__import__("ptvp35").TransactionView, "_delta").enter(es) ALogEE(__import__("ptvp35").TransactionView, "commit").enter(es) LogEE(__import__("ptvp35").TransactionView, "submit").enter(es) LogEE(__import__("ptvp35").TransactionView, "_do_gather").enter(es) LogEE(__import__("ptvp35").TransactionView, "_reduce_future").enter(es) LogEE(__import__("ptvp35").TransactionView, "_gather").enter(es) ALogEE(__import__("ptvp35").TransactionView, "commit_transaction").enter(es) LogEE(__import__("ptvp35").TransactionView, "submit_transaction").enter(es) LogEE(__import__("ptvp35").TransactionView, "submit_transaction_request").enter(es) LogEE(__import__("ptvp35").TransactionView, "loop").enter(es) LogEE(__import__("ptvp35").TransactionView, "transaction").enter(es) LogEE(__import__("ptvp35").Transaction, "__init__").enter(es) ALogEE(__import__("ptvp35").Transaction, "__aenter__").enter(es) ALogEE(__import__("ptvp35").Transaction, "__aexit__").enter(es) LogEE(__import__("ptvp35").Transaction, "_clean").enter(es) LogEE(__import__("ptvp35").Transaction, "__enter__").enter(es) LogEE(__import__("ptvp35").Transaction, "__exit__").enter(es) async with DbFactory(path, kvfactory=KVJson()) as db: await transaction_test(db) print_private_db_attrs(db) print_private_db_attrs(db) if __name__ == "__main__": asyncio.run(main())