Skip to content

Commit

Permalink
update README
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Oct 10, 2023
1 parent 2ad73c0 commit 425f7f5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 27 deletions.
52 changes: 25 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,36 @@

[![Actions Status](https://github.com/bonnal-enzo/kioss/workflows/test/badge.svg)](https://github.com/bonnal-enzo/kioss/actions) [![Actions Status](https://github.com/bonnal-enzo/kioss/workflows/PyPI/badge.svg)](https://github.com/bonnal-enzo/kioss/actions)

Write concise and expressive definitions of ETL and Reverse ETL pipelines. This library has been specifically designed to be convenient for handling data integration from and to APIs, with features such as multithreading, rate limiting, batching, and exception handling.
Expressive `Iterator`-based library that has been designed to **ease the development of (reverse) ETL data pipelines**, with features such as *multithreading*, *rate limiting*, *batching*, and *exceptions handling*.

## Install

`pip install kioss`

## Example
## Interface
- ***define***:
- `.__init__` a `Pipe` by providing an instance of `Iterator[T]` or `Iterable[T]` as source.
- `.map` a function over a pipe, optionally using multiple threads or processes.
- `.do` side effects on a pipe by calling an impure function over it, optionally using multiple threads or processes.
- `.flatten` a pipe whose elements are themselves `Iterator` or `Pipe` instances, optionally using multiple threads or processes.
- `.filter` a pipe using a predicate function.
- `.chain` several pipes to form a new one that yields elements of one pipe after the previous one is exhausted.
- `.batch` the elements of a pipe and yield them as `list`s of a specific maximum size and/or spanning over a specific period of time.
- ***control***:
- `.slow` a pipe, i.e. rate limit the iteration over it.
- `.log` a pipe's iteration advancement (no flood thanks to a logarithmic approach).
- `.catch` a pipe's exceptions by deciding which specific subtype of `Exception` to catch and whether to ignore it or to yield it.
- ***consume***: **(only these methods trigger the iteration over the pipe)**
- `.collect` a pipe's elements into a list having an optional max size.
- `.superintend` a pipe, i.e. iterate over it until it is exhausted, with logging and exceptions catching, ultimately logging a sample of the encountered errors and raising if any.
- ***inter-operate***: the `Pipe` class extends `Iterator[T]`, hence you can pass a pipe to any function supporting iterators:
- `set(pipe)`
- `functools.reduce(func, pipe, initial)`
- `itertools.islice(pipe, n_samples)`
- ...

## Code snippets
### 1. Extract social media messages from GCS and POST the hashtags they contain into a web API
```python
from typing import Iterator
from google.cloud import storage
Expand All @@ -28,7 +50,7 @@ object_paths: Iterator[str] = ...
# and we want to POST their hashtags to an API
(
# instanciate a Pipe with object paths as data source
Pipe(object_paths)
Pipe(source=object_paths)
.log(what="object paths")
# get the blob corresponding to this object path in the given bucket
.map(bucket.get_blob)
Expand Down Expand Up @@ -68,27 +90,3 @@ object_paths: Iterator[str] = ...
.superintend()
)
```

## Features
- define:
- The `.__init__` of the `Pipe` class takes as argument an instance of `Iterator[T]` or `Iterable[T]` used as the source of elements.
- `.map` a function over a pipe, optionally using multiple threads or processes.
- `.flatten` a pipe, whose elements are assumed to be iterators, creating a new pipe with individual elements, optionally using multiple threads or processes.
- `.filter` a pipe using a predicate function.
- `.do` side effects on a pipe, i.e. apply a function ignoring its returned value, optionally using multiple threads or processes.
- `.chain` several pipes to form a new one that yields elements of one pipe after the previous one is exhausted.
- `.batch` pipe's elements and yield them as lists of a given size or spanning over a given duration.
- control:
- `.slow` a pipe to limit the rate of the iteration over it.
- `.log` a pipe to get an idea of the status of the iteration.
- `.catch` pipe's exceptions.
- consume:
- `.collect` a pipe into a list having an optional max size.
- `.superintend` a pipe: iterate over it entirely while catching exceptions + logging the iteration process + collecting and raising error samples.
-----
Note that the `Pipe` class itself extends `Iterator[T]`, hence you can pass a pipe to any function supporting iterators:
- `set(pipe)`
- `functools.reduce(func, pipe, initial)`
- `itertools.islice(pipe, n_samples)`
- ...

1 change: 1 addition & 0 deletions tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def test_sidify(self):
self.assertEqual(f(2), 4)
self.assertEqual(sidify(f)(2), 2)

# test decoration
@sidify
def f(x):
return x**2
Expand Down

0 comments on commit 425f7f5

Please sign in to comment.