__all__ = ('Event', 'SendableEvents', 'ReceivableEvents', 'Events', 'Receiver') import asyncio from typing import Callable, Generic, TypeVar from typing_extensions import Self from .responsetype import ResponseType class Event: def json(self) -> ResponseType: raise NotImplementedError T = TypeVar('T', bound=Event) T_co = TypeVar('T_co', bound=Event, covariant=True) T_contra = TypeVar('T_contra', bound=Event, contravariant=True) class Receiver(Generic[T_contra]): def __init__(self, receive: Callable[[T_contra], None], receivers: set[Self], /) -> None: self.__receive = receive self.__receivers = receivers self.__receiving = False def __enter__(self) -> None: self.__receivers.add(self) self.__receiving = True def __exit__(self, exc_type, exc_val, exc_tb): self.__receiving = False self.__receivers.remove(self) def receive(self, event: T_contra, /) -> None: if self.__receiving: self.__receive(event) class SendableEvents(Generic[T_contra]): def send(self, event: T_contra, /) -> None: raise NotImplementedError class ReceivableEvents(Generic[T_co]): def receive(self, receive: Callable[[T_co], None], /) -> Receiver[T_co]: raise NotImplementedError class Events(Generic[T], SendableEvents[T], ReceivableEvents[T]): def __init__(self) -> None: self.__receivers: set[Receiver[T]] = set() self.__loop = asyncio.get_running_loop() def send(self, event: T, /) -> None: for receiver in self.__receivers: self.__loop.call_soon(receiver.receive, event) def receive(self, receive: Callable[[T], None], /) -> Receiver[T]: return Receiver(receive, self.__receivers)