diff --git a/README.md b/README.md index 807735e..52a2f58 100644 --- a/README.md +++ b/README.md @@ -15,14 +15,14 @@ There is only 1 import: from kioss import Pipe ``` Use this `Pipe` class as follow: -1. instanciate it with a data source -2. plan transformations and controlling operations on it -3. execute it +1. **instanciate** it with a data source +2. **plan** transformations and controlling operations on it +3. **execute** it +--- +### 💾 Define the data source +- `.__init__` a `Pipe` by providing a `Callable` returning an `Iterator[T]` or `Iterable[T]` as data source. -### Init -- `.__init__` a `Pipe` by providing a `Callable` returning an `Iterator[T]` or `Iterable[T]` as source. - -### Plan transformation operations +### ⚙️ Plan transformation operations - `.map` a function over a pipe (optional multithreading). - `.do` side effects on a pipe by calling an function over it while discarding the results (optional multithreading). - `.flatten` a pipe whose elements are themselves `Iterator`s (optional multithreading). @@ -30,24 +30,25 @@ Use this `Pipe` class as follow: - `.chain` several pipes to form a new one that yields elements of one pipe after the previous one is exhausted. - `.batch` the elements of a pipe and yield them as `list`s of a specific maximum size and/or spanning over a specific period of time. -### Plan controling operations +### 🎛️ Plan controlling operations - `.slow` a pipe, i.e. rate limit the iteration over it. - `.log` a pipe's iteration advancement (logarithmically, so no spam). - `.catch` a pipe's exceptions by deciding which specific subtype of `Exception` to catch and whether to ignore it or to yield it. -### Execute the plan +### 🎬 Execute the plan - `.collect` a pipe's elements into a list having an optional max size. - `.superintend` a pipe, i.e. iterate over it until it is exhausted, with logging and exceptions catching, ultimately returning the collected outputs or logging a sample of the encountered errors and raising if any. -### Inter-operate +### ♻️ Inter-operate The `Pipe[T]` class extends `Iterable[T]`, hence you can pass a pipe to any function supporting iterables: - `set(pipe)` - `functools.reduce(func, pipe, initial)` - `itertools.islice(pipe, n_samples)` - ... -## Code snippets -### 1. Extract social media messages from GCS and POST the hashtags they contain into a web API +---- +## Code snippet +Extract social media messages from GCS and POST the hashtags they contain into a web API ```python from typing import Iterator from google.cloud import storage @@ -95,7 +96,7 @@ object_paths: Iterator[str] = ... auth=("foo", "bar"), json={"hashtags": hashtags}, ), - n_workers=4, + n_threads=4, ) # raise for each response having status code 4XX or 5XX. .do(requests.Response.raise_for_status)