Compare commits

...

7 Commits

Author SHA1 Message Date
0551c0bb0b formatting 2023-10-02 16:55:33 +00:00
3ffd0adc21 fix get type 2023-08-28 09:56:59 +00:00
edbb207735 correct copyright 2023-08-27 01:19:58 +00:00
f8ee5d20f4 conf.py fix quotes 2023-06-18 10:57:10 +00:00
a103364f1a style fix 2023-06-16 12:23:43 +00:00
56e6160e6a -NightlyInstrumentation 2023-05-05 16:13:58 +00:00
360462287f .dockerignore compilation artifacts 2023-04-28 09:52:20 +00:00
7 changed files with 387 additions and 425 deletions

View File

@ -1 +1,4 @@
.git*
__pycache__
*.egg-info
build

View File

@ -4,10 +4,12 @@ import sys
import threading
from contextlib import ExitStack
from rainbowadn.instrument import Instrumentation
try:
sys.path.append(str((pathlib.Path(__file__).parent / '../..').absolute()))
from ptvp35 import DbFactory, DbConnection, KVJson
from ptvp35.instrumentation import InstrumentDiskWrites, NightlyInstrumentation
sys.path.append(str((pathlib.Path(__file__).parent / "../..").absolute()))
from ptvp35 import DbConnection, DbFactory, KVJson
from ptvp35.instrumentation import InstrumentDiskWrites
except:
raise
@ -22,308 +24,297 @@ class LogWrites(InstrumentDiskWrites):
self.loop = asyncio.get_running_loop()
def on_write(self, line: str, /) -> None:
asyncio.run_coroutine_threadsafe(
aprint(f'{self.methodname}[{line}]'), self.loop
).result()
asyncio.run_coroutine_threadsafe(aprint(f"{self.methodname}[{line}]"), self.loop).result()
class LogEE(NightlyInstrumentation):
class LogEE(Instrumentation):
def __init__(self, target, methodname: str):
super().__init__(target, methodname)
self.loop = asyncio.get_running_loop()
def _target_id(self) -> str:
name = (
self.target.__name__
if
hasattr(self.target, '__name__')
else
self.target.__class__.__name__
)
return f'{name}.{self.methodname}'
name = self.target.__name__ if hasattr(self.target, "__name__") else self.target.__class__.__name__
return f"{name}.{self.methodname}"
def _print(self, thread, *args) -> None:
print(
thread,
self._target_id(),
*args,
sep='\t'
)
print(thread, self._target_id(), *args, sep="\t")
async def aprint(self, thread, *args) -> None:
self._print(thread, *args)
def print(self, *args) -> None:
if (ct := threading.current_thread()) is threading.main_thread():
self._print('main', *args)
self._print("main", *args)
else:
asyncio.run_coroutine_threadsafe(
self.aprint('aux', *args), self.loop
).result()
asyncio.run_coroutine_threadsafe(self.aprint("aux", *args), self.loop).result()
def instrument(self, method, *args, **kwargs):
self.print('enter')
self.print("enter")
try:
result = method(*args, **kwargs)
except:
self.print('error')
self.print("error")
raise
else:
self.print('exit')
self.print("exit")
return result
class ALogEE(LogEE):
async def instrument(self, method, *args, **kwargs):
self._print('aio', 'enter')
self._print("aio", "enter")
try:
result = await method(*args, **kwargs)
except:
self._print('aio', 'error')
self._print("aio", "error")
raise
else:
self._print('aio', 'exit')
self._print("aio", "exit")
return result
async def transaction_test(db: DbConnection):
def logdb(*args):
if args:
args = (' ', ' ', '@',) + args
print(db.get('test', '0'), *args, sep='\t')
args = (
" ",
" ",
"@",
) + args
print(db.get("test", "0"), *args, sep="\t")
def logstate(*args):
if args:
args = ('@',) + args
print(db.get('test', '0'), '|', state.get('test', '0'), *args, sep='\t')
args = ("@",) + args
print(db.get("test", "0"), "|", state.get("test", "0"), *args, sep="\t")
logdb('empty db')
db.set_nowait('test', '1')
logdb('after set_nowait')
await db.set('test', '2')
logdb('after set')
logdb("empty db")
db.set_nowait("test", "1")
logdb("after set_nowait")
await db.set("test", "2")
logdb("after set")
try:
async with db.transaction() as state:
logstate('empty transaction')
state.set_nowait('test', '3')
logstate('after transaction.set_nowait')
logstate("empty transaction")
state.set_nowait("test", "3")
logstate("after transaction.set_nowait")
state.submit()
logstate('after transaction.submit')
logstate("after transaction.submit")
await state.commit()
logstate('after transaction.commit')
state.set_nowait('test', print) # will throw TypeError later
logstate("after transaction.commit")
state.set_nowait("test", print) # will throw TypeError later
logstate()
except TypeError:
print('type error')
logdb('after transaction')
print("type error")
logdb("after transaction")
async with db.transaction() as state:
logstate()
state.set_nowait('test', '4')
logstate('before implicit transaction.commit')
logdb('after transaction with implicit commit')
state.set_nowait("test", "4")
logstate("before implicit transaction.commit")
logdb("after transaction with implicit commit")
with db.transaction() as state:
logstate()
state.set_nowait('test', '5')
logstate('before implicit transaction.submit')
logdb('after transaction with implicit submit')
state.set_nowait("test", "5")
logstate("before implicit transaction.submit")
logdb("after transaction with implicit submit")
def print_private_db_attrs(db: DbConnection):
if run_all:
for attr in dir(db):
if attr.startswith('_DbConnection') and hasattr(db, attr):
if attr.startswith("_DbConnection") and hasattr(db, attr):
print(attr)
run_all = 'all' in sys.argv
run_all = "all" in sys.argv
async def main():
(path := pathlib.Path(__file__).parent / 'trace_example.db').unlink(missing_ok=True)
(path := pathlib.Path(__file__).parent / "trace_example.db").unlink(missing_ok=True)
with ExitStack() as es:
LogWrites().enter(es)
if run_all:
LogEE(__import__('ptvp35').Request, '__init__').enter(es)
LogEE(__import__('ptvp35').Request, 'waiting').enter(es)
LogEE(__import__('ptvp35').Request, 'set_result').enter(es)
LogEE(__import__('ptvp35').Request, 'set_exception').enter(es)
ALogEE(__import__('ptvp35').Request, 'wait').enter(es)
LogEE(__import__("ptvp35").Request, "__init__").enter(es)
LogEE(__import__("ptvp35").Request, "waiting").enter(es)
LogEE(__import__("ptvp35").Request, "set_result").enter(es)
LogEE(__import__("ptvp35").Request, "set_exception").enter(es)
ALogEE(__import__("ptvp35").Request, "wait").enter(es)
LogEE(__import__('ptvp35').LineRequest, '__init__').enter(es)
LogEE(__import__("ptvp35").LineRequest, "__init__").enter(es)
LogEE(__import__('ptvp35').KVFactory, 'run').enter(es)
LogEE(__import__('ptvp35').KVFactory, '_dbset').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'dbset').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'dbget').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'filter_value').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'request').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'free').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'io2db').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'db2io').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'path2db_sync').enter(es)
LogEE(__import__('ptvp35').KVFactory, 'db2path_sync').enter(es)
LogEE(__import__("ptvp35").KVFactory, "run").enter(es)
LogEE(__import__("ptvp35").KVFactory, "_dbset").enter(es)
LogEE(__import__("ptvp35").KVFactory, "dbset").enter(es)
LogEE(__import__("ptvp35").KVFactory, "dbget").enter(es)
LogEE(__import__("ptvp35").KVFactory, "filter_value").enter(es)
LogEE(__import__("ptvp35").KVFactory, "request").enter(es)
LogEE(__import__("ptvp35").KVFactory, "free").enter(es)
LogEE(__import__("ptvp35").KVFactory, "io2db").enter(es)
LogEE(__import__("ptvp35").KVFactory, "db2io").enter(es)
LogEE(__import__("ptvp35").KVFactory, "path2db_sync").enter(es)
LogEE(__import__("ptvp35").KVFactory, "db2path_sync").enter(es)
LogEE(__import__('ptvp35').KVRequest, '__init__').enter(es)
LogEE(__import__("ptvp35").KVRequest, "__init__").enter(es)
LogEE(__import__('ptvp35').KVJson, 'line').enter(es)
LogEE(__import__('ptvp35').KVJson, '_load_key').enter(es)
LogEE(__import__('ptvp35').KVJson, 'fromline').enter(es)
LogEE(__import__("ptvp35").KVJson, "line").enter(es)
LogEE(__import__("ptvp35").KVJson, "_load_key").enter(es)
LogEE(__import__("ptvp35").KVJson, "fromline").enter(es)
LogEE(__import__('ptvp35').TransactionRequest, '__init__').enter(es)
LogEE(__import__("ptvp35").TransactionRequest, "__init__").enter(es)
LogEE(__import__('ptvp35').DbParameters, '__init__').enter(es)
LogEE(__import__("ptvp35").DbParameters, "__init__").enter(es)
LogEE(__import__('ptvp35').VirtualConnection, 'transaction').enter(es)
LogEE(__import__("ptvp35").VirtualConnection, "transaction").enter(es)
LogEE(__import__('ptvp35')._Loop, '__init__').enter(es)
LogEE(__import__('ptvp35')._Loop, 'create_future').enter(es)
LogEE(__import__('ptvp35')._Loop, 'loop').enter(es)
LogEE(__import__('ptvp35')._Loop, 'run_in_thread').enter(es)
LogEE(__import__("ptvp35")._Loop, "__init__").enter(es)
LogEE(__import__("ptvp35")._Loop, "create_future").enter(es)
LogEE(__import__("ptvp35")._Loop, "loop").enter(es)
LogEE(__import__("ptvp35")._Loop, "run_in_thread").enter(es)
LogEE(__import__('ptvp35')._Errors, '__init__').enter(es)
LogEE(__import__('ptvp35')._Errors, '_save_sync').enter(es)
ALogEE(__import__('ptvp35')._Errors, '_save').enter(es)
LogEE(__import__('ptvp35')._Errors, 'save_from_thread').enter(es)
LogEE(__import__("ptvp35")._Errors, "__init__").enter(es)
LogEE(__import__("ptvp35")._Errors, "_save_sync").enter(es)
ALogEE(__import__("ptvp35")._Errors, "_save").enter(es)
LogEE(__import__("ptvp35")._Errors, "save_from_thread").enter(es)
LogEE(__import__('ptvp35')._File, '__init__').enter(es)
LogEE(__import__('ptvp35')._File, 'path').enter(es)
LogEE(__import__('ptvp35')._File, 'tell').enter(es)
LogEE(__import__('ptvp35')._File, 'write_to_disk_sync').enter(es)
LogEE(__import__('ptvp35')._File, 'open_sync').enter(es)
LogEE(__import__('ptvp35')._File, 'close_sync').enter(es)
LogEE(__import__("ptvp35")._File, "__init__").enter(es)
LogEE(__import__("ptvp35")._File, "path").enter(es)
LogEE(__import__("ptvp35")._File, "tell").enter(es)
LogEE(__import__("ptvp35")._File, "write_to_disk_sync").enter(es)
LogEE(__import__("ptvp35")._File, "open_sync").enter(es)
LogEE(__import__("ptvp35")._File, "close_sync").enter(es)
LogEE(__import__('ptvp35')._Backup, '__init__').enter(es)
LogEE(__import__('ptvp35')._Backup, 'file').enter(es)
LogEE(__import__('ptvp35')._Backup, 'kvfactory').enter(es)
LogEE(__import__('ptvp35')._Backup, '_copy_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, '_recovery_unset_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, '_finish_recovery_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, '_recovery_set_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, 'build_file_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, '_rebuild_file_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, '_reload_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, 'run_in_thread').enter(es)
ALogEE(__import__('ptvp35')._Backup, '_reload').enter(es)
ALogEE(__import__('ptvp35')._Backup, 'reload_if_oversized').enter(es)
LogEE(__import__('ptvp35')._Backup, 'load_mmdb_sync').enter(es)
LogEE(__import__('ptvp35')._Backup, 'uninitialize').enter(es)
LogEE(__import__("ptvp35")._Backup, "__init__").enter(es)
LogEE(__import__("ptvp35")._Backup, "file").enter(es)
LogEE(__import__("ptvp35")._Backup, "kvfactory").enter(es)
LogEE(__import__("ptvp35")._Backup, "_copy_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "_recovery_unset_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "_finish_recovery_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "_recovery_set_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "build_file_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "_rebuild_file_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "_reload_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "run_in_thread").enter(es)
ALogEE(__import__("ptvp35")._Backup, "_reload").enter(es)
ALogEE(__import__("ptvp35")._Backup, "reload_if_oversized").enter(es)
LogEE(__import__("ptvp35")._Backup, "load_mmdb_sync").enter(es)
LogEE(__import__("ptvp35")._Backup, "uninitialize").enter(es)
LogEE(__import__('ptvp35')._Guard, '__init__').enter(es)
LogEE(__import__('ptvp35')._Guard, 'backup').enter(es)
LogEE(__import__('ptvp35')._Guard, '_write_bytes_sync').enter(es)
LogEE(__import__('ptvp35')._Guard, '_write_value_sync').enter(es)
LogEE(__import__('ptvp35')._Guard, '_set_sync').enter(es)
LogEE(__import__('ptvp35')._Guard, '_unset_sync').enter(es)
LogEE(__import__('ptvp35')._Guard, '_read_bytes_sync').enter(es)
LogEE(__import__('ptvp35')._Guard, '_read_value_sync').enter(es)
LogEE(__import__('ptvp35')._Guard, '_truncate_sync').enter(es)
LogEE(__import__('ptvp35')._Guard, 'assure_sync').enter(es)
LogEE(__import__('ptvp35')._Guard, '_file_truncate_sync').enter(es)
LogEE(__import__('ptvp35')._Guard, 'file_write_sync').enter(es)
LogEE(__import__("ptvp35")._Guard, "__init__").enter(es)
LogEE(__import__("ptvp35")._Guard, "backup").enter(es)
LogEE(__import__("ptvp35")._Guard, "_write_bytes_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "_write_value_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "_set_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "_unset_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "_read_bytes_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "_read_value_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "_truncate_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "assure_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "_file_truncate_sync").enter(es)
LogEE(__import__("ptvp35")._Guard, "file_write_sync").enter(es)
LogEE(__import__('ptvp35')._ReceivingQueue, '__init__').enter(es)
LogEE(__import__('ptvp35')._ReceivingQueue, 'submit').enter(es)
LogEE(__import__("ptvp35")._ReceivingQueue, "__init__").enter(es)
LogEE(__import__("ptvp35")._ReceivingQueue, "submit").enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, '__init__').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, 'writeable').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, 'loop').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, '_compressed').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, '_commit_compressed_sync').enter(es)
ALogEE(__import__('ptvp35')._WriteableBuffer, '_commit_compressed').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, '_clear').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, '_satisfy_future').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, '_fail_future').enter(es)
ALogEE(__import__('ptvp35')._WriteableBuffer, '_do_commit_buffer').enter(es)
LogEE(__import__('ptvp35')._WriteableBuffer, '_request_buffer').enter(es)
ALogEE(__import__('ptvp35')._WriteableBuffer, '_commit').enter(es)
ALogEE(__import__('ptvp35')._WriteableBuffer, '_commit_or_request_so').enter(es)
ALogEE(__import__('ptvp35')._WriteableBuffer, '_write').enter(es)
ALogEE(__import__('ptvp35')._WriteableBuffer, '_handle_request').enter(es)
ALogEE(__import__('ptvp35')._WriteableBuffer, '_close').enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "__init__").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "writeable").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "loop").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "_compressed").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "_commit_compressed_sync").enter(es)
ALogEE(__import__("ptvp35")._WriteableBuffer, "_commit_compressed").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "_clear").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "_satisfy_future").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "_fail_future").enter(es)
ALogEE(__import__("ptvp35")._WriteableBuffer, "_do_commit_buffer").enter(es)
LogEE(__import__("ptvp35")._WriteableBuffer, "_request_buffer").enter(es)
ALogEE(__import__("ptvp35")._WriteableBuffer, "_commit").enter(es)
ALogEE(__import__("ptvp35")._WriteableBuffer, "_commit_or_request_so").enter(es)
ALogEE(__import__("ptvp35")._WriteableBuffer, "_write").enter(es)
ALogEE(__import__("ptvp35")._WriteableBuffer, "_handle_request").enter(es)
ALogEE(__import__("ptvp35")._WriteableBuffer, "_close").enter(es)
LogEE(__import__('ptvp35')._Memory, '__init__').enter(es)
LogEE(__import__('ptvp35')._Memory, '_initialize_sync').enter(es)
LogEE(__import__('ptvp35')._Memory, '_load_from_file_sync').enter(es)
ALogEE(__import__('ptvp35')._Memory, '_load_from_file').enter(es)
LogEE(__import__('ptvp35')._Memory, '_close_sync').enter(es)
ALogEE(__import__('ptvp35')._Memory, '_close').enter(es)
LogEE(__import__('ptvp35')._Memory, '_transaction_buffer').enter(es)
LogEE(__import__('ptvp35')._Memory, 'get').enter(es)
LogEE(__import__('ptvp35')._Memory, 'set').enter(es)
LogEE(__import__("ptvp35")._Memory, "__init__").enter(es)
LogEE(__import__("ptvp35")._Memory, "_initialize_sync").enter(es)
LogEE(__import__("ptvp35")._Memory, "_load_from_file_sync").enter(es)
ALogEE(__import__("ptvp35")._Memory, "_load_from_file").enter(es)
LogEE(__import__("ptvp35")._Memory, "_close_sync").enter(es)
ALogEE(__import__("ptvp35")._Memory, "_close").enter(es)
LogEE(__import__("ptvp35")._Memory, "_transaction_buffer").enter(es)
LogEE(__import__("ptvp35")._Memory, "get").enter(es)
LogEE(__import__("ptvp35")._Memory, "set").enter(es)
LogEE(__import__('ptvp35')._QueueTask, '__init__').enter(es)
ALogEE(__import__('ptvp35')._QueueTask, '_background_cycle').enter(es)
ALogEE(__import__('ptvp35')._QueueTask, '_background_task').enter(es)
ALogEE(__import__('ptvp35')._QueueTask, 'close').enter(es)
LogEE(__import__('ptvp35')._QueueTask, 'start').enter(es)
LogEE(__import__("ptvp35")._QueueTask, "__init__").enter(es)
ALogEE(__import__("ptvp35")._QueueTask, "_background_cycle").enter(es)
ALogEE(__import__("ptvp35")._QueueTask, "_background_task").enter(es)
ALogEE(__import__("ptvp35")._QueueTask, "close").enter(es)
LogEE(__import__("ptvp35")._QueueTask, "start").enter(es)
LogEE(__import__('ptvp35')._DbConnection, '__init__').enter(es)
LogEE(__import__('ptvp35')._DbConnection, 'kvprotocol').enter(es)
LogEE(__import__('ptvp35')._DbConnection, 'get').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, 'set').enter(es)
LogEE(__import__('ptvp35')._DbConnection, 'set_nowait').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, '_initialize_running').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, '_initialize').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, 'create').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, '_close_running').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, 'aclose').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, 'commit_transaction').enter(es)
LogEE(__import__('ptvp35')._DbConnection, 'submit_transaction').enter(es)
LogEE(__import__('ptvp35')._DbConnection, 'submit_transaction_request').enter(es)
ALogEE(__import__('ptvp35')._DbConnection, 'commit').enter(es)
LogEE(__import__('ptvp35')._DbConnection, 'loop').enter(es)
LogEE(__import__('ptvp35')._DbConnection, 'transaction').enter(es)
LogEE(__import__("ptvp35")._DbConnection, "__init__").enter(es)
LogEE(__import__("ptvp35")._DbConnection, "kvprotocol").enter(es)
LogEE(__import__("ptvp35")._DbConnection, "get").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "set").enter(es)
LogEE(__import__("ptvp35")._DbConnection, "set_nowait").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "_initialize_running").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "_initialize").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "create").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "_close_running").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "aclose").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "commit_transaction").enter(es)
LogEE(__import__("ptvp35")._DbConnection, "submit_transaction").enter(es)
LogEE(__import__("ptvp35")._DbConnection, "submit_transaction_request").enter(es)
ALogEE(__import__("ptvp35")._DbConnection, "commit").enter(es)
LogEE(__import__("ptvp35")._DbConnection, "loop").enter(es)
LogEE(__import__("ptvp35")._DbConnection, "transaction").enter(es)
LogEE(__import__('ptvp35').DbManager, '__init__').enter(es)
ALogEE(__import__('ptvp35').DbManager, '__aenter__').enter(es)
ALogEE(__import__('ptvp35').DbManager, '__aexit__').enter(es)
LogEE(__import__("ptvp35").DbManager, "__init__").enter(es)
ALogEE(__import__("ptvp35").DbManager, "__aenter__").enter(es)
ALogEE(__import__("ptvp35").DbManager, "__aexit__").enter(es)
LogEE(__import__('ptvp35').Db, '__init__').enter(es)
ALogEE(__import__('ptvp35').Db, '__aenter__').enter(es)
ALogEE(__import__('ptvp35').Db, '__aexit__').enter(es)
LogEE(__import__("ptvp35").Db, "__init__").enter(es)
ALogEE(__import__("ptvp35").Db, "__aenter__").enter(es)
ALogEE(__import__("ptvp35").Db, "__aexit__").enter(es)
LogEE(__import__('ptvp35').FutureContext, '__init__').enter(es)
ALogEE(__import__('ptvp35').FutureContext, '__aenter__').enter(es)
ALogEE(__import__('ptvp35').FutureContext, '__aexit__').enter(es)
ALogEE(__import__('ptvp35').FutureContext, 'wait').enter(es)
LogEE(__import__("ptvp35").FutureContext, "__init__").enter(es)
ALogEE(__import__("ptvp35").FutureContext, "__aenter__").enter(es)
ALogEE(__import__("ptvp35").FutureContext, "__aexit__").enter(es)
ALogEE(__import__("ptvp35").FutureContext, "wait").enter(es)
LogEE(__import__('ptvp35').TransactionView, '__init__').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'future_context').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'rollback').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'illuminate').enter(es)
ALogEE(__import__('ptvp35').TransactionView, 'ailluminate').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'fork').enter(es)
ALogEE(__import__('ptvp35').TransactionView, 'afork').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'clear').enter(es)
ALogEE(__import__('ptvp35').TransactionView, 'aclear').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'reset').enter(es)
ALogEE(__import__('ptvp35').TransactionView, 'areset').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'get').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'set_nowait').enter(es)
LogEE(__import__('ptvp35').TransactionView, '_delta').enter(es)
ALogEE(__import__('ptvp35').TransactionView, 'commit').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'submit').enter(es)
LogEE(__import__('ptvp35').TransactionView, '_do_gather').enter(es)
LogEE(__import__('ptvp35').TransactionView, '_reduce_future').enter(es)
LogEE(__import__('ptvp35').TransactionView, '_gather').enter(es)
ALogEE(__import__('ptvp35').TransactionView, 'commit_transaction').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'submit_transaction').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'submit_transaction_request').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'loop').enter(es)
LogEE(__import__('ptvp35').TransactionView, 'transaction').enter(es)
LogEE(__import__("ptvp35").TransactionView, "__init__").enter(es)
LogEE(__import__("ptvp35").TransactionView, "future_context").enter(es)
LogEE(__import__("ptvp35").TransactionView, "rollback").enter(es)
LogEE(__import__("ptvp35").TransactionView, "illuminate").enter(es)
ALogEE(__import__("ptvp35").TransactionView, "ailluminate").enter(es)
LogEE(__import__("ptvp35").TransactionView, "fork").enter(es)
ALogEE(__import__("ptvp35").TransactionView, "afork").enter(es)
LogEE(__import__("ptvp35").TransactionView, "clear").enter(es)
ALogEE(__import__("ptvp35").TransactionView, "aclear").enter(es)
LogEE(__import__("ptvp35").TransactionView, "reset").enter(es)
ALogEE(__import__("ptvp35").TransactionView, "areset").enter(es)
LogEE(__import__("ptvp35").TransactionView, "get").enter(es)
LogEE(__import__("ptvp35").TransactionView, "set_nowait").enter(es)
LogEE(__import__("ptvp35").TransactionView, "_delta").enter(es)
ALogEE(__import__("ptvp35").TransactionView, "commit").enter(es)
LogEE(__import__("ptvp35").TransactionView, "submit").enter(es)
LogEE(__import__("ptvp35").TransactionView, "_do_gather").enter(es)
LogEE(__import__("ptvp35").TransactionView, "_reduce_future").enter(es)
LogEE(__import__("ptvp35").TransactionView, "_gather").enter(es)
ALogEE(__import__("ptvp35").TransactionView, "commit_transaction").enter(es)
LogEE(__import__("ptvp35").TransactionView, "submit_transaction").enter(es)
LogEE(__import__("ptvp35").TransactionView, "submit_transaction_request").enter(es)
LogEE(__import__("ptvp35").TransactionView, "loop").enter(es)
LogEE(__import__("ptvp35").TransactionView, "transaction").enter(es)
LogEE(__import__('ptvp35').Transaction, '__init__').enter(es)
ALogEE(__import__('ptvp35').Transaction, '__aenter__').enter(es)
ALogEE(__import__('ptvp35').Transaction, '__aexit__').enter(es)
LogEE(__import__('ptvp35').Transaction, '_clean').enter(es)
LogEE(__import__('ptvp35').Transaction, '__enter__').enter(es)
LogEE(__import__('ptvp35').Transaction, '__exit__').enter(es)
LogEE(__import__("ptvp35").Transaction, "__init__").enter(es)
ALogEE(__import__("ptvp35").Transaction, "__aenter__").enter(es)
ALogEE(__import__("ptvp35").Transaction, "__aexit__").enter(es)
LogEE(__import__("ptvp35").Transaction, "_clean").enter(es)
LogEE(__import__("ptvp35").Transaction, "__enter__").enter(es)
LogEE(__import__("ptvp35").Transaction, "__exit__").enter(es)
async with DbFactory(path, kvfactory=KVJson()) as db:
await transaction_test(db)
print_private_db_attrs(db)
print_private_db_attrs(db)
if __name__ == '__main__':
if __name__ == "__main__":
asyncio.run(main())

View File

@ -9,34 +9,34 @@
import os.path
import sys
project = 'ptvp35'
copyright = '2022, PARRRATE TNV'
author = 'PARRRATE TNV'
with open('../../setup.py') as f:
project = "ptvp35"
copyright = "2022, PARRRATE TNV"
author = "PARRRATE TNV"
with open("../../setup.py") as f:
_src = f.read()
_src = _src[_src.index('version=\'') + 9:]
_src = _src[:_src.index('\'')]
_src = _src[_src.index('version="') + 9 :]
_src = _src[: _src.index('"')]
release = _src
# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
extensions = [
'sphinx.ext.autodoc',
"sphinx.ext.autodoc",
]
templates_path = ['_templates']
templates_path = ["_templates"]
exclude_patterns = []
# -- Options for HTML output -------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
html_theme = 'pydata_sphinx_theme'
html_theme = "pydata_sphinx_theme"
html_theme_options = {
"navbar_center": [],
}
html_static_path = ['_static']
html_static_path = ["_static"]
sys.path.insert(0, os.path.abspath('../..'))
sys.path.insert(0, os.path.abspath("../.."))

View File

@ -1,23 +1,23 @@
# Licensed under MIT License. Copyright: 2022-2023 Alisa Feistel, PARRRATE TNV.
# Licensed under MIT License. Copyright: 2021-2023 Alisa Feistel, PARRRATE TNV.
from __future__ import annotations
__all__ = (
'VDELETE',
'KVProtocol',
'KVFactory',
'KVJson',
'VirtualConnection',
'ExtendedVirtualConnection',
'DbInterface',
'AbstractDbConnection',
'DbConnection',
'DbManager',
'DbFactory',
'Db',
'Transaction',
'TransactionView',
'FutureContext',
"VDELETE",
"KVProtocol",
"KVFactory",
"KVJson",
"VirtualConnection",
"ExtendedVirtualConnection",
"DbInterface",
"AbstractDbConnection",
"DbConnection",
"DbManager",
"DbFactory",
"Db",
"Transaction",
"TransactionView",
"FutureContext",
)
import abc
@ -33,9 +33,7 @@ from typing import IO, Any, Protocol, TypeAlias
class Request:
__slots__ = (
'__future',
)
__slots__ = ("__future",)
def __init__(self, future: asyncio.Future | None, /) -> None:
self.__future = future
@ -57,9 +55,7 @@ class Request:
class LineRequest(Request):
__slots__ = (
'line',
)
__slots__ = ("line",)
def __init__(self, line: str, /, *, future: asyncio.Future | None) -> None:
super().__init__(future)
@ -76,7 +72,8 @@ VDELETE = object()
class KVFactory(KVProtocol):
"""this class is for working with already normalised data values, not for data transformation (e.g. reducing keys to a common form).
"""\
this class is for working with already normalised data values, not for data transformation (e.g. reducing keys to a common form).
that functionality may be added in the future, though, probably, only for custom DbConnection implementations.
note: unstable signature."""
@ -84,18 +81,21 @@ note: unstable signature."""
@abc.abstractmethod
def line(self, key: Any, value: Any, /) -> str:
"""line must contain exactly one '\\n' at exactly the end if the line is not empty.
"""\
line must contain exactly one '\\n' at exactly the end if the line is not empty.
note: other forms of requests will later be represented by different methods or by instances of Action class."""
raise NotImplementedError
@abc.abstractmethod
def fromline(self, line: str, /) -> tuple[Any, Any]:
"""inverse of line().
"""\
inverse of line().
note: unstable signature."""
raise NotImplementedError
def run(self, line: str, db: dict, reduce: bool, /) -> None:
"""run request against the db.
"""\
run request against the db.
extensible to allow forms of requests other than set.
note: unstable signature."""
key, value = self.fromline(line)
@ -121,18 +121,21 @@ note: unstable signature."""
return value
def request(self, key: Any, value: Any, /, *, future: asyncio.Future | None) -> KVRequest:
"""form request with Future.
"""\
form request with Future.
low-level API.
note: unstable signature."""
return KVRequest(key, value, future=future, factory=self)
def free(self, key: Any, value: Any, /) -> KVRequest:
"""result free from Future.
"""\
result free from Future.
note: unstable signature."""
return self.request(key, value, future=None)
def io2db(self, io: IO[str], db: dict, reduce: bool, /) -> int:
"""note: unstable signature."""
"""\
note: unstable signature."""
size = 0
for line in io:
self.run(line, db, reduce)
@ -140,7 +143,8 @@ note: unstable signature."""
return size
def db2io(self, db: dict, io: IO[str], /) -> int:
"""note: unstable signature."""
"""\
note: unstable signature."""
size = 0
for key, value in db.items():
size += io.write(self.line(key, value))
@ -148,11 +152,11 @@ note: unstable signature."""
def path2db_sync(self, path: pathlib.Path, db: dict, /) -> int:
path.touch()
with path.open('r') as file:
with path.open("r") as file:
return self.io2db(file, db, True)
def db2path_sync(self, db: dict, path: pathlib.Path, /) -> int:
with path.open('w') as file:
with path.open("w") as file:
initial_size = self.db2io(db, file)
os.fsync(file.fileno())
return initial_size
@ -160,9 +164,9 @@ note: unstable signature."""
class KVRequest(LineRequest):
__slots__ = (
'__factory',
'key',
'value',
"__factory",
"key",
"value",
)
def __init__(self, key: Any, value: Any, /, *, future: asyncio.Future | None, factory: KVFactory) -> None:
@ -187,10 +191,10 @@ class KVJson(KVFactory):
def line(self, key: Any, value: Any, /) -> str:
if value is VDELETE:
obj = {'key': key}
obj = {"key": key}
else:
obj = {'key': key, 'value': value}
return json.dumps(obj) + '\n'
obj = {"key": key, "value": value}
return json.dumps(obj) + "\n"
def _load_key(self, key: Any, /) -> Hashable:
"""note: unstable signature."""
@ -202,17 +206,15 @@ class KVJson(KVFactory):
case dict():
return tuple((self._load_key(k), self._load_key(v)) for k, v in key.items())
case _:
raise TypeError('unknown KVJson key type, cannot convert to hashable')
raise TypeError("unknown KVJson key type, cannot convert to hashable")
def fromline(self, line: str, /) -> tuple[Any, Any]:
d = json.loads(line)
return self._load_key(d['key']), d.get('value', VDELETE)
return self._load_key(d["key"]), d.get("value", VDELETE)
class TransactionRequest(LineRequest):
__slots__ = (
'buffer',
)
__slots__ = ("buffer",)
def __init__(self, buffer: StringIO, /, *, future: asyncio.Future | None) -> None:
super().__init__(buffer.getvalue(), future=future)
@ -221,9 +223,9 @@ class TransactionRequest(LineRequest):
class DbParameters:
__slots__ = (
'path',
'kvfactory',
'buffersize',
"path",
"kvfactory",
"buffersize",
)
def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize: int) -> None:
@ -239,9 +241,7 @@ class RequestToClosedConnection(asyncio.InvalidStateError):
pass
class VirtualConnection(
abc.ABC
):
class VirtualConnection(abc.ABC):
"""minimal intersection of DbConnection and TransactionView functionality"""
__slots__ = ()
@ -270,9 +270,7 @@ class VirtualConnection(
return Transaction(self)
class ExtendedVirtualConnection(
VirtualConnection, abc.ABC
):
class ExtendedVirtualConnection(VirtualConnection, abc.ABC):
"""maximal intersection of DbConnection and TransactionView functionality"""
@abc.abstractmethod
@ -288,9 +286,7 @@ class ExtendedVirtualConnection(
raise NotImplementedError
class DbInterface(
ExtendedVirtualConnection, abc.ABC
):
class DbInterface(ExtendedVirtualConnection, abc.ABC):
@abc.abstractmethod
async def set(self, key: Any, value: Any, /) -> None:
raise NotImplementedError
@ -302,17 +298,20 @@ class AbstractDbConnection(Protocol):
raise NotImplementedError
async def set(self, key: Any, value: Any, /) -> None:
"""this method may take time to run.
"""\
this method may take time to run.
ordering may not be guaranteed (depends on event loop implementation)."""
raise NotImplementedError
def set_nowait(self, key: Any, value: Any, /) -> None:
"""this method is instant.
"""\
this method is instant.
ordering is guaranteed."""
raise NotImplementedError
async def commit(self, /) -> None:
"""this method may take time to run.
"""\
this method may take time to run.
respects the ordering of previously called :meth:`~ptvp35.AbstractDbConnection.set_nowait` methods.
will, depending on event loop implementation, also execute later changes."""
raise NotImplementedError
@ -322,9 +321,7 @@ will, depending on event loop implementation, also execute later changes."""
class _Loop:
__slots__ = (
'__loop',
)
__slots__ = ("__loop",)
def __init__(self, loop: asyncio.AbstractEventLoop, /) -> None:
self.__loop = loop
@ -336,7 +333,8 @@ class _Loop:
return self.__loop
def run_in_thread(self, name: str, fn, /, *args, **kwargs) -> asyncio.Future:
"""we are using our own thread to guarantee as much of autonomy and control as possible.
"""\
we are using our own thread to guarantee as much of autonomy and control as possible.
intended for heavy tasks."""
future = self.create_future()
@ -344,45 +342,37 @@ intended for heavy tasks."""
try:
result = fn(*args, **kwargs)
except Exception as exception:
self.__loop.call_soon_threadsafe(
future.set_exception, exception
)
self.__loop.call_soon_threadsafe(future.set_exception, exception)
else:
self.__loop.call_soon_threadsafe(
future.set_result, result
)
fname = getattr(fn, '__name__', '?')
threading.Thread(
target=wrap,
name=f'persistence5-{name}-{fname}'
).start()
self.__loop.call_soon_threadsafe(future.set_result, result)
fname = getattr(fn, "__name__", "?")
threading.Thread(target=wrap, name=f"persistence5-{name}-{fname}").start()
return future
class _Errors:
__slots__ = (
'__path',
'__loop',
'__event_loop',
"__path",
"__loop",
"__event_loop",
)
def __init__(self, path: pathlib.Path, loop: _Loop, /) -> None:
self.__path = path.with_name(path.name + '.error')
self.__path = path.with_name(path.name + ".error")
self.__loop = loop
self.__event_loop = loop.loop()
def _save_sync(self, line: str, /) -> None:
with self.__path.open('a') as file:
file.write(line.strip() + '\n')
with self.__path.open("a") as file:
file.write(line.strip() + "\n")
async def _save(self, line: str, /) -> None:
await self.__event_loop.run_in_executor(None, self._save_sync, line)
def _schedule(self, line: str, /) -> concurrent.futures.Future:
return asyncio.run_coroutine_threadsafe(
self._save(line), self.__event_loop
)
return asyncio.run_coroutine_threadsafe(self._save(line), self.__event_loop)
def save_from_thread(self, line: str, /) -> None:
self._schedule(line).result()
@ -390,8 +380,8 @@ class _Errors:
class _File:
__slots__ = (
'__path',
'__file',
"__path",
"__file",
)
__file: IO[str]
@ -414,7 +404,7 @@ class _File:
pass
def open_sync(self, /) -> None:
self.__file = self.__path.open('a')
self.__file = self.__path.open("a")
def close_sync(self, /) -> None:
self.__file.close()
@ -423,13 +413,13 @@ class _File:
class _Backup:
__slots__ = (
'__file',
'__kvfactory',
'__loop',
'__path',
'__backup',
'__recover',
'__initial_size',
"__file",
"__kvfactory",
"__loop",
"__path",
"__backup",
"__recover",
"__initial_size",
)
__initial_size: int
@ -439,8 +429,8 @@ class _Backup:
self.__kvfactory = kvfactory
self.__loop = loop
self.__path = path
self.__backup = path.with_name(path.name + '.backup')
self.__recover = path.with_name(path.name + '.recover')
self.__backup = path.with_name(path.name + ".backup")
self.__recover = path.with_name(path.name + ".recover")
def file(self, /) -> _File:
return self.__file
@ -502,12 +492,12 @@ class _Backup:
class _Guard:
__slots__ = (
'__backup',
'__error',
'__file',
'__path',
'__truncate',
'__flag',
"__backup",
"__error",
"__file",
"__path",
"__truncate",
"__flag",
)
def __init__(self, backup: _Backup, error: _Errors, /) -> None:
@ -515,8 +505,8 @@ class _Guard:
self.__error = error
self.__file = backup.file()
self.__path = path = self.__file.path()
self.__truncate = path.with_name(path.name + '.truncate')
self.__flag = path.with_name(path.name + '.truncate_flag')
self.__truncate = path.with_name(path.name + ".truncate")
self.__flag = path.with_name(path.name + ".truncate_flag")
def backup(self, /) -> _Backup:
return self.__backup
@ -526,13 +516,13 @@ class _Guard:
self.__truncate.write_bytes(s)
def _write_value_sync(self, value: int, /) -> None:
self._write_bytes_sync(value.to_bytes(16, 'little'))
self._write_bytes_sync(value.to_bytes(16, "little"))
def _read_bytes_sync(self, /) -> bytes:
return self.__truncate.read_bytes()
def _read_value_sync(self, /) -> int:
return int.from_bytes(self._read_bytes_sync(), 'little')
return int.from_bytes(self._read_bytes_sync(), "little")
def _set_sync(self, /) -> None:
self._write_value_sync(self.__file.tell())
@ -543,7 +533,7 @@ class _Guard:
self.__truncate.unlink(missing_ok=True)
def _truncate_sync(self, /) -> None:
with self.__path.open('r+') as file:
with self.__path.open("r+") as file:
self._file_truncate_sync(file, self._read_value_sync())
def assure_sync(self, /) -> None:
@ -563,9 +553,7 @@ class _Guard:
class _ReceivingQueue:
__all__ = (
'__queue',
)
__all__ = ("__queue",)
def __init__(self, queue: asyncio.Queue[Request], /) -> None:
self.__queue: asyncio.Queue[Request] = queue
@ -576,25 +564,23 @@ class _ReceivingQueue:
class _WriteableBuffer:
__slots__ = (
'__buffersize',
'__guard',
'__queue',
'__backup',
'__kvfactory',
'__loop',
'__event_loop',
'__buffer',
'__buffer_future',
'__buffer_requested',
"__buffersize",
"__guard",
"__queue",
"__backup",
"__kvfactory",
"__loop",
"__event_loop",
"__buffer",
"__buffer_future",
"__buffer_requested",
)
__buffer: StringIO
__buffer_future: asyncio.Future
__buffer_requested: bool
def __init__(
self, buffersize: int, guard: _Guard, queue: _ReceivingQueue, loop: _Loop, /
) -> None:
def __init__(self, buffersize: int, guard: _Guard, queue: _ReceivingQueue, loop: _Loop, /) -> None:
self.__buffersize = buffersize
self.__guard = guard
self.__queue = queue
@ -647,16 +633,14 @@ class _WriteableBuffer:
def _request_buffer(self, request: Request, /) -> None:
if request.waiting():
def callback(bf: asyncio.Future) -> None:
if (e := bf.exception()) is not None:
request.set_exception(e)
else:
request.set_result(None)
self.__buffer_future.exception
self.__buffer_future.add_done_callback(
callback
)
self.__buffer_future.add_done_callback(callback)
if not self.__buffer_requested:
self.__buffer_requested = True
self.__queue.submit(CommitRequest(None))
@ -702,12 +686,12 @@ class _WriteableBuffer:
class _Memory:
__slots__ = (
'__backup',
'__guard',
'__file',
'__kvfactory',
'__loop',
'__mmdb',
"__backup",
"__guard",
"__file",
"__kvfactory",
"__loop",
"__mmdb",
)
__mmdb: dict
@ -745,7 +729,7 @@ class _Memory:
self.__kvfactory.dbset(self.__mmdb, key, value)
return buffer
def get(self, key: Any, default: Any, /) -> None:
def get(self, key: Any, default: Any, /) -> Any:
return self.__kvfactory.dbget(self.__mmdb, key, default)
def set(self, key: Any, value: Any, /) -> None:
@ -754,10 +738,10 @@ class _Memory:
class _QueueTask:
__slots__ = (
'__queue',
'__buffer',
'__event_loop',
'__task',
"__queue",
"__buffer",
"__event_loop",
"__task",
)
def __init__(self, queue: asyncio.Queue[Request], buffer: _WriteableBuffer, /) -> None:
@ -792,22 +776,20 @@ class _QueueTask:
self.__task = self.__event_loop.create_task(self._background_task())
class _DbConnection(
DbInterface
):
class _DbConnection(DbInterface):
"""note: unstable constructor signature."""
__slots__ = (
'__kvfactory',
'__buffersize',
'__path',
'__error',
'__not_running',
'__mmdb',
'__loop',
'__queue',
'__file',
'__task',
"__kvfactory",
"__buffersize",
"__path",
"__error",
"__not_running",
"__mmdb",
"__loop",
"__queue",
"__file",
"__task",
)
__mmdb: _Memory
@ -852,10 +834,7 @@ class _DbConnection(
self.__queue = _ReceivingQueue(queue)
self.__mmdb = _Memory(guard)
await self.__mmdb._load_from_file()
self.__task = _QueueTask(
queue,
_WriteableBuffer(self.__buffersize, guard, self.__queue, self.__loop)
)
self.__task = _QueueTask(queue, _WriteableBuffer(self.__buffersize, guard, self.__queue, self.__loop))
self.__task.start()
async def _initialize(self, /) -> None:
@ -866,7 +845,8 @@ class _DbConnection(
@classmethod
async def create(cls, parameters: DbParameters, /) -> _DbConnection:
"""connect to the factory.
"""\
connect to the factory.
note: unstable signature."""
dbconnection = _DbConnection(parameters)
await dbconnection._initialize()
@ -882,13 +862,15 @@ note: unstable signature."""
await mmdb._close()
async def aclose(self, /) -> None:
"""close the connection.
"""\
close the connection.
note: unstable signature."""
await self._close_running()
self.__running = False
async def commit_transaction(self, delta: dict, /) -> None:
"""hybrid of set() and dict.update().
"""\
hybrid of set() and dict.update().
note: unstable signature."""
if not delta:
return
@ -898,7 +880,8 @@ note: unstable signature."""
await future
def submit_transaction(self, delta: dict, /) -> None:
"""hybrid of set_nowait() and dict.update().
"""\
hybrid of set_nowait() and dict.update().
_nowait analogue of commit_transaction().
note: this method was added only for async-sync symmetry with commit_transaction().
note: unstable signature."""
@ -908,7 +891,8 @@ note: unstable signature."""
self.__queue.submit(TransactionRequest(buffer, future=None))
def submit_transaction_request(self, delta: dict, future: asyncio.Future | None, /) -> None:
"""low-level API.
"""\
low-level API.
for high-level synchronisation use transaction() instead.
note: unstable signature."""
if not delta:
@ -933,14 +917,12 @@ DbConnection: TypeAlias = DbInterface
class DbManager:
__slots__ = (
'__parameters',
'__db',
"__parameters",
"__db",
)
def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None:
self.__parameters = DbParameters(
path, kvfactory=kvfactory, buffersize=buffersize
)
self.__parameters = DbParameters(path, kvfactory=kvfactory, buffersize=buffersize)
async def __aenter__(self) -> DbInterface:
self.__db = await _DbConnection.create(self.__parameters)
@ -959,12 +941,7 @@ class Db(_DbConnection):
__slots__ = ()
def __init__(self, path: str | pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None:
_DbConnection.__init__(
self,
DbParameters(
pathlib.Path(path), kvfactory=kvfactory, buffersize=buffersize
)
)
_DbConnection.__init__(self, DbParameters(pathlib.Path(path), kvfactory=kvfactory, buffersize=buffersize))
async def __aenter__(self) -> _DbConnection:
await self._initialize()
@ -989,18 +966,16 @@ class FutureContext:
await self.__future
class TransactionView(
ExtendedVirtualConnection
):
class TransactionView(ExtendedVirtualConnection):
"""note: unstable constructor signature."""
__slots__ = (
'__delta',
'__shadow',
'__connection',
'__loop',
'__kvprotocol',
'__subfuture',
"__delta",
"__shadow",
"__connection",
"__loop",
"__kvprotocol",
"__subfuture",
)
def __init__(self, delta: dict, connection: VirtualConnection, /) -> None:
@ -1098,7 +1073,8 @@ class TransactionView(
await self.__connection.commit_transaction(delta)
def submit(self, /) -> None:
"""submit changes.
"""\
submit changes.
_nowait analogue of commit().
bulk analogue of DbConnection.set_nowait()."""
# for persistence5('s forks) developers:
@ -1185,9 +1161,9 @@ class Transaction:
"""note: unstable signature."""
__slots__ = (
'__connection',
'__view',
'__running',
"__connection",
"__view",
"__running",
)
__view: TransactionView

View File

@ -1,12 +1,12 @@
import ptvp35
from rainbowadn.instrument import Instrumentation
__all__ = ('InstrumentDiskWrites', 'NightlyInstrumentation')
__all__ = ("InstrumentDiskWrites",)
class InstrumentDiskWrites(Instrumentation):
def __init__(self, /):
super().__init__(ptvp35._File, 'write_to_disk_sync')
super().__init__(ptvp35._File, "write_to_disk_sync")
def on_write(self, line: str, /) -> None:
pass
@ -14,12 +14,3 @@ class InstrumentDiskWrites(Instrumentation):
def instrument(self, method, db, line, /):
self.on_write(line)
return method(db, line)
class NightlyInstrumentation(Instrumentation):
def __init__(self, target, methodname: str):
method = getattr(target, methodname)
if hasattr(method, '__non_nightly__'):
target = method
methodname = '__non_nightly__'
super().__init__(target, methodname)

View File

@ -1,12 +1,12 @@
from setuptools import setup
setup(
name='ptvp35',
version='1.1.0',
packages=['ptvp35'],
url='https://gitea.ongoteam.net/PTV/ptvp35',
license='MIT',
author='PARRRATE TNV',
author_email='',
description='',
name="ptvp35",
version="1.1.0",
packages=["ptvp35"],
url="https://gitea.ongoteam.net/PTV/ptvp35",
license="MIT",
author="PARRRATE TNV",
author_email="",
description="",
)

View File

@ -1,11 +1,11 @@
import asyncio
import pathlib
from ptvp35 import DbFactory, KVJson, VDELETE
from ptvp35 import VDELETE, DbFactory, KVJson
async def main():
path = pathlib.Path('test_delete.db')
path = pathlib.Path("test_delete.db")
path.unlink(missing_ok=True)
async with DbFactory(path, kvfactory=KVJson()) as connection:
connection.set_nowait(0, 0)
@ -19,4 +19,5 @@ async def main():
print(connection.get(0, 1))
# path.unlink(missing_ok=True)
asyncio.run(main())