Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Sep 27, 2024
1 parent 103d40b commit 9350694
Showing 1 changed file with 35 additions and 37 deletions.
72 changes: 35 additions & 37 deletions tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ def iterate():
return timeit.timeit(iterate, number=1), res


def indentity_sleep(seconds: float) -> float:
time.sleep(seconds)
return seconds


async def async_indentity_sleep(seconds: float) -> float:
await asyncio.sleep(seconds)
return seconds


# simulates an I/0 bound function
slow_identity_duration = 0.01

Expand Down Expand Up @@ -336,59 +346,47 @@ def test_map(self, concurrency) -> None:

@parameterized.expand(
[
[True, identity, 2],
[False, sorted, 1.1],
]
)
def test_unordered_foreach(
self,
ordered: bool,
order_mutation: Callable[[Iterable[float]], Iterable[float]],
expected_duration,
):
seconds = [1, 0.1, 1]
duration, res = timestream(
Stream(seconds).foreach(time.sleep, ordered=ordered, concurrency=2)
)
self.assertListEqual(
res,
list(order_mutation(seconds)),
msg="`foreach` must respect `ordered` constraint.",
)

self.assertAlmostEqual(
duration,
expected_duration,
msg="unordered `foreach` improves runtime by avoiding bottlenecks",
delta=0.03,
)

@parameterized.expand(
[
[True, identity, 2],
[False, sorted, 1.1],
[
ordered,
order_mutation,
expected_duration,
operation,
func,
]
for ordered, order_mutation, expected_duration in [
(True, identity, 0.2),
(False, sorted, 0.11),
]
for operation, func in [
(Stream.foreach, time.sleep),
(Stream.map, indentity_sleep),
(Stream.aforeach, asyncio.sleep),
(Stream.amap, async_indentity_sleep),
]
]
)
def test_unordered_aforeach(
def test_mapping_ordering(
self,
ordered: bool,
order_mutation: Callable[[Iterable[float]], Iterable[float]],
expected_duration,
expected_duration: float,
operation,
func,
):
seconds = [1, 0.1, 1]
seconds = [0.1, 0.01, 0.1]
duration, res = timestream(
Stream(seconds).aforeach(asyncio.sleep, ordered=ordered, concurrency=2)
operation(Stream(seconds), func, ordered=ordered, concurrency=2)
)
self.assertListEqual(
res,
list(order_mutation(seconds)),
msg="`aforeach` must respect `ordered` constraint.",
msg=f"`{operation}` must respect `ordered` constraint.",
)

self.assertAlmostEqual(
duration,
expected_duration,
msg="unordered `aforeach` improves runtime by avoiding bottlenecks",
msg=f"{'ordered' if ordered else 'unordered'} `{operation}` should reflect that unordering improves runtime by avoiding bottlenecks",
delta=0.03,
)

Expand Down

0 comments on commit 9350694

Please sign in to comment.