Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Jul 24, 2023
1 parent bed1946 commit bc71172
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 20 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea
__pycache__
.venv
.venv
_*
19 changes: 17 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,25 @@
## Usage

```python
>>> from kissio import Pipe
import json
from kissio import Pipe, CatchedPipeError

if (
errors := Pipe(range(-10, 10))
.map(str)
.map(lambda elem: elem[:1]) # get the first character of the int
.map(int) # cast to int
.catch() # a CatchedPipeError element will be yielded by the upstream iterator in case an exception would be raised
.log() # add verbosity to the iteration process
.filter(lambda elem: isinstance(elem, CatchedPipeError)) # only keep the errors
.map(str) # get the str of the error
.list(limit=3) # exhaust the entire interator but only store the first 3 errors
):
raise RuntimeError(errors)

```

## Dev
## Setup

```bash
python -m venv .venv
Expand Down
2 changes: 1 addition & 1 deletion kissio/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from kissio.pipe import Pipe
from kissio.pipe import CatchedPipeError, Pipe
30 changes: 23 additions & 7 deletions kissio/pipe.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from concurrent.futures import ThreadPoolExecutor
from functools import reduce
import itertools
import logging
from datetime import datetime
from queue import Empty, Queue
import time
import timeit
from typing import List, Callable, Iterable, Iterator, TypeVar
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from functools import reduce
from queue import Empty, Queue
from typing import Callable, Iterable, Iterator, List, TypeVar

T = TypeVar("T")
R = TypeVar("R")
Expand Down Expand Up @@ -42,6 +42,9 @@ def iterable():

return Pipe(iter(iterable()))

def explode(self) -> "Pipe":
return _ExplodingPipe(self)

def filter(self, predicate: Callable[[T], bool]) -> "Pipe[R]":
return Pipe(filter(predicate, self))

Expand All @@ -65,8 +68,8 @@ def log(self) -> "Pipe[T]":
def reduce(self, f: Callable[[R, T], R], initial: R) -> R:
return reduce(f, self, initial)

def exhaust_into_limited_list(self, limit: int = float("inf")) -> List[T]:
return [elem for i, elem in enumerate(self.iterator) if i < limit]
def list(self, limit: int = float("inf")) -> List[T]:
return [elem for i, elem in enumerate(self) if i < limit]

def timeit(self) -> float:
def iterate():
Expand All @@ -76,6 +79,19 @@ def iterate():
return timeit.timeit(iterate, number=1)


class _ExplodingPipe(Pipe[T]):
def __init__(self, iterator: Iterator[Iterable[T]]) -> None:
super().__init__(iterator)
self.current_iterator_elem = iter(super().__next__())

def __next__(self) -> T:
try:
return next(self.current_iterator_elem)
except StopIteration:
self.current_iterator_elem = iter(super().__next__())
return next(self)


class CatchedPipeError:
def __init__(self, e: Exception) -> None:
self.error_type = type(e)
Expand Down
9 changes: 8 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
parameterized==0.9.0
black==23.7.0
google-cloud-bigquery==2.34.3
google-cloud-bigquery-datatransfer==3.7.3
google-cloud-bigquery-storage==2.16.2
google-cloud-pubsub==2.13.10
google-cloud-storage==1.44.0
protobuf==3.19.6
requests==2.28.1
parameterized==0.9.0
28 changes: 20 additions & 8 deletions test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ def test_map(self):
delta=0.3 * (TEN_MS * 8) / num_threads,
)

def test_explode(self):
self.assertListEqual(
list(Pipe(["Hello World", "Happy to be here :)"]).map(str.split).explode()),
["Hello", "World", "Happy", "to", "be", "here", ":)"],
)
self.assertEqual(
sum(
Pipe([["1 2 3", "4 5 6"], ["7", "8 9 10"]])
.explode()
.map(str.split)
.explode()
.map(int)
),
55,
)

def test_filter(self):
self.assertListEqual(list(Pipe(range(8)).filter(lambda x: x % 2)), [1, 3, 5, 7])

Expand Down Expand Up @@ -95,16 +111,12 @@ def test_head(self):
delta=0.3 * TEN_MS * 2,
)

def test_exhaust_into_limited_list(self):
self.assertListEqual(
Pipe(range(8)).exhaust_into_limited_list(limit=6), list(range(6))
)
self.assertListEqual(Pipe(range(8)).exhaust_into_limited_list(), list(range(8)))
def test_list(self):
self.assertListEqual(Pipe(range(8)).list(limit=6), list(range(6)))
self.assertListEqual(Pipe(range(8)).list(), list(range(8)))
self.assertAlmostEqual(
timeit.timeit(
lambda: Pipe(range(8))
.map(ten_millis_identity)
.exhaust_into_limited_list(0),
lambda: Pipe(range(8)).map(ten_millis_identity).list(0),
number=1,
),
TEN_MS * 8,
Expand Down

0 comments on commit bc71172

Please sign in to comment.