Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrentMappingIterable: introduce futuretools.FutureResultCollection to optimize FDFO concurrent mapping (closes #20) #21

Merged
merged 5 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions streamable/futuretools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import asyncio
from abc import ABC, abstractmethod
from asyncio import AbstractEventLoop
from collections import deque
from concurrent.futures import Future
from multiprocessing import Queue
from typing import Deque, Iterator, Sized, TypeVar

T = TypeVar("T")


class FutureResultCollection(Iterator[T], Sized, ABC):
"""
Iterator over added futures' results. Supports adding new futures after iteration started.
"""

@abstractmethod
def add_future(self, future: "Future[T]") -> None: ...


class DequeFutureResultCollection(FutureResultCollection[T]):
def __init__(self) -> None:
self._futures: Deque["Future[T]"] = deque()

def __len__(self) -> int:
return len(self._futures)

def add_future(self, future: "Future[T]") -> None:
return self._futures.append(future)


class CounterFutureResultCollection(FutureResultCollection[T]):
def __init__(self) -> None:
self._n_futures = 0

def __len__(self) -> int:
return self._n_futures

def add_future(self, future: "Future[T]") -> None:
self._n_futures += 1


class FIFOThreadFutureResultCollection(DequeFutureResultCollection[T]):
"""
First In First Out
"""

def __next__(self) -> T:
return self._futures.popleft().result()


class FDFOThreadFutureResultCollection(CounterFutureResultCollection[T]):
"""
First Done First Out
"""

def __init__(self) -> None:
super().__init__()
self._results: "Queue[T]" = Queue()

def _done_callback(self, future: "Future[T]") -> None:
self._results.put(future.result())

def add_future(self, future: "Future[T]") -> None:
future.add_done_callback(lambda f: self._done_callback(f))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just:

 future.add_done_callback(self._done_callback)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch! (no reason, I dragged this lambda from previous iterations 🤦🏻)

super().add_future(future)

def __next__(self) -> T:
self._n_futures -= 1
return self._results.get()


class FIFOAsyncFutureResultCollection(DequeFutureResultCollection[T]):
"""
First In First Out
"""

def __init__(self, loop: AbstractEventLoop) -> None:
super().__init__()
self._loop = loop

def __next__(self) -> T:
return self._loop.run_until_complete(self._futures.popleft()) # type: ignore


class FDFOAsyncFutureResultCollection(CounterFutureResultCollection[T]):
"""
First Done First Out
"""

def __init__(self, loop: AbstractEventLoop) -> None:
super().__init__()
self._loop = loop
self._waiter: asyncio.futures.Future[T] = self._loop.create_future()

def add_future(self, future: "Future[T]") -> None:
future.add_done_callback(lambda f: self._waiter.set_result(f.result()))
super().add_future(future)

def __next__(self) -> T:
self._n_futures -= 1
result = self._loop.run_until_complete(self._waiter)
self._waiter = self._loop.create_future()
return result
102 changes: 45 additions & 57 deletions streamable/iters.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
import asyncio
import time
from abc import ABC, abstractmethod
from asyncio import AbstractEventLoop, get_event_loop
from collections import defaultdict, deque
from concurrent.futures import (
FIRST_COMPLETED,
Executor,
Future,
ThreadPoolExecutor,
wait,
)
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from contextlib import contextmanager
from datetime import datetime
from math import ceil
Expand All @@ -36,6 +29,13 @@
T = TypeVar("T")
U = TypeVar("U")

from streamable.futuretools import (
FDFOAsyncFutureResultCollection,
FDFOThreadFutureResultCollection,
FIFOAsyncFutureResultCollection,
FIFOThreadFutureResultCollection,
FutureResultCollection,
)
from streamable.util import NO_REPLACEMENT, NoopStopIteration, get_logger, reraise_as


Expand Down Expand Up @@ -342,7 +342,7 @@ class ConcurrentMappingIterable(
"""
Template Method Pattern:
This abstract class's `__iter__` is a skeleton for a queue-based concurrent mapping algorithm
that relies on abstract helper methods (`_context_manager`, `_launch_future`, `_get_future_result`)
that relies on abstract helper methods (`_context_manager`, `_create_future`, `_future_result_collection`)
that must be implemented by concrete subclasses.
"""

Expand All @@ -360,40 +360,36 @@ def __init__(
def _context_manager(self) -> ContextManager: ...

@abstractmethod
def _launch_future(
def _create_future(
self, elem: T
) -> "Future[Union[U, RaisingIterator.ExceptionContainer]]": ...

# factory method
@abstractmethod
def _next_yield(
self, futures: "Deque[Future[Union[U, RaisingIterator.ExceptionContainer]]]"
) -> Union[U, RaisingIterator.ExceptionContainer]: ...
def _future_result_collection(
self,
) -> FutureResultCollection[Union[U, RaisingIterator.ExceptionContainer]]: ...

def __iter__(self) -> Iterator[Union[U, RaisingIterator.ExceptionContainer]]:
with self._context_manager():
futures: Deque["Future[Union[U, RaisingIterator.ExceptionContainer]]"] = (
deque()
)
to_yield: Deque[Union[U, RaisingIterator.ExceptionContainer]] = deque(
maxlen=1
)
# wait, queue, yield (FIFO)
while True:
if futures:
to_yield.append(self._next_yield(futures))
# queue tasks up to buffer_size
while len(futures) < self.buffer_size:
try:
elem = next(self.iterator)
except StopIteration:
# the upstream iterator is exhausted
break
futures.append(self._launch_future(elem))
if to_yield:
yield to_yield.pop()
if not futures:
future_results = self._future_result_collection()

# queue tasks up to buffer_size
while len(future_results) < self.buffer_size:
try:
future_results.add_future(self._create_future(next(self.iterator)))
except StopIteration:
break

# wait, queue, yield
while future_results:
result = next(future_results)
try:
future_results.add_future(self._create_future(next(self.iterator)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be a little bit cleaner with with suppress(StopIteration): ...

Also applies to the except above, by putting it outside the while.

Copy link
Owner Author

@ebonnal ebonnal Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh great!

except StopIteration:
pass
yield result


class ThreadConcurrentMappingIterable(ConcurrentMappingIterable[T, U]):
def __init__(
Expand Down Expand Up @@ -421,21 +417,18 @@ def _safe_transformation(
except Exception as e:
return RaisingIterator.ExceptionContainer(e)

def _launch_future(
def _create_future(
self, elem: T
) -> "Future[Union[U, RaisingIterator.ExceptionContainer]]":
return self.executor.submit(self._safe_transformation, elem)

def _next_yield(
self, futures: "Deque[Future[Union[U, RaisingIterator.ExceptionContainer]]]"
) -> Union[U, RaisingIterator.ExceptionContainer]:
def _future_result_collection(
self,
) -> FutureResultCollection[Union[U, RaisingIterator.ExceptionContainer]]:
if self.ordered:
return futures.popleft().result()
return FIFOThreadFutureResultCollection()
else:
done_futures, _ = wait(futures, return_when=FIRST_COMPLETED)
done_future = next(iter(done_futures))
futures.remove(done_future)
return done_future.result()
return FDFOThreadFutureResultCollection()


class AsyncConcurrentMappingIterable(ConcurrentMappingIterable[T, U]):
Expand All @@ -448,11 +441,11 @@ def __init__(
) -> None:
super().__init__(iterator, buffer_size, ordered)
self.transformation = transformation
self.loop: AbstractEventLoop
self._loop: AbstractEventLoop

@contextmanager
def _context_manager(self):
self.loop = get_event_loop()
self._loop = get_event_loop()
yield

async def _safe_transformation(
Expand All @@ -468,26 +461,21 @@ async def _safe_transformation(
except Exception as e:
return RaisingIterator.ExceptionContainer(e)

def _launch_future(
def _create_future(
self, elem: T
) -> "Future[Union[U, RaisingIterator.ExceptionContainer]]":
return cast(
"Future[Union[U, RaisingIterator.ExceptionContainer]]",
self.loop.create_task(self._safe_transformation(elem)),
self._loop.create_task(self._safe_transformation(elem)),
)

def _next_yield(
self, futures: "Deque[Future[Union[U, RaisingIterator.ExceptionContainer]]]"
) -> Union[U, RaisingIterator.ExceptionContainer]:
def _future_result_collection(
self,
) -> FutureResultCollection[Union[U, RaisingIterator.ExceptionContainer]]:
if self.ordered:
return self.loop.run_until_complete(futures.popleft()) # type: ignore
return FIFOAsyncFutureResultCollection(self._loop)
else:
done_futures, _ = self.loop.run_until_complete(
asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED) # type: ignore
)
done_future = next(iter(done_futures))
futures.remove(done_future)
return done_future.result()
return FDFOAsyncFutureResultCollection(self._loop)


class ConcurrentFlatteningIterable(
Expand Down
27 changes: 23 additions & 4 deletions tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
R = TypeVar("R")


def timestream(stream: Stream[T]) -> Tuple[float, List[T]]:
def timestream(stream: Stream[T], times: int = 1) -> Tuple[float, List[T]]:
res: List[T] = []

def iterate():
nonlocal res
res = list(stream)

return timeit.timeit(iterate, number=1), res
return timeit.timeit(iterate, number=times) / times, res


def identity_sleep(seconds: float) -> float:
Expand Down Expand Up @@ -344,6 +344,24 @@ def test_map(self, concurrency) -> None:
msg="At any concurrency the `map` method should act as the builtin map function, transforming elements while preserving input elements order.",
)

@parameterized.expand(
[
[16, 0],
[1, 0],
[16, 1],
[16, 15],
[16, 16],
]
)
def test_map_with_more_concurrency_than_elements(
self, concurrency, n_elems
) -> None:
self.assertListEqual(
list(Stream(range(n_elems)).map(str, concurrency=concurrency)),
list(map(str, range(n_elems))),
msg="`map` method should act correctly when concurrency > number of elements.",
)

@parameterized.expand(
[
[
Expand Down Expand Up @@ -375,7 +393,8 @@ def test_mapping_ordering(
):
seconds = [0.1, 0.01, 0.2]
duration, res = timestream(
operation(Stream(seconds), func, ordered=ordered, concurrency=2)
operation(Stream(seconds), func, ordered=ordered, concurrency=2),
5,
)
self.assertListEqual(
res,
Expand All @@ -387,7 +406,7 @@ def test_mapping_ordering(
duration,
expected_duration,
msg=f"{'ordered' if ordered else 'unordered'} `{operation}` should reflect that unordering improves runtime by avoiding bottlenecks",
delta=0.03,
delta=expected_duration * 0.2,
)

@parameterized.expand(
Expand Down
Loading