VirtualConnection + nightly

This commit is contained in:
AF 2022-11-23 15:53:48 +00:00
parent 07ea5a7a1a
commit 7a8b7a82af
2 changed files with 212 additions and 21 deletions

View File

@ -1,12 +1,14 @@
import abc
import asyncio
import concurrent.futures
import json
import os
import pathlib
import threading
import warnings
from functools import wraps
from io import StringIO, UnsupportedOperation
from typing import Any, IO, Hashable
from typing import IO, Any, Callable, Hashable, TypeVar
__all__ = (
'KVFactory',
@ -182,7 +184,74 @@ class RequestToClosedConnection(asyncio.InvalidStateError):
pass
class DbConnection:
class NightlyWarning(Warning):
enabled = True
TDecoratedNightly = TypeVar('TDecoratedNightly', bound=Callable | type | bool)
def nightly(decorated: TDecoratedNightly = None, prefix: str = '', stacklevel=2) -> TDecoratedNightly:
if decorated is None:
NightlyWarning.enabled = False
return None # type: ignore
if isinstance(decorated, type):
decorated.__init__ = nightly(
decorated.__init__, f'{decorated.__name__}.'
)
decorated.__init_subclass__ = nightly(
decorated.__init_subclass__, f'{decorated.__name__}.', stacklevel=3
)
return decorated # type: ignore
assert callable(decorated)
message = f"{prefix}{decorated.__name__}"
@wraps(decorated)
def wrap(*args, **kwargs):
if NightlyWarning.enabled:
warnings.warn(message, NightlyWarning, stacklevel=stacklevel)
return decorated(*args, **kwargs)
if wrap.__doc__ is None or not wrap.__doc__:
wrap.__doc__ = '@nightly'
elif wrap.__doc__.startswith('@nightly'):
pass
else:
wrap.__doc__ = '@nightly\n\n' + wrap.__doc__
return wrap # type: ignore
warnings.filterwarnings('default', category=NightlyWarning, module='__main__')
warnings.filterwarnings('ignore', category=NightlyWarning, module='ptvp35')
nightly = nightly(nightly) # type: ignore
@nightly
class VirtualConnection(abc.ABC):
"""intersection of DbConnection and TransactionView functionality"""
__slots__ = ()
def get(self, key: Any, default: Any, /):
raise NotImplementedError
async def commit_transaction(self, delta: dict, /) -> None:
raise NotImplementedError
def submit_transaction_request(self, delta: dict, future: asyncio.Future | None, /) -> None:
raise NotImplementedError
@nightly
def loop(self, /) -> asyncio.AbstractEventLoop:
raise NotImplementedError
def transaction(self, /) -> 'Transaction':
return Transaction(self)
class DbConnection(VirtualConnection):
"""note: unstable constructor signature."""
__slots__ = (
@ -519,7 +588,7 @@ note: unstable signature."""
dbconnection = cls(parametres)
await dbconnection._initialize()
return dbconnection
async def _close_buffer(self, /):
await self._commit_buffer()
if not self.__buffer_future.done():
@ -601,9 +670,12 @@ note: unstable signature."""
self.__queue.put_nowait(CommitRequest(future))
await future
@nightly
def loop(self, /) -> asyncio.AbstractEventLoop:
return self.__loop
def transaction(self, /) -> 'Transaction':
"""open new transaction."""
return Transaction(self, self.__loop)
return Transaction(self)
class DbFactory:
@ -646,7 +718,19 @@ class Db(DbConnection):
await self.aclose()
class TransactionView:
class FutureContext:
def __init__(self, future: asyncio.Future | None) -> None:
self.__future = future
async def __aenter__(self) -> None:
pass
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.__future is not None:
await self.__future
class TransactionView(VirtualConnection):
"""note: unstable constructor signature."""
__slots__ = (
@ -657,14 +741,71 @@ class TransactionView:
'__subfuture',
)
def __init__(self, delta: dict, connection: DbConnection, loop: asyncio.AbstractEventLoop, /) -> None:
def __init__(self, delta: dict, connection: VirtualConnection, /) -> None:
self.__delta = delta
self.__shadow = {}
self.__connection = connection
self.__loop = loop
self.__loop = connection.loop()
self.__subfuture: asyncio.Future | None = None
def future_context(self, /) -> FutureContext:
"""do something (inside of async with), then wait for submitted changes to be committed."""
return FutureContext(self.__subfuture)
@nightly
def rollback(self, /) -> None:
"""clear unsubmitted changes."""
self.__delta.clear()
@nightly
def illuminate(self, /) -> None:
"""clear submitted changes, thus syncing the view (underlying the delta) with the connection."""
self.__shadow.clear()
@nightly
async def ailluminate(self, /) -> None:
"""illuminate, then wait for submitted changes to be committed."""
async with self.future_context():
self.illuminate()
@nightly
def fork(self, /) -> None:
"""keep delta, but forget about the shadow entirely (including making sure it's committed)."""
self.illuminate()
self.__subfuture = None
@nightly
async def afork(self, /) -> None:
"""fork, then wait for submitted changes to be committed."""
async with self.future_context():
self.fork()
@nightly
def clear(self, /) -> None:
"""clear all changes (including the shadow)."""
self.rollback()
self.illuminate()
@nightly
async def aclear(self, /) -> None:
"""clear, then wait for submitted changes to be committed."""
async with self.future_context():
self.clear()
@nightly
def reset(self, /) -> None:
"""reset transaction."""
self.clear()
self.__subfuture = None
@nightly
async def areset(self, /) -> None:
"""reset, then wait for submitted changes to be committed."""
async with self.future_context():
self.reset()
def get(self, key: Any, default: Any, /):
"""get from the delta (unsubmitted), else from the shadow (submitted), else from the connection."""
if key in self.__delta:
return self.__delta[key]
if key in self.__shadow:
@ -683,18 +824,29 @@ class TransactionView:
async def commit(self, /) -> None:
"""bulk analogue of DbConnection.set()."""
subfuture: asyncio.Future | None = self.__subfuture
self.__subfuture = None
delta = self._delta()
if delta:
await self.__connection.commit_transaction(delta)
if subfuture is not None:
await subfuture
# for persistence5('s forks) developers:
# q: why not self.__subfuture = None here?
# a: run two commit calls concurrently. one will quit early and fail semantically.
# we also never implicitly reset self.__subfuture because newly created future may depend on it.
# q: why not self.submit() inside FC block?
# a: that would require using FC block later once more + that future may include extra submitted changes;
# so one would need to do submit, then do an empty FC block. that maybe introduced in the future
# q: why use if delta?
# a: to have code symmetric to that of submit + to not create an extra coroutine.
# note: q&a comments above may become obsolete
async with self.future_context():
delta = self._delta()
if delta:
await self.__connection.commit_transaction(delta)
def submit(self, /) -> None:
"""submit changes.
_nowait analogue of commit().
bulk analogue of DbConnection.set_nowait()."""
# for persistence5('s forks) developers:
# q: why use if delta?
# a: to have code symmetric to that of commit + to not create an extra future.
# note: q&a comments above may become obsolete
delta = self._delta()
if delta:
future = self.__loop.create_future()
@ -737,22 +889,61 @@ bulk analogue of DbConnection.set_nowait()."""
case _:
raise TypeError
@nightly
async def commit_transaction(self, delta: dict, /) -> None:
if not delta:
return
self.__delta.update(delta)
await self.commit()
@nightly
def submit_transaction(self, delta: dict, /) -> None:
if not delta:
return
self.__delta.update(delta)
self.submit()
@nightly
def submit_transaction_request(self, delta: dict, future: asyncio.Future | None, /) -> None:
def set_result(sf: asyncio.Future | None):
if future is None:
pass
elif sf is None or (e := sf.exception()) is None:
future.set_result(None)
else:
future.set_exception(e)
if not delta:
set_result(None)
return
self.submit_transaction(delta)
if self.__subfuture is None:
set_result(None)
return
self.__subfuture.add_done_callback(set_result)
@nightly
def loop(self, /) -> asyncio.AbstractEventLoop:
return self.__loop
@nightly
def transaction(self, /) -> 'Transaction':
return super().transaction()
class Transaction:
"""note: unstable signature."""
__slots__ = (
'__connection',
'__loop',
'__view',
'__running',
)
__view: TransactionView
def __init__(self, connection: DbConnection, loop: asyncio.AbstractEventLoop, /) -> None:
def __init__(self, connection: VirtualConnection, /) -> None:
self.__connection = connection
self.__loop = loop
self.__running = False
async def __aenter__(self) -> TransactionView:
@ -770,7 +961,7 @@ class Transaction:
def __enter__(self) -> TransactionView:
assert not self.__running
self.__running = True
self.__view = TransactionView({}, self.__connection, self.__loop)
self.__view = TransactionView({}, self.__connection)
return self.__view
def __exit__(self, exc_type, exc_val, exc_tb):

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name='ptvp35',
version='1.0rc6',
version='1.0rc7',
packages=['ptvp35'],
url='https://gitea.ongoteam.net/PTV/ptvp35',
license='',