v6d2ctx.integration

This commit is contained in:
AF 2023-02-27 09:07:02 +00:00
parent fe7ea0bf5c
commit 1fc3f3f60c
3 changed files with 153 additions and 0 deletions

View File

@ -0,0 +1,60 @@
__all__ = ('Event', 'SendableEvents', 'ReceivableEvents', 'Events', 'Receiver')
import asyncio
from typing import Callable, Generic, TypeVar
from typing_extensions import Self
from .responsetype import ResponseType
class Event:
def json(self) -> ResponseType:
raise NotImplementedError
T = TypeVar('T', bound=Event)
T_co = TypeVar('T_co', bound=Event, covariant=True)
T_contra = TypeVar('T_contra', bound=Event, contravariant=True)
class Receiver(Generic[T_contra]):
def __init__(self, receive: Callable[[T_contra], None], receivers: set[Self], /) -> None:
self.__receive = receive
self.__receivers = receivers
self.__receiving = False
def __enter__(self) -> None:
self.__receivers.add(self)
self.__receiving = True
def __exit__(self, exc_type, exc_val, exc_tb):
self.__receiving = False
self.__receivers.remove(self)
def receive(self, event: T_contra, /) -> None:
if self.__receiving:
self.__receive(event)
class SendableEvents(Generic[T_contra]):
def send(self, event: T_contra, /) -> None:
raise NotImplementedError
class ReceivableEvents(Generic[T_co]):
def receive(self, receive: Callable[[T_co], None], /) -> Receiver[T_co]:
raise NotImplementedError
class Events(Generic[T], SendableEvents[T], ReceivableEvents[T]):
def __init__(self) -> None:
self.__receivers: set[Receiver[T]] = set()
self.__loop = asyncio.get_running_loop()
def send(self, event: T, /) -> None:
for receiver in self.__receivers:
self.__loop.call_soon(receiver.receive, event)
def receive(self, receive: Callable[[T], None], /) -> Receiver[T]:
return Receiver(receive, self.__receivers)

View File

@ -0,0 +1,17 @@
__all__ = ('ResponseType', 'cast_to_response')
from typing import Any, TypeAlias
ResponseType: TypeAlias = list['ResponseType'] | dict[str, 'ResponseType'] | float | int | bool | str | None
def cast_to_response(target: Any) -> ResponseType:
match target:
case str() | int() | float() | bool() | None:
return target
case list() | tuple():
return list(map(cast_to_response, target))
case dict():
return {str(key): cast_to_response(value) for key, value in target.items()}
case _:
return str(target)

View File

@ -0,0 +1,76 @@
__all__ = ('Targets', 'JsonLike', 'Async')
import abc
from typing import Any, Callable, Generic, TypeVar
from rainbowadn.instrument import Instrumentation
from .responsetype import *
def qualname(t: type) -> str:
return f'{t.__module__}.{t.__qualname__}'
T = TypeVar('T')
class Flagful(Generic[T]):
def __init__(self, value: T, flags: set[object]) -> None:
self.value = value
self.flags = flags
class Targets:
def __init__(self) -> None:
self.targets: dict[str, Flagful[tuple[Any, str]]] = {}
self.instrumentations: dict[str, Flagful[Callable[[Any, str], Instrumentation]]] = {}
self.factories: dict[tuple[str, str], Callable[[], Instrumentation]] = {}
def register_target(self, targetname: str, target: Any, methodname: str, /, *flags: object) -> None:
self.targets[targetname] = Flagful((target, methodname), set(flags))
print(f'registered target: {targetname}')
def register_type(self, target: type, methodname: str, /, *flags: object) -> None:
self.register_target(f'{qualname(target)}.{methodname}', target, methodname, *flags)
def register_instance(self, target: object, methodname: str, /, *flags: object) -> None:
self.register_target(f'{qualname(target.__class__)}().{methodname}', target, methodname, *flags)
def register_instrumentation(
self,
instrumentationname: str,
instrumentation_factory: Callable[[Any, str], Instrumentation],
/,
*flags: object,
) -> None:
self.instrumentations[instrumentationname] = Flagful(instrumentation_factory, set(flags))
print(f'registered instrumentation: {instrumentationname}')
def get_factory(
self,
targetname: str,
target: Any,
methodname: str,
instrumentationname: str,
instrumentation_factory: Callable[[Any, str], Instrumentation],
/
) -> Callable[[], Instrumentation]:
if (targetname, instrumentationname) not in self.factories:
flags_required = self.instrumentations[instrumentationname].flags
flags_present = self.targets[targetname].flags
if not flags_required.issubset(flags_present):
raise KeyError('target lacks flags required by instrumentation')
self.factories[targetname, instrumentationname] = (
lambda: instrumentation_factory(target, methodname)
)
return self.factories[targetname, instrumentationname]
class JsonLike(abc.ABC):
@abc.abstractmethod
def json(self) -> ResponseType:
raise NotImplementedError
Async = object()