Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrentMappingIterable: introduce futuretools.FutureResultCollection to optimize FDFO concurrent mapping (closes #20) #21

Merged
merged 5 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions streamable/futuretools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import asyncio
from abc import ABC, abstractmethod
from asyncio import AbstractEventLoop
from collections import deque
from concurrent.futures import Future
from multiprocessing import Queue
from typing import Deque, Iterator, Sized, TypeVar

T = TypeVar("T")


class FutureResultCollection(Iterator[T], Sized, ABC):
"""
Iterator over added futures' results. Supports adding new futures after iteration started.
"""

@abstractmethod
def add_future(self, future: "Future[T]") -> None: ...


class DequeFutureResultCollection(FutureResultCollection[T]):
def __init__(self) -> None:
self._futures: Deque["Future[T]"] = deque()

def __len__(self) -> int:
return len(self._futures)

def add_future(self, future: "Future[T]") -> None:
return self._futures.append(future)


class CallbackFutureResultCollection(FutureResultCollection[T]):
def __init__(self) -> None:
self._n_futures = 0

def __len__(self) -> int:
return self._n_futures

@abstractmethod
def _done_callback(self, future: "Future[T]") -> None: ...

def add_future(self, future: "Future[T]") -> None:
future.add_done_callback(self._done_callback)
self._n_futures += 1


class FIFOThreadFutureResultCollection(DequeFutureResultCollection[T]):
"""
First In First Out
"""

def __next__(self) -> T:
return self._futures.popleft().result()


class FDFOThreadFutureResultCollection(CallbackFutureResultCollection[T]):
"""
First Done First Out
"""

def __init__(self) -> None:
super().__init__()
self._results: "Queue[T]" = Queue()

def _done_callback(self, future: "Future[T]") -> None:
self._results.put(future.result())

def __next__(self) -> T:
self._n_futures -= 1
return self._results.get()


class FIFOAsyncFutureResultCollection(DequeFutureResultCollection[T]):
"""
First In First Out
"""

def __init__(self, loop: AbstractEventLoop) -> None:
super().__init__()
self._loop = loop

def __next__(self) -> T:
return self._loop.run_until_complete(self._futures.popleft()) # type: ignore


class FDFOAsyncFutureResultCollection(CallbackFutureResultCollection[T]):
"""
First Done First Out
"""

def __init__(self, loop: AbstractEventLoop) -> None:
super().__init__()
self._loop = loop
self._waiter: asyncio.futures.Future[T] = self._loop.create_future()

def _done_callback(self, future: "Future[T]") -> None:
self._waiter.set_result(future.result())

def __next__(self) -> T:
self._n_futures -= 1
result = self._loop.run_until_complete(self._waiter)
self._waiter = self._loop.create_future()
return result
102 changes: 43 additions & 59 deletions streamable/iters.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
import asyncio
import time
from abc import ABC, abstractmethod
from asyncio import AbstractEventLoop, get_event_loop
from collections import defaultdict, deque
from concurrent.futures import (
FIRST_COMPLETED,
Executor,
Future,
ThreadPoolExecutor,
wait,
)
from contextlib import contextmanager
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from contextlib import contextmanager, suppress
from datetime import datetime
from math import ceil
from typing import (
Expand All @@ -36,6 +29,13 @@
T = TypeVar("T")
U = TypeVar("U")

from streamable.futuretools import (
FDFOAsyncFutureResultCollection,
FDFOThreadFutureResultCollection,
FIFOAsyncFutureResultCollection,
FIFOThreadFutureResultCollection,
FutureResultCollection,
)
from streamable.util import NO_REPLACEMENT, NoopStopIteration, get_logger, reraise_as


Expand Down Expand Up @@ -342,7 +342,7 @@ class ConcurrentMappingIterable(
"""
Template Method Pattern:
This abstract class's `__iter__` is a skeleton for a queue-based concurrent mapping algorithm
that relies on abstract helper methods (`_context_manager`, `_launch_future`, `_get_future_result`)
that relies on abstract helper methods (`_context_manager`, `_create_future`, `_future_result_collection`)
that must be implemented by concrete subclasses.
"""

Expand All @@ -360,39 +360,31 @@ def __init__(
def _context_manager(self) -> ContextManager: ...

@abstractmethod
def _launch_future(
def _create_future(
self, elem: T
) -> "Future[Union[U, RaisingIterator.ExceptionContainer]]": ...

# factory method
@abstractmethod
def _next_yield(
self, futures: "Deque[Future[Union[U, RaisingIterator.ExceptionContainer]]]"
) -> Union[U, RaisingIterator.ExceptionContainer]: ...
def _future_result_collection(
self,
) -> FutureResultCollection[Union[U, RaisingIterator.ExceptionContainer]]: ...

def __iter__(self) -> Iterator[Union[U, RaisingIterator.ExceptionContainer]]:
with self._context_manager():
futures: Deque["Future[Union[U, RaisingIterator.ExceptionContainer]]"] = (
deque()
)
to_yield: Deque[Union[U, RaisingIterator.ExceptionContainer]] = deque(
maxlen=1
)
# wait, queue, yield (FIFO)
while True:
if futures:
to_yield.append(self._next_yield(futures))
# queue tasks up to buffer_size
while len(futures) < self.buffer_size:
try:
elem = next(self.iterator)
except StopIteration:
# the upstream iterator is exhausted
break
futures.append(self._launch_future(elem))
if to_yield:
yield to_yield.pop()
if not futures:
break
future_results = self._future_result_collection()

# queue tasks up to buffer_size
with suppress(StopIteration):
while len(future_results) < self.buffer_size:
future_results.add_future(self._create_future(next(self.iterator)))

# wait, queue, yield
while future_results:
result = next(future_results)
with suppress(StopIteration):
future_results.add_future(self._create_future(next(self.iterator)))
yield result


class ThreadConcurrentMappingIterable(ConcurrentMappingIterable[T, U]):
Expand Down Expand Up @@ -421,21 +413,18 @@ def _safe_transformation(
except Exception as e:
return RaisingIterator.ExceptionContainer(e)

def _launch_future(
def _create_future(
self, elem: T
) -> "Future[Union[U, RaisingIterator.ExceptionContainer]]":
return self.executor.submit(self._safe_transformation, elem)

def _next_yield(
self, futures: "Deque[Future[Union[U, RaisingIterator.ExceptionContainer]]]"
) -> Union[U, RaisingIterator.ExceptionContainer]:
def _future_result_collection(
self,
) -> FutureResultCollection[Union[U, RaisingIterator.ExceptionContainer]]:
if self.ordered:
return futures.popleft().result()
return FIFOThreadFutureResultCollection()
else:
done_futures, _ = wait(futures, return_when=FIRST_COMPLETED)
done_future = next(iter(done_futures))
futures.remove(done_future)
return done_future.result()
return FDFOThreadFutureResultCollection()


class AsyncConcurrentMappingIterable(ConcurrentMappingIterable[T, U]):
Expand All @@ -448,11 +437,11 @@ def __init__(
) -> None:
super().__init__(iterator, buffer_size, ordered)
self.transformation = transformation
self.loop: AbstractEventLoop
self._loop: AbstractEventLoop

@contextmanager
def _context_manager(self):
self.loop = get_event_loop()
self._loop = get_event_loop()
yield

async def _safe_transformation(
Expand All @@ -468,26 +457,21 @@ async def _safe_transformation(
except Exception as e:
return RaisingIterator.ExceptionContainer(e)

def _launch_future(
def _create_future(
self, elem: T
) -> "Future[Union[U, RaisingIterator.ExceptionContainer]]":
return cast(
"Future[Union[U, RaisingIterator.ExceptionContainer]]",
self.loop.create_task(self._safe_transformation(elem)),
self._loop.create_task(self._safe_transformation(elem)),
)

def _next_yield(
self, futures: "Deque[Future[Union[U, RaisingIterator.ExceptionContainer]]]"
) -> Union[U, RaisingIterator.ExceptionContainer]:
def _future_result_collection(
self,
) -> FutureResultCollection[Union[U, RaisingIterator.ExceptionContainer]]:
if self.ordered:
return self.loop.run_until_complete(futures.popleft()) # type: ignore
return FIFOAsyncFutureResultCollection(self._loop)
else:
done_futures, _ = self.loop.run_until_complete(
asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED) # type: ignore
)
done_future = next(iter(done_futures))
futures.remove(done_future)
return done_future.result()
return FDFOAsyncFutureResultCollection(self._loop)


class ConcurrentFlatteningIterable(
Expand Down
27 changes: 23 additions & 4 deletions tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
R = TypeVar("R")


def timestream(stream: Stream[T]) -> Tuple[float, List[T]]:
def timestream(stream: Stream[T], times: int = 1) -> Tuple[float, List[T]]:
res: List[T] = []

def iterate():
nonlocal res
res = list(stream)

return timeit.timeit(iterate, number=1), res
return timeit.timeit(iterate, number=times) / times, res


def identity_sleep(seconds: float) -> float:
Expand Down Expand Up @@ -344,6 +344,24 @@ def test_map(self, concurrency) -> None:
msg="At any concurrency the `map` method should act as the builtin map function, transforming elements while preserving input elements order.",
)

@parameterized.expand(
[
[16, 0],
[1, 0],
[16, 1],
[16, 15],
[16, 16],
]
)
def test_map_with_more_concurrency_than_elements(
self, concurrency, n_elems
) -> None:
self.assertListEqual(
list(Stream(range(n_elems)).map(str, concurrency=concurrency)),
list(map(str, range(n_elems))),
msg="`map` method should act correctly when concurrency > number of elements.",
)

@parameterized.expand(
[
[
Expand Down Expand Up @@ -375,7 +393,8 @@ def test_mapping_ordering(
):
seconds = [0.1, 0.01, 0.2]
duration, res = timestream(
operation(Stream(seconds), func, ordered=ordered, concurrency=2)
operation(Stream(seconds), func, ordered=ordered, concurrency=2),
5,
)
self.assertListEqual(
res,
Expand All @@ -387,7 +406,7 @@ def test_mapping_ordering(
duration,
expected_duration,
msg=f"{'ordered' if ordered else 'unordered'} `{operation}` should reflect that unordering improves runtime by avoiding bottlenecks",
delta=0.03,
delta=expected_duration * 0.2,
)

@parameterized.expand(
Expand Down
Loading