diff --git a/v6d2ctx/integration/event.py b/v6d2ctx/integration/event.py new file mode 100644 index 0000000..7f8b4fb --- /dev/null +++ b/v6d2ctx/integration/event.py @@ -0,0 +1,60 @@ +__all__ = ('Event', 'SendableEvents', 'ReceivableEvents', 'Events', 'Receiver') + +import asyncio +from typing import Callable, Generic, TypeVar + +from typing_extensions import Self + +from .responsetype import ResponseType + + +class Event: + def json(self) -> ResponseType: + raise NotImplementedError + + +T = TypeVar('T', bound=Event) +T_co = TypeVar('T_co', bound=Event, covariant=True) +T_contra = TypeVar('T_contra', bound=Event, contravariant=True) + + +class Receiver(Generic[T_contra]): + def __init__(self, receive: Callable[[T_contra], None], receivers: set[Self], /) -> None: + self.__receive = receive + self.__receivers = receivers + self.__receiving = False + + def __enter__(self) -> None: + self.__receivers.add(self) + self.__receiving = True + + def __exit__(self, exc_type, exc_val, exc_tb): + self.__receiving = False + self.__receivers.remove(self) + + def receive(self, event: T_contra, /) -> None: + if self.__receiving: + self.__receive(event) + + +class SendableEvents(Generic[T_contra]): + def send(self, event: T_contra, /) -> None: + raise NotImplementedError + + +class ReceivableEvents(Generic[T_co]): + def receive(self, receive: Callable[[T_co], None], /) -> Receiver[T_co]: + raise NotImplementedError + + +class Events(Generic[T], SendableEvents[T], ReceivableEvents[T]): + def __init__(self) -> None: + self.__receivers: set[Receiver[T]] = set() + self.__loop = asyncio.get_running_loop() + + def send(self, event: T, /) -> None: + for receiver in self.__receivers: + self.__loop.call_soon(receiver.receive, event) + + def receive(self, receive: Callable[[T], None], /) -> Receiver[T]: + return Receiver(receive, self.__receivers) diff --git a/v6d2ctx/integration/responsetype.py b/v6d2ctx/integration/responsetype.py new file mode 100644 index 0000000..2181c0d --- /dev/null +++ b/v6d2ctx/integration/responsetype.py @@ -0,0 +1,17 @@ +__all__ = ('ResponseType', 'cast_to_response') + +from typing import Any, TypeAlias + +ResponseType: TypeAlias = list['ResponseType'] | dict[str, 'ResponseType'] | float | int | bool | str | None + + +def cast_to_response(target: Any) -> ResponseType: + match target: + case str() | int() | float() | bool() | None: + return target + case list() | tuple(): + return list(map(cast_to_response, target)) + case dict(): + return {str(key): cast_to_response(value) for key, value in target.items()} + case _: + return str(target) diff --git a/v6d2ctx/integration/targets.py b/v6d2ctx/integration/targets.py new file mode 100644 index 0000000..eb58130 --- /dev/null +++ b/v6d2ctx/integration/targets.py @@ -0,0 +1,76 @@ +__all__ = ('Targets', 'JsonLike', 'Async') + +import abc +from typing import Any, Callable, Generic, TypeVar + +from rainbowadn.instrument import Instrumentation + +from .responsetype import * + + +def qualname(t: type) -> str: + return f'{t.__module__}.{t.__qualname__}' + + +T = TypeVar('T') + + +class Flagful(Generic[T]): + def __init__(self, value: T, flags: set[object]) -> None: + self.value = value + self.flags = flags + + +class Targets: + def __init__(self) -> None: + self.targets: dict[str, Flagful[tuple[Any, str]]] = {} + self.instrumentations: dict[str, Flagful[Callable[[Any, str], Instrumentation]]] = {} + self.factories: dict[tuple[str, str], Callable[[], Instrumentation]] = {} + + def register_target(self, targetname: str, target: Any, methodname: str, /, *flags: object) -> None: + self.targets[targetname] = Flagful((target, methodname), set(flags)) + print(f'registered target: {targetname}') + + def register_type(self, target: type, methodname: str, /, *flags: object) -> None: + self.register_target(f'{qualname(target)}.{methodname}', target, methodname, *flags) + + def register_instance(self, target: object, methodname: str, /, *flags: object) -> None: + self.register_target(f'{qualname(target.__class__)}().{methodname}', target, methodname, *flags) + + def register_instrumentation( + self, + instrumentationname: str, + instrumentation_factory: Callable[[Any, str], Instrumentation], + /, + *flags: object, + ) -> None: + self.instrumentations[instrumentationname] = Flagful(instrumentation_factory, set(flags)) + print(f'registered instrumentation: {instrumentationname}') + + def get_factory( + self, + targetname: str, + target: Any, + methodname: str, + instrumentationname: str, + instrumentation_factory: Callable[[Any, str], Instrumentation], + / + ) -> Callable[[], Instrumentation]: + if (targetname, instrumentationname) not in self.factories: + flags_required = self.instrumentations[instrumentationname].flags + flags_present = self.targets[targetname].flags + if not flags_required.issubset(flags_present): + raise KeyError('target lacks flags required by instrumentation') + self.factories[targetname, instrumentationname] = ( + lambda: instrumentation_factory(target, methodname) + ) + return self.factories[targetname, instrumentationname] + + +class JsonLike(abc.ABC): + @abc.abstractmethod + def json(self) -> ResponseType: + raise NotImplementedError + + +Async = object()