diff --git a/v6d3music/processing/yprocess.py b/v6d3music/processing/yprocess.py
new file mode 100644
index 0000000..e326e11
--- /dev/null
+++ b/v6d3music/processing/yprocess.py
@@ -0,0 +1,159 @@
+# why
+# why
+# why
+# why
+# why
+# why
+# why
+# why
+# why
+# why
+# why
+# why
+# why
+# why
+# why
+# why
+# what in the fuck am I doing
+# I don't want to actor model
+# 3.10 has no task groups ffs
+# whaaeeee
+
+from typing import AsyncIterable, TypeVar, Generic
+import asyncio
+
+T = TypeVar('T')
+
+
+async def future_join(futures: AsyncIterable[asyncio.Future[T]]) -> AsyncIterable[T]:
+    async for future in futures:
+        yield await future
+
+
+TMessage = TypeVar('TMessage', contravariant=True)
+
+
+class YProcess(Generic[TMessage]):
+    __queue: asyncio.Queue[TMessage]
+    __task: asyncio.Future[None]
+
+    def __init__(self) -> None:
+        super().__init__()
+
+    async def _handle(self, message: TMessage) -> bool:
+        raise NotImplementedError
+
+    async def _task(self) -> None:
+        while True:
+            message = await self.__queue.get()
+            try:
+                if await self._handle(message):
+                    return
+            finally:
+                self.__queue.task_done()
+
+    async def _initialize(self) -> None:
+        self.__task = asyncio.create_task(self._task())
+
+    async def __aenter__(self) -> 'YProcess':
+        await self._initialize()
+        return self
+
+    def send(self, message: TMessage):
+        self.__queue.put_nowait(message)
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        await self.__task
+
+
+TElement = TypeVar('TElement')
+TResult = TypeVar('TResult')
+
+
+class Push(Generic[TElement, TResult]):
+    def __init__(self, element: TElement) -> None:
+        self.element = element
+        self.push: asyncio.Future[None] = asyncio.Future()
+        self.pull: asyncio.Future[asyncio.Future[TResult]] = asyncio.Future()
+
+
+class Reader(YProcess[Push[TElement, TResult] | None], Generic[TElement, TResult]):
+    __queue: asyncio.Queue[Push[TElement, TResult] | None]
+
+    async def _handle(self, message: Push[TElement, TResult] | None) -> bool:
+        self.__queue.put_nowait(message)
+        match message:
+            case Push() as push:
+                return False
+            case None:
+                return True
+            case _:
+                raise TypeError
+
+    async def _iterate(self) -> AsyncIterable[TResult]:
+        while True:
+            message = await self.__queue.get()
+            match message:
+                case Push() as push:
+                    yield await (await push.pull)
+                case None:
+                    return
+                case _:
+                    raise TypeError
+
+    async def iterate(self) -> AsyncIterable[TResult]:
+        async with self:
+            async for element in self._iterate():
+                yield element
+
+    async def close(self):
+        self.send(None)
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        await self.close()
+        return await super().__aexit__(exc_type, exc_val, exc_tb)
+
+
+class Pushable(YProcess[Push[TElement, TResult] | None], Generic[TElement, TResult]):
+    def __init__(self, reader: YProcess[Push[TElement, TResult] | None]) -> None:
+        self.reader = reader
+
+    async def _on_push(self, push: Push[TElement, TResult]) -> None:
+        raise NotImplementedError
+
+    async def _result(self, element: TElement) -> TResult:
+        raise NotImplementedError
+
+    async def result(self, element: TElement) -> TResult:
+        try:
+            return await self._result(element)
+        except Exception as e:
+            await self.close()
+            raise
+
+    def _schedule(self, push: Push[TElement, TResult]) -> None:
+        push.pull.set_result(asyncio.create_task(self.result(push.element)))
+
+    async def _handle(self, message: Push[TElement, TResult] | None) -> bool:
+        match message:
+            case Push() as push:
+                await self._on_push(push)
+                return False
+            case None:
+                return True
+            case _:
+                raise TypeError
+
+    async def push(self, element: TElement) -> None:
+        push = Push(element)
+        self.send(push)
+        self.reader.send(push)
+        await push.push
+
+    async def close(self):
+        self.send(None)
+        self.reader.send(None)
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        await self.close()
+        return await super().__aexit__(exc_type, exc_val, exc_tb)