style fix

This commit is contained in:
AF 2023-06-16 12:23:43 +00:00
parent 56e6160e6a
commit a103364f1a
3 changed files with 167 additions and 190 deletions

View File

@ -3,21 +3,21 @@
from __future__ import annotations
__all__ = (
'VDELETE',
'KVProtocol',
'KVFactory',
'KVJson',
'VirtualConnection',
'ExtendedVirtualConnection',
'DbInterface',
'AbstractDbConnection',
'DbConnection',
'DbManager',
'DbFactory',
'Db',
'Transaction',
'TransactionView',
'FutureContext',
"VDELETE",
"KVProtocol",
"KVFactory",
"KVJson",
"VirtualConnection",
"ExtendedVirtualConnection",
"DbInterface",
"AbstractDbConnection",
"DbConnection",
"DbManager",
"DbFactory",
"Db",
"Transaction",
"TransactionView",
"FutureContext",
)
import abc
@ -33,9 +33,7 @@ from typing import IO, Any, Protocol, TypeAlias
class Request:
__slots__ = (
'__future',
)
__slots__ = ("__future",)
def __init__(self, future: asyncio.Future | None, /) -> None:
self.__future = future
@ -57,9 +55,7 @@ class Request:
class LineRequest(Request):
__slots__ = (
'line',
)
__slots__ = ("line",)
def __init__(self, line: str, /, *, future: asyncio.Future | None) -> None:
super().__init__(future)
@ -76,7 +72,8 @@ VDELETE = object()
class KVFactory(KVProtocol):
"""this class is for working with already normalised data values, not for data transformation (e.g. reducing keys to a common form).
"""\
this class is for working with already normalised data values, not for data transformation (e.g. reducing keys to a common form).
that functionality may be added in the future, though, probably, only for custom DbConnection implementations.
note: unstable signature."""
@ -84,18 +81,21 @@ note: unstable signature."""
@abc.abstractmethod
def line(self, key: Any, value: Any, /) -> str:
"""line must contain exactly one '\\n' at exactly the end if the line is not empty.
"""\
line must contain exactly one '\\n' at exactly the end if the line is not empty.
note: other forms of requests will later be represented by different methods or by instances of Action class."""
raise NotImplementedError
@abc.abstractmethod
def fromline(self, line: str, /) -> tuple[Any, Any]:
"""inverse of line().
"""\
inverse of line().
note: unstable signature."""
raise NotImplementedError
def run(self, line: str, db: dict, reduce: bool, /) -> None:
"""run request against the db.
"""\
run request against the db.
extensible to allow forms of requests other than set.
note: unstable signature."""
key, value = self.fromline(line)
@ -121,18 +121,21 @@ note: unstable signature."""
return value
def request(self, key: Any, value: Any, /, *, future: asyncio.Future | None) -> KVRequest:
"""form request with Future.
"""\
form request with Future.
low-level API.
note: unstable signature."""
return KVRequest(key, value, future=future, factory=self)
def free(self, key: Any, value: Any, /) -> KVRequest:
"""result free from Future.
"""\
result free from Future.
note: unstable signature."""
return self.request(key, value, future=None)
def io2db(self, io: IO[str], db: dict, reduce: bool, /) -> int:
"""note: unstable signature."""
"""\
note: unstable signature."""
size = 0
for line in io:
self.run(line, db, reduce)
@ -140,7 +143,8 @@ note: unstable signature."""
return size
def db2io(self, db: dict, io: IO[str], /) -> int:
"""note: unstable signature."""
"""\
note: unstable signature."""
size = 0
for key, value in db.items():
size += io.write(self.line(key, value))
@ -148,11 +152,11 @@ note: unstable signature."""
def path2db_sync(self, path: pathlib.Path, db: dict, /) -> int:
path.touch()
with path.open('r') as file:
with path.open("r") as file:
return self.io2db(file, db, True)
def db2path_sync(self, db: dict, path: pathlib.Path, /) -> int:
with path.open('w') as file:
with path.open("w") as file:
initial_size = self.db2io(db, file)
os.fsync(file.fileno())
return initial_size
@ -160,9 +164,9 @@ note: unstable signature."""
class KVRequest(LineRequest):
__slots__ = (
'__factory',
'key',
'value',
"__factory",
"key",
"value",
)
def __init__(self, key: Any, value: Any, /, *, future: asyncio.Future | None, factory: KVFactory) -> None:
@ -187,10 +191,10 @@ class KVJson(KVFactory):
def line(self, key: Any, value: Any, /) -> str:
if value is VDELETE:
obj = {'key': key}
obj = {"key": key}
else:
obj = {'key': key, 'value': value}
return json.dumps(obj) + '\n'
obj = {"key": key, "value": value}
return json.dumps(obj) + "\n"
def _load_key(self, key: Any, /) -> Hashable:
"""note: unstable signature."""
@ -202,17 +206,15 @@ class KVJson(KVFactory):
case dict():
return tuple((self._load_key(k), self._load_key(v)) for k, v in key.items())
case _:
raise TypeError('unknown KVJson key type, cannot convert to hashable')
raise TypeError("unknown KVJson key type, cannot convert to hashable")
def fromline(self, line: str, /) -> tuple[Any, Any]:
d = json.loads(line)
return self._load_key(d['key']), d.get('value', VDELETE)
return self._load_key(d["key"]), d.get("value", VDELETE)
class TransactionRequest(LineRequest):
__slots__ = (
'buffer',
)
__slots__ = ("buffer",)
def __init__(self, buffer: StringIO, /, *, future: asyncio.Future | None) -> None:
super().__init__(buffer.getvalue(), future=future)
@ -221,9 +223,9 @@ class TransactionRequest(LineRequest):
class DbParameters:
__slots__ = (
'path',
'kvfactory',
'buffersize',
"path",
"kvfactory",
"buffersize",
)
def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize: int) -> None:
@ -239,9 +241,7 @@ class RequestToClosedConnection(asyncio.InvalidStateError):
pass
class VirtualConnection(
abc.ABC
):
class VirtualConnection(abc.ABC):
"""minimal intersection of DbConnection and TransactionView functionality"""
__slots__ = ()
@ -270,9 +270,7 @@ class VirtualConnection(
return Transaction(self)
class ExtendedVirtualConnection(
VirtualConnection, abc.ABC
):
class ExtendedVirtualConnection(VirtualConnection, abc.ABC):
"""maximal intersection of DbConnection and TransactionView functionality"""
@abc.abstractmethod
@ -288,9 +286,7 @@ class ExtendedVirtualConnection(
raise NotImplementedError
class DbInterface(
ExtendedVirtualConnection, abc.ABC
):
class DbInterface(ExtendedVirtualConnection, abc.ABC):
@abc.abstractmethod
async def set(self, key: Any, value: Any, /) -> None:
raise NotImplementedError
@ -302,17 +298,20 @@ class AbstractDbConnection(Protocol):
raise NotImplementedError
async def set(self, key: Any, value: Any, /) -> None:
"""this method may take time to run.
"""\
this method may take time to run.
ordering may not be guaranteed (depends on event loop implementation)."""
raise NotImplementedError
def set_nowait(self, key: Any, value: Any, /) -> None:
"""this method is instant.
"""\
this method is instant.
ordering is guaranteed."""
raise NotImplementedError
async def commit(self, /) -> None:
"""this method may take time to run.
"""\
this method may take time to run.
respects the ordering of previously called :meth:`~ptvp35.AbstractDbConnection.set_nowait` methods.
will, depending on event loop implementation, also execute later changes."""
raise NotImplementedError
@ -322,9 +321,7 @@ will, depending on event loop implementation, also execute later changes."""
class _Loop:
__slots__ = (
'__loop',
)
__slots__ = ("__loop",)
def __init__(self, loop: asyncio.AbstractEventLoop, /) -> None:
self.__loop = loop
@ -336,7 +333,8 @@ class _Loop:
return self.__loop
def run_in_thread(self, name: str, fn, /, *args, **kwargs) -> asyncio.Future:
"""we are using our own thread to guarantee as much of autonomy and control as possible.
"""\
we are using our own thread to guarantee as much of autonomy and control as possible.
intended for heavy tasks."""
future = self.create_future()
@ -344,45 +342,37 @@ intended for heavy tasks."""
try:
result = fn(*args, **kwargs)
except Exception as exception:
self.__loop.call_soon_threadsafe(
future.set_exception, exception
)
self.__loop.call_soon_threadsafe(future.set_exception, exception)
else:
self.__loop.call_soon_threadsafe(
future.set_result, result
)
fname = getattr(fn, '__name__', '?')
threading.Thread(
target=wrap,
name=f'persistence5-{name}-{fname}'
).start()
self.__loop.call_soon_threadsafe(future.set_result, result)
fname = getattr(fn, "__name__", "?")
threading.Thread(target=wrap, name=f"persistence5-{name}-{fname}").start()
return future
class _Errors:
__slots__ = (
'__path',
'__loop',
'__event_loop',
"__path",
"__loop",
"__event_loop",
)
def __init__(self, path: pathlib.Path, loop: _Loop, /) -> None:
self.__path = path.with_name(path.name + '.error')
self.__path = path.with_name(path.name + ".error")
self.__loop = loop
self.__event_loop = loop.loop()
def _save_sync(self, line: str, /) -> None:
with self.__path.open('a') as file:
file.write(line.strip() + '\n')
with self.__path.open("a") as file:
file.write(line.strip() + "\n")
async def _save(self, line: str, /) -> None:
await self.__event_loop.run_in_executor(None, self._save_sync, line)
def _schedule(self, line: str, /) -> concurrent.futures.Future:
return asyncio.run_coroutine_threadsafe(
self._save(line), self.__event_loop
)
return asyncio.run_coroutine_threadsafe(self._save(line), self.__event_loop)
def save_from_thread(self, line: str, /) -> None:
self._schedule(line).result()
@ -390,8 +380,8 @@ class _Errors:
class _File:
__slots__ = (
'__path',
'__file',
"__path",
"__file",
)
__file: IO[str]
@ -414,7 +404,7 @@ class _File:
pass
def open_sync(self, /) -> None:
self.__file = self.__path.open('a')
self.__file = self.__path.open("a")
def close_sync(self, /) -> None:
self.__file.close()
@ -423,13 +413,13 @@ class _File:
class _Backup:
__slots__ = (
'__file',
'__kvfactory',
'__loop',
'__path',
'__backup',
'__recover',
'__initial_size',
"__file",
"__kvfactory",
"__loop",
"__path",
"__backup",
"__recover",
"__initial_size",
)
__initial_size: int
@ -439,8 +429,8 @@ class _Backup:
self.__kvfactory = kvfactory
self.__loop = loop
self.__path = path
self.__backup = path.with_name(path.name + '.backup')
self.__recover = path.with_name(path.name + '.recover')
self.__backup = path.with_name(path.name + ".backup")
self.__recover = path.with_name(path.name + ".recover")
def file(self, /) -> _File:
return self.__file
@ -502,12 +492,12 @@ class _Backup:
class _Guard:
__slots__ = (
'__backup',
'__error',
'__file',
'__path',
'__truncate',
'__flag',
"__backup",
"__error",
"__file",
"__path",
"__truncate",
"__flag",
)
def __init__(self, backup: _Backup, error: _Errors, /) -> None:
@ -515,8 +505,8 @@ class _Guard:
self.__error = error
self.__file = backup.file()
self.__path = path = self.__file.path()
self.__truncate = path.with_name(path.name + '.truncate')
self.__flag = path.with_name(path.name + '.truncate_flag')
self.__truncate = path.with_name(path.name + ".truncate")
self.__flag = path.with_name(path.name + ".truncate_flag")
def backup(self, /) -> _Backup:
return self.__backup
@ -526,13 +516,13 @@ class _Guard:
self.__truncate.write_bytes(s)
def _write_value_sync(self, value: int, /) -> None:
self._write_bytes_sync(value.to_bytes(16, 'little'))
self._write_bytes_sync(value.to_bytes(16, "little"))
def _read_bytes_sync(self, /) -> bytes:
return self.__truncate.read_bytes()
def _read_value_sync(self, /) -> int:
return int.from_bytes(self._read_bytes_sync(), 'little')
return int.from_bytes(self._read_bytes_sync(), "little")
def _set_sync(self, /) -> None:
self._write_value_sync(self.__file.tell())
@ -543,7 +533,7 @@ class _Guard:
self.__truncate.unlink(missing_ok=True)
def _truncate_sync(self, /) -> None:
with self.__path.open('r+') as file:
with self.__path.open("r+") as file:
self._file_truncate_sync(file, self._read_value_sync())
def assure_sync(self, /) -> None:
@ -563,9 +553,7 @@ class _Guard:
class _ReceivingQueue:
__all__ = (
'__queue',
)
__all__ = ("__queue",)
def __init__(self, queue: asyncio.Queue[Request], /) -> None:
self.__queue: asyncio.Queue[Request] = queue
@ -576,25 +564,23 @@ class _ReceivingQueue:
class _WriteableBuffer:
__slots__ = (
'__buffersize',
'__guard',
'__queue',
'__backup',
'__kvfactory',
'__loop',
'__event_loop',
'__buffer',
'__buffer_future',
'__buffer_requested',
"__buffersize",
"__guard",
"__queue",
"__backup",
"__kvfactory",
"__loop",
"__event_loop",
"__buffer",
"__buffer_future",
"__buffer_requested",
)
__buffer: StringIO
__buffer_future: asyncio.Future
__buffer_requested: bool
def __init__(
self, buffersize: int, guard: _Guard, queue: _ReceivingQueue, loop: _Loop, /
) -> None:
def __init__(self, buffersize: int, guard: _Guard, queue: _ReceivingQueue, loop: _Loop, /) -> None:
self.__buffersize = buffersize
self.__guard = guard
self.__queue = queue
@ -647,16 +633,14 @@ class _WriteableBuffer:
def _request_buffer(self, request: Request, /) -> None:
if request.waiting():
def callback(bf: asyncio.Future) -> None:
if (e := bf.exception()) is not None:
request.set_exception(e)
else:
request.set_result(None)
self.__buffer_future.exception
self.__buffer_future.add_done_callback(
callback
)
self.__buffer_future.add_done_callback(callback)
if not self.__buffer_requested:
self.__buffer_requested = True
self.__queue.submit(CommitRequest(None))
@ -702,12 +686,12 @@ class _WriteableBuffer:
class _Memory:
__slots__ = (
'__backup',
'__guard',
'__file',
'__kvfactory',
'__loop',
'__mmdb',
"__backup",
"__guard",
"__file",
"__kvfactory",
"__loop",
"__mmdb",
)
__mmdb: dict
@ -754,10 +738,10 @@ class _Memory:
class _QueueTask:
__slots__ = (
'__queue',
'__buffer',
'__event_loop',
'__task',
"__queue",
"__buffer",
"__event_loop",
"__task",
)
def __init__(self, queue: asyncio.Queue[Request], buffer: _WriteableBuffer, /) -> None:
@ -792,22 +776,20 @@ class _QueueTask:
self.__task = self.__event_loop.create_task(self._background_task())
class _DbConnection(
DbInterface
):
class _DbConnection(DbInterface):
"""note: unstable constructor signature."""
__slots__ = (
'__kvfactory',
'__buffersize',
'__path',
'__error',
'__not_running',
'__mmdb',
'__loop',
'__queue',
'__file',
'__task',
"__kvfactory",
"__buffersize",
"__path",
"__error",
"__not_running",
"__mmdb",
"__loop",
"__queue",
"__file",
"__task",
)
__mmdb: _Memory
@ -852,10 +834,7 @@ class _DbConnection(
self.__queue = _ReceivingQueue(queue)
self.__mmdb = _Memory(guard)
await self.__mmdb._load_from_file()
self.__task = _QueueTask(
queue,
_WriteableBuffer(self.__buffersize, guard, self.__queue, self.__loop)
)
self.__task = _QueueTask(queue, _WriteableBuffer(self.__buffersize, guard, self.__queue, self.__loop))
self.__task.start()
async def _initialize(self, /) -> None:
@ -866,7 +845,8 @@ class _DbConnection(
@classmethod
async def create(cls, parameters: DbParameters, /) -> _DbConnection:
"""connect to the factory.
"""\
connect to the factory.
note: unstable signature."""
dbconnection = _DbConnection(parameters)
await dbconnection._initialize()
@ -882,13 +862,15 @@ note: unstable signature."""
await mmdb._close()
async def aclose(self, /) -> None:
"""close the connection.
"""\
close the connection.
note: unstable signature."""
await self._close_running()
self.__running = False
async def commit_transaction(self, delta: dict, /) -> None:
"""hybrid of set() and dict.update().
"""\
hybrid of set() and dict.update().
note: unstable signature."""
if not delta:
return
@ -898,7 +880,8 @@ note: unstable signature."""
await future
def submit_transaction(self, delta: dict, /) -> None:
"""hybrid of set_nowait() and dict.update().
"""\
hybrid of set_nowait() and dict.update().
_nowait analogue of commit_transaction().
note: this method was added only for async-sync symmetry with commit_transaction().
note: unstable signature."""
@ -908,7 +891,8 @@ note: unstable signature."""
self.__queue.submit(TransactionRequest(buffer, future=None))
def submit_transaction_request(self, delta: dict, future: asyncio.Future | None, /) -> None:
"""low-level API.
"""\
low-level API.
for high-level synchronisation use transaction() instead.
note: unstable signature."""
if not delta:
@ -933,14 +917,12 @@ DbConnection: TypeAlias = DbInterface
class DbManager:
__slots__ = (
'__parameters',
'__db',
"__parameters",
"__db",
)
def __init__(self, path: pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None:
self.__parameters = DbParameters(
path, kvfactory=kvfactory, buffersize=buffersize
)
self.__parameters = DbParameters(path, kvfactory=kvfactory, buffersize=buffersize)
async def __aenter__(self) -> DbInterface:
self.__db = await _DbConnection.create(self.__parameters)
@ -959,12 +941,7 @@ class Db(_DbConnection):
__slots__ = ()
def __init__(self, path: str | pathlib.Path, /, *, kvfactory: KVFactory, buffersize=1048576) -> None:
_DbConnection.__init__(
self,
DbParameters(
pathlib.Path(path), kvfactory=kvfactory, buffersize=buffersize
)
)
_DbConnection.__init__(self, DbParameters(pathlib.Path(path), kvfactory=kvfactory, buffersize=buffersize))
async def __aenter__(self) -> _DbConnection:
await self._initialize()
@ -989,18 +966,16 @@ class FutureContext:
await self.__future
class TransactionView(
ExtendedVirtualConnection
):
class TransactionView(ExtendedVirtualConnection):
"""note: unstable constructor signature."""
__slots__ = (
'__delta',
'__shadow',
'__connection',
'__loop',
'__kvprotocol',
'__subfuture',
"__delta",
"__shadow",
"__connection",
"__loop",
"__kvprotocol",
"__subfuture",
)
def __init__(self, delta: dict, connection: VirtualConnection, /) -> None:
@ -1098,7 +1073,8 @@ class TransactionView(
await self.__connection.commit_transaction(delta)
def submit(self, /) -> None:
"""submit changes.
"""\
submit changes.
_nowait analogue of commit().
bulk analogue of DbConnection.set_nowait()."""
# for persistence5('s forks) developers:
@ -1185,9 +1161,9 @@ class Transaction:
"""note: unstable signature."""
__slots__ = (
'__connection',
'__view',
'__running',
"__connection",
"__view",
"__running",
)
__view: TransactionView

View File

@ -1,12 +1,12 @@
from setuptools import setup
setup(
name='ptvp35',
version='1.1.0',
packages=['ptvp35'],
url='https://gitea.ongoteam.net/PTV/ptvp35',
license='MIT',
author='PARRRATE TNV',
author_email='',
description='',
name="ptvp35",
version="1.1.0",
packages=["ptvp35"],
url="https://gitea.ongoteam.net/PTV/ptvp35",
license="MIT",
author="PARRRATE TNV",
author_email="",
description="",
)

View File

@ -1,11 +1,11 @@
import asyncio
import pathlib
from ptvp35 import DbFactory, KVJson, VDELETE
from ptvp35 import VDELETE, DbFactory, KVJson
async def main():
path = pathlib.Path('test_delete.db')
path = pathlib.Path("test_delete.db")
path.unlink(missing_ok=True)
async with DbFactory(path, kvfactory=KVJson()) as connection:
connection.set_nowait(0, 0)
@ -19,4 +19,5 @@ async def main():
print(connection.get(0, 1))
# path.unlink(missing_ok=True)
asyncio.run(main())