ptvp35/docs/scripts/traced_example.py
2023-05-05 16:13:58 +00:00

321 lines
16 KiB
Python

import asyncio
import pathlib
import sys
import threading
from contextlib import ExitStack
from rainbowadn.instrument import Instrumentation
try:
sys.path.append(str((pathlib.Path(__file__).parent / "../..").absolute()))
from ptvp35 import DbConnection, DbFactory, KVJson
from ptvp35.instrumentation import InstrumentDiskWrites
except:
raise
async def aprint(*args, **kwargs):
print(*args, **kwargs)
class LogWrites(InstrumentDiskWrites):
def __init__(self, /):
super().__init__()
self.loop = asyncio.get_running_loop()
def on_write(self, line: str, /) -> None:
asyncio.run_coroutine_threadsafe(aprint(f"{self.methodname}[{line}]"), self.loop).result()
class LogEE(Instrumentation):
def __init__(self, target, methodname: str):
super().__init__(target, methodname)
self.loop = asyncio.get_running_loop()
def _target_id(self) -> str:
name = self.target.__name__ if hasattr(self.target, "__name__") else self.target.__class__.__name__
return f"{name}.{self.methodname}"
def _print(self, thread, *args) -> None:
print(thread, self._target_id(), *args, sep="\t")
async def aprint(self, thread, *args) -> None:
self._print(thread, *args)
def print(self, *args) -> None:
if (ct := threading.current_thread()) is threading.main_thread():
self._print("main", *args)
else:
asyncio.run_coroutine_threadsafe(self.aprint("aux", *args), self.loop).result()
def instrument(self, method, *args, **kwargs):
self.print("enter")
try:
result = method(*args, **kwargs)
except:
self.print("error")
raise
else:
self.print("exit")
return result
class ALogEE(LogEE):
async def instrument(self, method, *args, **kwargs):
self._print("aio", "enter")
try:
result = await method(*args, **kwargs)
except:
self._print("aio", "error")
raise
else:
self._print("aio", "exit")
return result
async def transaction_test(db: DbConnection):
def logdb(*args):
if args:
args = (
" ",
" ",
"@",
) + args
print(db.get("test", "0"), *args, sep="\t")
def logstate(*args):
if args:
args = ("@",) + args
print(db.get("test", "0"), "|", state.get("test", "0"), *args, sep="\t")
logdb("empty db")
db.set_nowait("test", "1")
logdb("after set_nowait")
await db.set("test", "2")
logdb("after set")
try:
async with db.transaction() as state:
logstate("empty transaction")
state.set_nowait("test", "3")
logstate("after transaction.set_nowait")
state.submit()
logstate("after transaction.submit")
await state.commit()
logstate("after transaction.commit")
state.set_nowait("test", print) # will throw TypeError later
logstate()
except TypeError:
print("type error")
logdb("after transaction")
async with db.transaction() as state:
logstate()
state.set_nowait("test", "4")
logstate("before implicit transaction.commit")
logdb("after transaction with implicit commit")
with db.transaction() as state:
logstate()
state.set_nowait("test", "5")
logstate("before implicit transaction.submit")
logdb("after transaction with implicit submit")
def print_private_db_attrs(db: DbConnection):
if run_all:
for attr in dir(db):
if attr.startswith("_DbConnection") and hasattr(db, attr):
print(attr)
run_all = "all" in sys.argv
async def main():
(path := pathlib.Path(__file__).parent / "trace_example.db").unlink(missing_ok=True)
with ExitStack() as es:
LogWrites().enter(es)
if run_all:
LogEE(__import__("ptvp35").Request, "__init__").enter(es)
LogEE(__import__("ptvp35").Request, "waiting").enter(es)
LogEE(__import__("ptvp35").Request, "set_result").enter(es)
LogEE(__import__("ptvp35").Request, "set_exception").enter(es)
ALogEE(__import__("ptvp35").Request, "wait").enter(es)
LogEE(__import__("ptvp35").LineRequest, "__init__").enter(es)
LogEE(__import__("ptvp35").KVFactory, "run").enter(es)
LogEE(__import__("ptvp35").KVFactory, "_dbset").enter(es)
LogEE(__import__("ptvp35").KVFactory, "dbset").enter(es)
LogEE(__import__("ptvp35").KVFactory, "dbget").enter(es)
LogEE(__import__("ptvp35").KVFactory, "filter_value").enter(es)
LogEE(__import__("ptvp35").KVFactory, "request").enter(es)
LogEE(__import__("ptvp35").KVFactory, "free").enter(es)
LogEE(__import__("ptvp35").KVFactory, "io2db").enter(es)
LogEE(__import__("ptvp35").KVFactory, "db2io").enter(es)
LogEE(__import__("ptvp35").KVFactory, "path2db_sync").enter(es)
LogEE(__import__("ptvp35").KVFactory, "db2path_sync").enter(es)
LogEE(__import__("ptvp35").KVRequest, "__init__").enter(es)
LogEE(__import__("ptvp35").KVJson, "line").enter(es)
LogEE(__import__("ptvp35").KVJson, "_load_key").enter(es)
LogEE(__import__("ptvp35").KVJson, "fromline").enter(es)
LogEE(__import__("ptvp35").TransactionRequest, "__init__").enter(es)
LogEE(__import__("ptvp35").DbParameters, "__init__").enter(es)
LogEE(__import__("ptvp35").VirtualConnection, "transaction").enter(es)
LogEE(__import__("ptvp35")._Loop, "__init__").enter(es)
LogEE(__import__("ptvp35")._Loop, "create_future").enter(es)
LogEE(__import__("ptvp35")._Loop, "loop").enter(es)
LogEE(__import__("ptvp35")._Loop, "run_in_thread").enter(es)
LogEE(__import__("ptvp35")._Errors, "__init__").enter(es)
LogEE(__import__("ptvp35")._Errors, "_save_sync").enter(es)
ALogEE(__import__("ptvp35")._Errors, "_save").enter(es)
LogEE(__import__("ptvp35")._Errors, "save_from_thread").enter(es)
LogEE(__import__("ptvp35")._File, "__init__").enter(es)
LogEE(__import__("ptvp35")._File, "path").enter(es)
LogEE(__import__("ptvp35")._File, "tell").enter(es)
LogEE(__import__("ptvp35")._File, "write_to_disk_sync").enter(es)
LogEE(__import__("ptvp35")._File, "open_sync").enter(es)
LogEE(__import__("ptvp35")._File, "close_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "__init__").enter(es)
LogEE(__import__("ptvp35")._Backup, "file").enter(es)
LogEE(__import__("ptvp35")._Backup, "kvfactory").enter(es)
LogEE(__import__("ptvp35")._Backup, "_copy_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "_recovery_unset_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "_finish_recovery_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "_recovery_set_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "build_file_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "_rebuild_file_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "_reload_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "run_in_thread").enter(es)
ALogEE(__import__("ptvp35")._Backup, "_reload").enter(es)
ALogEE(__import__("ptvp35")._Backup, "reload_if_oversized").enter(es)
LogEE(__import__("ptvp35")._Backup, "load_mmdb_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "uninitialize").enter(es)
LogEE(__import__("ptvp35")._Guard, "__init__").enter(es)
LogEE(__import__("ptvp35")._Guard, "backup").enter(es)
LogEE(__import__("ptvp35")._Guard, "_write_bytes_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "_write_value_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "_set_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "_unset_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "_read_bytes_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "_read_value_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "_truncate_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "assure_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "_file_truncate_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "file_write_sync").enter(es)
LogEE(__import__("ptvp35")._ReceivingQueue, "__init__").enter(es)
LogEE(__import__("ptvp35")._ReceivingQueue, "submit").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "__init__").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "writeable").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "loop").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "_compressed").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "_commit_compressed_sync").enter(es)
ALogEE(__import__("ptvp35")._WriteableBuffer, "_commit_compressed").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "_clear").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "_satisfy_future").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "_fail_future").enter(es)
ALogEE(__import__("ptvp35")._WriteableBuffer, "_do_commit_buffer").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "_request_buffer").enter(es)
ALogEE(__import__("ptvp35")._WriteableBuffer, "_commit").enter(es)
ALogEE(__import__("ptvp35")._WriteableBuffer, "_commit_or_request_so").enter(es)
ALogEE(__import__("ptvp35")._WriteableBuffer, "_write").enter(es)
ALogEE(__import__("ptvp35")._WriteableBuffer, "_handle_request").enter(es)
ALogEE(__import__("ptvp35")._WriteableBuffer, "_close").enter(es)
LogEE(__import__("ptvp35")._Memory, "__init__").enter(es)
LogEE(__import__("ptvp35")._Memory, "_initialize_sync").enter(es)
LogEE(__import__("ptvp35")._Memory, "_load_from_file_sync").enter(es)
ALogEE(__import__("ptvp35")._Memory, "_load_from_file").enter(es)
LogEE(__import__("ptvp35")._Memory, "_close_sync").enter(es)
ALogEE(__import__("ptvp35")._Memory, "_close").enter(es)
LogEE(__import__("ptvp35")._Memory, "_transaction_buffer").enter(es)
LogEE(__import__("ptvp35")._Memory, "get").enter(es)
LogEE(__import__("ptvp35")._Memory, "set").enter(es)
LogEE(__import__("ptvp35")._QueueTask, "__init__").enter(es)
ALogEE(__import__("ptvp35")._QueueTask, "_background_cycle").enter(es)
ALogEE(__import__("ptvp35")._QueueTask, "_background_task").enter(es)
ALogEE(__import__("ptvp35")._QueueTask, "close").enter(es)
LogEE(__import__("ptvp35")._QueueTask, "start").enter(es)
LogEE(__import__("ptvp35")._DbConnection, "__init__").enter(es)
LogEE(__import__("ptvp35")._DbConnection, "kvprotocol").enter(es)
LogEE(__import__("ptvp35")._DbConnection, "get").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "set").enter(es)
LogEE(__import__("ptvp35")._DbConnection, "set_nowait").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "_initialize_running").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "_initialize").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "create").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "_close_running").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "aclose").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "commit_transaction").enter(es)
LogEE(__import__("ptvp35")._DbConnection, "submit_transaction").enter(es)
LogEE(__import__("ptvp35")._DbConnection, "submit_transaction_request").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "commit").enter(es)
LogEE(__import__("ptvp35")._DbConnection, "loop").enter(es)
LogEE(__import__("ptvp35")._DbConnection, "transaction").enter(es)
LogEE(__import__("ptvp35").DbManager, "__init__").enter(es)
ALogEE(__import__("ptvp35").DbManager, "__aenter__").enter(es)
ALogEE(__import__("ptvp35").DbManager, "__aexit__").enter(es)
LogEE(__import__("ptvp35").Db, "__init__").enter(es)
ALogEE(__import__("ptvp35").Db, "__aenter__").enter(es)
ALogEE(__import__("ptvp35").Db, "__aexit__").enter(es)
LogEE(__import__("ptvp35").FutureContext, "__init__").enter(es)
ALogEE(__import__("ptvp35").FutureContext, "__aenter__").enter(es)
ALogEE(__import__("ptvp35").FutureContext, "__aexit__").enter(es)
ALogEE(__import__("ptvp35").FutureContext, "wait").enter(es)
LogEE(__import__("ptvp35").TransactionView, "__init__").enter(es)
LogEE(__import__("ptvp35").TransactionView, "future_context").enter(es)
LogEE(__import__("ptvp35").TransactionView, "rollback").enter(es)
LogEE(__import__("ptvp35").TransactionView, "illuminate").enter(es)
ALogEE(__import__("ptvp35").TransactionView, "ailluminate").enter(es)
LogEE(__import__("ptvp35").TransactionView, "fork").enter(es)
ALogEE(__import__("ptvp35").TransactionView, "afork").enter(es)
LogEE(__import__("ptvp35").TransactionView, "clear").enter(es)
ALogEE(__import__("ptvp35").TransactionView, "aclear").enter(es)
LogEE(__import__("ptvp35").TransactionView, "reset").enter(es)
ALogEE(__import__("ptvp35").TransactionView, "areset").enter(es)
LogEE(__import__("ptvp35").TransactionView, "get").enter(es)
LogEE(__import__("ptvp35").TransactionView, "set_nowait").enter(es)
LogEE(__import__("ptvp35").TransactionView, "_delta").enter(es)
ALogEE(__import__("ptvp35").TransactionView, "commit").enter(es)
LogEE(__import__("ptvp35").TransactionView, "submit").enter(es)
LogEE(__import__("ptvp35").TransactionView, "_do_gather").enter(es)
LogEE(__import__("ptvp35").TransactionView, "_reduce_future").enter(es)
LogEE(__import__("ptvp35").TransactionView, "_gather").enter(es)
ALogEE(__import__("ptvp35").TransactionView, "commit_transaction").enter(es)
LogEE(__import__("ptvp35").TransactionView, "submit_transaction").enter(es)
LogEE(__import__("ptvp35").TransactionView, "submit_transaction_request").enter(es)
LogEE(__import__("ptvp35").TransactionView, "loop").enter(es)
LogEE(__import__("ptvp35").TransactionView, "transaction").enter(es)
LogEE(__import__("ptvp35").Transaction, "__init__").enter(es)
ALogEE(__import__("ptvp35").Transaction, "__aenter__").enter(es)
ALogEE(__import__("ptvp35").Transaction, "__aexit__").enter(es)
LogEE(__import__("ptvp35").Transaction, "_clean").enter(es)
LogEE(__import__("ptvp35").Transaction, "__enter__").enter(es)
LogEE(__import__("ptvp35").Transaction, "__exit__").enter(es)
async with DbFactory(path, kvfactory=KVJson()) as db:
await transaction_test(db)
print_private_db_attrs(db)
print_private_db_attrs(db)
if __name__ == "__main__":
asyncio.run(main())