diff --git a/ptvp35/__init__.py b/ptvp35/__init__.py index 71dcfde..daf26bd 100644 --- a/ptvp35/__init__.py +++ b/ptvp35/__init__.py @@ -1,12 +1,14 @@ +import abc import asyncio import concurrent.futures import json import os import pathlib import threading +import warnings +from functools import wraps from io import StringIO, UnsupportedOperation -from typing import Any, IO, Hashable - +from typing import IO, Any, Callable, Hashable, TypeVar __all__ = ( 'KVFactory', @@ -182,7 +184,74 @@ class RequestToClosedConnection(asyncio.InvalidStateError): pass -class DbConnection: +class NightlyWarning(Warning): + enabled = True + + +TDecoratedNightly = TypeVar('TDecoratedNightly', bound=Callable | type | bool) + + +def nightly(decorated: TDecoratedNightly = None, prefix: str = '', stacklevel=2) -> TDecoratedNightly: + if decorated is None: + NightlyWarning.enabled = False + return None # type: ignore + if isinstance(decorated, type): + decorated.__init__ = nightly( + decorated.__init__, f'{decorated.__name__}.' + ) + decorated.__init_subclass__ = nightly( + decorated.__init_subclass__, f'{decorated.__name__}.', stacklevel=3 + ) + return decorated # type: ignore + + assert callable(decorated) + message = f"{prefix}{decorated.__name__}" + + @wraps(decorated) + def wrap(*args, **kwargs): + if NightlyWarning.enabled: + warnings.warn(message, NightlyWarning, stacklevel=stacklevel) + return decorated(*args, **kwargs) + + if wrap.__doc__ is None or not wrap.__doc__: + wrap.__doc__ = '@nightly' + elif wrap.__doc__.startswith('@nightly'): + pass + else: + wrap.__doc__ = '@nightly\n\n' + wrap.__doc__ + + return wrap # type: ignore + + +warnings.filterwarnings('default', category=NightlyWarning, module='__main__') +warnings.filterwarnings('ignore', category=NightlyWarning, module='ptvp35') +nightly = nightly(nightly) # type: ignore + + +@nightly +class VirtualConnection(abc.ABC): + """intersection of DbConnection and TransactionView functionality""" + + __slots__ = () + + def get(self, key: Any, default: Any, /): + raise NotImplementedError + + async def commit_transaction(self, delta: dict, /) -> None: + raise NotImplementedError + + def submit_transaction_request(self, delta: dict, future: asyncio.Future | None, /) -> None: + raise NotImplementedError + + @nightly + def loop(self, /) -> asyncio.AbstractEventLoop: + raise NotImplementedError + + def transaction(self, /) -> 'Transaction': + return Transaction(self) + + +class DbConnection(VirtualConnection): """note: unstable constructor signature.""" __slots__ = ( @@ -519,7 +588,7 @@ note: unstable signature.""" dbconnection = cls(parametres) await dbconnection._initialize() return dbconnection - + async def _close_buffer(self, /): await self._commit_buffer() if not self.__buffer_future.done(): @@ -601,9 +670,12 @@ note: unstable signature.""" self.__queue.put_nowait(CommitRequest(future)) await future + @nightly + def loop(self, /) -> asyncio.AbstractEventLoop: + return self.__loop + def transaction(self, /) -> 'Transaction': - """open new transaction.""" - return Transaction(self, self.__loop) + return Transaction(self) class DbFactory: @@ -646,7 +718,19 @@ class Db(DbConnection): await self.aclose() -class TransactionView: +class FutureContext: + def __init__(self, future: asyncio.Future | None) -> None: + self.__future = future + + async def __aenter__(self) -> None: + pass + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if self.__future is not None: + await self.__future + + +class TransactionView(VirtualConnection): """note: unstable constructor signature.""" __slots__ = ( @@ -657,14 +741,71 @@ class TransactionView: '__subfuture', ) - def __init__(self, delta: dict, connection: DbConnection, loop: asyncio.AbstractEventLoop, /) -> None: + def __init__(self, delta: dict, connection: VirtualConnection, /) -> None: self.__delta = delta self.__shadow = {} self.__connection = connection - self.__loop = loop + self.__loop = connection.loop() self.__subfuture: asyncio.Future | None = None + def future_context(self, /) -> FutureContext: + """do something (inside of async with), then wait for submitted changes to be committed.""" + return FutureContext(self.__subfuture) + + @nightly + def rollback(self, /) -> None: + """clear unsubmitted changes.""" + self.__delta.clear() + + @nightly + def illuminate(self, /) -> None: + """clear submitted changes, thus syncing the view (underlying the delta) with the connection.""" + self.__shadow.clear() + + @nightly + async def ailluminate(self, /) -> None: + """illuminate, then wait for submitted changes to be committed.""" + async with self.future_context(): + self.illuminate() + + @nightly + def fork(self, /) -> None: + """keep delta, but forget about the shadow entirely (including making sure it's committed).""" + self.illuminate() + self.__subfuture = None + + @nightly + async def afork(self, /) -> None: + """fork, then wait for submitted changes to be committed.""" + async with self.future_context(): + self.fork() + + @nightly + def clear(self, /) -> None: + """clear all changes (including the shadow).""" + self.rollback() + self.illuminate() + + @nightly + async def aclear(self, /) -> None: + """clear, then wait for submitted changes to be committed.""" + async with self.future_context(): + self.clear() + + @nightly + def reset(self, /) -> None: + """reset transaction.""" + self.clear() + self.__subfuture = None + + @nightly + async def areset(self, /) -> None: + """reset, then wait for submitted changes to be committed.""" + async with self.future_context(): + self.reset() + def get(self, key: Any, default: Any, /): + """get from the delta (unsubmitted), else from the shadow (submitted), else from the connection.""" if key in self.__delta: return self.__delta[key] if key in self.__shadow: @@ -683,18 +824,29 @@ class TransactionView: async def commit(self, /) -> None: """bulk analogue of DbConnection.set().""" - subfuture: asyncio.Future | None = self.__subfuture - self.__subfuture = None - delta = self._delta() - if delta: - await self.__connection.commit_transaction(delta) - if subfuture is not None: - await subfuture + # for persistence5('s forks) developers: + # q: why not self.__subfuture = None here? + # a: run two commit calls concurrently. one will quit early and fail semantically. + # we also never implicitly reset self.__subfuture because newly created future may depend on it. + # q: why not self.submit() inside FC block? + # a: that would require using FC block later once more + that future may include extra submitted changes; + # so one would need to do submit, then do an empty FC block. that maybe introduced in the future + # q: why use if delta? + # a: to have code symmetric to that of submit + to not create an extra coroutine. + # note: q&a comments above may become obsolete + async with self.future_context(): + delta = self._delta() + if delta: + await self.__connection.commit_transaction(delta) def submit(self, /) -> None: """submit changes. _nowait analogue of commit(). bulk analogue of DbConnection.set_nowait().""" + # for persistence5('s forks) developers: + # q: why use if delta? + # a: to have code symmetric to that of commit + to not create an extra future. + # note: q&a comments above may become obsolete delta = self._delta() if delta: future = self.__loop.create_future() @@ -737,22 +889,61 @@ bulk analogue of DbConnection.set_nowait().""" case _: raise TypeError + @nightly + async def commit_transaction(self, delta: dict, /) -> None: + if not delta: + return + self.__delta.update(delta) + await self.commit() + + @nightly + def submit_transaction(self, delta: dict, /) -> None: + if not delta: + return + self.__delta.update(delta) + self.submit() + + @nightly + def submit_transaction_request(self, delta: dict, future: asyncio.Future | None, /) -> None: + def set_result(sf: asyncio.Future | None): + if future is None: + pass + elif sf is None or (e := sf.exception()) is None: + future.set_result(None) + else: + future.set_exception(e) + + if not delta: + set_result(None) + return + self.submit_transaction(delta) + if self.__subfuture is None: + set_result(None) + return + self.__subfuture.add_done_callback(set_result) + + @nightly + def loop(self, /) -> asyncio.AbstractEventLoop: + return self.__loop + + @nightly + def transaction(self, /) -> 'Transaction': + return super().transaction() + class Transaction: """note: unstable signature.""" __slots__ = ( '__connection', - '__loop', '__view', '__running', ) __view: TransactionView - def __init__(self, connection: DbConnection, loop: asyncio.AbstractEventLoop, /) -> None: + def __init__(self, connection: VirtualConnection, /) -> None: self.__connection = connection - self.__loop = loop self.__running = False async def __aenter__(self) -> TransactionView: @@ -770,7 +961,7 @@ class Transaction: def __enter__(self) -> TransactionView: assert not self.__running self.__running = True - self.__view = TransactionView({}, self.__connection, self.__loop) + self.__view = TransactionView({}, self.__connection) return self.__view def __exit__(self, exc_type, exc_val, exc_tb): diff --git a/setup.py b/setup.py index 3f01e87..dd8f724 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup setup( name='ptvp35', - version='1.0rc6', + version='1.0rc7', packages=['ptvp35'], url='https://gitea.ongoteam.net/PTV/ptvp35', license='',