diff --git a/ptvp35/__init__.py b/ptvp35/__init__.py index 46a328f..ba27367 100644 --- a/ptvp35/__init__.py +++ b/ptvp35/__init__.py @@ -3,21 +3,21 @@ from __future__ import annotations __all__ = ( - 'VDELETE', - 'KVProtocol', - 'KVFactory', - 'KVJson', - 'VirtualConnection', - 'ExtendedVirtualConnection', - 'DbInterface', - 'AbstractDbConnection', - 'DbConnection', - 'DbManager', - 'DbFactory', - 'Db', - 'Transaction', - 'TransactionView', - 'FutureContext', + "VDELETE", + "KVProtocol", + "KVFactory", + "KVJson", + "VirtualConnection", + "ExtendedVirtualConnection", + "DbInterface", + "AbstractDbConnection", + "DbConnection", + "DbManager", + "DbFactory", + "Db", + "Transaction", + "TransactionView", + "FutureContext", ) import abc @@ -33,9 +33,7 @@ from typing import IO, Any, Protocol, TypeAlias class Request: - __slots__ = ( - '__future', - ) + __slots__ = ("__future",) def __init__(self, future: asyncio.Future | None, /) -> None: self.__future = future @@ -57,9 +55,7 @@ class Request: class LineRequest(Request): - __slots__ = ( - 'line', - ) + __slots__ = ("line",) def __init__(self, line: str, /, *, future: asyncio.Future | None) -> None: super().__init__(future) @@ -76,7 +72,8 @@ VDELETE = object() class KVFactory(KVProtocol): - """this class is for working with already normalised data values, not for data transformation (e.g. reducing keys to a common form). + """\ +this class is for working with already normalised data values, not for data transformation (e.g. reducing keys to a common form). that functionality may be added in the future, though, probably, only for custom DbConnection implementations. note: unstable signature.""" @@ -84,18 +81,21 @@ note: unstable signature.""" @abc.abstractmethod def line(self, key: Any, value: Any, /) -> str: - """line must contain exactly one '\\n' at exactly the end if the line is not empty. + """\ +line must contain exactly one '\\n' at exactly the end if the line is not empty. note: other forms of requests will later be represented by different methods or by instances of Action class.""" raise NotImplementedError @abc.abstractmethod def fromline(self, line: str, /) -> tuple[Any, Any]: - """inverse of line(). + """\ +inverse of line(). note: unstable signature.""" raise NotImplementedError def run(self, line: str, db: dict, reduce: bool, /) -> None: - """run request against the db. + """\ +run request against the db. extensible to allow forms of requests other than set. note: unstable signature.""" key, value = self.fromline(line) @@ -121,18 +121,21 @@ note: unstable signature.""" return value def request(self, key: Any, value: Any, /, *, future: asyncio.Future | None) -> KVRequest: - """form request with Future. + """\ +form request with Future. low-level API. note: unstable signature.""" return KVRequest(key, value, future=future, factory=self) def free(self, key: Any, value: Any, /) -> KVRequest: - """result free from Future. + """\ +result free from Future. note: unstable signature.""" return self.request(key, value, future=None) def io2db(self, io: IO[str], db: dict, reduce: bool, /) -> int: - """note: unstable signature.""" + """\ +note: unstable signature.""" size = 0 for line in io: self.run(line, db, reduce) @@ -140,7 +143,8 @@ note: unstable signature.""" return size def db2io(self, db: dict, io: IO[str], /) -> int: - """note: unstable signature.""" + """\ +note: unstable signature.""" size = 0 for key, value in db.items(): size += io.write(self.line(key, value)) @@ -148,11 +152,11 @@ note: unstable signature.""" def path2db_sync(self, path: pathlib.Path, db: dict, /) -> int: path.touch() - with path.open('r') as file: + with path.open("r") as file: return self.io2db(file, db, True) def db2path_sync(self, db: dict, path: pathlib.Path, /) -> int: - with path.open('w') as file: + with path.open("w") as file: initial_size = self.db2io(db, file) os.fsync(file.fileno()) return initial_size @@ -160,9 +164,9 @@ note: unstable signature.""" class KVRequest(LineRequest): __slots__ = ( - '__factory', - 'key', - 'value', + "__factory", + "key", + "value", ) def __init__(self, key: Any, value: Any, /, *, future: asyncio.Future | None, factory: KVFactory) -> None: @@ -187,10 +191,10 @@ class KVJson(KVFactory): def line(self, key: Any, value: Any, /) -> str: if value is VDELETE: - obj = {'key': key} + obj = {"key": key} else: - obj = {'key': key, 'value': value} - return json.dumps(obj) + '\n' + obj = {"key": key, "value": value} + return json.dumps(obj) + "\n" def _load_key(self, key: Any, /) -> Hashable: """note: unstable signature.""" @@ -202,17 +206,15 @@ class KVJson(KVFactory): case dict(): return tuple((self._load_key(k), self._load_key(v)) for k, v in key.items()) case _: - raise TypeError('unknown KVJson key type, cannot convert to hashable') + raise TypeError("unknown KVJson key type, cannot convert to hashable") def fromline(self, line: str, /) -> tuple[Any, Any]: d = json.loads(line) - return self._load_key(d['key']), d.get('value', VDELETE) + return self._load_key(d["key"]), d.get("value", VDELETE) class TransactionRequest(LineRequest): - __slots__ = ( - 'buffer', - ) + __slots__ = ("buffer",) def __init__(self, buffer: StringIO, /, *, future: asyncio.Future | None) -> None: super().__init__(buffer.getvalue(), future=future) @@ -221,9 +223,9 @@ class TransactionRequest(LineRequest): class DbParameters: __slots__ = ( - 'path', - 'kvfactory', - 'buffersize', + "path", + "kvfactory", + "buffersize", ) def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize: int) -> None: @@ -239,9 +241,7 @@ class RequestToClosedConnection(asyncio.InvalidStateError): pass -class VirtualConnection( - abc.ABC -): +class VirtualConnection(abc.ABC): """minimal intersection of DbConnection and TransactionView functionality""" __slots__ = () @@ -270,9 +270,7 @@ class VirtualConnection( return Transaction(self) -class ExtendedVirtualConnection( - VirtualConnection, abc.ABC -): +class ExtendedVirtualConnection(VirtualConnection, abc.ABC): """maximal intersection of DbConnection and TransactionView functionality""" @abc.abstractmethod @@ -288,9 +286,7 @@ class ExtendedVirtualConnection( raise NotImplementedError -class DbInterface( - ExtendedVirtualConnection, abc.ABC -): +class DbInterface(ExtendedVirtualConnection, abc.ABC): @abc.abstractmethod async def set(self, key: Any, value: Any, /) -> None: raise NotImplementedError @@ -302,17 +298,20 @@ class AbstractDbConnection(Protocol): raise NotImplementedError async def set(self, key: Any, value: Any, /) -> None: - """this method may take time to run. + """\ +this method may take time to run. ordering may not be guaranteed (depends on event loop implementation).""" raise NotImplementedError def set_nowait(self, key: Any, value: Any, /) -> None: - """this method is instant. + """\ +this method is instant. ordering is guaranteed.""" raise NotImplementedError async def commit(self, /) -> None: - """this method may take time to run. + """\ +this method may take time to run. respects the ordering of previously called :meth:`~ptvp35.AbstractDbConnection.set_nowait` methods. will, depending on event loop implementation, also execute later changes.""" raise NotImplementedError @@ -322,9 +321,7 @@ will, depending on event loop implementation, also execute later changes.""" class _Loop: - __slots__ = ( - '__loop', - ) + __slots__ = ("__loop",) def __init__(self, loop: asyncio.AbstractEventLoop, /) -> None: self.__loop = loop @@ -336,7 +333,8 @@ class _Loop: return self.__loop def run_in_thread(self, name: str, fn, /, *args, **kwargs) -> asyncio.Future: - """we are using our own thread to guarantee as much of autonomy and control as possible. + """\ +we are using our own thread to guarantee as much of autonomy and control as possible. intended for heavy tasks.""" future = self.create_future() @@ -344,45 +342,37 @@ intended for heavy tasks.""" try: result = fn(*args, **kwargs) except Exception as exception: - self.__loop.call_soon_threadsafe( - future.set_exception, exception - ) + self.__loop.call_soon_threadsafe(future.set_exception, exception) else: - self.__loop.call_soon_threadsafe( - future.set_result, result - ) - fname = getattr(fn, '__name__', '?') - threading.Thread( - target=wrap, - name=f'persistence5-{name}-{fname}' - ).start() + self.__loop.call_soon_threadsafe(future.set_result, result) + + fname = getattr(fn, "__name__", "?") + threading.Thread(target=wrap, name=f"persistence5-{name}-{fname}").start() return future class _Errors: __slots__ = ( - '__path', - '__loop', - '__event_loop', + "__path", + "__loop", + "__event_loop", ) def __init__(self, path: pathlib.Path, loop: _Loop, /) -> None: - self.__path = path.with_name(path.name + '.error') + self.__path = path.with_name(path.name + ".error") self.__loop = loop self.__event_loop = loop.loop() def _save_sync(self, line: str, /) -> None: - with self.__path.open('a') as file: - file.write(line.strip() + '\n') + with self.__path.open("a") as file: + file.write(line.strip() + "\n") async def _save(self, line: str, /) -> None: await self.__event_loop.run_in_executor(None, self._save_sync, line) def _schedule(self, line: str, /) -> concurrent.futures.Future: - return asyncio.run_coroutine_threadsafe( - self._save(line), self.__event_loop - ) + return asyncio.run_coroutine_threadsafe(self._save(line), self.__event_loop) def save_from_thread(self, line: str, /) -> None: self._schedule(line).result() @@ -390,8 +380,8 @@ class _Errors: class _File: __slots__ = ( - '__path', - '__file', + "__path", + "__file", ) __file: IO[str] @@ -414,7 +404,7 @@ class _File: pass def open_sync(self, /) -> None: - self.__file = self.__path.open('a') + self.__file = self.__path.open("a") def close_sync(self, /) -> None: self.__file.close() @@ -423,13 +413,13 @@ class _File: class _Backup: __slots__ = ( - '__file', - '__kvfactory', - '__loop', - '__path', - '__backup', - '__recover', - '__initial_size', + "__file", + "__kvfactory", + "__loop", + "__path", + "__backup", + "__recover", + "__initial_size", ) __initial_size: int @@ -439,8 +429,8 @@ class _Backup: self.__kvfactory = kvfactory self.__loop = loop self.__path = path - self.__backup = path.with_name(path.name + '.backup') - self.__recover = path.with_name(path.name + '.recover') + self.__backup = path.with_name(path.name + ".backup") + self.__recover = path.with_name(path.name + ".recover") def file(self, /) -> _File: return self.__file @@ -502,12 +492,12 @@ class _Backup: class _Guard: __slots__ = ( - '__backup', - '__error', - '__file', - '__path', - '__truncate', - '__flag', + "__backup", + "__error", + "__file", + "__path", + "__truncate", + "__flag", ) def __init__(self, backup: _Backup, error: _Errors, /) -> None: @@ -515,8 +505,8 @@ class _Guard: self.__error = error self.__file = backup.file() self.__path = path = self.__file.path() - self.__truncate = path.with_name(path.name + '.truncate') - self.__flag = path.with_name(path.name + '.truncate_flag') + self.__truncate = path.with_name(path.name + ".truncate") + self.__flag = path.with_name(path.name + ".truncate_flag") def backup(self, /) -> _Backup: return self.__backup @@ -526,13 +516,13 @@ class _Guard: self.__truncate.write_bytes(s) def _write_value_sync(self, value: int, /) -> None: - self._write_bytes_sync(value.to_bytes(16, 'little')) + self._write_bytes_sync(value.to_bytes(16, "little")) def _read_bytes_sync(self, /) -> bytes: return self.__truncate.read_bytes() def _read_value_sync(self, /) -> int: - return int.from_bytes(self._read_bytes_sync(), 'little') + return int.from_bytes(self._read_bytes_sync(), "little") def _set_sync(self, /) -> None: self._write_value_sync(self.__file.tell()) @@ -543,7 +533,7 @@ class _Guard: self.__truncate.unlink(missing_ok=True) def _truncate_sync(self, /) -> None: - with self.__path.open('r+') as file: + with self.__path.open("r+") as file: self._file_truncate_sync(file, self._read_value_sync()) def assure_sync(self, /) -> None: @@ -563,9 +553,7 @@ class _Guard: class _ReceivingQueue: - __all__ = ( - '__queue', - ) + __all__ = ("__queue",) def __init__(self, queue: asyncio.Queue[Request], /) -> None: self.__queue: asyncio.Queue[Request] = queue @@ -576,25 +564,23 @@ class _ReceivingQueue: class _WriteableBuffer: __slots__ = ( - '__buffersize', - '__guard', - '__queue', - '__backup', - '__kvfactory', - '__loop', - '__event_loop', - '__buffer', - '__buffer_future', - '__buffer_requested', + "__buffersize", + "__guard", + "__queue", + "__backup", + "__kvfactory", + "__loop", + "__event_loop", + "__buffer", + "__buffer_future", + "__buffer_requested", ) __buffer: StringIO __buffer_future: asyncio.Future __buffer_requested: bool - def __init__( - self, buffersize: int, guard: _Guard, queue: _ReceivingQueue, loop: _Loop, / - ) -> None: + def __init__(self, buffersize: int, guard: _Guard, queue: _ReceivingQueue, loop: _Loop, /) -> None: self.__buffersize = buffersize self.__guard = guard self.__queue = queue @@ -647,16 +633,14 @@ class _WriteableBuffer: def _request_buffer(self, request: Request, /) -> None: if request.waiting(): + def callback(bf: asyncio.Future) -> None: if (e := bf.exception()) is not None: request.set_exception(e) else: request.set_result(None) - self.__buffer_future.exception - self.__buffer_future.add_done_callback( - callback - ) + self.__buffer_future.add_done_callback(callback) if not self.__buffer_requested: self.__buffer_requested = True self.__queue.submit(CommitRequest(None)) @@ -702,12 +686,12 @@ class _WriteableBuffer: class _Memory: __slots__ = ( - '__backup', - '__guard', - '__file', - '__kvfactory', - '__loop', - '__mmdb', + "__backup", + "__guard", + "__file", + "__kvfactory", + "__loop", + "__mmdb", ) __mmdb: dict @@ -754,10 +738,10 @@ class _Memory: class _QueueTask: __slots__ = ( - '__queue', - '__buffer', - '__event_loop', - '__task', + "__queue", + "__buffer", + "__event_loop", + "__task", ) def __init__(self, queue: asyncio.Queue[Request], buffer: _WriteableBuffer, /) -> None: @@ -792,22 +776,20 @@ class _QueueTask: self.__task = self.__event_loop.create_task(self._background_task()) -class _DbConnection( - DbInterface -): +class _DbConnection(DbInterface): """note: unstable constructor signature.""" __slots__ = ( - '__kvfactory', - '__buffersize', - '__path', - '__error', - '__not_running', - '__mmdb', - '__loop', - '__queue', - '__file', - '__task', + "__kvfactory", + "__buffersize", + "__path", + "__error", + "__not_running", + "__mmdb", + "__loop", + "__queue", + "__file", + "__task", ) __mmdb: _Memory @@ -852,10 +834,7 @@ class _DbConnection( self.__queue = _ReceivingQueue(queue) self.__mmdb = _Memory(guard) await self.__mmdb._load_from_file() - self.__task = _QueueTask( - queue, - _WriteableBuffer(self.__buffersize, guard, self.__queue, self.__loop) - ) + self.__task = _QueueTask(queue, _WriteableBuffer(self.__buffersize, guard, self.__queue, self.__loop)) self.__task.start() async def _initialize(self, /) -> None: @@ -866,7 +845,8 @@ class _DbConnection( @classmethod async def create(cls, parameters: DbParameters, /) -> _DbConnection: - """connect to the factory. + """\ +connect to the factory. note: unstable signature.""" dbconnection = _DbConnection(parameters) await dbconnection._initialize() @@ -882,13 +862,15 @@ note: unstable signature.""" await mmdb._close() async def aclose(self, /) -> None: - """close the connection. + """\ +close the connection. note: unstable signature.""" await self._close_running() self.__running = False async def commit_transaction(self, delta: dict, /) -> None: - """hybrid of set() and dict.update(). + """\ +hybrid of set() and dict.update(). note: unstable signature.""" if not delta: return @@ -898,7 +880,8 @@ note: unstable signature.""" await future def submit_transaction(self, delta: dict, /) -> None: - """hybrid of set_nowait() and dict.update(). + """\ +hybrid of set_nowait() and dict.update(). _nowait analogue of commit_transaction(). note: this method was added only for async-sync symmetry with commit_transaction(). note: unstable signature.""" @@ -908,7 +891,8 @@ note: unstable signature.""" self.__queue.submit(TransactionRequest(buffer, future=None)) def submit_transaction_request(self, delta: dict, future: asyncio.Future | None, /) -> None: - """low-level API. + """\ +low-level API. for high-level synchronisation use transaction() instead. note: unstable signature.""" if not delta: @@ -933,14 +917,12 @@ DbConnection: TypeAlias = DbInterface class DbManager: __slots__ = ( - '__parameters', - '__db', + "__parameters", + "__db", ) def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None: - self.__parameters = DbParameters( - path, kvfactory=kvfactory, buffersize=buffersize - ) + self.__parameters = DbParameters(path, kvfactory=kvfactory, buffersize=buffersize) async def __aenter__(self) -> DbInterface: self.__db = await _DbConnection.create(self.__parameters) @@ -959,12 +941,7 @@ class Db(_DbConnection): __slots__ = () def __init__(self, path: str | pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None: - _DbConnection.__init__( - self, - DbParameters( - pathlib.Path(path), kvfactory=kvfactory, buffersize=buffersize - ) - ) + _DbConnection.__init__(self, DbParameters(pathlib.Path(path), kvfactory=kvfactory, buffersize=buffersize)) async def __aenter__(self) -> _DbConnection: await self._initialize() @@ -989,18 +966,16 @@ class FutureContext: await self.__future -class TransactionView( - ExtendedVirtualConnection -): +class TransactionView(ExtendedVirtualConnection): """note: unstable constructor signature.""" __slots__ = ( - '__delta', - '__shadow', - '__connection', - '__loop', - '__kvprotocol', - '__subfuture', + "__delta", + "__shadow", + "__connection", + "__loop", + "__kvprotocol", + "__subfuture", ) def __init__(self, delta: dict, connection: VirtualConnection, /) -> None: @@ -1098,7 +1073,8 @@ class TransactionView( await self.__connection.commit_transaction(delta) def submit(self, /) -> None: - """submit changes. + """\ +submit changes. _nowait analogue of commit(). bulk analogue of DbConnection.set_nowait().""" # for persistence5('s forks) developers: @@ -1185,9 +1161,9 @@ class Transaction: """note: unstable signature.""" __slots__ = ( - '__connection', - '__view', - '__running', + "__connection", + "__view", + "__running", ) __view: TransactionView diff --git a/setup.py b/setup.py index da026fb..a55715e 100644 --- a/setup.py +++ b/setup.py @@ -1,12 +1,12 @@ from setuptools import setup setup( - name='ptvp35', - version='1.1.0', - packages=['ptvp35'], - url='https://gitea.ongoteam.net/PTV/ptvp35', - license='MIT', - author='PARRRATE TNV', - author_email='', - description='', + name="ptvp35", + version="1.1.0", + packages=["ptvp35"], + url="https://gitea.ongoteam.net/PTV/ptvp35", + license="MIT", + author="PARRRATE TNV", + author_email="", + description="", ) diff --git a/test_delete.py b/test_delete.py index c751a21..7b5cda6 100644 --- a/test_delete.py +++ b/test_delete.py @@ -1,11 +1,11 @@ import asyncio import pathlib -from ptvp35 import DbFactory, KVJson, VDELETE +from ptvp35 import VDELETE, DbFactory, KVJson async def main(): - path = pathlib.Path('test_delete.db') + path = pathlib.Path("test_delete.db") path.unlink(missing_ok=True) async with DbFactory(path, kvfactory=KVJson()) as connection: connection.set_nowait(0, 0) @@ -19,4 +19,5 @@ async def main(): print(connection.get(0, 1)) # path.unlink(missing_ok=True) + asyncio.run(main())