From 860192b922e910587b2394bcf5e2dd63518c9c28 Mon Sep 17 00:00:00 2001 From: bonnal-enzo Date: Mon, 25 Dec 2023 14:23:00 +0100 Subject: [PATCH] 0.0.1: streamable --- .github/workflows/test.yml | 8 +- .gitignore | 1 + README.md | 98 ++++---- examples/xmas_comments_translation.py | 6 +- iterable/__init__.py | 1 + {kioss => iterable}/_execution/__init__.py | 0 .../_execution/_concurrency.py | 4 +- {kioss => iterable}/_execution/_core.py | 2 +- kioss/_pipe.py => iterable/_stream.py | 224 +++++++++--------- {kioss => iterable}/_util.py | 2 +- {kioss => iterable}/_visit/__init__.py | 0 iterable/_visit/_base.py | 48 ++++ iterable/_visit/_explanation.py | 108 +++++++++ iterable/_visit/_iter.py | 80 +++++++ {kioss => iterable}/py.typed | 0 kioss/__init__.py | 1 - kioss/_visit/_base.py | 48 ---- kioss/_visit/_explanation.py | 108 --------- kioss/_visit/_iter.py | 80 ------- setup.py | 10 +- tests/test_legacy.py | 208 ++++++++-------- tests/test_pipe.py | 76 +++--- tests/test_util.py | 2 +- 23 files changed, 558 insertions(+), 557 deletions(-) create mode 100644 iterable/__init__.py rename {kioss => iterable}/_execution/__init__.py (100%) rename {kioss => iterable}/_execution/_concurrency.py (98%) rename {kioss => iterable}/_execution/_core.py (99%) rename kioss/_pipe.py => iterable/_stream.py (58%) rename {kioss => iterable}/_util.py (97%) rename {kioss => iterable}/_visit/__init__.py (100%) create mode 100644 iterable/_visit/_base.py create mode 100644 iterable/_visit/_explanation.py create mode 100644 iterable/_visit/_iter.py rename {kioss => iterable}/py.typed (100%) delete mode 100644 kioss/__init__.py delete mode 100644 kioss/_visit/_base.py delete mode 100644 kioss/_visit/_explanation.py delete mode 100644 kioss/_visit/_iter.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3efc3fe..f6410dc 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -31,7 +31,7 @@ jobs: - name: mypy run: | python -m pip install -r requirements.txt - python -m mypy kioss tests + python -m mypy iterable tests lint: runs-on: ubuntu-latest @@ -45,7 +45,7 @@ jobs: - name: checks run: | - # python -m autoflake --in-place --remove-all-unused-imports --remove-unused-variables --ignore-init-module -r kioss tests && python -m isort kioss tests && python -m black kioss tests + # python -m autoflake --in-place --remove-all-unused-imports --remove-unused-variables --ignore-init-module -r iterable tests && python -m isort iterable tests && python -m black iterable tests python -m pip install -r requirements.txt - python -m autoflake --in-place --remove-all-unused-imports --remove-unused-variables --ignore-init-module --check -r kioss tests - python -m black --check kioss/* tests/* + python -m autoflake --in-place --remove-all-unused-imports --remove-unused-variables --ignore-init-module --check -r iterable tests + python -m black --check iterable/* tests/* diff --git a/.gitignore b/.gitignore index a45beb0..2dd3fa8 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ __pycache__ tmp* build dist +.mypy_cache \ No newline at end of file diff --git a/README.md b/README.md index 1bc47cd..8935f4a 100644 --- a/README.md +++ b/README.md @@ -1,40 +1,40 @@ -# `kioss` -**Keep I/O Simple and Stupid** +# `iterable` +**Keep Iterables Simple and Stupid** -[![Actions Status](https://github.com/bonnal-enzo/kioss/workflows/test/badge.svg)](https://github.com/bonnal-enzo/kioss/actions) [![Actions Status](https://github.com/bonnal-enzo/kioss/workflows/PyPI/badge.svg)](https://github.com/bonnal-enzo/kioss/actions) +[![Actions Status](https://github.com/bonnal-enzo/iterable/workflows/test/badge.svg)](https://github.com/bonnal-enzo/iterable/actions) [![Actions Status](https://github.com/bonnal-enzo/iterable/workflows/PyPI/badge.svg)](https://github.com/bonnal-enzo/iterable/actions) -Ease the development of ETL/EL/ReverseETL scripts. +Ease the manipulation of `Iterable`s. ## 1. install ```bash -pip install kioss +pip install iterable ``` ## 2. import ```python -from kioss import Pipe +from iterable import Stream ``` ## 3. init ```python -integers: Pipe[int] = Pipe(source=lambda: range(10)) +integers: Stream[int] = Stream(source=lambda: range(10)) ``` -Instantiate a `Pipe` by providing a function that returns an `Iterable` (the data source). +Instantiate a `Stream` by providing a function that returns an `Iterable` (the data source). ## 4. declare operations -A `Pipe` is ***immutable***, meaning that applying an operation returns a new child pipe while the parent pipe remains unchanged. +A `Stream` is ***immutable***, meaning that applying an operation returns a new child stream while the parent stream remains unchanged. There are 2 kinds of operations: -- **transformations**: to act on the pipe's elements -- **controls**: to configure the behaviors of the iteration over the pipe +- **transformations**: to act on the stream's elements +- **controls**: to configure the behaviors of the iteration over the stream ```python -odd_squares: Pipe[int] = ( +odd_squares: Stream[int] = ( integers .map(lambda x: x ** 2, n_threads=2) # transformation .filter(lambda x: x % 2 == 1) # transformation @@ -45,7 +45,7 @@ All operations are described in the ***Operations guide*** section. ## 5. iterate -Once your pipe's declaration is done you can iterate over it. Our `Pipe[int]` being an `Iterable[int]`, you are free to iterate over it the way you want, e.g.: +Once your stream's declaration is done you can iterate over it. Our `Stream[int]` being an `Iterable[int]`, you are free to iterate over it the way you want, e.g.: ```python set(rate_limited_odd_squares) ``` @@ -57,10 +57,10 @@ for i in rate_limited_odd_squares: ... ``` -Alternatively, a pipe also exposes a convenient method `.run` to launch an iteration over itself until exhaustion. It catches exceptions occurring during iteration and optionnaly collects output elements into a list to return. At the end it raises if exceptions occurred. +Alternatively, a stream also exposes a convenient method `.iterate` to launch an iteration over itself until exhaustion. It catches exceptions occurring during iteration and optionnaly collects output elements into a list to return. At the end it raises if exceptions occurred. ```python -odd_squares: List[int] = rate_limited_odd_squares.run(collect_limit=1024) +odd_squares: List[int] = rate_limited_odd_squares.iterate(collect_limit=1024) assert odd_squares == [1, 9, 25, 49, 81] ``` @@ -73,7 +73,7 @@ assert odd_squares == [1, 9, 25, 49, 81] Let's keep the same example: ```python -integers = Pipe(lambda: range(10)) +integers = Stream(lambda: range(10)) ``` # Transformations @@ -82,7 +82,7 @@ integers = Pipe(lambda: range(10)) ## `.map` Defines the application of a function on parent elements. ```python -integer_strings: Pipe[str] = integers.map(str) +integer_strings: Stream[str] = integers.map(str) ``` It has an optional `n_threads` parameter if you need to apply the function concurrently using multiple threads. @@ -91,7 +91,7 @@ It has an optional `n_threads` parameter if you need to apply the function concu Defines the application of a function on parent elements like `.map`, but the parent elements will be forwarded instead of the result of the function. ```python -printed_integers: Pipe[int] = integers.do(print) +printed_integers: Stream[int] = integers.do(print) ``` It also has an optional `n_threads` parameter. @@ -100,7 +100,7 @@ It also has an optional `n_threads` parameter. Defines the filtering of parent elements based on a predicate function. ```python -pair_integers: Pipe[int] = integers.filter(lambda x: x % 2 == 0) +pair_integers: Stream[int] = integers.filter(lambda x: x % 2 == 0) ``` ## `.batch` @@ -108,13 +108,13 @@ pair_integers: Pipe[int] = integers.filter(lambda x: x % 2 == 0) Defines the grouping of parent elements into batches. ```python -integer_batches: Pipe[List[int]] = integers.batch(size=100, period=60) +integer_batches: Stream[List[int]] = integers.batch(size=100, period=60) ``` In this example a batch will be a list of 100 elements. It may contain less elements in the following cases: -- the pipe is exhausted +- the stream is exhausted - an exception occurred - more than 60 seconds (the `period` parameter) has elapsed since the last batch has been yielded. @@ -123,21 +123,21 @@ It may contain less elements in the following cases: Defines the ungrouping of parent elements assuming that the parent elements are `Iterable`s. ```python -integers: Pipe[int] = integer_batches.flatten() +integers: Stream[int] = integer_batches.flatten() ``` It also has an optional `n_threads` parameter to flatten concurrently several parent iterables. ## `.chain` -Defines the concatenation of the parent pipe with other pipes. The resulting pipe yields the elements of one pipe until it is exhausted and then moves to the next one. It starts with the pipe on which `.chain` is called. +Defines the concatenation of the parent stream with other streams. The resulting stream yields the elements of one stream until it is exhausted and then moves to the next one. It starts with the stream on which `.chain` is called. ```python -one_to_ten_integers: Pipe[int] = Pipe(lambda: range(1, 11)) -eleven_to_twenty_integers: Pipe[int] = Pipe(lambda: range(11, 21)) -twenty_one_to_thirty_integers: Pipe[int] = Pipe(lambda: range(21, 31)) +one_to_ten_integers: Stream[int] = Stream(lambda: range(1, 11)) +eleven_to_twenty_integers: Stream[int] = Stream(lambda: range(11, 21)) +twenty_one_to_thirty_integers: Stream[int] = Stream(lambda: range(21, 31)) -one_to_thirty_integers: Pipe[int] = one_to_ten_integers.chain( +one_to_thirty_integers: Stream[int] = one_to_ten_integers.chain( eleven_to_twenty_integers, twenty_one_to_thirty_integers, ) @@ -151,20 +151,20 @@ one_to_thirty_integers: Pipe[int] = one_to_ten_integers.chain( Defines a maximum rate at which parent elements will be yielded. ```python -slowed_integers: Pipe[int] = integers.slow(freq=2) +slowed_integers: Stream[int] = integers.slow(freq=2) ``` -The rate is expressed in elements per second, here a maximum of 2 elements per second will be yielded when iterating on the pipe. +The rate is expressed in elements per second, here a maximum of 2 elements per second will be yielded when iterating on the stream. ## `.observe` Defines that the iteration process will be logged. ```python -observed_slowed_integers: Pipe[int] = slowed_integers.observe(what="integers from 0 to 9") +observed_slowed_integers: Stream[int] = slowed_integers.observe(what="integers from 0 to 9") ``` -When iterating over the pipe, you should get an output like: +When iterating over the stream, you should get an output like: ``` INFO - iteration over 'integers from 0 to 9' will be logged. @@ -183,20 +183,20 @@ As you can notice the logs can never be overwhelming because they are produced l Defines that the provided type of exception will be catched. ```python -inverse_floats: Pipe[float] = integers.map(lambda x: 1/x) -safe_inverse_floats: Pipe[float] = inverse_floats.catch(ZeroDivisionError) +inverse_floats: Stream[float] = integers.map(lambda x: 1/x) +safe_inverse_floats: Stream[float] = inverse_floats.catch(ZeroDivisionError) ``` It has an optional `when` parameter: a function that takes the parent element as input and decides whether or not to catch the exception. --- -# ⭐ ***Typical use case for `kioss` in Data Engineering*** ⭐ +# ⭐ ***Typical use case in Data Engineering*** ⭐ ![](./img/dataeng.gif) As a data engineer, you often need to write python scripts to do **ETL** (*Extract* the data from a source API, *Transform* and *Load* it into the data warehouse) or **EL** (same but with minimal transformation) or **Reverse ETL** (read data from the data warehouse and post it into a destination API). -These scripts **do not manipulate huge volumes** of data because they are scheduled to run periodically (using orchestrators like *Airflow/DAGster/Prefect*) and only manipulates the data produced or updated during that period. At worst if you are *Amazon*-sized business you may need to process 10 millions payment transactions every 10 minutes. +These scripts **do not manipulate huge volumes** of data because they are scheduled to iterate periodically (using orchestrators like *Airflow/DAGster/Prefect*) and only manipulates the data produced or updated during that period. At worst if you are *Amazon*-sized business you may need to process 10 millions payment transactions every 10 minutes. These scripts tend to be replaced in part by EL tools like *Airbyte*, but sometimes you still need **custom integration logic**. @@ -222,15 +222,15 @@ These scripts are typically composed of: - The logic to **catch** exceptions of a given type. Also, we typically want to catch errors and seamlessly proceed with the integration until completion. For instance, if you have 1000 records to integrate and encounter an exception at the 236th record due to a malformed record, it is often more favorable to successfully integrate 999 records and raise after the interation has completed compared to skipping 763 valid records prematurely. -The ambition of `kioss` is to help us write these type of scripts in a **DRY** (Don't Repeat Yourself), **flexible**, **robust** and **readable** way. +The ambition of `iterable` is to help us write these type of scripts in a **DRY** (Don't Repeat Yourself), **flexible**, **robust** and **readable** way. -Let's delve into an example to gain a better understanding of what a job using `kioss` entails! +Let's delve into an example to gain a better understanding of what a job using `iterable` entails! ## 1. imports ```python import datetime import requests -from kioss import Pipe +from iterable import Stream from google.cloud import bigquery from typing import Iterable, Iterator, Dict, Any ``` @@ -275,11 +275,11 @@ Also let's init a BQ client: bq_client = bigquery.Client(project) ``` -## 4. pipe +## 4. stream Write your integration function. -Tip: Define your pipe between parentheses to be allowed to go to line between each operation. +Tip: Define your stream between parentheses to be allowed to go to line between each operation. ```python def integrate_pokemon_cards_into_bigquery( @@ -287,17 +287,17 @@ def integrate_pokemon_cards_into_bigquery( end_time: datetime.datetime, ) -> None: ( - Pipe(lambda: PokemonCardSource(start_time, end_time)) - # at this point we have a Pipe[List[Dict[str, Any]]] + Stream(lambda: PokemonCardSource(start_time, end_time)) + # at this point we have a Stream[List[Dict[str, Any]]] # Let's say pokemontcg.io rate limits us to 10 calls per second, - # let's keep a margin and slow our pipe down to 9. + # let's keep a margin and slow our stream down to 9. .slow(freq=9) .observe(what="pokemon cards page") # let's flatten the card page into individual cards .flatten() - # at this point we have a Pipe[Dict[str, Any]] + # at this point we have a Stream[Dict[str, Any]] # let's structure our row .map(lambda card: @@ -311,7 +311,7 @@ def integrate_pokemon_cards_into_bigquery( # Let's batch cards by 1000 for performant multi-rows insert. .batch(size=1000) - # at this point we have a Pipe[List[Dict[str, Any]]] + # at this point we have a Stream[List[Dict[str, Any]]] .observe(what="pokemon card batches") # Let's post the batches into BQ concurrently using 2 threads. @@ -322,17 +322,17 @@ def integrate_pokemon_cards_into_bigquery( ), n_threads=2, ) - # at this point we have a Pipe[Sequence[Dict[str, Any]]] + # at this point we have a Stream[Sequence[Dict[str, Any]]] # The insertion in bigquery returns a list of inserts results. # Let's raise if the insertion got errors. .flatten() .observe(what="bigquery insert results") - # at this point we have a Pipe[Dict[str, Any]] + # at this point we have a Stream[Dict[str, Any]] .do(raise_for_errors) - # iterate until no more card in the pipe and finally raises if errors occurred. - .run() + # iterate until no more card in the stream and finally raises if errors occurred. + .iterate() ) ``` diff --git a/examples/xmas_comments_translation.py b/examples/xmas_comments_translation.py index 9ea662e..5f736fe 100644 --- a/examples/xmas_comments_translation.py +++ b/examples/xmas_comments_translation.py @@ -2,11 +2,11 @@ import requests from google.cloud import bigquery, translate # type: ignore -from kioss import Pipe +from iterable import Stream ( # Read the comments made on your platform from your BigQuery datawarehouse - Pipe(bigquery.Client().query("SELECT text FROM fact.comment").result) + Stream(bigquery.Client().query("SELECT text FROM fact.comment").result) .map(itemgetter("text")) .observe(what="comments") @@ -40,5 +40,5 @@ .do(requests.Response.raise_for_status) .map(requests.Response.text) .observe(what="integration response's texts") - .run() + .iterate() ) diff --git a/iterable/__init__.py b/iterable/__init__.py new file mode 100644 index 0000000..9d6e9cd --- /dev/null +++ b/iterable/__init__.py @@ -0,0 +1 @@ +from iterable._stream import Stream diff --git a/kioss/_execution/__init__.py b/iterable/_execution/__init__.py similarity index 100% rename from kioss/_execution/__init__.py rename to iterable/_execution/__init__.py diff --git a/kioss/_execution/_concurrency.py b/iterable/_execution/_concurrency.py similarity index 98% rename from kioss/_execution/_concurrency.py rename to iterable/_execution/_concurrency.py index 5ca3845..578bb70 100644 --- a/kioss/_execution/_concurrency.py +++ b/iterable/_execution/_concurrency.py @@ -4,8 +4,8 @@ from queue import Queue from typing import Callable, Iterable, Iterator, Optional, Set, TypeVar, Union -from kioss import _util -from kioss._execution._core import IteratorWrapper +from iterable import _util +from iterable._execution._core import IteratorWrapper T = TypeVar("T") R = TypeVar("R") diff --git a/kioss/_execution/_core.py b/iterable/_execution/_core.py similarity index 99% rename from kioss/_execution/_core.py rename to iterable/_execution/_core.py index a7c3d0f..64279a7 100644 --- a/kioss/_execution/_core.py +++ b/iterable/_execution/_core.py @@ -6,7 +6,7 @@ T = TypeVar("T") R = TypeVar("R") -from kioss import _util +from iterable import _util class IteratorWrapper(Iterator[T], ABC): diff --git a/kioss/_pipe.py b/iterable/_stream.py similarity index 58% rename from kioss/_pipe.py rename to iterable/_stream.py index 664eb0a..a46c170 100644 --- a/kioss/_pipe.py +++ b/iterable/_stream.py @@ -15,49 +15,49 @@ overload, ) -from kioss import _util +from iterable import _util if TYPE_CHECKING: - from kioss._visit._base import Visitor + from iterable._visit._base import Visitor R = TypeVar("R") T = TypeVar("T") V = TypeVar("V") -class Pipe(Iterable[T]): +class Stream(Iterable[T]): _RUN_MAX_NUM_ERROR_SAMPLES = 8 def __init__(self, source: Callable[[], Iterable[T]]) -> None: """ - Initialize a Pipe with a data source. + Initialize a Stream with a data source. The source must be a callable that returns an iterator, i.e., an object implementing __iter__ and __next__ methods. - Each subsequent iteration over the pipe will use a fresh iterator obtained from `source()`. + Each subsequent iteration over the stream will use a fresh iterator obtained from `source()`. Args: source (Callable[[], Iterator[T]]): A factory function called to obtain a fresh data source iterator for each iteration. """ - self.upstream: "Optional[Pipe]" = None + self.upstream: "Optional[Stream]" = None if not callable(source): raise TypeError(f"source must be a callable but got a {type(source)}") self.source = source def __iter__(self) -> Iterator[T]: - from kioss._visit import _iter + from iterable._visit import _iter return self._accept(_iter.IteratorProducingVisitor[T]()) - def __add__(self, other: "Pipe[T]") -> "Pipe[T]": + def __add__(self, other: "Stream[T]") -> "Stream[T]": return self.chain(other) def explain(self, colored: bool = False) -> str: - from kioss._visit import _explanation + from iterable._visit import _explanation return self._accept(_explanation.ExplainingVisitor(colored)) def _accept(self, visitor: "Visitor[V]") -> V: - return visitor.visit_source_pipe(self) + return visitor.visit_source_stream(self) @staticmethod def sanitize_n_threads(n_threads: int): @@ -74,137 +74,137 @@ def map( self, func: Callable[[T], R], n_threads: int = 1, - ) -> "Pipe[R]": + ) -> "Stream[R]": """ - Apply a function to each element of the Pipe. + Apply a function to each element of the Stream. Args: func (Callable[[T], R]): The function to be applied to each element. n_threads (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used). Returns: - Pipe[R]: A new Pipe instance with elements resulting from applying the function to each element. + Stream[R]: A new Stream instance with elements resulting from applying the function to each element. """ - Pipe.sanitize_n_threads(n_threads) - return MapPipe(self, func, n_threads) + Stream.sanitize_n_threads(n_threads) + return MapStream(self, func, n_threads) def do( self, func: Callable[[T], Any], n_threads: int = 1, - ) -> "Pipe[T]": + ) -> "Stream[T]": """ - Run the func as side effect: the resulting Pipe forwards the upstream elements after func execution's end. + Run the func as side effect: the resulting Stream forwards the upstream elements after func execution's end. Args: func (Callable[[T], R]): The function to be applied to each element. n_threads (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used). Returns: - Pipe[T]: A new Pipe instance with elements resulting from applying the function to each element. + Stream[T]: A new Stream instance with elements resulting from applying the function to each element. """ - Pipe.sanitize_n_threads(n_threads) - return DoPipe(self, func, n_threads) + Stream.sanitize_n_threads(n_threads) + return DoStream(self, func, n_threads) @overload def flatten( - self: "Pipe[Iterable[R]]", + self: "Stream[Iterable[R]]", n_threads: int = 1, - ) -> "Pipe[R]": + ) -> "Stream[R]": ... @overload def flatten( - self: "Pipe[Collection[R]]", + self: "Stream[Collection[R]]", n_threads: int = 1, - ) -> "Pipe[R]": + ) -> "Stream[R]": ... @overload def flatten( - self: "Pipe[Pipe[R]]", + self: "Stream[Stream[R]]", n_threads: int = 1, - ) -> "Pipe[R]": + ) -> "Stream[R]": ... @overload def flatten( - self: "Pipe[Iterator[R]]", + self: "Stream[Iterator[R]]", n_threads: int = 1, - ) -> "Pipe[R]": + ) -> "Stream[R]": ... @overload def flatten( - self: "Pipe[List[R]]", + self: "Stream[List[R]]", n_threads: int = 1, - ) -> "Pipe[R]": + ) -> "Stream[R]": ... @overload def flatten( - self: "Pipe[Sequence[R]]", + self: "Stream[Sequence[R]]", n_threads: int = 1, - ) -> "Pipe[R]": + ) -> "Stream[R]": ... @overload def flatten( - self: "Pipe[Set[R]]", + self: "Stream[Set[R]]", n_threads: int = 1, - ) -> "Pipe[R]": + ) -> "Stream[R]": ... def flatten( - self: "Pipe[Iterable[R]]", + self: "Stream[Iterable[R]]", n_threads: int = 1, - ) -> "Pipe[R]": + ) -> "Stream[R]": """ - Flatten the elements of the Pipe, which are assumed to be iterables, creating a new Pipe with individual elements. + Flatten the elements of the Stream, which are assumed to be iterables, creating a new Stream with individual elements. Returns: - Pipe[R]: A new Pipe instance with individual elements obtained by flattening the original elements. + Stream[R]: A new Stream instance with individual elements obtained by flattening the original elements. n_threads (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used). """ - Pipe.sanitize_n_threads(n_threads) - return FlattenPipe(self, n_threads) + Stream.sanitize_n_threads(n_threads) + return FlattenStream(self, n_threads) - def chain(self, *others: "Pipe[T]") -> "Pipe[T]": + def chain(self, *others: "Stream[T]") -> "Stream[T]": """ - Create a new Pipe by chaining the elements of this Pipe with the elements from other Pipes. The elements of a given Pipe are yielded after its predecessor Pipe is exhausted. + Create a new Stream by chaining the elements of this Stream with the elements from other Streams. The elements of a given Stream are yielded after its predecessor Stream is exhausted. Args: - *others (Pipe[T]): One or more additional Pipe instances to chain with this Pipe. + *others (Stream[T]): One or more additional Stream instances to chain with this Stream. Returns: - Pipe[T]: A new Pipe instance with elements from this Pipe followed by elements from other Pipes. + Stream[T]: A new Stream instance with elements from this Stream followed by elements from other Streams. """ - return ChainPipe(self, list(others)) + return ChainStream(self, list(others)) - def filter(self, predicate: Callable[[T], bool]) -> "Pipe[T]": + def filter(self, predicate: Callable[[T], bool]) -> "Stream[T]": """ - Filter the elements of the Pipe based on the given predicate, creating a new Pipe with filtered elements. + Filter the elements of the Stream based on the given predicate, creating a new Stream with filtered elements. Args: predicate (Callable[[T], bool]): The function that determines whether an element should be included. Returns: - Pipe[T]: A new Pipe instance with elements that satisfy the predicate. + Stream[T]: A new Stream instance with elements that satisfy the predicate. """ - return FilterPipe(self, predicate) + return FilterStream(self, predicate) - def batch(self, size: int = 100, period: float = float("inf")) -> "Pipe[List[T]]": + def batch(self, size: int = 100, period: float = float("inf")) -> "Stream[List[T]]": """ - Batch elements of the Pipe into lists of a specified size or within a specified time window. + Batch elements of the Stream into lists of a specified size or within a specified time window. Args: size (int, optional): The maximum number of elements per batch (default is 100). period (float, optional): The maximum number of seconds to wait before yielding a batch (default is infinity). Returns: - Pipe[List[T]]: A new Pipe instance with lists containing batches of elements. + Stream[List[T]]: A new Stream instance with lists containing batches of elements. """ - return BatchPipe(self, size, period) + return BatchStream(self, size, period) - def slow(self, freq: float) -> "Pipe[T]": + def slow(self, freq: float) -> "Stream[T]": """ Slow down the iteration to a maximum frequency in Hz (max number of elements yielded per second). @@ -212,15 +212,15 @@ def slow(self, freq: float) -> "Pipe[T]": freq (float): The maximum frequency in Hz of the iteration, i.e. how many elements will be yielded per second at most. Returns: - Pipe[T]: A new Pipe instance with elements iterated at the specified frequency. + Stream[T]: A new Stream instance with elements iterated at the specified frequency. """ - return SlowPipe(self, freq) + return SlowStream(self, freq) def catch( self, *classes: Type[Exception], when: Optional[Callable[[Exception], bool]] = None, - ) -> "Pipe[T]": + ) -> "Stream[T]": """ Any exception who is instance of `exception_class`will be catched, under the condition that the `when` predicate function (if provided) returns True. @@ -229,31 +229,31 @@ def catch( when (Callable[[Exception], bool], optional): catches an exception whose type is in `classes` only if this predicate function is None or evaluates to True. Returns: - Pipe[T]: A new Pipe instance with error handling capability. + Stream[T]: A new Stream instance with error handling capability. """ - return CatchPipe(self, *classes, when=when) + return CatchStream(self, *classes, when=when) - def observe(self, what: str = "elements", colored: bool = False) -> "Pipe[T]": + def observe(self, what: str = "elements", colored: bool = False) -> "Stream[T]": """ Will logs the evolution of the iteration over elements. Args: - what (str): name the objects yielded by the pipe for clearer logs, must be a plural descriptor. + what (str): name the objects yielded by the stream for clearer logs, must be a plural descriptor. colored (bool): whether or not to use ascii colorization. Returns: - Pipe[T]: A new Pipe instance with logging capability. + Stream[T]: A new Stream instance with logging capability. """ - return ObservePipe(self, what, colored) + return ObserveStream(self, what, colored) - def run( + def iterate( self, collect_limit: int = 0, raise_if_more_errors_than: int = 0, fail_fast: bool = False, ) -> List[T]: """ - Run the Pipe: + Run the Stream: - iterates over it until it is exhausted, - logs - catches exceptions log a sample of them at the end of the iteration @@ -265,15 +265,15 @@ def run( collect_limit (int, optional): How many output elements to return (default is 0). fail_fast (bool, optional): Decide to raise at the first encountered exception or at the end of the iteration (default is False). Returns: - List[T]: A list containing the elements of the Pipe truncate to the first `n_samples` ones. + List[T]: A list containing the elements of the Stream titeratecate to the first `n_samples` ones. Raises: Exception: If more exception than `raise_if_more_errors_than` are catched during iteration. """ max_num_error_samples = self._RUN_MAX_NUM_ERROR_SAMPLES - pipe = self + stream = self - if not isinstance(self, ObservePipe): - pipe = self.observe("output elements") + if not isinstance(self, ObserveStream): + stream = self.observe("output elements") error_samples: List[Exception] = [] errors_count = 0 @@ -287,12 +287,12 @@ def register_error_sample(error): error_samples.append(error) return True - pipe = pipe.catch(Exception, when=register_error_sample) + stream = stream.catch(Exception, when=register_error_sample) - _util.LOGGER.info(pipe.explain(colored=False)) + _util.LOGGER.info(stream.explain(colored=False)) output_samples: List[T] = [] - for elem in pipe: + for elem in stream: if len(output_samples) < collect_limit: output_samples.append(elem) @@ -313,13 +313,13 @@ def register_error_sample(error): Z = TypeVar("Z") -class SourcePipe(Pipe[X]): +class SourceStream(Stream[X]): def __init__(self, source: Callable[[], Iterable[X]]): """ - Initialize a Pipe with a data source. + Initialize a Stream with a data source. The source must be a callable that returns an iterator, i.e., an object implementing __iter__ and __next__ methods. - Each subsequent iteration over the pipe will use a fresh iterator obtained from `source()`. + Each subsequent iteration over the stream will use a fresh iterator obtained from `source()`. Args: source (Callable[[], Iterator[T]]): A factory function called to obtain a fresh data source iterator for each iteration. @@ -332,95 +332,95 @@ def __init__(self, source: Callable[[], Iterable[X]]): self.source = source def _accept(self, visitor: "Visitor[V]") -> V: - return visitor.visit_source_pipe(self) + return visitor.visit_source_stream(self) -class FilterPipe(Pipe[Y]): - def __init__(self, upstream: Pipe[Y], predicate: Callable[[Y], bool]): - self.upstream: Pipe[Y] = upstream +class FilterStream(Stream[Y]): + def __init__(self, upstream: Stream[Y], predicate: Callable[[Y], bool]): + self.upstream: Stream[Y] = upstream self.predicate = predicate def _accept(self, visitor: "Visitor[V]") -> V: - return visitor.visit_filter_pipe(self) + return visitor.visit_filter_stream(self) -class MapPipe(Pipe[Z], Generic[Y, Z]): - def __init__(self, upstream: Pipe[Y], func: Callable[[Y], Z], n_threads: int): - self.upstream: Pipe[Y] = upstream +class MapStream(Stream[Z], Generic[Y, Z]): + def __init__(self, upstream: Stream[Y], func: Callable[[Y], Z], n_threads: int): + self.upstream: Stream[Y] = upstream self.func = func self.n_threads = n_threads def _accept(self, visitor: "Visitor[V]") -> V: - return visitor.visit_map_pipe(self) + return visitor.visit_map_stream(self) -class DoPipe(Pipe[Y]): - def __init__(self, upstream: Pipe[Y], func: Callable[[Y], Any], n_threads: int): - self.upstream: Pipe[Y] = upstream +class DoStream(Stream[Y]): + def __init__(self, upstream: Stream[Y], func: Callable[[Y], Any], n_threads: int): + self.upstream: Stream[Y] = upstream self.func = func self.n_threads = n_threads def _accept(self, visitor: "Visitor[V]") -> V: - return visitor.visit_do_pipe(self) + return visitor.visit_do_stream(self) -class ObservePipe(Pipe[Y]): - def __init__(self, upstream: Pipe[Y], what: str, colored: bool): - self.upstream: Pipe[Y] = upstream +class ObserveStream(Stream[Y]): + def __init__(self, upstream: Stream[Y], what: str, colored: bool): + self.upstream: Stream[Y] = upstream self.what = what self.colored = colored def _accept(self, visitor: "Visitor[V]") -> V: - return visitor.visit_observe_pipe(self) + return visitor.visit_observe_stream(self) -class FlattenPipe(Pipe[Y]): - def __init__(self, upstream: Pipe[Iterable[Y]], n_threads: int) -> None: - self.upstream: Pipe[Iterable[Y]] = upstream +class FlattenStream(Stream[Y]): + def __init__(self, upstream: Stream[Iterable[Y]], n_threads: int) -> None: + self.upstream: Stream[Iterable[Y]] = upstream self.n_threads = n_threads def _accept(self, visitor: "Visitor[V]") -> V: - return visitor.visit_flatten_pipe(self) + return visitor.visit_flatten_stream(self) -class BatchPipe(Pipe[List[Y]]): - def __init__(self, upstream: Pipe[Y], size: int, period: float): - self.upstream: Pipe[Y] = upstream +class BatchStream(Stream[List[Y]]): + def __init__(self, upstream: Stream[Y], size: int, period: float): + self.upstream: Stream[Y] = upstream self.size = size self.period = period def _accept(self, visitor: "Visitor[V]") -> V: - return visitor.visit_batch_pipe(self) + return visitor.visit_batch_stream(self) -class CatchPipe(Pipe[Y]): +class CatchStream(Stream[Y]): def __init__( self, - upstream: Pipe[Y], + upstream: Stream[Y], *classes: Type[Exception], when: Optional[Callable[[Exception], bool]] = None, ): - self.upstream: Pipe[Y] = upstream + self.upstream: Stream[Y] = upstream self.classes = classes self.when = when def _accept(self, visitor: "Visitor[V]") -> V: - return visitor.visit_catch_pipe(self) + return visitor.visit_catch_stream(self) -class ChainPipe(Pipe[Y]): - def __init__(self, upstream: Pipe[Y], others: List[Pipe]): - self.upstream: Pipe[Y] = upstream +class ChainStream(Stream[Y]): + def __init__(self, upstream: Stream[Y], others: List[Stream]): + self.upstream: Stream[Y] = upstream self.others = others def _accept(self, visitor: "Visitor[V]") -> V: - return visitor.visit_chain_pipe(self) + return visitor.visit_chain_stream(self) -class SlowPipe(Pipe[Y]): - def __init__(self, upstream: Pipe[Y], freq: float): - self.upstream: Pipe[Y] = upstream +class SlowStream(Stream[Y]): + def __init__(self, upstream: Stream[Y], freq: float): + self.upstream: Stream[Y] = upstream self.freq = freq def _accept(self, visitor: "Visitor[V]") -> V: - return visitor.visit_slow_pipe(self) + return visitor.visit_slow_stream(self) diff --git a/kioss/_util.py b/iterable/_util.py similarity index 97% rename from kioss/_util.py rename to iterable/_util.py index ace4fbe..bc70470 100644 --- a/kioss/_util.py +++ b/iterable/_util.py @@ -3,7 +3,7 @@ from typing_extensions import TypeGuard -LOGGER = logging.getLogger("kioss") +LOGGER = logging.getLogger("iterable") LOGGER.propagate = False handler = logging.StreamHandler() formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") diff --git a/kioss/_visit/__init__.py b/iterable/_visit/__init__.py similarity index 100% rename from kioss/_visit/__init__.py rename to iterable/_visit/__init__.py diff --git a/iterable/_visit/_base.py b/iterable/_visit/_base.py new file mode 100644 index 0000000..8a153d2 --- /dev/null +++ b/iterable/_visit/_base.py @@ -0,0 +1,48 @@ +from abc import ABC, abstractmethod +from typing import Generic, TypeVar + +from iterable import _stream + +V = TypeVar("V") + + +class Visitor(Generic[V], ABC): + @abstractmethod + def visit_source_stream(self, stream: _stream.Stream) -> V: + ... + + @abstractmethod + def visit_map_stream(self, stream: _stream.MapStream) -> V: + ... + + @abstractmethod + def visit_do_stream(self, stream: _stream.DoStream) -> V: + ... + + @abstractmethod + def visit_flatten_stream(self, stream: _stream.FlattenStream) -> V: + ... + + @abstractmethod + def visit_chain_stream(self, stream: _stream.ChainStream) -> V: + ... + + @abstractmethod + def visit_filter_stream(self, stream: _stream.FilterStream) -> V: + ... + + @abstractmethod + def visit_batch_stream(self, stream: _stream.BatchStream) -> V: + ... + + @abstractmethod + def visit_slow_stream(self, stream: _stream.SlowStream) -> V: + ... + + @abstractmethod + def visit_catch_stream(self, stream: _stream.CatchStream) -> V: + ... + + @abstractmethod + def visit_observe_stream(self, stream: _stream.ObserveStream) -> V: + ... diff --git a/iterable/_visit/_explanation.py b/iterable/_visit/_explanation.py new file mode 100644 index 0000000..e935368 --- /dev/null +++ b/iterable/_visit/_explanation.py @@ -0,0 +1,108 @@ +from typing import Any + +from iterable import _stream, _util +from iterable._visit._base import Visitor + + +class ExplainingVisitor(Visitor[str]): + HEADER = "Stream's plan:" + + def __init__( + self, colored: bool = False, initial_margin: int = 0, add_header: bool = True + ): + self.colored = colored + self.current_margin = initial_margin + self.margin_step = 2 + self.add_header = add_header + + def additional_explain_lines(self, name: str, descr: str) -> str: + margin = " " * self.current_margin + if self.add_header: + linking_symbols = " " * self.margin_step + "•" + else: + linking_symbols = "└" + "─" * (self.margin_step - 1) + "•" + + if self.colored: + linking_symbols = _util.colorize_in_grey(linking_symbols) + name = _util.colorize_in_red(name) + return f"{margin}{linking_symbols}{name}({descr})\n" + + def visit_any_stream(self, stream: _stream.Stream, name: str, descr: str) -> str: + additional_explain_lines = self.additional_explain_lines(name, descr) + if self.add_header: + if self.colored: + header = _util.bold(ExplainingVisitor.HEADER) + "\n" + else: + header = ExplainingVisitor.HEADER + "\n" + self.add_header = False + else: + header = "" + self.current_margin += self.margin_step + if stream.upstream is not None: + upstream_repr = stream.upstream._accept(self) + else: + upstream_repr = "" + return f"{header}{additional_explain_lines}{upstream_repr}" + + def visit_chain_stream(self, stream: _stream.ChainStream) -> Any: + name = "Chain" + descr = f"{len(stream.others)+1} streams" + additional_explain_lines = self.additional_explain_lines(name, descr) + self.current_margin += self.margin_step + chained_streams_repr = "".join( + map( + lambda stream: stream._accept( + ExplainingVisitor( + self.colored, self.current_margin, add_header=False + ) + ), + stream.others, + ) + ) + upstream_repr = stream.upstream._accept(self) + return f"{additional_explain_lines}{chained_streams_repr}{upstream_repr}" + + def visit_source_stream(self, stream: _stream.Stream) -> Any: + name = "Source" + descr = f"from: {stream.source}" + return self.visit_any_stream(stream, name, descr) + + def visit_map_stream(self, stream: _stream.MapStream) -> Any: + name = "Map" + descr = f"function {stream.func}, using {stream.n_threads} thread{'s' if stream.n_threads > 1 else ''}" + return self.visit_any_stream(stream, name, descr) + + def visit_do_stream(self, stream: _stream.DoStream) -> Any: + name = "Do" + descr = f"side effects by applying a function {stream.func}, using {stream.n_threads} thread{'s' if stream.n_threads > 1 else ''}" + return self.visit_any_stream(stream, name, descr) + + def visit_flatten_stream(self, stream: _stream.FlattenStream) -> Any: + name = "Flatten" + descr = f"using {stream.n_threads} thread{'s' if stream.n_threads > 1 else ''}" + return self.visit_any_stream(stream, name, descr) + + def visit_filter_stream(self, stream: _stream.FilterStream) -> Any: + name = "Filter" + descr = f"using predicate function {stream.predicate}" + return self.visit_any_stream(stream, name, descr) + + def visit_batch_stream(self, stream: _stream.BatchStream) -> Any: + name = "Batch" + descr = f"elements by groups of {stream.size} element{'s' if stream.size > 1 else ''}, or over a period of {stream.period} second{'s' if stream.period > 1 else ''}" + return self.visit_any_stream(stream, name, descr) + + def visit_slow_stream(self, stream: _stream.SlowStream) -> Any: + name = "Slow" + descr = f"at a maximum frequency of {stream.freq} element{'s' if stream.freq > 1 else ''} per second" + return self.visit_any_stream(stream, name, descr) + + def visit_catch_stream(self, stream: _stream.CatchStream) -> Any: + name = "Catch" + descr = f"exception instances of class in [{', '.join(map(lambda class_: class_.__name__, stream.classes))}]{', with an additional `when` condition' if stream.when is not None else ''}" + return self.visit_any_stream(stream, name, descr) + + def visit_observe_stream(self, stream: _stream.ObserveStream) -> Any: + name = "Observe" + descr = f"the evolution of the iteration over '{stream.what}'" + return self.visit_any_stream(stream, name, descr) diff --git a/iterable/_visit/_iter.py b/iterable/_visit/_iter.py new file mode 100644 index 0000000..52efabc --- /dev/null +++ b/iterable/_visit/_iter.py @@ -0,0 +1,80 @@ +import itertools +from typing import Iterable, Iterator, List, TypeVar, cast + +from iterable import _stream, _util +from iterable._execution import _concurrency, _core +from iterable._visit._base import Visitor + +T = TypeVar("T") +U = TypeVar("U") + + +class IteratorProducingVisitor(Visitor[Iterator[T]]): + def visit_source_stream(self, stream: _stream.Stream[T]) -> Iterator[T]: + iterable = stream.source() + _util.ducktype_assert_iterable(iterable) + return iter(iterable) + + def visit_map_stream(self, stream: _stream.MapStream[U, T]) -> Iterator[T]: + func = _util.map_exception(stream.func, source=StopIteration, target=RuntimeError) + it: Iterator[U] = stream.upstream._accept(IteratorProducingVisitor[U]()) + if stream.n_threads == 1: + return map(func, it) + else: + return _concurrency.ThreadedMappingIteratorWrapper( + it, func, n_workers=stream.n_threads + ) + + def visit_do_stream(self, stream: _stream.DoStream[T]) -> Iterator[T]: + func = _util.sidify( + _util.map_exception(stream.func, source=StopIteration, target=RuntimeError) + ) + return self.visit_map_stream(_stream.MapStream(stream.upstream, func, stream.n_threads)) + + def visit_flatten_stream(self, stream: _stream.FlattenStream[T]) -> Iterator[T]: + it = stream.upstream._accept(IteratorProducingVisitor[Iterable]()) + if stream.n_threads == 1: + return _core.FlatteningIteratorWrapper(it) + else: + return _concurrency.ThreadedFlatteningIteratorWrapper( + it, n_workers=stream.n_threads + ) + + def visit_chain_stream(self, stream: _stream.ChainStream[T]) -> Iterator[T]: + it: Iterator[T] = stream.upstream._accept(self) + other_its: List[Iterator[T]] = list( + map(lambda stream: stream._accept(self), stream.others) + ) + return itertools.chain(it, *other_its) + + def visit_filter_stream(self, stream: _stream.FilterStream[T]) -> Iterator[T]: + predicate = _util.map_exception( + stream.predicate, source=StopIteration, target=RuntimeError + ) + it: Iterator[T] = stream.upstream._accept(self) + return filter(predicate, it) + + def visit_batch_stream(self, stream: _stream.BatchStream[U]) -> Iterator[T]: + it: Iterator[U] = stream.upstream._accept(IteratorProducingVisitor[U]()) + return cast( + Iterator[T], _core.BatchingIteratorWrapper(it, stream.size, stream.period) + ) + + def visit_slow_stream(self, stream: _stream.SlowStream[T]) -> Iterator[T]: + return _core.SlowingIteratorWrapper(stream.upstream._accept(self), stream.freq) + + def visit_catch_stream(self, stream: _stream.CatchStream[T]) -> Iterator[T]: + if stream.when is not None: + when = _util.map_exception( + stream.when, source=StopIteration, target=RuntimeError + ) + else: + when = None + return _core.CatchingIteratorWrapper( + stream.upstream._accept(self), *stream.classes, when=when + ) + + def visit_observe_stream(self, stream: _stream.ObserveStream[T]) -> Iterator[T]: + return _core.ObservingIteratorWrapper( + stream.upstream._accept(self), stream.what, stream.colored + ) diff --git a/kioss/py.typed b/iterable/py.typed similarity index 100% rename from kioss/py.typed rename to iterable/py.typed diff --git a/kioss/__init__.py b/kioss/__init__.py deleted file mode 100644 index 7de3791..0000000 --- a/kioss/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from kioss._pipe import Pipe diff --git a/kioss/_visit/_base.py b/kioss/_visit/_base.py deleted file mode 100644 index e743d6d..0000000 --- a/kioss/_visit/_base.py +++ /dev/null @@ -1,48 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Generic, TypeVar - -from kioss import _pipe - -V = TypeVar("V") - - -class Visitor(Generic[V], ABC): - @abstractmethod - def visit_source_pipe(self, pipe: _pipe.Pipe) -> V: - ... - - @abstractmethod - def visit_map_pipe(self, pipe: _pipe.MapPipe) -> V: - ... - - @abstractmethod - def visit_do_pipe(self, pipe: _pipe.DoPipe) -> V: - ... - - @abstractmethod - def visit_flatten_pipe(self, pipe: _pipe.FlattenPipe) -> V: - ... - - @abstractmethod - def visit_chain_pipe(self, pipe: _pipe.ChainPipe) -> V: - ... - - @abstractmethod - def visit_filter_pipe(self, pipe: _pipe.FilterPipe) -> V: - ... - - @abstractmethod - def visit_batch_pipe(self, pipe: _pipe.BatchPipe) -> V: - ... - - @abstractmethod - def visit_slow_pipe(self, pipe: _pipe.SlowPipe) -> V: - ... - - @abstractmethod - def visit_catch_pipe(self, pipe: _pipe.CatchPipe) -> V: - ... - - @abstractmethod - def visit_observe_pipe(self, pipe: _pipe.ObservePipe) -> V: - ... diff --git a/kioss/_visit/_explanation.py b/kioss/_visit/_explanation.py deleted file mode 100644 index 25a52a9..0000000 --- a/kioss/_visit/_explanation.py +++ /dev/null @@ -1,108 +0,0 @@ -from typing import Any - -from kioss import _pipe, _util -from kioss._visit._base import Visitor - - -class ExplainingVisitor(Visitor[str]): - HEADER = "Pipe's plan:" - - def __init__( - self, colored: bool = False, initial_margin: int = 0, add_header: bool = True - ): - self.colored = colored - self.current_margin = initial_margin - self.margin_step = 2 - self.add_header = add_header - - def additional_explain_lines(self, name: str, descr: str) -> str: - margin = " " * self.current_margin - if self.add_header: - linking_symbols = " " * self.margin_step + "•" - else: - linking_symbols = "└" + "─" * (self.margin_step - 1) + "•" - - if self.colored: - linking_symbols = _util.colorize_in_grey(linking_symbols) - name = _util.colorize_in_red(name) - return f"{margin}{linking_symbols}{name}({descr})\n" - - def visit_any_pipe(self, pipe: _pipe.Pipe, name: str, descr: str) -> str: - additional_explain_lines = self.additional_explain_lines(name, descr) - if self.add_header: - if self.colored: - header = _util.bold(ExplainingVisitor.HEADER) + "\n" - else: - header = ExplainingVisitor.HEADER + "\n" - self.add_header = False - else: - header = "" - self.current_margin += self.margin_step - if pipe.upstream is not None: - upstream_repr = pipe.upstream._accept(self) - else: - upstream_repr = "" - return f"{header}{additional_explain_lines}{upstream_repr}" - - def visit_chain_pipe(self, pipe: _pipe.ChainPipe) -> Any: - name = "Chain" - descr = f"{len(pipe.others)+1} pipes" - additional_explain_lines = self.additional_explain_lines(name, descr) - self.current_margin += self.margin_step - chained_pipes_repr = "".join( - map( - lambda pipe: pipe._accept( - ExplainingVisitor( - self.colored, self.current_margin, add_header=False - ) - ), - pipe.others, - ) - ) - upstream_repr = pipe.upstream._accept(self) - return f"{additional_explain_lines}{chained_pipes_repr}{upstream_repr}" - - def visit_source_pipe(self, pipe: _pipe.Pipe) -> Any: - name = "Source" - descr = f"from: {pipe.source}" - return self.visit_any_pipe(pipe, name, descr) - - def visit_map_pipe(self, pipe: _pipe.MapPipe) -> Any: - name = "Map" - descr = f"function {pipe.func}, using {pipe.n_threads} thread{'s' if pipe.n_threads > 1 else ''}" - return self.visit_any_pipe(pipe, name, descr) - - def visit_do_pipe(self, pipe: _pipe.DoPipe) -> Any: - name = "Do" - descr = f"side effects by applying a function {pipe.func}, using {pipe.n_threads} thread{'s' if pipe.n_threads > 1 else ''}" - return self.visit_any_pipe(pipe, name, descr) - - def visit_flatten_pipe(self, pipe: _pipe.FlattenPipe) -> Any: - name = "Flatten" - descr = f"using {pipe.n_threads} thread{'s' if pipe.n_threads > 1 else ''}" - return self.visit_any_pipe(pipe, name, descr) - - def visit_filter_pipe(self, pipe: _pipe.FilterPipe) -> Any: - name = "Filter" - descr = f"using predicate function {pipe.predicate}" - return self.visit_any_pipe(pipe, name, descr) - - def visit_batch_pipe(self, pipe: _pipe.BatchPipe) -> Any: - name = "Batch" - descr = f"elements by groups of {pipe.size} element{'s' if pipe.size > 1 else ''}, or over a period of {pipe.period} second{'s' if pipe.period > 1 else ''}" - return self.visit_any_pipe(pipe, name, descr) - - def visit_slow_pipe(self, pipe: _pipe.SlowPipe) -> Any: - name = "Slow" - descr = f"at a maximum frequency of {pipe.freq} element{'s' if pipe.freq > 1 else ''} per second" - return self.visit_any_pipe(pipe, name, descr) - - def visit_catch_pipe(self, pipe: _pipe.CatchPipe) -> Any: - name = "Catch" - descr = f"exception instances of class in [{', '.join(map(lambda class_: class_.__name__, pipe.classes))}]{', with an additional `when` condition' if pipe.when is not None else ''}" - return self.visit_any_pipe(pipe, name, descr) - - def visit_observe_pipe(self, pipe: _pipe.ObservePipe) -> Any: - name = "Observe" - descr = f"the evolution of the iteration over '{pipe.what}'" - return self.visit_any_pipe(pipe, name, descr) diff --git a/kioss/_visit/_iter.py b/kioss/_visit/_iter.py deleted file mode 100644 index 7147f3d..0000000 --- a/kioss/_visit/_iter.py +++ /dev/null @@ -1,80 +0,0 @@ -import itertools -from typing import Iterable, Iterator, List, TypeVar, cast - -from kioss import _pipe, _util -from kioss._execution import _concurrency, _core -from kioss._visit._base import Visitor - -T = TypeVar("T") -U = TypeVar("U") - - -class IteratorProducingVisitor(Visitor[Iterator[T]]): - def visit_source_pipe(self, pipe: _pipe.Pipe[T]) -> Iterator[T]: - iterable = pipe.source() - _util.ducktype_assert_iterable(iterable) - return iter(iterable) - - def visit_map_pipe(self, pipe: _pipe.MapPipe[U, T]) -> Iterator[T]: - func = _util.map_exception(pipe.func, source=StopIteration, target=RuntimeError) - it: Iterator[U] = pipe.upstream._accept(IteratorProducingVisitor[U]()) - if pipe.n_threads == 1: - return map(func, it) - else: - return _concurrency.ThreadedMappingIteratorWrapper( - it, func, n_workers=pipe.n_threads - ) - - def visit_do_pipe(self, pipe: _pipe.DoPipe[T]) -> Iterator[T]: - func = _util.sidify( - _util.map_exception(pipe.func, source=StopIteration, target=RuntimeError) - ) - return self.visit_map_pipe(_pipe.MapPipe(pipe.upstream, func, pipe.n_threads)) - - def visit_flatten_pipe(self, pipe: _pipe.FlattenPipe[T]) -> Iterator[T]: - it = pipe.upstream._accept(IteratorProducingVisitor[Iterable]()) - if pipe.n_threads == 1: - return _core.FlatteningIteratorWrapper(it) - else: - return _concurrency.ThreadedFlatteningIteratorWrapper( - it, n_workers=pipe.n_threads - ) - - def visit_chain_pipe(self, pipe: _pipe.ChainPipe[T]) -> Iterator[T]: - it: Iterator[T] = pipe.upstream._accept(self) - other_its: List[Iterator[T]] = list( - map(lambda pipe: pipe._accept(self), pipe.others) - ) - return itertools.chain(it, *other_its) - - def visit_filter_pipe(self, pipe: _pipe.FilterPipe[T]) -> Iterator[T]: - predicate = _util.map_exception( - pipe.predicate, source=StopIteration, target=RuntimeError - ) - it: Iterator[T] = pipe.upstream._accept(self) - return filter(predicate, it) - - def visit_batch_pipe(self, pipe: _pipe.BatchPipe[U]) -> Iterator[T]: - it: Iterator[U] = pipe.upstream._accept(IteratorProducingVisitor[U]()) - return cast( - Iterator[T], _core.BatchingIteratorWrapper(it, pipe.size, pipe.period) - ) - - def visit_slow_pipe(self, pipe: _pipe.SlowPipe[T]) -> Iterator[T]: - return _core.SlowingIteratorWrapper(pipe.upstream._accept(self), pipe.freq) - - def visit_catch_pipe(self, pipe: _pipe.CatchPipe[T]) -> Iterator[T]: - if pipe.when is not None: - when = _util.map_exception( - pipe.when, source=StopIteration, target=RuntimeError - ) - else: - when = None - return _core.CatchingIteratorWrapper( - pipe.upstream._accept(self), *pipe.classes, when=when - ) - - def visit_observe_pipe(self, pipe: _pipe.ObservePipe[T]) -> Iterator[T]: - return _core.ObservingIteratorWrapper( - pipe.upstream._accept(self), pipe.what, pipe.colored - ) diff --git a/setup.py b/setup.py index 02f5579..597ee42 100644 --- a/setup.py +++ b/setup.py @@ -1,13 +1,13 @@ from setuptools import find_packages, setup # type: ignore setup( - name='kioss', - version='0.9.1', + name='streamable', + version='0.0.1', packages=find_packages(), - package_data={"kioss": ["py.typed"]}, - url='http://github.com/bonnal-enzo/kioss', + package_data={"iterable": ["py.typed"]}, + url='http://github.com/bonnal-enzo/iterable', license='Apache 2.', author='bonnal-enzo', author_email='bonnal.enzo.dev@gmail.com', - description='Keep I/O Simple and Stupid: Ease the development of ETL/EL/ReverseETL scripts.' + description='Ease the manipulation of iterables.' ) diff --git a/tests/test_legacy.py b/tests/test_legacy.py index e0a5e7e..74fb980 100644 --- a/tests/test_legacy.py +++ b/tests/test_legacy.py @@ -7,16 +7,16 @@ from parameterized import parameterized # type: ignore -from kioss import Pipe, _util +from iterable import Stream, _util TEN_MS = 0.01 DELTA = 0.35 T = TypeVar("T") -def timepipe(pipe: Pipe): +def timestream(stream: Stream): def iterate(): - for _ in pipe: + for _ in stream: pass return timeit.timeit(iterate, number=1) @@ -36,31 +36,31 @@ def raise_stopiteration() -> bool: N = 64 -class TestPipe(unittest.TestCase): +class TestStream(unittest.TestCase): def test_init(self) -> None: # from iterable - self.assertListEqual(list(Pipe(range(8).__iter__)), list(range(8))) + self.assertListEqual(list(Stream(range(8).__iter__)), list(range(8))) # from iterator - self.assertListEqual(list(Pipe(range(8).__iter__)), list(range(8))) + self.assertListEqual(list(Stream(range(8).__iter__)), list(range(8))) def test_chain(self) -> None: # test that the order is preserved self.assertListEqual( list( - Pipe(range(2).__iter__) + Stream(range(2).__iter__) .chain( - Pipe(range(2, 4).__iter__), - Pipe(range(4, 6).__iter__), + Stream(range(2, 4).__iter__), + Stream(range(4, 6).__iter__), ) - .chain(Pipe(range(6, 8).__iter__)) + .chain(Stream(range(6, 8).__iter__)) ), list(range(8)), ) def test_flatten_typing(self) -> None: - a: Pipe[str] = Pipe("abc".__iter__).map(iter).flatten() - b: Pipe[str] = Pipe("abc".__iter__).map(list).flatten() - c: Pipe[str] = Pipe("abc".__iter__).map(set).flatten() + a: Stream[str] = Stream("abc".__iter__).map(iter).flatten() + b: Stream[str] = Stream("abc".__iter__).map(list).flatten() + c: Stream[str] = Stream("abc".__iter__).map(set).flatten() @parameterized.expand([[1], [2], [3]]) def test_flatten(self, n_threads: int): @@ -68,7 +68,7 @@ def test_flatten(self, n_threads: int): # test ordering self.assertListEqual( list( - Pipe(["Hello World", "Happy to be here :)"].__iter__) + Stream(["Hello World", "Happy to be here :)"].__iter__) .map(str.split) .flatten(n_threads=n_threads) ), @@ -76,7 +76,7 @@ def test_flatten(self, n_threads: int): ) self.assertSetEqual( set( - Pipe(["Hello World", "Happy to be here :)"].__iter__) + Stream(["Hello World", "Happy to be here :)"].__iter__) .map(str.split) .flatten(n_threads=n_threads) ), @@ -84,7 +84,7 @@ def test_flatten(self, n_threads: int): ) self.assertEqual( sum( - Pipe([["1 2 3", "4 5 6"], ["7", "8 9 10"]].__iter__) + Stream([["1 2 3", "4 5 6"], ["7", "8 9 10"]].__iter__) .flatten(n_threads=n_threads) .map(str.split) .flatten(n_threads=n_threads) @@ -95,48 +95,48 @@ def test_flatten(self, n_threads: int): # test potential recursion issue with chained empty iters list( - Pipe([iter([]) for _ in range(2000)].__iter__).flatten(n_threads=n_threads) + Stream([iter([]) for _ in range(2000)].__iter__).flatten(n_threads=n_threads) ) # test concurrency - single_pipe_iteration_duration = 0.5 + single_stream_iteration_duration = 0.5 queue_get_timeout = 0.1 - pipes = [ - Pipe(range(0, N, 3).__iter__).slow( - (N / 3) / single_pipe_iteration_duration + streams = [ + Stream(range(0, N, 3).__iter__).slow( + (N / 3) / single_stream_iteration_duration ), - Pipe(range(1, N, 3).__iter__).slow( - (N / 3) / single_pipe_iteration_duration + Stream(range(1, N, 3).__iter__).slow( + (N / 3) / single_stream_iteration_duration ), - Pipe(range(2, N, 3).__iter__).slow( - (N / 3) / single_pipe_iteration_duration + Stream(range(2, N, 3).__iter__).slow( + (N / 3) / single_stream_iteration_duration ), ] self.assertAlmostEqual( timeit.timeit( lambda: self.assertSetEqual( - set(Pipe(pipes.__iter__).flatten(n_threads=n_threads)), + set(Stream(streams.__iter__).flatten(n_threads=n_threads)), set(range(N)), ), number=1, ), - len(pipes) - * single_pipe_iteration_duration + len(streams) + * single_stream_iteration_duration / (1 if n_threads is None else n_threads), delta=DELTA - * len(pipes) - * single_pipe_iteration_duration + * len(streams) + * single_stream_iteration_duration / (1 if n_threads is None else n_threads) + queue_get_timeout, ) # partial iteration - zeros = lambda: Pipe(([0] * N).__iter__) + zeros = lambda: Stream(([0] * N).__iter__) self.assertEqual( next( iter( - Pipe([zeros(), zeros(), zeros()].__iter__).flatten( + Stream([zeros(), zeros(), zeros()].__iter__).flatten( n_threads=n_threads ) ) @@ -154,8 +154,8 @@ def raise_for_4(x): raise AssertionError() return x - get_pipe: Callable[[], Pipe[int]] = lambda: ( - Pipe( + get_stream: Callable[[], Stream[int]] = lambda: ( + Stream( lambda: map( raise_for_4, [ @@ -176,24 +176,24 @@ def store_error_types(error): error_types.add(type(error)) return True - set(get_pipe().catch(Exception, when=store_error_types)) + set(get_stream().catch(Exception, when=store_error_types)) self.assertSetEqual( error_types, {ValueError, TypeError, AssertionError, RuntimeError}, ) self.assertSetEqual( - set(get_pipe().catch(Exception)), + set(get_stream().catch(Exception)), set(range(7)), ) # test rasing: self.assertRaises( ValueError, - lambda: list(Pipe([map(int, "12-3")].__iter__).flatten(n_threads=n_threads)), # type: ignore + lambda: list(Stream([map(int, "12-3")].__iter__).flatten(n_threads=n_threads)), # type: ignore ) self.assertRaises( ValueError, - lambda: list(Pipe(lambda: map(int, "-")).flatten(n_threads=n_threads)), # type: ignore + lambda: list(Stream(lambda: map(int, "-")).flatten(n_threads=n_threads)), # type: ignore ) def test_add(self) -> None: @@ -201,12 +201,12 @@ def test_add(self) -> None: list( sum( [ - Pipe(range(0, 2).__iter__), - Pipe(range(2, 4).__iter__), - Pipe(range(4, 6).__iter__), - Pipe(range(6, 8).__iter__), + Stream(range(0, 2).__iter__), + Stream(range(2, 4).__iter__), + Stream(range(4, 6).__iter__), + Stream(range(6, 8).__iter__), ], - start=Pipe([].__iter__), + start=Stream([].__iter__), ) ), list(range(8)), @@ -217,7 +217,7 @@ def test_map(self, n_threads: int): func = lambda x: x**2 self.assertSetEqual( set( - Pipe(range(N).__iter__) + Stream(range(N).__iter__) .map(ten_ms_identity, n_threads=n_threads) .map(lambda x: x if 1 / x else x) .map(func, n_threads=n_threads) @@ -231,7 +231,7 @@ def test_map(self, n_threads: int): l: List[List[int]] = [[1], [], [3]] self.assertSetEqual( set( - Pipe(l.__iter__) + Stream(l.__iter__) .map(lambda l: iter(l)) .map(next, n_threads=n_threads) .catch(RuntimeError) @@ -241,12 +241,12 @@ def test_map(self, n_threads: int): def test_map_threading_bench(self) -> None: # non-threaded vs threaded execution time - pipe = Pipe(range(N).__iter__).map(ten_ms_identity) - self.assertAlmostEqual(timepipe(pipe), TEN_MS * N, delta=DELTA * (TEN_MS * N)) + stream = Stream(range(N).__iter__).map(ten_ms_identity) + self.assertAlmostEqual(timestream(stream), TEN_MS * N, delta=DELTA * (TEN_MS * N)) n_threads = 2 - pipe = Pipe(range(N).__iter__).map(ten_ms_identity, n_threads=n_threads) + stream = Stream(range(N).__iter__).map(ten_ms_identity, n_threads=n_threads) self.assertAlmostEqual( - timepipe(pipe), + timestream(stream), TEN_MS * N / n_threads, delta=DELTA * (TEN_MS * N) / n_threads, ) @@ -263,7 +263,7 @@ def func_with_side_effect(x): args = range(N) self.assertListEqual( - list(Pipe(args.__iter__).do(func_with_side_effect)), + list(Stream(args.__iter__).do(func_with_side_effect)), list(args), ) self.assertListEqual(l, list(map(func, args))) @@ -271,14 +271,14 @@ def func_with_side_effect(x): # with threads l.clear() self.assertSetEqual( - set(Pipe(args.__iter__).do(func_with_side_effect, n_threads=2)), + set(Stream(args.__iter__).do(func_with_side_effect, n_threads=2)), set(args), ) self.assertSetEqual(set(l), set(map(func, args))) self.assertEqual( list( - Pipe(range(N).__iter__) + Stream(range(N).__iter__) .do(lambda n: None if n % 2 == 0 else raise_stopiteration()) .catch(RuntimeError) ), @@ -287,15 +287,15 @@ def func_with_side_effect(x): def test_filter(self) -> None: self.assertListEqual( - list(Pipe(range(8).__iter__).filter(lambda x: x % 2 != 0)), + list(Stream(range(8).__iter__).filter(lambda x: x % 2 != 0)), [1, 3, 5, 7], ) - self.assertListEqual(list(Pipe(range(8).__iter__).filter(lambda _: False)), []) + self.assertListEqual(list(Stream(range(8).__iter__).filter(lambda _: False)), []) self.assertEqual( list( - Pipe(range(N).__iter__) + Stream(range(N).__iter__) .filter(lambda n: True if n % 2 == 0 else raise_stopiteration()) .catch(RuntimeError) ), @@ -304,32 +304,32 @@ def test_filter(self) -> None: def test_batch(self) -> None: self.assertListEqual( - list(Pipe(range(8).__iter__).batch(size=3)), + list(Stream(range(8).__iter__).batch(size=3)), [[0, 1, 2], [3, 4, 5], [6, 7]], ) self.assertListEqual( - list(Pipe(range(6).__iter__).batch(size=3)), + list(Stream(range(6).__iter__).batch(size=3)), [[0, 1, 2], [3, 4, 5]], ) self.assertListEqual( - list(Pipe(range(8).__iter__).batch(size=1)), + list(Stream(range(8).__iter__).batch(size=1)), list(map(lambda x: [x], range(8))), ) self.assertListEqual( - list(Pipe(range(8).__iter__).batch(size=8)), + list(Stream(range(8).__iter__).batch(size=8)), [list(range(8))], ) self.assertEqual( - len(list(Pipe(range(8).__iter__).slow(10).batch(period=0.09))), + len(list(Stream(range(8).__iter__).slow(10).batch(period=0.09))), 7, ) # assert batch gracefully yields if next elem throw exception self.assertListEqual( - list(Pipe("01234-56789".__iter__).map(int).batch(2).catch(ValueError)), + list(Stream("01234-56789".__iter__).map(int).batch(2).catch(ValueError)), [[0, 1], [2, 3], [4], [5, 6], [7, 8], [9]], ) self.assertListEqual( - list(Pipe("0123-56789".__iter__).map(int).batch(2).catch(ValueError)), + list(Stream("0123-56789".__iter__).map(int).batch(2).catch(ValueError)), [[0, 1], [2, 3], [5, 6], [7, 8], [9]], ) errors = set() @@ -340,7 +340,7 @@ def store_errors(error): self.assertListEqual( list( - Pipe("0123-56789".__iter__) + Stream("0123-56789".__iter__) .map(int) .batch(2) .catch(ValueError, when=store_errors) @@ -355,22 +355,22 @@ def store_errors(error): @parameterized.expand([[1], [2], [3]]) def test_slow(self, n_threads: int): freq = 64 - pipe = ( - Pipe(range(N).__iter__).map(ten_ms_identity, n_threads=n_threads).slow(freq) + stream = ( + Stream(range(N).__iter__).map(ten_ms_identity, n_threads=n_threads).slow(freq) ) self.assertAlmostEqual( - timepipe(pipe), + timestream(stream), 1 / freq * N, delta=DELTA * (1 / freq * N), ) def test_time(self) -> None: - new_pipe = lambda: Pipe(range(8).__iter__).slow(64) + new_stream = lambda: Stream(range(8).__iter__).slow(64) start_time = time.time() - list(new_pipe()) + list(new_stream()) execution_time = time.time() - start_time self.assertAlmostEqual( - execution_time, timepipe(new_pipe()), delta=DELTA * execution_time + execution_time, timestream(new_stream()), delta=DELTA * execution_time ) @parameterized.expand([[1], [2], [3]]) @@ -384,7 +384,7 @@ def store_errors(error): self.assertSetEqual( set( - Pipe(["1", "r", "2"].__iter__) + Stream(["1", "r", "2"].__iter__) .map(int, n_threads=n_threads) .catch( Exception, @@ -402,14 +402,14 @@ def store_errors(error): self.assertRaises( ValueError, lambda: list( - Pipe(["1", "r", "2"].__iter__) + Stream(["1", "r", "2"].__iter__) .map(int, n_threads=n_threads) .catch(ValueError, when=lambda error: False) ), ) self.assertListEqual( list( - Pipe(["1", "r", "2"].__iter__) + Stream(["1", "r", "2"].__iter__) .map(int, n_threads=n_threads) .catch( ValueError, @@ -423,7 +423,7 @@ def store_errors(error): # chain catches self.assertListEqual( list( - Pipe(["1", "r", "2"].__iter__) + Stream(["1", "r", "2"].__iter__) .map(int, n_threads=n_threads) .catch(TypeError) .catch(ValueError) @@ -437,7 +437,7 @@ def store_errors(error): self.assertRaises( ValueError, lambda: list( - Pipe(["1", "r", "2"].__iter__) + Stream(["1", "r", "2"].__iter__) .map(int, n_threads=n_threads) .catch(TypeError) .map(type) @@ -446,41 +446,41 @@ def store_errors(error): self.assertRaises( ValueError, lambda: list( - Pipe(["1", "r", "2"].__iter__) + Stream(["1", "r", "2"].__iter__) .map(int, n_threads=n_threads) .catch(TypeError) .map(type) ), ) - def test_run(self) -> None: - self.assertListEqual(Pipe("123".__iter__).map(int).run(collect_limit=2), [1, 2]) - self.assertListEqual(Pipe("123".__iter__).map(int).run(), []) + def test_iterate(self) -> None: + self.assertListEqual(Stream("123".__iter__).map(int).iterate(collect_limit=2), [1, 2]) + self.assertListEqual(Stream("123".__iter__).map(int).iterate(), []) # errors - run = Pipe("12-3".__iter__).map(int).run + iterate = Stream("12-3".__iter__).map(int).iterate self.assertRaises( ValueError, - run, + iterate, ) # does not raise with sufficient threshold - run(raise_if_more_errors_than=1) + iterate(raise_if_more_errors_than=1) # raise with insufficient threshold self.assertRaises( ValueError, - lambda: run(raise_if_more_errors_than=0), + lambda: iterate(raise_if_more_errors_than=0), ) # fail_fast self.assertRaises( ValueError, - lambda: Pipe("a-b".__iter__).map(int).run(fail_fast=True), + lambda: Stream("a-b".__iter__).map(int).iterate(fail_fast=True), ) def test_log(self) -> None: self.assertListEqual( list( - Pipe("123".__iter__) + Stream("123".__iter__) .observe("chars") .map(int) .observe("ints") @@ -491,18 +491,18 @@ def test_log(self) -> None: ) ( - Pipe("12-3".__iter__) + Stream("12-3".__iter__) .observe("chars") .map(int) .observe("ints", colored=True) .batch(2) - .run(raise_if_more_errors_than=1) + .iterate(raise_if_more_errors_than=1) ) def test_partial_iteration(self) -> None: first_elem = next( iter( - Pipe(([0] * N).__iter__) + Stream(([0] * N).__iter__) .slow(50) .map(_util.identity, n_threads=2) .slow(50) @@ -514,8 +514,8 @@ def test_partial_iteration(self) -> None: ) self.assertEqual(first_elem, 0) n = 10 - pipe = ( - Pipe(([0] * N).__iter__) + stream = ( + Stream(([0] * N).__iter__) .slow(50) .map(_util.identity, n_threads=2) .slow(50) @@ -524,22 +524,22 @@ def test_partial_iteration(self) -> None: .map(_util.identity, n_threads=2) .slow(50) ) - samples = list(itertools.islice(pipe, n)) + samples = list(itertools.islice(stream, n)) self.assertListEqual(samples, [0] * n) def test_invalid_source(self) -> None: - self.assertRaises(TypeError, lambda: Pipe(range(3))) # type: ignore - pipe_ok_at_construction: Pipe[int] = Pipe(lambda: 0) # type: ignore - self.assertRaises(TypeError, lambda: list(pipe_ok_at_construction)) + self.assertRaises(TypeError, lambda: Stream(range(3))) # type: ignore + stream_ok_at_construction: Stream[int] = Stream(lambda: 0) # type: ignore + self.assertRaises(TypeError, lambda: list(stream_ok_at_construction)) @parameterized.expand([[1], [2], [3]]) def test_invalid_flatten_upstream(self, n_threads: int): self.assertRaises( - TypeError, lambda: list(Pipe(range(3).__iter__).flatten(n_threads=n_threads)) # type: ignore + TypeError, lambda: list(Stream(range(3).__iter__).flatten(n_threads=n_threads)) # type: ignore ) def test_planning_and_execution_decoupling(self) -> None: - a = Pipe(range(N).__iter__) + a = Stream(range(N).__iter__) b = a.batch(size=N) # test double execution self.assertListEqual(list(a), list(range(N))) @@ -552,13 +552,13 @@ def test_generator_already_generating(self) -> None: iter((ten_ms_identity(x) for x in range(N))) for _ in range(3) ] self.assertEqual( - Counter(Pipe(l.__iter__).flatten(n_threads=2)), + Counter(Stream(l.__iter__).flatten(n_threads=2)), Counter(list(range(N)) + list(range(N)) + list(range(N))), ) def test_explain(self) -> None: - p: Pipe[int] = ( - Pipe(range(8).__iter__) + p: Stream[int] = ( + Stream(range(8).__iter__) .filter(lambda _: True) .map(lambda x: x) .batch(100) @@ -567,8 +567,8 @@ def test_explain(self) -> None: .slow(64) .observe("slowed elems") .chain( - Pipe([].__iter__).do(lambda e: None).observe("other 1"), - Pipe([].__iter__).observe("other 2"), + Stream([].__iter__).do(lambda e: None).observe("other 1"), + Stream([].__iter__).observe("other 2"), ) .catch(ValueError, TypeError, when=lambda e: True) ) @@ -581,13 +581,13 @@ def test_explain(self) -> None: print(c) def test_accept_typing(self) -> None: - p: Pipe[str] = ( - Pipe(lambda: range(10)).batch().map(lambda b: list(map(str, b))).flatten() + p: Stream[str] = ( + Stream(lambda: range(10)).batch().map(lambda b: list(map(str, b))).flatten() ) it: Iterator[str] = iter(p) - from kioss._visit._iter import IteratorProducingVisitor + from iterable._visit._iter import IteratorProducingVisitor p._accept(IteratorProducingVisitor[str]()) - from kioss._visit._explanation import ExplainingVisitor + from iterable._visit._explanation import ExplainingVisitor p._accept(ExplainingVisitor()) diff --git a/tests/test_pipe.py b/tests/test_pipe.py index ac118d0..49a3ae3 100644 --- a/tests/test_pipe.py +++ b/tests/test_pipe.py @@ -5,14 +5,14 @@ from parameterized import parameterized # type: ignore -from kioss import Pipe +from iterable import Stream T = TypeVar("T") -def timepipe(pipe: Pipe): +def timestream(stream: Stream): def iterate(): - for _ in pipe: + for _ in stream: pass return timeit.timeit(iterate, number=1) @@ -36,104 +36,104 @@ def identity(x: T) -> T: src: Callable[[], Iterable[int]] = range(N).__iter__ -class TestPipe(unittest.TestCase): +class TestStream(unittest.TestCase): def test_init(self) -> None: - pipe = Pipe(src) + stream = Stream(src) self.assertEqual( - pipe.source, + stream.source, src, - msg="The pipe's `source` must be the source argument.", + msg="The stream's `source` must be the source argument.", ) self.assertIsNone( - pipe.upstream, - msg="The `upstream` attribute of a base Pipe's instance must be None.", + stream.upstream, + msg="The `upstream` attribute of a base Stream's instance must be None.", ) with self.assertRaisesRegex( TypeError, "source must be a callable but got a ", - msg="Instantiating a Pipe with a source not being a callable must raise TypeError.", + msg="Instantiating a Stream with a source not being a callable must raise TypeError.", ): - Pipe(range(N)) # type: ignore + Stream(range(N)) # type: ignore def test_explain(self) -> None: - complex_pipe: Pipe[int] = ( - Pipe(src) + complex_stream: Stream[int] = ( + Stream(src) .filter(lambda _: True) .map(lambda _: _) .batch(100) .observe("batches") .flatten(n_threads=4) .slow(64) - .observe("pipe #1 elements") + .observe("stream #1 elements") .chain( - Pipe([].__iter__).do(lambda _: None).observe("pipe #2 elements"), - Pipe([].__iter__).observe("pipe #3 elements"), + Stream([].__iter__).do(lambda _: None).observe("stream #2 elements"), + Stream([].__iter__).observe("stream #3 elements"), ) .catch(ValueError, TypeError, when=lambda _: True) ) - explanation_1 = complex_pipe.explain() - explanation_2 = complex_pipe.explain() + explanation_1 = complex_stream.explain() + explanation_2 = complex_stream.explain() self.assertEqual( explanation_1, explanation_2, - msg="Pipe.explain() must be deterministic.", + msg="Stream.explain() must be deterministic.", ) - colored_explanation = complex_pipe.explain(colored=True) + colored_explanation = complex_stream.explain(colored=True) self.assertNotEqual( explanation_1, colored_explanation, - msg="Pipe.explain(colored=True) must different from non colored one.", + msg="Stream.explain(colored=True) must different from non colored one.", ) - explanation_3 = complex_pipe.map(str).explain() + explanation_3 = complex_stream.map(str).explain() self.assertNotEqual( explanation_1, explanation_3, - msg="explanation of different pipes must be different", + msg="explanation of different streams must be different", ) print(colored_explanation) def test_iter(self) -> None: self.assertIsInstance( - iter(Pipe(src)), + iter(Stream(src)), Iterator, - msg="iter(pipe) must return an Iterator", + msg="iter(stream) must return an Iterator", ) def test_add(self) -> None: - from kioss._pipe import ChainPipe + from iterable._stream import ChainStream - pipe = Pipe(src) + stream = Stream(src) self.assertIsInstance( - pipe.chain(pipe), - ChainPipe, - msg="iter(pipe) must return an Iterator", + stream.chain(stream), + ChainStream, + msg="iter(stream) must return an Iterator", ) @parameterized.expand( [ - [Pipe.map, [identity]], - [Pipe.do, [identity]], - [Pipe.flatten, []], + [Stream.map, [identity]], + [Stream.do, [identity]], + [Stream.flatten, []], ] ) def test_sanitize_n_threads(self, method, args) -> None: - pipe = Pipe(src) + stream = Stream(src) with self.assertRaises( TypeError, msg=f"{method} should be raising TypeError for non-int n_threads.", ): - method(pipe, *args, n_threads="1") + method(stream, *args, n_threads="1") with self.assertRaises( ValueError, msg=f"{method} should be raising ValueError for n_threads=0." ): - method(pipe, *args, n_threads=0) + method(stream, *args, n_threads=0) for n_threads in range(1, 10): self.assertIsInstance( - method(pipe, *args, n_threads=n_threads), - Pipe, + method(stream, *args, n_threads=n_threads), + Stream, msg=f"it must be ok to call {method} with n_threads={n_threads}", ) diff --git a/tests/test_util.py b/tests/test_util.py index fdf5fd3..1bbf110 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -1,6 +1,6 @@ import unittest -from kioss._util import sidify +from iterable._util import sidify class TestUtil(unittest.TestCase):