rainbowadn/rainbowadn/flow/core/_mapreduce.py

44 lines
1.4 KiB
Python

from typing import Any, Callable, Coroutine, Generic, TypeVar
from rainbowadn.core import *
from ._mapper import *
from ._reduce import *
__all__ = ('MapReduce',)
Element = TypeVar('Element')
Mapped = TypeVar('Mapped')
Out = TypeVar('Out')
class MapReduce(Reduce[Element, Out], Generic[Element, Out, Mapped]):
def __init__(self, mapper: Mapper[Element, Mapped], reduce: Reduce[Mapped, Out]):
assert isinstance(mapper, Mapper)
assert isinstance(reduce, Reduce)
super().__init__(reduce.initial)
self.mapper = mapper
self.reduce_mapped = reduce
async def reduce(self, out: Out, element: Element) -> Out:
return await self.reduce_mapped.reducec(lambda: aidentity(out), lambda: self.mapper.map(element))
async def reducec(
self,
outc: Callable[[], Coroutine[Any, Any, Out]],
elementc: Callable[[], Coroutine[Any, Any, Element]],
) -> Out:
return await self.reduce_mapped.reducec(outc, self.mapper.bind(elementc))
async def merge(self, left: Out, right: Out) -> Out:
return await self.reduce_mapped.merge(left, right)
async def mergec(
self,
leftc: Callable[[], Coroutine[Any, Any, Out]],
rightc: Callable[[], Coroutine[Any, Any, Out]],
) -> Out:
return await self.reduce_mapped.mergec(leftc, rightc)
def loose(self) -> Reduce[Element, Out]:
return self