From 96338b559e6fb9abc748e1616c44eb11c00c3152 Mon Sep 17 00:00:00 2001 From: timotheyca Date: Fri, 15 Jul 2022 21:40:29 +0300 Subject: [PATCH] reducec --- rainbowadn/flow/bridge/_listbridge.py | 5 ++++- rainbowadn/flow/core/_mapreduce.py | 19 +++++++++++++---- rainbowadn/flow/core/_purereduce.py | 4 ++-- rainbowadn/flow/core/_reduce.py | 14 +++++++++++-- rainbowadn/flow/stacked/_stackedreduce.py | 4 ++-- .../flow/verification/core/_verifyreduce.py | 2 +- rainbowadn/flow13/__init__.py | 2 ++ rainbowadn/flow13/_binaryflow.py | 21 ++++++++++++------- rainbowadn/flow13/_flowcheque.py | 2 +- rainbowadn/flow13/_flowiterate.py | 2 +- rainbowadn/instrument/__init__.py | 4 ++++ rainbowadn/testing/test_bridge.py | 4 ++-- 12 files changed, 60 insertions(+), 23 deletions(-) diff --git a/rainbowadn/flow/bridge/_listbridge.py b/rainbowadn/flow/bridge/_listbridge.py index badd0d4..4034814 100644 --- a/rainbowadn/flow/bridge/_listbridge.py +++ b/rainbowadn/flow/bridge/_listbridge.py @@ -35,4 +35,7 @@ class ListBridge( left_bridge.reduce(reduce), right_bridge.reduce(reduce), ) - return await reduce.merge(left, right) + return reduce.merge(left, right) + + def loose(self) -> Reducer[Element, Out]: + return self diff --git a/rainbowadn/flow/core/_mapreduce.py b/rainbowadn/flow/core/_mapreduce.py index 0edc9ee..a7adbe5 100644 --- a/rainbowadn/flow/core/_mapreduce.py +++ b/rainbowadn/flow/core/_mapreduce.py @@ -1,5 +1,6 @@ -from typing import Generic, TypeVar +from typing import Any, Callable, Coroutine, Generic, TypeVar +from rainbowadn.core import * from ._mapper import * from ._reduce import * @@ -19,10 +20,20 @@ class MapReduce(Reduce[Element, Out], Generic[Element, Out, Mapped]): self.reduce_mapped = reduce async def reduce(self, out: Out, element: Element) -> Out: - return await self.reduce_mapped.reduce(out, await self.mapper.map(element)) + return await self.reduce_mapped.reducec(lambda: aidentity(out), lambda: self.mapper.map(element)) - async def merge(self, left: Out, right: Out) -> Out: - return await self.reduce_mapped.merge(left, right) + async def reducec( + self, + outc: Callable[[], Coroutine[Any, Any, Out]], + elementc: Callable[[], Coroutine[Any, Any, Element]], + ) -> Out: + async def element(): + return await self.mapper.map(await elementc()) + + return await self.reduce_mapped.reducec(outc, lambda: element()) + + def merge(self, left: Out, right: Out) -> Out: + return self.reduce_mapped.merge(left, right) def loose(self) -> Reduce[Element, Out]: return self diff --git a/rainbowadn/flow/core/_purereduce.py b/rainbowadn/flow/core/_purereduce.py index 4d2f200..7d42160 100644 --- a/rainbowadn/flow/core/_purereduce.py +++ b/rainbowadn/flow/core/_purereduce.py @@ -12,7 +12,7 @@ class PureReduce(Reduce[Pure, Pure], Generic[Pure]): super().__init__(initial) async def reduce(self, out: Pure, element: Pure) -> Pure: - return await self.merge(out, element) + return self.merge(out, element) - async def merge(self, left: Pure, right: Pure) -> Pure: + def merge(self, left: Pure, right: Pure) -> Pure: raise NotImplementedError diff --git a/rainbowadn/flow/core/_reduce.py b/rainbowadn/flow/core/_reduce.py index 5e8afde..a6765f3 100644 --- a/rainbowadn/flow/core/_reduce.py +++ b/rainbowadn/flow/core/_reduce.py @@ -1,7 +1,9 @@ -from typing import Generic, TypeVar +from typing import Any, Callable, Coroutine, Generic, TypeVar +from rainbowadn.core import * __all__ = ('Reduce',) + Element = TypeVar('Element') Out = TypeVar('Out') @@ -13,5 +15,13 @@ class Reduce(Generic[Element, Out]): async def reduce(self, out: Out, element: Element) -> Out: raise NotImplementedError - async def merge(self, left: Out, right: Out) -> Out: + async def reducec( + self, + out2: Callable[[], Coroutine[Any, Any, Out]], + element2: Callable[[], Coroutine[Any, Any, Element]], + ) -> Out: + out, element = await gather(out2(), element2()) + return await self.reduce(out, element) + + def merge(self, left: Out, right: Out) -> Out: raise NotImplementedError diff --git a/rainbowadn/flow/stacked/_stackedreduce.py b/rainbowadn/flow/stacked/_stackedreduce.py index ced435f..d48efa1 100644 --- a/rainbowadn/flow/stacked/_stackedreduce.py +++ b/rainbowadn/flow/stacked/_stackedreduce.py @@ -24,8 +24,8 @@ class StackedReduce( assert isinstance(element, SequenceDispatcher) return await element.dispatch(StackedDispatch(self.stacked, out)) - async def merge(self, left: Out, right: Out) -> Out: - return await self.stacked.merge(left, right) + def merge(self, left: Out, right: Out) -> Out: + return self.stacked.merge(left, right) def loose(self) -> Reduce[SequenceDispatcher[Stacked, Out], Out]: return self diff --git a/rainbowadn/flow/verification/core/_verifyreduce.py b/rainbowadn/flow/verification/core/_verifyreduce.py index 321e15d..ead33a1 100644 --- a/rainbowadn/flow/verification/core/_verifyreduce.py +++ b/rainbowadn/flow/verification/core/_verifyreduce.py @@ -8,7 +8,7 @@ class VerifyReduce(PureReduce[bool]): def __init__(self): super().__init__(True) - async def merge(self, left: bool, right: bool) -> bool: + def merge(self, left: bool, right: bool) -> bool: assert_true(left) assert_true(right) return True diff --git a/rainbowadn/flow13/__init__.py b/rainbowadn/flow13/__init__.py index df5575f..ff9e82c 100644 --- a/rainbowadn/flow13/__init__.py +++ b/rainbowadn/flow13/__init__.py @@ -1,5 +1,6 @@ __all__ = ( 'BankBlock', + 'FlowBlock', 'FlowBlockFactory', 'FlowBlockVerification', 'FlowCheque', 'FlowIterate', 'FlowStandard', @@ -7,6 +8,7 @@ __all__ = ( ) from ._bankblock import BankBlock +from ._flowblock import FlowBlock, FlowBlockFactory, FlowBlockVerification from ._flowcheque import FlowCheque from ._flowiterate import FlowIterate from ._flowstandard import FlowStandard diff --git a/rainbowadn/flow13/_binaryflow.py b/rainbowadn/flow13/_binaryflow.py index 1abd543..87a7ba8 100644 --- a/rainbowadn/flow13/_binaryflow.py +++ b/rainbowadn/flow13/_binaryflow.py @@ -51,16 +51,20 @@ class BinaryReducerAction( left: Out key: KeyT right: Out + + async def reduce_left() -> Out: + return await self.reduce.reducec( + lambda: self.on(case.protocolizedl()), + lambda: aidentity(case.split.key), + ) + left, right = await gather( - self.on(case.protocolizedl()), + reduce_left(), self.on(case.protocolizedr()), ) - return await self.reduce.merge( - await self.reduce.reduce( - left, - case.split.key - ), + return self.reduce.merge( + left, right, ) @@ -189,5 +193,8 @@ class VerifySubsetAction( class VerifySubsetReduce( PureReduce[CheckResult] ): - async def merge(self, left: CheckResult, right: CheckResult) -> CheckResult: + def merge(self, left: CheckResult, right: CheckResult) -> CheckResult: return max(left, right) + + def loose(self) -> Reduce[CheckResult, CheckResult]: + return self diff --git a/rainbowadn/flow13/_flowcheque.py b/rainbowadn/flow13/_flowcheque.py index 6ebda9f..659f4c7 100644 --- a/rainbowadn/flow13/_flowcheque.py +++ b/rainbowadn/flow13/_flowcheque.py @@ -15,7 +15,7 @@ __all__ = ('FlowCheque',) class SumReduce(PureReduce[int]): - async def merge(self, left: int, right: int) -> int: + def merge(self, left: int, right: int) -> int: return left + right diff --git a/rainbowadn/flow13/_flowiterate.py b/rainbowadn/flow13/_flowiterate.py index 74e7268..25d7428 100644 --- a/rainbowadn/flow13/_flowiterate.py +++ b/rainbowadn/flow13/_flowiterate.py @@ -18,7 +18,7 @@ class FlowIterate( return wrap() - async def merge(self, left: Iterable[Element], right: Iterable[Element]) -> Iterable[Element]: + def merge(self, left: Iterable[Element], right: Iterable[Element]) -> Iterable[Element]: def wrap() -> Iterable[Element]: yield from left yield from right diff --git a/rainbowadn/instrument/__init__.py b/rainbowadn/instrument/__init__.py index 740123e..3ff3b66 100644 --- a/rainbowadn/instrument/__init__.py +++ b/rainbowadn/instrument/__init__.py @@ -1,10 +1,14 @@ __all__ = ( + 'Measure', + 'Terminated', 'Terminate', 'Concurrency', 'Counter', 'EntryExit', 'Instrumentation', ) +from ._measure import Measure +from ._terminate import Terminate, Terminated from .concurrency import Concurrency from .counter import Counter from .entryexit import EntryExit diff --git a/rainbowadn/testing/test_bridge.py b/rainbowadn/testing/test_bridge.py index df2e2c3..74b622c 100644 --- a/rainbowadn/testing/test_bridge.py +++ b/rainbowadn/testing/test_bridge.py @@ -42,7 +42,7 @@ class NoneReduce(Reduce[None, None]): async def reduce(self, out: None, element: None) -> None: return None - async def merge(self, left: None, right: None) -> None: + def merge(self, left: None, right: None) -> None: return None @@ -59,7 +59,7 @@ class PrintReduce(Reduce[tuple[Nullable[HashPoint], HashPoint], None]): ) return None - async def merge(self, left: None, right: None) -> None: + def merge(self, left: None, right: None) -> None: return None def loose(self) -> Reduce[tuple[Nullable[HashPoint], HashPoint], None]: