I don't want to actor model
This commit is contained in:
parent
3b58d59194
commit
209b34a8a7
159
v6d3music/processing/yprocess.py
Normal file
159
v6d3music/processing/yprocess.py
Normal file
@ -0,0 +1,159 @@
|
||||
# why
|
||||
# why
|
||||
# why
|
||||
# why
|
||||
# why
|
||||
# why
|
||||
# why
|
||||
# why
|
||||
# why
|
||||
# why
|
||||
# why
|
||||
# why
|
||||
# why
|
||||
# why
|
||||
# why
|
||||
# why
|
||||
# what in the fuck am I doing
|
||||
# I don't want to actor model
|
||||
# 3.10 has no task groups ffs
|
||||
# whaaeeee
|
||||
|
||||
from typing import AsyncIterable, TypeVar, Generic
|
||||
import asyncio
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
|
||||
async def future_join(futures: AsyncIterable[asyncio.Future[T]]) -> AsyncIterable[T]:
|
||||
async for future in futures:
|
||||
yield await future
|
||||
|
||||
|
||||
TMessage = TypeVar('TMessage', contravariant=True)
|
||||
|
||||
|
||||
class YProcess(Generic[TMessage]):
|
||||
__queue: asyncio.Queue[TMessage]
|
||||
__task: asyncio.Future[None]
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
|
||||
async def _handle(self, message: TMessage) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
async def _task(self) -> None:
|
||||
while True:
|
||||
message = await self.__queue.get()
|
||||
try:
|
||||
if await self._handle(message):
|
||||
return
|
||||
finally:
|
||||
self.__queue.task_done()
|
||||
|
||||
async def _initialize(self) -> None:
|
||||
self.__task = asyncio.create_task(self._task())
|
||||
|
||||
async def __aenter__(self) -> 'YProcess':
|
||||
await self._initialize()
|
||||
return self
|
||||
|
||||
def send(self, message: TMessage):
|
||||
self.__queue.put_nowait(message)
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
await self.__task
|
||||
|
||||
|
||||
TElement = TypeVar('TElement')
|
||||
TResult = TypeVar('TResult')
|
||||
|
||||
|
||||
class Push(Generic[TElement, TResult]):
|
||||
def __init__(self, element: TElement) -> None:
|
||||
self.element = element
|
||||
self.push: asyncio.Future[None] = asyncio.Future()
|
||||
self.pull: asyncio.Future[asyncio.Future[TResult]] = asyncio.Future()
|
||||
|
||||
|
||||
class Reader(YProcess[Push[TElement, TResult] | None], Generic[TElement, TResult]):
|
||||
__queue: asyncio.Queue[Push[TElement, TResult] | None]
|
||||
|
||||
async def _handle(self, message: Push[TElement, TResult] | None) -> bool:
|
||||
self.__queue.put_nowait(message)
|
||||
match message:
|
||||
case Push() as push:
|
||||
return False
|
||||
case None:
|
||||
return True
|
||||
case _:
|
||||
raise TypeError
|
||||
|
||||
async def _iterate(self) -> AsyncIterable[TResult]:
|
||||
while True:
|
||||
message = await self.__queue.get()
|
||||
match message:
|
||||
case Push() as push:
|
||||
yield await (await push.pull)
|
||||
case None:
|
||||
return
|
||||
case _:
|
||||
raise TypeError
|
||||
|
||||
async def iterate(self) -> AsyncIterable[TResult]:
|
||||
async with self:
|
||||
async for element in self._iterate():
|
||||
yield element
|
||||
|
||||
async def close(self):
|
||||
self.send(None)
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
await self.close()
|
||||
return await super().__aexit__(exc_type, exc_val, exc_tb)
|
||||
|
||||
|
||||
class Pushable(YProcess[Push[TElement, TResult] | None], Generic[TElement, TResult]):
|
||||
def __init__(self, reader: YProcess[Push[TElement, TResult] | None]) -> None:
|
||||
self.reader = reader
|
||||
|
||||
async def _on_push(self, push: Push[TElement, TResult]) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
async def _result(self, element: TElement) -> TResult:
|
||||
raise NotImplementedError
|
||||
|
||||
async def result(self, element: TElement) -> TResult:
|
||||
try:
|
||||
return await self._result(element)
|
||||
except Exception as e:
|
||||
await self.close()
|
||||
raise
|
||||
|
||||
def _schedule(self, push: Push[TElement, TResult]) -> None:
|
||||
push.pull.set_result(asyncio.create_task(self.result(push.element)))
|
||||
|
||||
async def _handle(self, message: Push[TElement, TResult] | None) -> bool:
|
||||
match message:
|
||||
case Push() as push:
|
||||
await self._on_push(push)
|
||||
return False
|
||||
case None:
|
||||
return True
|
||||
case _:
|
||||
raise TypeError
|
||||
|
||||
async def push(self, element: TElement) -> None:
|
||||
push = Push(element)
|
||||
self.send(push)
|
||||
self.reader.send(push)
|
||||
await push.push
|
||||
|
||||
async def close(self):
|
||||
self.send(None)
|
||||
self.reader.send(None)
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
await self.close()
|
||||
return await super().__aexit__(exc_type, exc_val, exc_tb)
|
Loading…
Reference in New Issue
Block a user