Skip to content

Commit

Permalink
update README
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Oct 17, 2023
1 parent 459e92e commit 42a30d6
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,40 @@ There is only 1 import:
from kioss import Pipe
```
Use this `Pipe` class as follow:
1. instanciate it with a data source
2. plan transformations and controlling operations on it
3. execute it
1. **instanciate** it with a data source
2. **plan** transformations and controlling operations on it
3. **execute** it
---
### 💾 Define the data source
- `.__init__` a `Pipe` by providing a `Callable` returning an `Iterator[T]` or `Iterable[T]` as data source.

### Init
- `.__init__` a `Pipe` by providing a `Callable` returning an `Iterator[T]` or `Iterable[T]` as source.

### Plan transformation operations
### ⚙️ Plan transformation operations
- `.map` a function over a pipe (optional multithreading).
- `.do` side effects on a pipe by calling an function over it while discarding the results (optional multithreading).
- `.flatten` a pipe whose elements are themselves `Iterator`s (optional multithreading).
- `.filter` a pipe using a predicate function.
- `.chain` several pipes to form a new one that yields elements of one pipe after the previous one is exhausted.
- `.batch` the elements of a pipe and yield them as `list`s of a specific maximum size and/or spanning over a specific period of time.

### Plan controling operations
### 🎛️ Plan controlling operations
- `.slow` a pipe, i.e. rate limit the iteration over it.
- `.log` a pipe's iteration advancement (logarithmically, so no spam).
- `.catch` a pipe's exceptions by deciding which specific subtype of `Exception` to catch and whether to ignore it or to yield it.

### Execute the plan
### 🎬 Execute the plan
- `.collect` a pipe's elements into a list having an optional max size.
- `.superintend` a pipe, i.e. iterate over it until it is exhausted, with logging and exceptions catching, ultimately returning the collected outputs or logging a sample of the encountered errors and raising if any.

### Inter-operate
### ♻️ Inter-operate
The `Pipe[T]` class extends `Iterable[T]`, hence you can pass a pipe to any function supporting iterables:
- `set(pipe)`
- `functools.reduce(func, pipe, initial)`
- `itertools.islice(pipe, n_samples)`
- ...

## Code snippets
### 1. Extract social media messages from GCS and POST the hashtags they contain into a web API
----
## Code snippet
Extract social media messages from GCS and POST the hashtags they contain into a web API
```python
from typing import Iterator
from google.cloud import storage
Expand Down Expand Up @@ -95,7 +96,7 @@ object_paths: Iterator[str] = ...
auth=("foo", "bar"),
json={"hashtags": hashtags},
),
n_workers=4,
n_threads=4,
)
# raise for each response having status code 4XX or 5XX.
.do(requests.Response.raise_for_status)
Expand Down

0 comments on commit 42a30d6

Please sign in to comment.