Skip to content

Commit

Permalink
.map/.foreach/.amap/.aforeach: introduce ordered: bool para…
Browse files Browse the repository at this point in the history
…m to optionally remove order preservation, no-op for `concurrency==1`

Co-authored-by: erezsh <erezshin@gmail.com>
  • Loading branch information
ebonnal and erezsh committed Sep 28, 2024
1 parent 2f0c1ec commit d905064
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 40 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ assert list(integer_strings) == ['0', '-1', '-2', '-3', '-4', '-5', '-6', '-7',
```

### thread-based concurrency
> Applies the transformation concurrently using a thread pool of size `concurrency` (preserving the order):
> Applies the transformation concurrently using a thread queue of size `concurrency`, and preserving the upstream order by default:
```python
import requests

Expand All @@ -119,7 +119,7 @@ assert list(pokemon_names) == ['bulbasaur', 'ivysaur', 'venusaur']
```

### async-based concurrency
> The sibling operation called `.amap` applies an async transformation (preserving the order):
> The sibling operation `.amap` applies an async function:
```python
import httpx
import asyncio
Expand Down Expand Up @@ -152,7 +152,7 @@ assert list(self_printing_integers) == list(integers) # triggers the printing
> Like `.map` it has an optional `concurrency: int` parameter.
### async-based concurrency
> Like `.map` it has a sibling operation `.aforeach` for async.
> Like `.map` it has a sibling `.aforeach` operation for async.
## `.filter`
> Keeps only the elements that satisfy a condition:
Expand Down
8 changes: 7 additions & 1 deletion streamable/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ def group(


def map(
transformation: Callable[[T], U], iterator: Iterator[T], concurrency: int = 1
transformation: Callable[[T], U],
iterator: Iterator[T],
concurrency: int = 1,
ordered: bool = True,
) -> Iterator[U]:
validate_iterator(iterator)
validate_concurrency(concurrency)
Expand All @@ -117,6 +120,7 @@ def map(
transformation,
concurrency=concurrency,
buffer_size=concurrency,
ordered=ordered,
)
)
)
Expand All @@ -126,6 +130,7 @@ def amap(
transformation: Callable[[T], Coroutine[Any, Any, U]],
iterator: Iterator[T],
concurrency: int = 1,
ordered: bool = True,
) -> Iterator[U]:
validate_iterator(iterator)
validate_concurrency(concurrency)
Expand All @@ -135,6 +140,7 @@ def amap(
iterator,
transformation,
buffer_size=concurrency,
ordered=ordered,
)
)
)
Expand Down
54 changes: 39 additions & 15 deletions streamable/iters.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import asyncio
import time
from abc import ABC, abstractmethod
from asyncio import AbstractEventLoop, Task, get_event_loop
from asyncio import AbstractEventLoop, get_event_loop
from collections import defaultdict, deque
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from concurrent.futures import (
FIRST_COMPLETED,
Executor,
Future,
ThreadPoolExecutor,
wait,
)
from contextlib import contextmanager
from datetime import datetime
from math import ceil
Expand Down Expand Up @@ -343,9 +350,11 @@ def __init__(
self,
iterator: Iterator[T],
buffer_size: int,
ordered: bool,
) -> None:
self.iterator = iterator
self.buffer_size = buffer_size
self.ordered = ordered

@abstractmethod
def _context_manager(self) -> ContextManager: ...
Expand All @@ -354,9 +363,10 @@ def _context_manager(self) -> ContextManager: ...
def _launch_future(
self, elem: T
) -> "Future[Union[U, RaisingIterator.ExceptionContainer]]": ...

@abstractmethod
def _get_future_result(
self, future: "Future[Union[U, RaisingIterator.ExceptionContainer]]"
def _next_yield(
self, futures: "Deque[Future[Union[U, RaisingIterator.ExceptionContainer]]]"
) -> Union[U, RaisingIterator.ExceptionContainer]: ...

def __iter__(self) -> Iterator[Union[U, RaisingIterator.ExceptionContainer]]:
Expand All @@ -370,7 +380,7 @@ def __iter__(self) -> Iterator[Union[U, RaisingIterator.ExceptionContainer]]:
# wait, queue, yield (FIFO)
while True:
if futures:
to_yield.append(self._get_future_result(futures.popleft()))
to_yield.append(self._next_yield(futures))
# queue tasks up to buffer_size
while len(futures) < self.buffer_size:
try:
Expand All @@ -392,8 +402,9 @@ def __init__(
transformation: Callable[[T], U],
concurrency: int,
buffer_size: int,
ordered: bool,
) -> None:
super().__init__(iterator, buffer_size)
super().__init__(iterator, buffer_size, ordered)
self.transformation = transformation
self.concurrency = concurrency
self.executor: Executor
Expand All @@ -415,10 +426,16 @@ def _launch_future(
) -> "Future[Union[U, RaisingIterator.ExceptionContainer]]":
return self.executor.submit(self._safe_transformation, elem)

def _get_future_result(
self, future: "Future[Union[U, RaisingIterator.ExceptionContainer]]"
def _next_yield(
self, futures: "Deque[Future[Union[U, RaisingIterator.ExceptionContainer]]]"
) -> Union[U, RaisingIterator.ExceptionContainer]:
return future.result()
if self.ordered:
return futures.popleft().result()
else:
done_futures, _ = wait(futures, return_when=FIRST_COMPLETED)
done_future = next(iter(done_futures))
futures.remove(done_future)
return done_future.result()


class AsyncConcurrentMappingIterable(ConcurrentMappingIterable[T, U]):
Expand All @@ -427,8 +444,9 @@ def __init__(
iterator: Iterator[T],
transformation: Callable[[T], Coroutine[Any, Any, U]],
buffer_size: int,
ordered: bool,
) -> None:
super().__init__(iterator, buffer_size)
super().__init__(iterator, buffer_size, ordered)
self.transformation = transformation
self.loop: AbstractEventLoop

Expand Down Expand Up @@ -458,12 +476,18 @@ def _launch_future(
self.loop.create_task(self._safe_transformation(elem)),
)

def _get_future_result(
self, future: "Future[Union[U, RaisingIterator.ExceptionContainer]]"
def _next_yield(
self, futures: "Deque[Future[Union[U, RaisingIterator.ExceptionContainer]]]"
) -> Union[U, RaisingIterator.ExceptionContainer]:
return self.loop.run_until_complete(
cast("Task[Union[U, RaisingIterator.ExceptionContainer]]", future)
)
if self.ordered:
return self.loop.run_until_complete(futures.popleft()) # type: ignore
else:
done_futures, _ = self.loop.run_until_complete(
asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED) # type: ignore
)
done_future = next(iter(done_futures))
futures.remove(done_future)
return done_future.result()


class ConcurrentFlatteningIterable(
Expand Down
49 changes: 37 additions & 12 deletions streamable/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,37 +229,41 @@ def foreach(
self,
effect: Callable[[T], Any],
concurrency: int = 1,
ordered: bool = True,
) -> "Stream[T]":
"""
For each upstream element, yields it after having called `effect` on it.
If `effect(elem)` throws an exception then it will be thrown and `elem` will not be yielded.
Args:
effect (Callable[[T], Any]): The function to be applied to each element as a side effect.
concurrency (int): The number of threads used to concurrently apply the `effect` (default is 1, meaning no concurrency). Preserves the upstream order.
concurrency (int): The number of threads used to concurrently apply the `effect` (default is 1, meaning no concurrency).
ordered (bool): Whether to preserve the order of elements or yield them as soon as they are processed when `concurrency` > 1 (default preserves order).
Returns:
Stream[T]: A stream of upstream elements, unchanged.
"""
validate_concurrency(concurrency)
return ForeachStream(self, effect, concurrency)
return ForeachStream(self, effect, concurrency, ordered)

def aforeach(
self,
effect: Callable[[T], Coroutine],
concurrency: int = 1,
ordered: bool = True,
) -> "Stream[T]":
"""
For each upstream element, yields it after having called the asynchronous `effect` on it.
If the `effect(elem)` coroutine throws an exception then it will be thrown and `elem` will not be yielded.
Args:
effect (Callable[[T], Any]): The asynchronous function to be applied to each element as a side effect.
concurrency (int): How many asyncio tasks will run at the same time. Preserves the upstream order.
concurrency (int): How many asyncio tasks will run at the same time.
ordered (bool): Whether to preserve the order of elements or yield them as soon as they are processed when `concurrency` > 1 (default preserves order).
Returns:
Stream[T]: A stream of upstream elements, unchanged.
"""
validate_concurrency(concurrency)
return AForeachStream(self, effect, concurrency)
return AForeachStream(self, effect, concurrency, ordered)

def group(
self,
Expand Down Expand Up @@ -290,35 +294,39 @@ def map(
self,
transformation: Callable[[T], U],
concurrency: int = 1,
ordered: bool = True,
) -> "Stream[U]":
"""
Applies `transformation` on upstream elements and yields the results.
Args:
transformation (Callable[[T], R]): The function to be applied to each element.
concurrency (int): The number of threads used to concurrently apply `transformation` (default is 1, meaning no concurrency). Preserves the upstream order.
concurrency (int): The number of threads used to concurrently apply `transformation` (default is 1, meaning no concurrency).
ordered (bool): Whether to preserve the order of elements or yield them as soon as they are processed when `concurrency` > 1 (default preserves order).
Returns:
Stream[R]: A stream of results of `transformation` applied to upstream elements.
"""
validate_concurrency(concurrency)
return MapStream(self, transformation, concurrency)
return MapStream(self, transformation, concurrency, ordered)

def amap(
self,
transformation: Callable[[T], Coroutine[Any, Any, U]],
concurrency: int = 1,
ordered: bool = True,
) -> "Stream[U]":
"""
Applies the asynchrounous `transformation` on upstream elements and yields the results in order.
Applies the asynchrounous `transformation` on upstream elements and yields the results.
Args:
transformation (Callable[[T], Coroutine[Any, Any, U]]): The asynchronous function to be applied to each element.
concurrency (int): How many asyncio tasks will run at the same time. Preserves the upstream order.
concurrency (int): How many asyncio tasks will run at the same time.
ordered (bool): Whether to preserve the order of elements or yield them as soon as they are processed when `concurrency` > 1 (default preserves order).
Returns:
Stream[R]: A stream of results of `transformation` applied to upstream elements.
"""
validate_concurrency(concurrency)
return AMapStream(self, transformation, concurrency)
return AMapStream(self, transformation, concurrency, ordered)

def observe(self, what: str = "elements") -> "Stream[T]":
"""
Expand Down Expand Up @@ -434,23 +442,33 @@ def accept(self, visitor: "Visitor[V]") -> V:

class ForeachStream(DownStream[T, T]):
def __init__(
self, upstream: Stream[T], effect: Callable[[T], Any], concurrency: int
self,
upstream: Stream[T],
effect: Callable[[T], Any],
concurrency: int,
ordered: bool,
) -> None:
super().__init__(upstream)
self._effect = effect
self._concurrency = concurrency
self._ordered = ordered

def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_foreach_stream(self)


class AForeachStream(DownStream[T, T]):
def __init__(
self, upstream: Stream[T], effect: Callable[[T], Coroutine], concurrency: int
self,
upstream: Stream[T],
effect: Callable[[T], Coroutine],
concurrency: int,
ordered: bool,
) -> None:
super().__init__(upstream)
self._effect = effect
self._concurrency = concurrency
self._ordered = ordered

def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_aforeach_stream(self)
Expand All @@ -475,11 +493,16 @@ def accept(self, visitor: "Visitor[V]") -> V:

class MapStream(DownStream[T, U]):
def __init__(
self, upstream: Stream[T], transformation: Callable[[T], U], concurrency: int
self,
upstream: Stream[T],
transformation: Callable[[T], U],
concurrency: int,
ordered: bool,
) -> None:
super().__init__(upstream)
self._transformation = transformation
self._concurrency = concurrency
self._ordered = ordered

def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_map_stream(self)
Expand All @@ -491,10 +514,12 @@ def __init__(
upstream: Stream[T],
transformation: Callable[[T], Coroutine[Any, Any, U]],
concurrency: int,
ordered: bool,
) -> None:
super().__init__(upstream)
self._transformation = transformation
self._concurrency = concurrency
self._ordered = ordered

def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_amap_stream(self)
Expand Down
4 changes: 4 additions & 0 deletions streamable/visitors/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def visit_foreach_stream(self, stream: ForeachStream[T]) -> Iterator[T]:
stream.upstream,
util.sidify(stream._effect),
stream._concurrency,
stream._ordered,
)
)

Expand All @@ -58,6 +59,7 @@ def visit_aforeach_stream(self, stream: AForeachStream[T]) -> Iterator[T]:
stream.upstream,
util.async_sidify(stream._effect),
stream._concurrency,
stream._ordered,
)
)

Expand All @@ -77,13 +79,15 @@ def visit_map_stream(self, stream: MapStream[U, T]) -> Iterator[T]:
stream._transformation,
stream.upstream.accept(IteratorVisitor[U]()),
concurrency=stream._concurrency,
ordered=stream._ordered,
)

def visit_amap_stream(self, stream: AMapStream[U, T]) -> Iterator[T]:
return functions.amap(
stream._transformation,
stream.upstream.accept(IteratorVisitor[U]()),
concurrency=stream._concurrency,
ordered=stream._ordered,
)

def visit_observe_stream(self, stream: ObserveStream[T]) -> Iterator[T]:
Expand Down
Loading

0 comments on commit d905064

Please sign in to comment.