From 209b34a8a7f6c090c950d32c672f4877b54c4391 Mon Sep 17 00:00:00 2001 From: timofey Date: Tue, 20 Dec 2022 05:50:55 +0000 Subject: [PATCH] I don't want to actor model --- v6d3music/processing/yprocess.py | 159 +++++++++++++++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 v6d3music/processing/yprocess.py diff --git a/v6d3music/processing/yprocess.py b/v6d3music/processing/yprocess.py new file mode 100644 index 0000000..e326e11 --- /dev/null +++ b/v6d3music/processing/yprocess.py @@ -0,0 +1,159 @@ +# why +# why +# why +# why +# why +# why +# why +# why +# why +# why +# why +# why +# why +# why +# why +# why +# what in the fuck am I doing +# I don't want to actor model +# 3.10 has no task groups ffs +# whaaeeee + +from typing import AsyncIterable, TypeVar, Generic +import asyncio + +T = TypeVar('T') + + +async def future_join(futures: AsyncIterable[asyncio.Future[T]]) -> AsyncIterable[T]: + async for future in futures: + yield await future + + +TMessage = TypeVar('TMessage', contravariant=True) + + +class YProcess(Generic[TMessage]): + __queue: asyncio.Queue[TMessage] + __task: asyncio.Future[None] + + def __init__(self) -> None: + super().__init__() + + async def _handle(self, message: TMessage) -> bool: + raise NotImplementedError + + async def _task(self) -> None: + while True: + message = await self.__queue.get() + try: + if await self._handle(message): + return + finally: + self.__queue.task_done() + + async def _initialize(self) -> None: + self.__task = asyncio.create_task(self._task()) + + async def __aenter__(self) -> 'YProcess': + await self._initialize() + return self + + def send(self, message: TMessage): + self.__queue.put_nowait(message) + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.__task + + +TElement = TypeVar('TElement') +TResult = TypeVar('TResult') + + +class Push(Generic[TElement, TResult]): + def __init__(self, element: TElement) -> None: + self.element = element + self.push: asyncio.Future[None] = asyncio.Future() + self.pull: asyncio.Future[asyncio.Future[TResult]] = asyncio.Future() + + +class Reader(YProcess[Push[TElement, TResult] | None], Generic[TElement, TResult]): + __queue: asyncio.Queue[Push[TElement, TResult] | None] + + async def _handle(self, message: Push[TElement, TResult] | None) -> bool: + self.__queue.put_nowait(message) + match message: + case Push() as push: + return False + case None: + return True + case _: + raise TypeError + + async def _iterate(self) -> AsyncIterable[TResult]: + while True: + message = await self.__queue.get() + match message: + case Push() as push: + yield await (await push.pull) + case None: + return + case _: + raise TypeError + + async def iterate(self) -> AsyncIterable[TResult]: + async with self: + async for element in self._iterate(): + yield element + + async def close(self): + self.send(None) + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + return await super().__aexit__(exc_type, exc_val, exc_tb) + + +class Pushable(YProcess[Push[TElement, TResult] | None], Generic[TElement, TResult]): + def __init__(self, reader: YProcess[Push[TElement, TResult] | None]) -> None: + self.reader = reader + + async def _on_push(self, push: Push[TElement, TResult]) -> None: + raise NotImplementedError + + async def _result(self, element: TElement) -> TResult: + raise NotImplementedError + + async def result(self, element: TElement) -> TResult: + try: + return await self._result(element) + except Exception as e: + await self.close() + raise + + def _schedule(self, push: Push[TElement, TResult]) -> None: + push.pull.set_result(asyncio.create_task(self.result(push.element))) + + async def _handle(self, message: Push[TElement, TResult] | None) -> bool: + match message: + case Push() as push: + await self._on_push(push) + return False + case None: + return True + case _: + raise TypeError + + async def push(self, element: TElement) -> None: + push = Push(element) + self.send(push) + self.reader.send(push) + await push.push + + async def close(self): + self.send(None) + self.reader.send(None) + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + return await super().__aexit__(exc_type, exc_val, exc_tb)