1.1rc4: delete

This commit is contained in:
AF 2023-01-15 08:54:07 +00:00
parent dcc9d642aa
commit 1cd39ad061
3 changed files with 88 additions and 20 deletions

View File

@ -60,13 +60,21 @@ class LineRequest(Request):
self.line = line
class KVFactory(abc.ABC):
class KVProtocol(abc.ABC):
@abc.abstractmethod
def dbget(self, db: dict, key: Any, default: Any, /):
raise NotImplementedError
class KVFactory(KVProtocol):
"""this class is for working with already normalised data values, not for data transformation (e.g. reducing keys to a common form).
that functionality may be added in the future, though, probably, only for custom DbConnection implementations.
note: unstable signature."""
__slots__ = ()
DELETE = object()
@abc.abstractmethod
def line(self, key: Any, value: Any, /) -> str:
"""line must contain exactly one '\\n' at exactly the end if the line is not empty.
@ -79,12 +87,31 @@ note: other forms of requests will later be represented by different methods or
note: unstable signature."""
raise NotImplementedError
def run(self, line: str, db: dict, /) -> None:
def run(self, line: str, db: dict, reduce: bool, /) -> None:
"""run request against the db.
extensible to allow forms of requests other than set.
note: unstable signature."""
key, value = self.fromline(line)
db[key] = value
self._dbset(db, key, value, reduce)
def _dbset(self, db: dict, key: Any, value: Any, reduce: bool, /):
if reduce and value is self.DELETE:
db.pop(key, None)
else:
db[key] = value
def dbset(self, db: dict, key: Any, value: Any, /):
self._dbset(db, key, value, True)
def dbget(self, db: dict, key: Any, default: Any, /):
value = db.get(key, default)
return self.filter_value(value, default)
def filter_value(self, value: Any, default: Any, /):
if value is self.DELETE:
return default
else:
return value
def request(self, key: Any, value: Any, /, *, future: asyncio.Future | None) -> 'KVRequest':
"""form request with Future.
@ -97,11 +124,11 @@ note: unstable signature."""
note: unstable signature."""
return self.request(key, value, future=None)
def io2db(self, io: IO[str], db: dict, /) -> int:
def io2db(self, io: IO[str], db: dict, reduce: bool, /) -> int:
"""note: unstable signature."""
size = 0
for line in io:
self.run(line, db)
self.run(line, db, reduce)
size += len(line)
return size
@ -141,7 +168,11 @@ class KVJson(KVFactory):
__slots__ = ()
def line(self, key: Any, value: Any, /) -> str:
return json.dumps({'key': key, 'value': value}) + '\n'
if value is self.DELETE:
obj = {'key': key}
else:
obj = {'key': key, 'value': value}
return json.dumps(obj) + '\n'
def _load_key(self, key: Any, /) -> Hashable:
"""note: unstable signature."""
@ -157,7 +188,7 @@ class KVJson(KVFactory):
def fromline(self, line: str, /) -> tuple[Any, Any]:
d = json.loads(line)
return self._load_key(d['key']), d['value']
return self._load_key(d['key']), d.get('value', self.DELETE)
class TransactionRequest(LineRequest):
@ -249,6 +280,10 @@ class VirtualConnection(abc.ABC):
def get(self, key: Any, default: Any, /):
raise NotImplementedError
@abc.abstractmethod
def kvprotocol(self, /) -> KVProtocol:
raise NotImplementedError
@abc.abstractmethod
async def commit_transaction(self, delta: dict, /) -> None:
raise NotImplementedError
@ -337,7 +372,7 @@ class DbConnection(VirtualConnection):
def _path2db_sync(self, path: pathlib.Path, db: dict, /) -> int:
path.touch()
with path.open('r') as file:
return self.__kvfactory.io2db(file, db)
return self.__kvfactory.io2db(file, db, True)
def _db2path_sync(self, db: dict, path: pathlib.Path, /) -> int:
with path.open('w') as file:
@ -345,22 +380,26 @@ class DbConnection(VirtualConnection):
os.fsync(file.fileno())
return initial_size
def kvprotocol(self, /) -> KVProtocol:
return self.__kvfactory
def get(self, key: Any, default: Any, /):
"""dict-like get with mandatory default parametre."""
return self.__mmdb.get(key, default)
return self.__kvfactory.dbget(self.__mmdb, key, default)
async def set(self, key: Any, value: Any, /) -> None:
"""set the value and wait until it's written to disk."""
future = self._create_future()
request = self.__kvfactory.request(key, value, future=future)
self.__mmdb[key] = value
self.__kvfactory.dbset(self.__mmdb, key, value)
self.__queue.put_nowait(request)
await future
def set_nowait(self, key: Any, value: Any, /) -> None:
"""set value and add write-to-disk request to queue."""
self.__mmdb[key] = value
self.__queue.put_nowait(self.__kvfactory.free(key, value))
request = self.__kvfactory.free(key, value)
self.__kvfactory.dbset(self.__mmdb, key, value)
self.__queue.put_nowait(request)
def _clear_buffer(self, /) -> None:
self.__buffer = StringIO()
@ -370,7 +409,7 @@ class DbConnection(VirtualConnection):
def _compress_buffer(self, /) -> StringIO:
self.__buffer.seek(0)
bufferdb = {}
self.__kvfactory.io2db(self.__buffer, bufferdb)
self.__kvfactory.io2db(self.__buffer, bufferdb, False)
buffer = StringIO()
self.__kvfactory.db2io(bufferdb, buffer)
return buffer
@ -609,7 +648,7 @@ intended for heavy tasks."""
async def create(cls, parametres: DbParametres, /) -> 'DbConnection':
"""connect to the factory.
note: unstable signature."""
dbconnection = cls(parametres)
dbconnection = DbConnection(parametres)
await dbconnection._initialize()
return dbconnection
@ -656,7 +695,8 @@ note: unstable signature."""
def _transaction_buffer(self, delta: dict, /) -> StringIO:
buffer = StringIO()
self.__kvfactory.db2io(delta, buffer)
self.__mmdb.update(delta)
for key, value in delta.items():
self.__kvfactory.dbset(self.__mmdb, key, value)
return buffer
async def commit_transaction(self, delta: dict, /) -> None:
@ -764,6 +804,7 @@ class TransactionView(VirtualConnection):
'__shadow',
'__connection',
'__loop',
'__kvprotocol',
'__subfuture',
)
@ -772,6 +813,7 @@ class TransactionView(VirtualConnection):
self.__shadow = {}
self.__connection = connection
self.__loop = connection.loop()
self.__kvprotocol = connection.kvprotocol()
self.__subfuture: asyncio.Future | None = None
def future_context(self, /) -> FutureContext:
@ -829,13 +871,17 @@ class TransactionView(VirtualConnection):
async with self.future_context():
self.reset()
def kvprotocol(self, /) -> KVProtocol:
return self.__kvprotocol
def get(self, key: Any, default: Any, /):
"""get from the delta (unsubmitted), else from the shadow (submitted), else from the connection."""
if key in self.__delta:
return self.__delta[key]
if key in self.__shadow:
return self.__shadow[key]
return self.__connection.get(key, default)
return self.__kvprotocol.dbget(self.__delta, key, default)
elif key in self.__shadow:
return self.__kvprotocol.dbget(self.__shadow, key, default)
else:
return self.__connection.get(key, default)
def set_nowait(self, key: Any, value: Any, /) -> None:
"""note: unlike the corresponding db method, this one does not catch serialisation errors early."""

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name='ptvp35',
version='1.1rc3',
version='1.1rc4',
packages=['ptvp35'],
url='https://gitea.ongoteam.net/PTV/ptvp35',
license='MIT',

22
test_delete.py Normal file
View File

@ -0,0 +1,22 @@
import asyncio
import pathlib
from ptvp35 import *
async def main():
path = pathlib.Path('test_delete.db')
path.unlink(missing_ok=True)
async with DbFactory(path, kvfactory=KVJson()) as connection:
connection.set_nowait(0, 0)
print(connection.get(0, 1))
await connection.commit()
async with connection.transaction() as transaction:
print(transaction.get(0, 1))
transaction.set_nowait(0, KVFactory.DELETE)
print(transaction.get(0, 1))
input()
print(connection.get(0, 1))
# path.unlink(missing_ok=True)
asyncio.run(main())