reducec
This commit is contained in:
parent
3235e2ca36
commit
96338b559e
@ -35,4 +35,7 @@ class ListBridge(
|
|||||||
left_bridge.reduce(reduce),
|
left_bridge.reduce(reduce),
|
||||||
right_bridge.reduce(reduce),
|
right_bridge.reduce(reduce),
|
||||||
)
|
)
|
||||||
return await reduce.merge(left, right)
|
return reduce.merge(left, right)
|
||||||
|
|
||||||
|
def loose(self) -> Reducer[Element, Out]:
|
||||||
|
return self
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
from typing import Generic, TypeVar
|
from typing import Any, Callable, Coroutine, Generic, TypeVar
|
||||||
|
|
||||||
|
from rainbowadn.core import *
|
||||||
from ._mapper import *
|
from ._mapper import *
|
||||||
from ._reduce import *
|
from ._reduce import *
|
||||||
|
|
||||||
@ -19,10 +20,20 @@ class MapReduce(Reduce[Element, Out], Generic[Element, Out, Mapped]):
|
|||||||
self.reduce_mapped = reduce
|
self.reduce_mapped = reduce
|
||||||
|
|
||||||
async def reduce(self, out: Out, element: Element) -> Out:
|
async def reduce(self, out: Out, element: Element) -> Out:
|
||||||
return await self.reduce_mapped.reduce(out, await self.mapper.map(element))
|
return await self.reduce_mapped.reducec(lambda: aidentity(out), lambda: self.mapper.map(element))
|
||||||
|
|
||||||
async def merge(self, left: Out, right: Out) -> Out:
|
async def reducec(
|
||||||
return await self.reduce_mapped.merge(left, right)
|
self,
|
||||||
|
outc: Callable[[], Coroutine[Any, Any, Out]],
|
||||||
|
elementc: Callable[[], Coroutine[Any, Any, Element]],
|
||||||
|
) -> Out:
|
||||||
|
async def element():
|
||||||
|
return await self.mapper.map(await elementc())
|
||||||
|
|
||||||
|
return await self.reduce_mapped.reducec(outc, lambda: element())
|
||||||
|
|
||||||
|
def merge(self, left: Out, right: Out) -> Out:
|
||||||
|
return self.reduce_mapped.merge(left, right)
|
||||||
|
|
||||||
def loose(self) -> Reduce[Element, Out]:
|
def loose(self) -> Reduce[Element, Out]:
|
||||||
return self
|
return self
|
||||||
|
@ -12,7 +12,7 @@ class PureReduce(Reduce[Pure, Pure], Generic[Pure]):
|
|||||||
super().__init__(initial)
|
super().__init__(initial)
|
||||||
|
|
||||||
async def reduce(self, out: Pure, element: Pure) -> Pure:
|
async def reduce(self, out: Pure, element: Pure) -> Pure:
|
||||||
return await self.merge(out, element)
|
return self.merge(out, element)
|
||||||
|
|
||||||
async def merge(self, left: Pure, right: Pure) -> Pure:
|
def merge(self, left: Pure, right: Pure) -> Pure:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
from typing import Generic, TypeVar
|
from typing import Any, Callable, Coroutine, Generic, TypeVar
|
||||||
|
from rainbowadn.core import *
|
||||||
|
|
||||||
__all__ = ('Reduce',)
|
__all__ = ('Reduce',)
|
||||||
|
|
||||||
|
|
||||||
Element = TypeVar('Element')
|
Element = TypeVar('Element')
|
||||||
Out = TypeVar('Out')
|
Out = TypeVar('Out')
|
||||||
|
|
||||||
@ -13,5 +15,13 @@ class Reduce(Generic[Element, Out]):
|
|||||||
async def reduce(self, out: Out, element: Element) -> Out:
|
async def reduce(self, out: Out, element: Element) -> Out:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
async def merge(self, left: Out, right: Out) -> Out:
|
async def reducec(
|
||||||
|
self,
|
||||||
|
out2: Callable[[], Coroutine[Any, Any, Out]],
|
||||||
|
element2: Callable[[], Coroutine[Any, Any, Element]],
|
||||||
|
) -> Out:
|
||||||
|
out, element = await gather(out2(), element2())
|
||||||
|
return await self.reduce(out, element)
|
||||||
|
|
||||||
|
def merge(self, left: Out, right: Out) -> Out:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
@ -24,8 +24,8 @@ class StackedReduce(
|
|||||||
assert isinstance(element, SequenceDispatcher)
|
assert isinstance(element, SequenceDispatcher)
|
||||||
return await element.dispatch(StackedDispatch(self.stacked, out))
|
return await element.dispatch(StackedDispatch(self.stacked, out))
|
||||||
|
|
||||||
async def merge(self, left: Out, right: Out) -> Out:
|
def merge(self, left: Out, right: Out) -> Out:
|
||||||
return await self.stacked.merge(left, right)
|
return self.stacked.merge(left, right)
|
||||||
|
|
||||||
def loose(self) -> Reduce[SequenceDispatcher[Stacked, Out], Out]:
|
def loose(self) -> Reduce[SequenceDispatcher[Stacked, Out], Out]:
|
||||||
return self
|
return self
|
||||||
|
@ -8,7 +8,7 @@ class VerifyReduce(PureReduce[bool]):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__(True)
|
super().__init__(True)
|
||||||
|
|
||||||
async def merge(self, left: bool, right: bool) -> bool:
|
def merge(self, left: bool, right: bool) -> bool:
|
||||||
assert_true(left)
|
assert_true(left)
|
||||||
assert_true(right)
|
assert_true(right)
|
||||||
return True
|
return True
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
__all__ = (
|
__all__ = (
|
||||||
'BankBlock',
|
'BankBlock',
|
||||||
|
'FlowBlock', 'FlowBlockFactory', 'FlowBlockVerification',
|
||||||
'FlowCheque',
|
'FlowCheque',
|
||||||
'FlowIterate',
|
'FlowIterate',
|
||||||
'FlowStandard',
|
'FlowStandard',
|
||||||
@ -7,6 +8,7 @@ __all__ = (
|
|||||||
)
|
)
|
||||||
|
|
||||||
from ._bankblock import BankBlock
|
from ._bankblock import BankBlock
|
||||||
|
from ._flowblock import FlowBlock, FlowBlockFactory, FlowBlockVerification
|
||||||
from ._flowcheque import FlowCheque
|
from ._flowcheque import FlowCheque
|
||||||
from ._flowiterate import FlowIterate
|
from ._flowiterate import FlowIterate
|
||||||
from ._flowstandard import FlowStandard
|
from ._flowstandard import FlowStandard
|
||||||
|
@ -51,16 +51,20 @@ class BinaryReducerAction(
|
|||||||
left: Out
|
left: Out
|
||||||
key: KeyT
|
key: KeyT
|
||||||
right: Out
|
right: Out
|
||||||
|
|
||||||
|
async def reduce_left() -> Out:
|
||||||
|
return await self.reduce.reducec(
|
||||||
|
lambda: self.on(case.protocolizedl()),
|
||||||
|
lambda: aidentity(case.split.key),
|
||||||
|
)
|
||||||
|
|
||||||
left, right = await gather(
|
left, right = await gather(
|
||||||
self.on(case.protocolizedl()),
|
reduce_left(),
|
||||||
self.on(case.protocolizedr()),
|
self.on(case.protocolizedr()),
|
||||||
)
|
)
|
||||||
|
|
||||||
return await self.reduce.merge(
|
return self.reduce.merge(
|
||||||
await self.reduce.reduce(
|
|
||||||
left,
|
left,
|
||||||
case.split.key
|
|
||||||
),
|
|
||||||
right,
|
right,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -189,5 +193,8 @@ class VerifySubsetAction(
|
|||||||
class VerifySubsetReduce(
|
class VerifySubsetReduce(
|
||||||
PureReduce[CheckResult]
|
PureReduce[CheckResult]
|
||||||
):
|
):
|
||||||
async def merge(self, left: CheckResult, right: CheckResult) -> CheckResult:
|
def merge(self, left: CheckResult, right: CheckResult) -> CheckResult:
|
||||||
return max(left, right)
|
return max(left, right)
|
||||||
|
|
||||||
|
def loose(self) -> Reduce[CheckResult, CheckResult]:
|
||||||
|
return self
|
||||||
|
@ -15,7 +15,7 @@ __all__ = ('FlowCheque',)
|
|||||||
|
|
||||||
|
|
||||||
class SumReduce(PureReduce[int]):
|
class SumReduce(PureReduce[int]):
|
||||||
async def merge(self, left: int, right: int) -> int:
|
def merge(self, left: int, right: int) -> int:
|
||||||
return left + right
|
return left + right
|
||||||
|
|
||||||
|
|
||||||
|
@ -18,7 +18,7 @@ class FlowIterate(
|
|||||||
|
|
||||||
return wrap()
|
return wrap()
|
||||||
|
|
||||||
async def merge(self, left: Iterable[Element], right: Iterable[Element]) -> Iterable[Element]:
|
def merge(self, left: Iterable[Element], right: Iterable[Element]) -> Iterable[Element]:
|
||||||
def wrap() -> Iterable[Element]:
|
def wrap() -> Iterable[Element]:
|
||||||
yield from left
|
yield from left
|
||||||
yield from right
|
yield from right
|
||||||
|
@ -1,10 +1,14 @@
|
|||||||
__all__ = (
|
__all__ = (
|
||||||
|
'Measure',
|
||||||
|
'Terminated', 'Terminate',
|
||||||
'Concurrency',
|
'Concurrency',
|
||||||
'Counter',
|
'Counter',
|
||||||
'EntryExit',
|
'EntryExit',
|
||||||
'Instrumentation',
|
'Instrumentation',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from ._measure import Measure
|
||||||
|
from ._terminate import Terminate, Terminated
|
||||||
from .concurrency import Concurrency
|
from .concurrency import Concurrency
|
||||||
from .counter import Counter
|
from .counter import Counter
|
||||||
from .entryexit import EntryExit
|
from .entryexit import EntryExit
|
||||||
|
@ -42,7 +42,7 @@ class NoneReduce(Reduce[None, None]):
|
|||||||
async def reduce(self, out: None, element: None) -> None:
|
async def reduce(self, out: None, element: None) -> None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def merge(self, left: None, right: None) -> None:
|
def merge(self, left: None, right: None) -> None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
@ -59,7 +59,7 @@ class PrintReduce(Reduce[tuple[Nullable[HashPoint], HashPoint], None]):
|
|||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def merge(self, left: None, right: None) -> None:
|
def merge(self, left: None, right: None) -> None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def loose(self) -> Reduce[tuple[Nullable[HashPoint], HashPoint], None]:
|
def loose(self) -> Reduce[tuple[Nullable[HashPoint], HashPoint], None]:
|
||||||
|
Loading…
Reference in New Issue
Block a user