diff --git a/tests/test_stream.py b/tests/test_stream.py index 0017dc5..3b8c545 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -39,6 +39,16 @@ def iterate(): return timeit.timeit(iterate, number=1), res +def indentity_sleep(seconds: float) -> float: + time.sleep(seconds) + return seconds + + +async def async_indentity_sleep(seconds: float) -> float: + await asyncio.sleep(seconds) + return seconds + + # simulates an I/0 bound function slow_identity_duration = 0.01 @@ -336,59 +346,47 @@ def test_map(self, concurrency) -> None: @parameterized.expand( [ - [True, identity, 2], - [False, sorted, 1.1], - ] - ) - def test_unordered_foreach( - self, - ordered: bool, - order_mutation: Callable[[Iterable[float]], Iterable[float]], - expected_duration, - ): - seconds = [1, 0.1, 1] - duration, res = timestream( - Stream(seconds).foreach(time.sleep, ordered=ordered, concurrency=2) - ) - self.assertListEqual( - res, - list(order_mutation(seconds)), - msg="`foreach` must respect `ordered` constraint.", - ) - - self.assertAlmostEqual( - duration, - expected_duration, - msg="unordered `foreach` improves runtime by avoiding bottlenecks", - delta=0.03, - ) - - @parameterized.expand( - [ - [True, identity, 2], - [False, sorted, 1.1], + [ + ordered, + order_mutation, + expected_duration, + operation, + func, + ] + for ordered, order_mutation, expected_duration in [ + (True, identity, 0.2), + (False, sorted, 0.11), + ] + for operation, func in [ + (Stream.foreach, time.sleep), + (Stream.map, indentity_sleep), + (Stream.aforeach, asyncio.sleep), + (Stream.amap, async_indentity_sleep), + ] ] ) - def test_unordered_aforeach( + def test_mapping_ordering( self, ordered: bool, order_mutation: Callable[[Iterable[float]], Iterable[float]], - expected_duration, + expected_duration: float, + operation, + func, ): - seconds = [1, 0.1, 1] + seconds = [0.1, 0.01, 0.1] duration, res = timestream( - Stream(seconds).aforeach(asyncio.sleep, ordered=ordered, concurrency=2) + operation(Stream(seconds), func, ordered=ordered, concurrency=2) ) self.assertListEqual( res, list(order_mutation(seconds)), - msg="`aforeach` must respect `ordered` constraint.", + msg=f"`{operation}` must respect `ordered` constraint.", ) self.assertAlmostEqual( duration, expected_duration, - msg="unordered `aforeach` improves runtime by avoiding bottlenecks", + msg=f"{'ordered' if ordered else 'unordered'} `{operation}` should reflect that unordering improves runtime by avoiding bottlenecks", delta=0.03, )