From 51bdb737f634e22eb3ee6ccc1b7bbf53e5cc64e9 Mon Sep 17 00:00:00 2001 From: bonnal-enzo Date: Mon, 25 Dec 2023 22:24:08 +0100 Subject: [PATCH] map/do/flatten: rename n_threads -> concurrency --- README.md | 10 ++-- examples/xmas_comments_translation.py | 4 +- streamable/_stream.py | 60 +++++++++---------- streamable/_visit/_explanation.py | 6 +- streamable/_visit/_iter.py | 10 ++-- tests/test_legacy.py | 86 +++++++++++++-------------- tests/test_pipe.py | 18 +++--- 7 files changed, 97 insertions(+), 97 deletions(-) diff --git a/README.md b/README.md index 059fc2f..408c413 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ There are 2 kinds of operations: ```python odd_squares: Stream[int] = ( integers - .map(lambda x: x ** 2, n_threads=2) # transformation + .map(lambda x: x ** 2, concurrency=2) # transformation .filter(lambda x: x % 2 == 1) # transformation .slow(frequency=10) # control ) @@ -84,7 +84,7 @@ Defines the application of a function on parent elements. integer_strings: Stream[str] = integers.map(str) ``` -It has an optional `n_threads` parameter if you need to apply the function concurrently using multiple threads. +It has an optional `concurrency` parameter if you need to apply the function concurrently using multiple threads. ## `.do` Defines the application of a function on parent elements like `.map`, but the parent elements will be forwarded instead of the result of the function. @@ -93,7 +93,7 @@ Defines the application of a function on parent elements like `.map`, but the pa printed_integers: Stream[int] = integers.do(print) ``` -It also has an optional `n_threads` parameter. +It also has an optional `concurrency` parameter. ## `.filter` Defines the filtering of parent elements based on a predicate function. @@ -125,7 +125,7 @@ Defines the ungrouping of parent elements assuming that the parent elements are integers: Stream[int] = integer_batches.flatten() ``` -It also has an optional `n_threads` parameter to flatten concurrently several parent iterables. +It also has an optional `concurrency` parameter to flatten concurrently several parent iterables. ## `.chain` @@ -319,7 +319,7 @@ def integrate_pokemon_cards_into_bigquery( table="ingestion.pokemon_card", json_rows=cards_batch, ), - n_threads=2, + concurrency=2, ) # at this point we have a Stream[Sequence[Dict[str, Any]]] diff --git a/examples/xmas_comments_translation.py b/examples/xmas_comments_translation.py index 1fbdff6..7e0fff0 100644 --- a/examples/xmas_comments_translation.py +++ b/examples/xmas_comments_translation.py @@ -14,7 +14,7 @@ # by batch of 20 at a maximum rate of 50 batches per second .batch(size=20) .slow(frequency=50) - .map(translate.Client("en").translate, n_threads=4) + .map(translate.Client("en").translate, concurrency=4) .flatten() .map(itemgetter("translatedText")) .observe(what="comments translated in english") @@ -35,7 +35,7 @@ json={"text": comment}, auth=("foo", "bar"), ), - n_threads=2, + concurrency=2, ) .do(requests.Response.raise_for_status) .map(requests.Response.text) diff --git a/streamable/_stream.py b/streamable/_stream.py index 5209238..714b000 100644 --- a/streamable/_stream.py +++ b/streamable/_stream.py @@ -60,112 +60,112 @@ def _accept(self, visitor: "Visitor[V]") -> V: return visitor.visit_source_stream(self) @staticmethod - def sanitize_n_threads(n_threads: int): - if not isinstance(n_threads, int): + def sanitize_concurrency(concurrency: int): + if not isinstance(concurrency, int): raise TypeError( - f"n_threads should be an int but got '{n_threads}' of type {type(n_threads)}." + f"concurrency should be an int but got '{concurrency}' of type {type(concurrency)}." ) - if n_threads < 1: + if concurrency < 1: raise ValueError( - f"n_threads should be greater or equal to 1, but got {n_threads}." + f"concurrency should be greater or equal to 1, but got {concurrency}." ) def map( self, func: Callable[[T], R], - n_threads: int = 1, + concurrency: int = 1, ) -> "Stream[R]": """ Apply a function to each element of the Stream. Args: func (Callable[[T], R]): The function to be applied to each element. - n_threads (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used). + concurrency (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used). Returns: Stream[R]: A new Stream instance with elements resulting from applying the function to each element. """ - Stream.sanitize_n_threads(n_threads) - return MapStream(self, func, n_threads) + Stream.sanitize_concurrency(concurrency) + return MapStream(self, func, concurrency) def do( self, func: Callable[[T], Any], - n_threads: int = 1, + concurrency: int = 1, ) -> "Stream[T]": """ Run the func as side effect: the resulting Stream forwards the upstream elements after func execution's end. Args: func (Callable[[T], R]): The function to be applied to each element. - n_threads (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used). + concurrency (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used). Returns: Stream[T]: A new Stream instance with elements resulting from applying the function to each element. """ - Stream.sanitize_n_threads(n_threads) - return DoStream(self, func, n_threads) + Stream.sanitize_concurrency(concurrency) + return DoStream(self, func, concurrency) @overload def flatten( self: "Stream[Iterable[R]]", - n_threads: int = 1, + concurrency: int = 1, ) -> "Stream[R]": ... @overload def flatten( self: "Stream[Collection[R]]", - n_threads: int = 1, + concurrency: int = 1, ) -> "Stream[R]": ... @overload def flatten( self: "Stream[Stream[R]]", - n_threads: int = 1, + concurrency: int = 1, ) -> "Stream[R]": ... @overload def flatten( self: "Stream[Iterator[R]]", - n_threads: int = 1, + concurrency: int = 1, ) -> "Stream[R]": ... @overload def flatten( self: "Stream[List[R]]", - n_threads: int = 1, + concurrency: int = 1, ) -> "Stream[R]": ... @overload def flatten( self: "Stream[Sequence[R]]", - n_threads: int = 1, + concurrency: int = 1, ) -> "Stream[R]": ... @overload def flatten( self: "Stream[Set[R]]", - n_threads: int = 1, + concurrency: int = 1, ) -> "Stream[R]": ... def flatten( self: "Stream[Iterable[R]]", - n_threads: int = 1, + concurrency: int = 1, ) -> "Stream[R]": """ Flatten the elements of the Stream, which are assumed to be iterables, creating a new Stream with individual elements. Returns: Stream[R]: A new Stream instance with individual elements obtained by flattening the original elements. - n_threads (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used). + concurrency (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used). """ - Stream.sanitize_n_threads(n_threads) - return FlattenStream(self, n_threads) + Stream.sanitize_concurrency(concurrency) + return FlattenStream(self, concurrency) def chain(self, *others: "Stream[T]") -> "Stream[T]": """ @@ -345,20 +345,20 @@ def _accept(self, visitor: "Visitor[V]") -> V: class MapStream(Stream[Z], Generic[Y, Z]): - def __init__(self, upstream: Stream[Y], func: Callable[[Y], Z], n_threads: int): + def __init__(self, upstream: Stream[Y], func: Callable[[Y], Z], concurrency: int): self.upstream: Stream[Y] = upstream self.func = func - self.n_threads = n_threads + self.concurrency = concurrency def _accept(self, visitor: "Visitor[V]") -> V: return visitor.visit_map_stream(self) class DoStream(Stream[Y]): - def __init__(self, upstream: Stream[Y], func: Callable[[Y], Any], n_threads: int): + def __init__(self, upstream: Stream[Y], func: Callable[[Y], Any], concurrency: int): self.upstream: Stream[Y] = upstream self.func = func - self.n_threads = n_threads + self.concurrency = concurrency def _accept(self, visitor: "Visitor[V]") -> V: return visitor.visit_do_stream(self) @@ -375,9 +375,9 @@ def _accept(self, visitor: "Visitor[V]") -> V: class FlattenStream(Stream[Y]): - def __init__(self, upstream: Stream[Iterable[Y]], n_threads: int) -> None: + def __init__(self, upstream: Stream[Iterable[Y]], concurrency: int) -> None: self.upstream: Stream[Iterable[Y]] = upstream - self.n_threads = n_threads + self.concurrency = concurrency def _accept(self, visitor: "Visitor[V]") -> V: return visitor.visit_flatten_stream(self) diff --git a/streamable/_visit/_explanation.py b/streamable/_visit/_explanation.py index 299bb44..87c7a8f 100644 --- a/streamable/_visit/_explanation.py +++ b/streamable/_visit/_explanation.py @@ -69,17 +69,17 @@ def visit_source_stream(self, stream: _stream.Stream) -> Any: def visit_map_stream(self, stream: _stream.MapStream) -> Any: name = "Map" - descr = f"function {stream.func}, using {stream.n_threads} thread{'s' if stream.n_threads > 1 else ''}" + descr = f"function {stream.func}, using {stream.concurrency} thread{'s' if stream.concurrency > 1 else ''}" return self.visit_any_stream(stream, name, descr) def visit_do_stream(self, stream: _stream.DoStream) -> Any: name = "Do" - descr = f"side effects by applying a function {stream.func}, using {stream.n_threads} thread{'s' if stream.n_threads > 1 else ''}" + descr = f"side effects by applying a function {stream.func}, using {stream.concurrency} thread{'s' if stream.concurrency > 1 else ''}" return self.visit_any_stream(stream, name, descr) def visit_flatten_stream(self, stream: _stream.FlattenStream) -> Any: name = "Flatten" - descr = f"using {stream.n_threads} thread{'s' if stream.n_threads > 1 else ''}" + descr = f"using {stream.concurrency} thread{'s' if stream.concurrency > 1 else ''}" return self.visit_any_stream(stream, name, descr) def visit_filter_stream(self, stream: _stream.FilterStream) -> Any: diff --git a/streamable/_visit/_iter.py b/streamable/_visit/_iter.py index 592a5d5..dc736ee 100644 --- a/streamable/_visit/_iter.py +++ b/streamable/_visit/_iter.py @@ -20,11 +20,11 @@ def visit_map_stream(self, stream: _stream.MapStream[U, T]) -> Iterator[T]: stream.func, source=StopIteration, target=RuntimeError ) it: Iterator[U] = stream.upstream._accept(IteratorProducingVisitor[U]()) - if stream.n_threads == 1: + if stream.concurrency == 1: return map(func, it) else: return _concurrency.ThreadedMappingIteratorWrapper( - it, func, n_workers=stream.n_threads + it, func, n_workers=stream.concurrency ) def visit_do_stream(self, stream: _stream.DoStream[T]) -> Iterator[T]: @@ -32,16 +32,16 @@ def visit_do_stream(self, stream: _stream.DoStream[T]) -> Iterator[T]: _util.map_exception(stream.func, source=StopIteration, target=RuntimeError) ) return self.visit_map_stream( - _stream.MapStream(stream.upstream, func, stream.n_threads) + _stream.MapStream(stream.upstream, func, stream.concurrency) ) def visit_flatten_stream(self, stream: _stream.FlattenStream[T]) -> Iterator[T]: it = stream.upstream._accept(IteratorProducingVisitor[Iterable]()) - if stream.n_threads == 1: + if stream.concurrency == 1: return _core.FlatteningIteratorWrapper(it) else: return _concurrency.ThreadedFlatteningIteratorWrapper( - it, n_workers=stream.n_threads + it, n_workers=stream.concurrency ) def visit_chain_stream(self, stream: _stream.ChainStream[T]) -> Iterator[T]: diff --git a/tests/test_legacy.py b/tests/test_legacy.py index d709509..f62d5b6 100644 --- a/tests/test_legacy.py +++ b/tests/test_legacy.py @@ -63,14 +63,14 @@ def test_flatten_typing(self) -> None: c: Stream[str] = Stream("abc".__iter__).map(set).flatten() @parameterized.expand([[1], [2], [3]]) - def test_flatten(self, n_threads: int): - if n_threads == 1: + def test_flatten(self, concurrency: int): + if concurrency == 1: # test ordering self.assertListEqual( list( Stream(["Hello World", "Happy to be here :)"].__iter__) .map(str.split) - .flatten(n_threads=n_threads) + .flatten(concurrency=concurrency) ), ["Hello", "World", "Happy", "to", "be", "here", ":)"], ) @@ -78,16 +78,16 @@ def test_flatten(self, n_threads: int): set( Stream(["Hello World", "Happy to be here :)"].__iter__) .map(str.split) - .flatten(n_threads=n_threads) + .flatten(concurrency=concurrency) ), {"Hello", "World", "Happy", "to", "be", "here", ":)"}, ) self.assertEqual( sum( Stream([["1 2 3", "4 5 6"], ["7", "8 9 10"]].__iter__) - .flatten(n_threads=n_threads) + .flatten(concurrency=concurrency) .map(str.split) - .flatten(n_threads=n_threads) + .flatten(concurrency=concurrency) .map(int) ), 55, @@ -96,7 +96,7 @@ def test_flatten(self, n_threads: int): # test potential recursion issue with chained empty iters list( Stream([iter([]) for _ in range(2000)].__iter__).flatten( - n_threads=n_threads + concurrency=concurrency ) ) @@ -117,18 +117,18 @@ def test_flatten(self, n_threads: int): self.assertAlmostEqual( timeit.timeit( lambda: self.assertSetEqual( - set(Stream(streams.__iter__).flatten(n_threads=n_threads)), + set(Stream(streams.__iter__).flatten(concurrency=concurrency)), set(range(N)), ), number=1, ), len(streams) * single_stream_iteration_duration - / (1 if n_threads is None else n_threads), + / (1 if concurrency is None else concurrency), delta=DELTA * len(streams) * single_stream_iteration_duration - / (1 if n_threads is None else n_threads) + / (1 if concurrency is None else concurrency) + queue_get_timeout, ) @@ -139,7 +139,7 @@ def test_flatten(self, n_threads: int): next( iter( Stream([zeros(), zeros(), zeros()].__iter__).flatten( - n_threads=n_threads + concurrency=concurrency ) ) ), @@ -170,7 +170,7 @@ def raise_for_4(x): ) ) .map(iter) - .flatten(n_threads=n_threads) + .flatten(concurrency=concurrency) ) error_types = set() @@ -191,11 +191,11 @@ def store_error_types(error): # test rasing: self.assertRaises( ValueError, - lambda: list(Stream([map(int, "12-3")].__iter__).flatten(n_threads=n_threads)), # type: ignore + lambda: list(Stream([map(int, "12-3")].__iter__).flatten(concurrency=concurrency)), # type: ignore ) self.assertRaises( ValueError, - lambda: list(Stream(lambda: map(int, "-")).flatten(n_threads=n_threads)), # type: ignore + lambda: list(Stream(lambda: map(int, "-")).flatten(concurrency=concurrency)), # type: ignore ) def test_add(self) -> None: @@ -215,17 +215,17 @@ def test_add(self) -> None: ) @parameterized.expand([[1], [2], [3]]) - def test_map(self, n_threads: int): + def test_map(self, concurrency: int): func = lambda x: x**2 self.assertSetEqual( set( Stream(range(N).__iter__) - .map(ten_ms_identity, n_threads=n_threads) + .map(ten_ms_identity, concurrency=concurrency) .map(lambda x: x if 1 / x else x) - .map(func, n_threads=n_threads) + .map(func, concurrency=concurrency) .catch(ZeroDivisionError) .map( - ten_ms_identity, n_threads=n_threads + ten_ms_identity, concurrency=concurrency ) # check that the ZeroDivisionError is bypass the call to func ), set(map(func, range(1, N))), @@ -235,7 +235,7 @@ def test_map(self, n_threads: int): set( Stream(l.__iter__) .map(lambda l: iter(l)) - .map(next, n_threads=n_threads) + .map(next, concurrency=concurrency) .catch(RuntimeError) ), {1, 3}, @@ -247,12 +247,12 @@ def test_map_threading_bench(self) -> None: self.assertAlmostEqual( timestream(stream), TEN_MS * N, delta=DELTA * (TEN_MS * N) ) - n_threads = 2 - stream = Stream(range(N).__iter__).map(ten_ms_identity, n_threads=n_threads) + concurrency = 2 + stream = Stream(range(N).__iter__).map(ten_ms_identity, concurrency=concurrency) self.assertAlmostEqual( timestream(stream), - TEN_MS * N / n_threads, - delta=DELTA * (TEN_MS * N) / n_threads, + TEN_MS * N / concurrency, + delta=DELTA * (TEN_MS * N) / concurrency, ) def test_do(self) -> None: @@ -275,7 +275,7 @@ def func_with_side_effect(x): # with threads l.clear() self.assertSetEqual( - set(Stream(args.__iter__).do(func_with_side_effect, n_threads=2)), + set(Stream(args.__iter__).do(func_with_side_effect, concurrency=2)), set(args), ) self.assertSetEqual(set(l), set(map(func, args))) @@ -359,11 +359,11 @@ def store_errors(error): self.assertIsInstance(next(iter(errors)), ValueError) @parameterized.expand([[1], [2], [3]]) - def test_slow(self, n_threads: int): + def test_slow(self, concurrency: int): frequency = 64 stream = ( Stream(range(N).__iter__) - .map(ten_ms_identity, n_threads=n_threads) + .map(ten_ms_identity, concurrency=concurrency) .slow(frequency) ) self.assertAlmostEqual( @@ -382,7 +382,7 @@ def test_time(self) -> None: ) @parameterized.expand([[1], [2], [3]]) - def test_catch(self, n_threads: int): + def test_catch(self, concurrency: int): # ignore = True errors = set() @@ -393,7 +393,7 @@ def store_errors(error): self.assertSetEqual( set( Stream(["1", "r", "2"].__iter__) - .map(int, n_threads=n_threads) + .map(int, concurrency=concurrency) .catch( Exception, when=lambda error: "invalid literal for int() with base 10:" @@ -411,14 +411,14 @@ def store_errors(error): ValueError, lambda: list( Stream(["1", "r", "2"].__iter__) - .map(int, n_threads=n_threads) + .map(int, concurrency=concurrency) .catch(ValueError, when=lambda error: False) ), ) self.assertListEqual( list( Stream(["1", "r", "2"].__iter__) - .map(int, n_threads=n_threads) + .map(int, concurrency=concurrency) .catch( ValueError, when=lambda error: "invalid literal for int() with base 10:" @@ -432,7 +432,7 @@ def store_errors(error): self.assertListEqual( list( Stream(["1", "r", "2"].__iter__) - .map(int, n_threads=n_threads) + .map(int, concurrency=concurrency) .catch(TypeError) .catch(ValueError) .catch(TypeError) @@ -446,7 +446,7 @@ def store_errors(error): ValueError, lambda: list( Stream(["1", "r", "2"].__iter__) - .map(int, n_threads=n_threads) + .map(int, concurrency=concurrency) .catch(TypeError) .map(type) ), @@ -455,7 +455,7 @@ def store_errors(error): ValueError, lambda: list( Stream(["1", "r", "2"].__iter__) - .map(int, n_threads=n_threads) + .map(int, concurrency=concurrency) .catch(TypeError) .map(type) ), @@ -514,11 +514,11 @@ def test_partial_iteration(self) -> None: iter( Stream(([0] * N).__iter__) .slow(50) - .map(_util.identity, n_threads=2) + .map(_util.identity, concurrency=2) .slow(50) - .map(_util.identity, n_threads=2) + .map(_util.identity, concurrency=2) .slow(50) - .map(_util.identity, n_threads=2) + .map(_util.identity, concurrency=2) .slow(50) ) ) @@ -527,11 +527,11 @@ def test_partial_iteration(self) -> None: stream = ( Stream(([0] * N).__iter__) .slow(50) - .map(_util.identity, n_threads=2) + .map(_util.identity, concurrency=2) .slow(50) - .map(_util.identity, n_threads=2) + .map(_util.identity, concurrency=2) .slow(50) - .map(_util.identity, n_threads=2) + .map(_util.identity, concurrency=2) .slow(50) ) samples = list(itertools.islice(stream, n)) @@ -543,9 +543,9 @@ def test_invalid_source(self) -> None: self.assertRaises(TypeError, lambda: list(stream_ok_at_construction)) @parameterized.expand([[1], [2], [3]]) - def test_invalid_flatten_upstream(self, n_threads: int): + def test_invalid_flatten_upstream(self, concurrency: int): self.assertRaises( - TypeError, lambda: list(Stream(range(3).__iter__).flatten(n_threads=n_threads)) # type: ignore + TypeError, lambda: list(Stream(range(3).__iter__).flatten(concurrency=concurrency)) # type: ignore ) def test_planning_and_execution_decoupling(self) -> None: @@ -562,7 +562,7 @@ def test_generator_already_generating(self) -> None: iter((ten_ms_identity(x) for x in range(N))) for _ in range(3) ] self.assertEqual( - Counter(Stream(l.__iter__).flatten(n_threads=2)), + Counter(Stream(l.__iter__).flatten(concurrency=2)), Counter(list(range(N)) + list(range(N)) + list(range(N))), ) @@ -573,7 +573,7 @@ def test_explain(self) -> None: .map(lambda x: x) .batch(100) .observe("batches") - .flatten(n_threads=4) + .flatten(concurrency=4) .slow(64) .observe("slowed elems") .chain( diff --git a/tests/test_pipe.py b/tests/test_pipe.py index fb95524..096ba26 100644 --- a/tests/test_pipe.py +++ b/tests/test_pipe.py @@ -63,7 +63,7 @@ def test_explain(self) -> None: .map(lambda _: _) .batch(100) .observe("batches") - .flatten(n_threads=4) + .flatten(concurrency=4) .slow(64) .observe("stream #1 elements") .chain( @@ -118,22 +118,22 @@ def test_add(self) -> None: [Stream.flatten, []], ] ) - def test_sanitize_n_threads(self, method, args) -> None: + def test_sanitize_concurrency(self, method, args) -> None: stream = Stream(src) with self.assertRaises( TypeError, - msg=f"{method} should be raising TypeError for non-int n_threads.", + msg=f"{method} should be raising TypeError for non-int concurrency.", ): - method(stream, *args, n_threads="1") + method(stream, *args, concurrency="1") with self.assertRaises( - ValueError, msg=f"{method} should be raising ValueError for n_threads=0." + ValueError, msg=f"{method} should be raising ValueError for concurrency=0." ): - method(stream, *args, n_threads=0) + method(stream, *args, concurrency=0) - for n_threads in range(1, 10): + for concurrency in range(1, 10): self.assertIsInstance( - method(stream, *args, n_threads=n_threads), + method(stream, *args, concurrency=concurrency), Stream, - msg=f"it must be ok to call {method} with n_threads={n_threads}", + msg=f"it must be ok to call {method} with concurrency={concurrency}", )