From 1cd39ad061e221f39467262ca1e928bf1c89b35d Mon Sep 17 00:00:00 2001 From: timofey Date: Sun, 15 Jan 2023 08:54:07 +0000 Subject: [PATCH] 1.1rc4: delete --- ptvp35/__init__.py | 84 +++++++++++++++++++++++++++++++++++----------- setup.py | 2 +- test_delete.py | 22 ++++++++++++ 3 files changed, 88 insertions(+), 20 deletions(-) create mode 100644 test_delete.py diff --git a/ptvp35/__init__.py b/ptvp35/__init__.py index 959ed01..d8092f0 100644 --- a/ptvp35/__init__.py +++ b/ptvp35/__init__.py @@ -60,13 +60,21 @@ class LineRequest(Request): self.line = line -class KVFactory(abc.ABC): +class KVProtocol(abc.ABC): + @abc.abstractmethod + def dbget(self, db: dict, key: Any, default: Any, /): + raise NotImplementedError + + +class KVFactory(KVProtocol): """this class is for working with already normalised data values, not for data transformation (e.g. reducing keys to a common form). that functionality may be added in the future, though, probably, only for custom DbConnection implementations. note: unstable signature.""" __slots__ = () + DELETE = object() + @abc.abstractmethod def line(self, key: Any, value: Any, /) -> str: """line must contain exactly one '\\n' at exactly the end if the line is not empty. @@ -79,12 +87,31 @@ note: other forms of requests will later be represented by different methods or note: unstable signature.""" raise NotImplementedError - def run(self, line: str, db: dict, /) -> None: + def run(self, line: str, db: dict, reduce: bool, /) -> None: """run request against the db. extensible to allow forms of requests other than set. note: unstable signature.""" key, value = self.fromline(line) - db[key] = value + self._dbset(db, key, value, reduce) + + def _dbset(self, db: dict, key: Any, value: Any, reduce: bool, /): + if reduce and value is self.DELETE: + db.pop(key, None) + else: + db[key] = value + + def dbset(self, db: dict, key: Any, value: Any, /): + self._dbset(db, key, value, True) + + def dbget(self, db: dict, key: Any, default: Any, /): + value = db.get(key, default) + return self.filter_value(value, default) + + def filter_value(self, value: Any, default: Any, /): + if value is self.DELETE: + return default + else: + return value def request(self, key: Any, value: Any, /, *, future: asyncio.Future | None) -> 'KVRequest': """form request with Future. @@ -97,11 +124,11 @@ note: unstable signature.""" note: unstable signature.""" return self.request(key, value, future=None) - def io2db(self, io: IO[str], db: dict, /) -> int: + def io2db(self, io: IO[str], db: dict, reduce: bool, /) -> int: """note: unstable signature.""" size = 0 for line in io: - self.run(line, db) + self.run(line, db, reduce) size += len(line) return size @@ -141,7 +168,11 @@ class KVJson(KVFactory): __slots__ = () def line(self, key: Any, value: Any, /) -> str: - return json.dumps({'key': key, 'value': value}) + '\n' + if value is self.DELETE: + obj = {'key': key} + else: + obj = {'key': key, 'value': value} + return json.dumps(obj) + '\n' def _load_key(self, key: Any, /) -> Hashable: """note: unstable signature.""" @@ -157,7 +188,7 @@ class KVJson(KVFactory): def fromline(self, line: str, /) -> tuple[Any, Any]: d = json.loads(line) - return self._load_key(d['key']), d['value'] + return self._load_key(d['key']), d.get('value', self.DELETE) class TransactionRequest(LineRequest): @@ -249,6 +280,10 @@ class VirtualConnection(abc.ABC): def get(self, key: Any, default: Any, /): raise NotImplementedError + @abc.abstractmethod + def kvprotocol(self, /) -> KVProtocol: + raise NotImplementedError + @abc.abstractmethod async def commit_transaction(self, delta: dict, /) -> None: raise NotImplementedError @@ -337,7 +372,7 @@ class DbConnection(VirtualConnection): def _path2db_sync(self, path: pathlib.Path, db: dict, /) -> int: path.touch() with path.open('r') as file: - return self.__kvfactory.io2db(file, db) + return self.__kvfactory.io2db(file, db, True) def _db2path_sync(self, db: dict, path: pathlib.Path, /) -> int: with path.open('w') as file: @@ -345,22 +380,26 @@ class DbConnection(VirtualConnection): os.fsync(file.fileno()) return initial_size + def kvprotocol(self, /) -> KVProtocol: + return self.__kvfactory + def get(self, key: Any, default: Any, /): """dict-like get with mandatory default parametre.""" - return self.__mmdb.get(key, default) + return self.__kvfactory.dbget(self.__mmdb, key, default) async def set(self, key: Any, value: Any, /) -> None: """set the value and wait until it's written to disk.""" future = self._create_future() request = self.__kvfactory.request(key, value, future=future) - self.__mmdb[key] = value + self.__kvfactory.dbset(self.__mmdb, key, value) self.__queue.put_nowait(request) await future def set_nowait(self, key: Any, value: Any, /) -> None: """set value and add write-to-disk request to queue.""" - self.__mmdb[key] = value - self.__queue.put_nowait(self.__kvfactory.free(key, value)) + request = self.__kvfactory.free(key, value) + self.__kvfactory.dbset(self.__mmdb, key, value) + self.__queue.put_nowait(request) def _clear_buffer(self, /) -> None: self.__buffer = StringIO() @@ -370,7 +409,7 @@ class DbConnection(VirtualConnection): def _compress_buffer(self, /) -> StringIO: self.__buffer.seek(0) bufferdb = {} - self.__kvfactory.io2db(self.__buffer, bufferdb) + self.__kvfactory.io2db(self.__buffer, bufferdb, False) buffer = StringIO() self.__kvfactory.db2io(bufferdb, buffer) return buffer @@ -609,7 +648,7 @@ intended for heavy tasks.""" async def create(cls, parametres: DbParametres, /) -> 'DbConnection': """connect to the factory. note: unstable signature.""" - dbconnection = cls(parametres) + dbconnection = DbConnection(parametres) await dbconnection._initialize() return dbconnection @@ -656,7 +695,8 @@ note: unstable signature.""" def _transaction_buffer(self, delta: dict, /) -> StringIO: buffer = StringIO() self.__kvfactory.db2io(delta, buffer) - self.__mmdb.update(delta) + for key, value in delta.items(): + self.__kvfactory.dbset(self.__mmdb, key, value) return buffer async def commit_transaction(self, delta: dict, /) -> None: @@ -764,6 +804,7 @@ class TransactionView(VirtualConnection): '__shadow', '__connection', '__loop', + '__kvprotocol', '__subfuture', ) @@ -772,6 +813,7 @@ class TransactionView(VirtualConnection): self.__shadow = {} self.__connection = connection self.__loop = connection.loop() + self.__kvprotocol = connection.kvprotocol() self.__subfuture: asyncio.Future | None = None def future_context(self, /) -> FutureContext: @@ -829,13 +871,17 @@ class TransactionView(VirtualConnection): async with self.future_context(): self.reset() + def kvprotocol(self, /) -> KVProtocol: + return self.__kvprotocol + def get(self, key: Any, default: Any, /): """get from the delta (unsubmitted), else from the shadow (submitted), else from the connection.""" if key in self.__delta: - return self.__delta[key] - if key in self.__shadow: - return self.__shadow[key] - return self.__connection.get(key, default) + return self.__kvprotocol.dbget(self.__delta, key, default) + elif key in self.__shadow: + return self.__kvprotocol.dbget(self.__shadow, key, default) + else: + return self.__connection.get(key, default) def set_nowait(self, key: Any, value: Any, /) -> None: """note: unlike the corresponding db method, this one does not catch serialisation errors early.""" diff --git a/setup.py b/setup.py index 1cffba4..bcff2c8 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup setup( name='ptvp35', - version='1.1rc3', + version='1.1rc4', packages=['ptvp35'], url='https://gitea.ongoteam.net/PTV/ptvp35', license='MIT', diff --git a/test_delete.py b/test_delete.py new file mode 100644 index 0000000..2dc2d4c --- /dev/null +++ b/test_delete.py @@ -0,0 +1,22 @@ +import asyncio +import pathlib + +from ptvp35 import * + + +async def main(): + path = pathlib.Path('test_delete.db') + path.unlink(missing_ok=True) + async with DbFactory(path, kvfactory=KVJson()) as connection: + connection.set_nowait(0, 0) + print(connection.get(0, 1)) + await connection.commit() + async with connection.transaction() as transaction: + print(transaction.get(0, 1)) + transaction.set_nowait(0, KVFactory.DELETE) + print(transaction.get(0, 1)) + input() + print(connection.get(0, 1)) + # path.unlink(missing_ok=True) + +asyncio.run(main())