Skip to content

Commit

Permalink
iterables 0.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Dec 25, 2023
1 parent 0beae01 commit 516e3b7
Show file tree
Hide file tree
Showing 23 changed files with 558 additions and 557 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- name: mypy
run: |
python -m pip install -r requirements.txt
python -m mypy kioss tests
python -m mypy iterable tests
lint:
runs-on: ubuntu-latest
Expand All @@ -45,7 +45,7 @@ jobs:

- name: checks
run: |
# python -m autoflake --in-place --remove-all-unused-imports --remove-unused-variables --ignore-init-module -r kioss tests && python -m isort kioss tests && python -m black kioss tests
# python -m autoflake --in-place --remove-all-unused-imports --remove-unused-variables --ignore-init-module -r iterable tests && python -m isort iterable tests && python -m black iterable tests
python -m pip install -r requirements.txt
python -m autoflake --in-place --remove-all-unused-imports --remove-unused-variables --ignore-init-module --check -r kioss tests
python -m black --check kioss/* tests/*
python -m autoflake --in-place --remove-all-unused-imports --remove-unused-variables --ignore-init-module --check -r iterable tests
python -m black --check iterable/* tests/*
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ __pycache__
tmp*
build
dist
.mypy_cache
98 changes: 49 additions & 49 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
# `kioss`
**Keep I/O Simple and Stupid**
# `iterable`
**Keep Iterables Simple and Stupid**

[![Actions Status](https://github.com/bonnal-enzo/kioss/workflows/test/badge.svg)](https://github.com/bonnal-enzo/kioss/actions) [![Actions Status](https://github.com/bonnal-enzo/kioss/workflows/PyPI/badge.svg)](https://github.com/bonnal-enzo/kioss/actions)
[![Actions Status](https://github.com/bonnal-enzo/iterable/workflows/test/badge.svg)](https://github.com/bonnal-enzo/iterable/actions) [![Actions Status](https://github.com/bonnal-enzo/iterable/workflows/PyPI/badge.svg)](https://github.com/bonnal-enzo/iterable/actions)

Ease the development of ETL/EL/ReverseETL scripts.
Ease the manipulation of `Iterable`s.

## 1. install

```bash
pip install kioss
pip install iterable
```

## 2. import
```python
from kioss import Pipe
from iterable import Stream
```

## 3. init

```python
integers: Pipe[int] = Pipe(source=lambda: range(10))
integers: Stream[int] = Stream(source=lambda: range(10))
```

Instantiate a `Pipe` by providing a function that returns an `Iterable` (the data source).
Instantiate a `Stream` by providing a function that returns an `Iterable` (the data source).

## 4. declare operations

A `Pipe` is ***immutable***, meaning that applying an operation returns a new child pipe while the parent pipe remains unchanged.
A `Stream` is ***immutable***, meaning that applying an operation returns a new child stream while the parent stream remains unchanged.

There are 2 kinds of operations:
- **transformations**: to act on the pipe's elements
- **controls**: to configure the behaviors of the iteration over the pipe
- **transformations**: to act on the stream's elements
- **controls**: to configure the behaviors of the iteration over the stream


```python
odd_squares: Pipe[int] = (
odd_squares: Stream[int] = (
integers
.map(lambda x: x ** 2, n_threads=2) # transformation
.filter(lambda x: x % 2 == 1) # transformation
Expand All @@ -45,7 +45,7 @@ All operations are described in the ***Operations guide*** section.

## 5. iterate

Once your pipe's declaration is done you can iterate over it. Our `Pipe[int]` being an `Iterable[int]`, you are free to iterate over it the way you want, e.g.:
Once your stream's declaration is done you can iterate over it. Our `Stream[int]` being an `Iterable[int]`, you are free to iterate over it the way you want, e.g.:
```python
set(rate_limited_odd_squares)
```
Expand All @@ -57,10 +57,10 @@ for i in rate_limited_odd_squares:
...
```

Alternatively, a pipe also exposes a convenient method `.run` to launch an iteration over itself until exhaustion. It catches exceptions occurring during iteration and optionnaly collects output elements into a list to return. At the end it raises if exceptions occurred.
Alternatively, a stream also exposes a convenient method `.iterate` to launch an iteration over itself until exhaustion. It catches exceptions occurring during iteration and optionnaly collects output elements into a list to return. At the end it raises if exceptions occurred.

```python
odd_squares: List[int] = rate_limited_odd_squares.run(collect_limit=1024)
odd_squares: List[int] = rate_limited_odd_squares.iterate(collect_limit=1024)

assert odd_squares == [1, 9, 25, 49, 81]
```
Expand All @@ -73,7 +73,7 @@ assert odd_squares == [1, 9, 25, 49, 81]

Let's keep the same example:
```python
integers = Pipe(lambda: range(10))
integers = Stream(lambda: range(10))
```

# Transformations
Expand All @@ -82,7 +82,7 @@ integers = Pipe(lambda: range(10))
## `.map`
Defines the application of a function on parent elements.
```python
integer_strings: Pipe[str] = integers.map(str)
integer_strings: Stream[str] = integers.map(str)
```

It has an optional `n_threads` parameter if you need to apply the function concurrently using multiple threads.
Expand All @@ -91,7 +91,7 @@ It has an optional `n_threads` parameter if you need to apply the function concu
Defines the application of a function on parent elements like `.map`, but the parent elements will be forwarded instead of the result of the function.

```python
printed_integers: Pipe[int] = integers.do(print)
printed_integers: Stream[int] = integers.do(print)
```

It also has an optional `n_threads` parameter.
Expand All @@ -100,21 +100,21 @@ It also has an optional `n_threads` parameter.
Defines the filtering of parent elements based on a predicate function.

```python
pair_integers: Pipe[int] = integers.filter(lambda x: x % 2 == 0)
pair_integers: Stream[int] = integers.filter(lambda x: x % 2 == 0)
```

## `.batch`

Defines the grouping of parent elements into batches.

```python
integer_batches: Pipe[List[int]] = integers.batch(size=100, period=60)
integer_batches: Stream[List[int]] = integers.batch(size=100, period=60)
```

In this example a batch will be a list of 100 elements.

It may contain less elements in the following cases:
- the pipe is exhausted
- the stream is exhausted
- an exception occurred
- more than 60 seconds (the `period` parameter) has elapsed since the last batch has been yielded.

Expand All @@ -123,21 +123,21 @@ It may contain less elements in the following cases:
Defines the ungrouping of parent elements assuming that the parent elements are `Iterable`s.

```python
integers: Pipe[int] = integer_batches.flatten()
integers: Stream[int] = integer_batches.flatten()
```

It also has an optional `n_threads` parameter to flatten concurrently several parent iterables.

## `.chain`

Defines the concatenation of the parent pipe with other pipes. The resulting pipe yields the elements of one pipe until it is exhausted and then moves to the next one. It starts with the pipe on which `.chain` is called.
Defines the concatenation of the parent stream with other streams. The resulting stream yields the elements of one stream until it is exhausted and then moves to the next one. It starts with the stream on which `.chain` is called.

```python
one_to_ten_integers: Pipe[int] = Pipe(lambda: range(1, 11))
eleven_to_twenty_integers: Pipe[int] = Pipe(lambda: range(11, 21))
twenty_one_to_thirty_integers: Pipe[int] = Pipe(lambda: range(21, 31))
one_to_ten_integers: Stream[int] = Stream(lambda: range(1, 11))
eleven_to_twenty_integers: Stream[int] = Stream(lambda: range(11, 21))
twenty_one_to_thirty_integers: Stream[int] = Stream(lambda: range(21, 31))

one_to_thirty_integers: Pipe[int] = one_to_ten_integers.chain(
one_to_thirty_integers: Stream[int] = one_to_ten_integers.chain(
eleven_to_twenty_integers,
twenty_one_to_thirty_integers,
)
Expand All @@ -151,20 +151,20 @@ one_to_thirty_integers: Pipe[int] = one_to_ten_integers.chain(
Defines a maximum rate at which parent elements will be yielded.

```python
slowed_integers: Pipe[int] = integers.slow(freq=2)
slowed_integers: Stream[int] = integers.slow(freq=2)
```

The rate is expressed in elements per second, here a maximum of 2 elements per second will be yielded when iterating on the pipe.
The rate is expressed in elements per second, here a maximum of 2 elements per second will be yielded when iterating on the stream.

## `.observe`

Defines that the iteration process will be logged.

```python
observed_slowed_integers: Pipe[int] = slowed_integers.observe(what="integers from 0 to 9")
observed_slowed_integers: Stream[int] = slowed_integers.observe(what="integers from 0 to 9")
```

When iterating over the pipe, you should get an output like:
When iterating over the stream, you should get an output like:

```
INFO - iteration over 'integers from 0 to 9' will be logged.
Expand All @@ -183,20 +183,20 @@ As you can notice the logs can never be overwhelming because they are produced l
Defines that the provided type of exception will be catched.

```python
inverse_floats: Pipe[float] = integers.map(lambda x: 1/x)
safe_inverse_floats: Pipe[float] = inverse_floats.catch(ZeroDivisionError)
inverse_floats: Stream[float] = integers.map(lambda x: 1/x)
safe_inverse_floats: Stream[float] = inverse_floats.catch(ZeroDivisionError)
```

It has an optional `when` parameter: a function that takes the parent element as input and decides whether or not to catch the exception.

---

# ***Typical use case for `kioss` in Data Engineering***
# ***Typical use case in Data Engineering***
![](./img/dataeng.gif)

As a data engineer, you often need to write python scripts to do **ETL** (*Extract* the data from a source API, *Transform* and *Load* it into the data warehouse) or **EL** (same but with minimal transformation) or **Reverse ETL** (read data from the data warehouse and post it into a destination API).

These scripts **do not manipulate huge volumes** of data because they are scheduled to run periodically (using orchestrators like *Airflow/DAGster/Prefect*) and only manipulates the data produced or updated during that period. At worst if you are *Amazon*-sized business you may need to process 10 millions payment transactions every 10 minutes.
These scripts **do not manipulate huge volumes** of data because they are scheduled to iterate periodically (using orchestrators like *Airflow/DAGster/Prefect*) and only manipulates the data produced or updated during that period. At worst if you are *Amazon*-sized business you may need to process 10 millions payment transactions every 10 minutes.

These scripts tend to be replaced in part by EL tools like *Airbyte*, but sometimes you still need **custom integration logic**.

Expand All @@ -222,15 +222,15 @@ These scripts are typically composed of:

- The logic to **catch** exceptions of a given type. Also, we typically want to catch errors and seamlessly proceed with the integration until completion. For instance, if you have 1000 records to integrate and encounter an exception at the 236th record due to a malformed record, it is often more favorable to successfully integrate 999 records and raise after the interation has completed compared to skipping 763 valid records prematurely.

The ambition of `kioss` is to help us write these type of scripts in a **DRY** (Don't Repeat Yourself), **flexible**, **robust** and **readable** way.
The ambition of `iterable` is to help us write these type of scripts in a **DRY** (Don't Repeat Yourself), **flexible**, **robust** and **readable** way.

Let's delve into an example to gain a better understanding of what a job using `kioss` entails!
Let's delve into an example to gain a better understanding of what a job using `iterable` entails!

## 1. imports
```python
import datetime
import requests
from kioss import Pipe
from iterable import Stream
from google.cloud import bigquery
from typing import Iterable, Iterator, Dict, Any
```
Expand Down Expand Up @@ -275,29 +275,29 @@ Also let's init a BQ client:
bq_client = bigquery.Client(project)
```

## 4. pipe
## 4. stream

Write your integration function.

Tip: Define your pipe between parentheses to be allowed to go to line between each operation.
Tip: Define your stream between parentheses to be allowed to go to line between each operation.

```python
def integrate_pokemon_cards_into_bigquery(
start_time: datetime.datetime,
end_time: datetime.datetime,
) -> None:
(
Pipe(lambda: PokemonCardSource(start_time, end_time))
# at this point we have a Pipe[List[Dict[str, Any]]]
Stream(lambda: PokemonCardSource(start_time, end_time))
# at this point we have a Stream[List[Dict[str, Any]]]

# Let's say pokemontcg.io rate limits us to 10 calls per second,
# let's keep a margin and slow our pipe down to 9.
# let's keep a margin and slow our stream down to 9.
.slow(freq=9)
.observe(what="pokemon cards page")

# let's flatten the card page into individual cards
.flatten()
# at this point we have a Pipe[Dict[str, Any]]
# at this point we have a Stream[Dict[str, Any]]

# let's structure our row
.map(lambda card:
Expand All @@ -311,7 +311,7 @@ def integrate_pokemon_cards_into_bigquery(

# Let's batch cards by 1000 for performant multi-rows insert.
.batch(size=1000)
# at this point we have a Pipe[List[Dict[str, Any]]]
# at this point we have a Stream[List[Dict[str, Any]]]
.observe(what="pokemon card batches")

# Let's post the batches into BQ concurrently using 2 threads.
Expand All @@ -322,17 +322,17 @@ def integrate_pokemon_cards_into_bigquery(
),
n_threads=2,
)
# at this point we have a Pipe[Sequence[Dict[str, Any]]]
# at this point we have a Stream[Sequence[Dict[str, Any]]]

# The insertion in bigquery returns a list of inserts results.
# Let's raise if the insertion got errors.
.flatten()
.observe(what="bigquery insert results")
# at this point we have a Pipe[Dict[str, Any]]
# at this point we have a Stream[Dict[str, Any]]
.do(raise_for_errors)

# iterate until no more card in the pipe and finally raises if errors occurred.
.run()
# iterate until no more card in the stream and finally raises if errors occurred.
.iterate()
)
```

Expand Down
6 changes: 3 additions & 3 deletions examples/xmas_comments_translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import requests
from google.cloud import bigquery, translate # type: ignore
from kioss import Pipe
from iterable import Stream

(
# Read the comments made on your platform from your BigQuery datawarehouse
Pipe(bigquery.Client().query("SELECT text FROM fact.comment").result)
Stream(bigquery.Client().query("SELECT text FROM fact.comment").result)
.map(itemgetter("text"))
.observe(what="comments")

Expand Down Expand Up @@ -40,5 +40,5 @@
.do(requests.Response.raise_for_status)
.map(requests.Response.text)
.observe(what="integration response's texts")
.run()
.iterate()
)
1 change: 1 addition & 0 deletions iterable/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from iterable._stream import Stream
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from queue import Queue
from typing import Callable, Iterable, Iterator, Optional, Set, TypeVar, Union

from kioss import _util
from kioss._execution._core import IteratorWrapper
from iterable import _util
from iterable._execution._core import IteratorWrapper

T = TypeVar("T")
R = TypeVar("R")
Expand Down
2 changes: 1 addition & 1 deletion kioss/_execution/_core.py → iterable/_execution/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
T = TypeVar("T")
R = TypeVar("R")

from kioss import _util
from iterable import _util


class IteratorWrapper(Iterator[T], ABC):
Expand Down
Loading

0 comments on commit 516e3b7

Please sign in to comment.