Skip to content

Commit

Permalink
map/do/flatten: rename n_threads -> concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Dec 25, 2023
1 parent 7806626 commit 51bdb73
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 97 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ There are 2 kinds of operations:
```python
odd_squares: Stream[int] = (
integers
.map(lambda x: x ** 2, n_threads=2) # transformation
.map(lambda x: x ** 2, concurrency=2) # transformation
.filter(lambda x: x % 2 == 1) # transformation
.slow(frequency=10) # control
)
Expand Down Expand Up @@ -84,7 +84,7 @@ Defines the application of a function on parent elements.
integer_strings: Stream[str] = integers.map(str)
```

It has an optional `n_threads` parameter if you need to apply the function concurrently using multiple threads.
It has an optional `concurrency` parameter if you need to apply the function concurrently using multiple threads.

## `.do`
Defines the application of a function on parent elements like `.map`, but the parent elements will be forwarded instead of the result of the function.
Expand All @@ -93,7 +93,7 @@ Defines the application of a function on parent elements like `.map`, but the pa
printed_integers: Stream[int] = integers.do(print)
```

It also has an optional `n_threads` parameter.
It also has an optional `concurrency` parameter.

## `.filter`
Defines the filtering of parent elements based on a predicate function.
Expand Down Expand Up @@ -125,7 +125,7 @@ Defines the ungrouping of parent elements assuming that the parent elements are
integers: Stream[int] = integer_batches.flatten()
```

It also has an optional `n_threads` parameter to flatten concurrently several parent iterables.
It also has an optional `concurrency` parameter to flatten concurrently several parent iterables.

## `.chain`

Expand Down Expand Up @@ -319,7 +319,7 @@ def integrate_pokemon_cards_into_bigquery(
table="ingestion.pokemon_card",
json_rows=cards_batch,
),
n_threads=2,
concurrency=2,
)
# at this point we have a Stream[Sequence[Dict[str, Any]]]

Expand Down
4 changes: 2 additions & 2 deletions examples/xmas_comments_translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# by batch of 20 at a maximum rate of 50 batches per second
.batch(size=20)
.slow(frequency=50)
.map(translate.Client("en").translate, n_threads=4)
.map(translate.Client("en").translate, concurrency=4)
.flatten()
.map(itemgetter("translatedText"))
.observe(what="comments translated in english")
Expand All @@ -35,7 +35,7 @@
json={"text": comment},
auth=("foo", "bar"),
),
n_threads=2,
concurrency=2,
)
.do(requests.Response.raise_for_status)
.map(requests.Response.text)
Expand Down
60 changes: 30 additions & 30 deletions streamable/_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,112 +60,112 @@ def _accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_source_stream(self)

@staticmethod
def sanitize_n_threads(n_threads: int):
if not isinstance(n_threads, int):
def sanitize_concurrency(concurrency: int):
if not isinstance(concurrency, int):
raise TypeError(
f"n_threads should be an int but got '{n_threads}' of type {type(n_threads)}."
f"concurrency should be an int but got '{concurrency}' of type {type(concurrency)}."
)
if n_threads < 1:
if concurrency < 1:
raise ValueError(
f"n_threads should be greater or equal to 1, but got {n_threads}."
f"concurrency should be greater or equal to 1, but got {concurrency}."
)

def map(
self,
func: Callable[[T], R],
n_threads: int = 1,
concurrency: int = 1,
) -> "Stream[R]":
"""
Apply a function to each element of the Stream.
Args:
func (Callable[[T], R]): The function to be applied to each element.
n_threads (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used).
concurrency (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used).
Returns:
Stream[R]: A new Stream instance with elements resulting from applying the function to each element.
"""
Stream.sanitize_n_threads(n_threads)
return MapStream(self, func, n_threads)
Stream.sanitize_concurrency(concurrency)
return MapStream(self, func, concurrency)

def do(
self,
func: Callable[[T], Any],
n_threads: int = 1,
concurrency: int = 1,
) -> "Stream[T]":
"""
Run the func as side effect: the resulting Stream forwards the upstream elements after func execution's end.
Args:
func (Callable[[T], R]): The function to be applied to each element.
n_threads (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used).
concurrency (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used).
Returns:
Stream[T]: A new Stream instance with elements resulting from applying the function to each element.
"""
Stream.sanitize_n_threads(n_threads)
return DoStream(self, func, n_threads)
Stream.sanitize_concurrency(concurrency)
return DoStream(self, func, concurrency)

@overload
def flatten(
self: "Stream[Iterable[R]]",
n_threads: int = 1,
concurrency: int = 1,
) -> "Stream[R]":
...

@overload
def flatten(
self: "Stream[Collection[R]]",
n_threads: int = 1,
concurrency: int = 1,
) -> "Stream[R]":
...

@overload
def flatten(
self: "Stream[Stream[R]]",
n_threads: int = 1,
concurrency: int = 1,
) -> "Stream[R]":
...

@overload
def flatten(
self: "Stream[Iterator[R]]",
n_threads: int = 1,
concurrency: int = 1,
) -> "Stream[R]":
...

@overload
def flatten(
self: "Stream[List[R]]",
n_threads: int = 1,
concurrency: int = 1,
) -> "Stream[R]":
...

@overload
def flatten(
self: "Stream[Sequence[R]]",
n_threads: int = 1,
concurrency: int = 1,
) -> "Stream[R]":
...

@overload
def flatten(
self: "Stream[Set[R]]",
n_threads: int = 1,
concurrency: int = 1,
) -> "Stream[R]":
...

def flatten(
self: "Stream[Iterable[R]]",
n_threads: int = 1,
concurrency: int = 1,
) -> "Stream[R]":
"""
Flatten the elements of the Stream, which are assumed to be iterables, creating a new Stream with individual elements.
Returns:
Stream[R]: A new Stream instance with individual elements obtained by flattening the original elements.
n_threads (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used).
concurrency (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used).
"""
Stream.sanitize_n_threads(n_threads)
return FlattenStream(self, n_threads)
Stream.sanitize_concurrency(concurrency)
return FlattenStream(self, concurrency)

def chain(self, *others: "Stream[T]") -> "Stream[T]":
"""
Expand Down Expand Up @@ -345,20 +345,20 @@ def _accept(self, visitor: "Visitor[V]") -> V:


class MapStream(Stream[Z], Generic[Y, Z]):
def __init__(self, upstream: Stream[Y], func: Callable[[Y], Z], n_threads: int):
def __init__(self, upstream: Stream[Y], func: Callable[[Y], Z], concurrency: int):
self.upstream: Stream[Y] = upstream
self.func = func
self.n_threads = n_threads
self.concurrency = concurrency

def _accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_map_stream(self)


class DoStream(Stream[Y]):
def __init__(self, upstream: Stream[Y], func: Callable[[Y], Any], n_threads: int):
def __init__(self, upstream: Stream[Y], func: Callable[[Y], Any], concurrency: int):
self.upstream: Stream[Y] = upstream
self.func = func
self.n_threads = n_threads
self.concurrency = concurrency

def _accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_do_stream(self)
Expand All @@ -375,9 +375,9 @@ def _accept(self, visitor: "Visitor[V]") -> V:


class FlattenStream(Stream[Y]):
def __init__(self, upstream: Stream[Iterable[Y]], n_threads: int) -> None:
def __init__(self, upstream: Stream[Iterable[Y]], concurrency: int) -> None:
self.upstream: Stream[Iterable[Y]] = upstream
self.n_threads = n_threads
self.concurrency = concurrency

def _accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_flatten_stream(self)
Expand Down
6 changes: 3 additions & 3 deletions streamable/_visit/_explanation.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,17 @@ def visit_source_stream(self, stream: _stream.Stream) -> Any:

def visit_map_stream(self, stream: _stream.MapStream) -> Any:
name = "Map"
descr = f"function {stream.func}, using {stream.n_threads} thread{'s' if stream.n_threads > 1 else ''}"
descr = f"function {stream.func}, using {stream.concurrency} thread{'s' if stream.concurrency > 1 else ''}"
return self.visit_any_stream(stream, name, descr)

def visit_do_stream(self, stream: _stream.DoStream) -> Any:
name = "Do"
descr = f"side effects by applying a function {stream.func}, using {stream.n_threads} thread{'s' if stream.n_threads > 1 else ''}"
descr = f"side effects by applying a function {stream.func}, using {stream.concurrency} thread{'s' if stream.concurrency > 1 else ''}"
return self.visit_any_stream(stream, name, descr)

def visit_flatten_stream(self, stream: _stream.FlattenStream) -> Any:
name = "Flatten"
descr = f"using {stream.n_threads} thread{'s' if stream.n_threads > 1 else ''}"
descr = f"using {stream.concurrency} thread{'s' if stream.concurrency > 1 else ''}"
return self.visit_any_stream(stream, name, descr)

def visit_filter_stream(self, stream: _stream.FilterStream) -> Any:
Expand Down
10 changes: 5 additions & 5 deletions streamable/_visit/_iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,28 @@ def visit_map_stream(self, stream: _stream.MapStream[U, T]) -> Iterator[T]:
stream.func, source=StopIteration, target=RuntimeError
)
it: Iterator[U] = stream.upstream._accept(IteratorProducingVisitor[U]())
if stream.n_threads == 1:
if stream.concurrency == 1:
return map(func, it)
else:
return _concurrency.ThreadedMappingIteratorWrapper(
it, func, n_workers=stream.n_threads
it, func, n_workers=stream.concurrency
)

def visit_do_stream(self, stream: _stream.DoStream[T]) -> Iterator[T]:
func = _util.sidify(
_util.map_exception(stream.func, source=StopIteration, target=RuntimeError)
)
return self.visit_map_stream(
_stream.MapStream(stream.upstream, func, stream.n_threads)
_stream.MapStream(stream.upstream, func, stream.concurrency)
)

def visit_flatten_stream(self, stream: _stream.FlattenStream[T]) -> Iterator[T]:
it = stream.upstream._accept(IteratorProducingVisitor[Iterable]())
if stream.n_threads == 1:
if stream.concurrency == 1:
return _core.FlatteningIteratorWrapper(it)
else:
return _concurrency.ThreadedFlatteningIteratorWrapper(
it, n_workers=stream.n_threads
it, n_workers=stream.concurrency
)

def visit_chain_stream(self, stream: _stream.ChainStream[T]) -> Iterator[T]:
Expand Down
Loading

0 comments on commit 51bdb73

Please sign in to comment.