From bc71172c4d3835f09c0b3092133f3f5ab11c8940 Mon Sep 17 00:00:00 2001 From: bonnal-enzo Date: Mon, 24 Jul 2023 03:40:34 +0200 Subject: [PATCH] wip --- .gitignore | 3 ++- README.md | 19 +++++++++++++++++-- kissio/__init__.py | 2 +- kissio/pipe.py | 30 +++++++++++++++++++++++------- requirements.txt | 9 ++++++++- test/test.py | 28 ++++++++++++++++++++-------- 6 files changed, 71 insertions(+), 20 deletions(-) diff --git a/.gitignore b/.gitignore index c835c85..0d14867 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .idea __pycache__ -.venv \ No newline at end of file +.venv +_* \ No newline at end of file diff --git a/README.md b/README.md index 12b1e91..8c29ce7 100644 --- a/README.md +++ b/README.md @@ -9,10 +9,25 @@ ## Usage ```python ->>> from kissio import Pipe +import json +from kissio import Pipe, CatchedPipeError + +if ( + errors := Pipe(range(-10, 10)) + .map(str) + .map(lambda elem: elem[:1]) # get the first character of the int + .map(int) # cast to int + .catch() # a CatchedPipeError element will be yielded by the upstream iterator in case an exception would be raised + .log() # add verbosity to the iteration process + .filter(lambda elem: isinstance(elem, CatchedPipeError)) # only keep the errors + .map(str) # get the str of the error + .list(limit=3) # exhaust the entire interator but only store the first 3 errors +): + raise RuntimeError(errors) + ``` -## Dev +## Setup ```bash python -m venv .venv diff --git a/kissio/__init__.py b/kissio/__init__.py index d4c5071..a022c44 100644 --- a/kissio/__init__.py +++ b/kissio/__init__.py @@ -1 +1 @@ -from kissio.pipe import Pipe +from kissio.pipe import CatchedPipeError, Pipe diff --git a/kissio/pipe.py b/kissio/pipe.py index 00f117c..392309f 100644 --- a/kissio/pipe.py +++ b/kissio/pipe.py @@ -1,12 +1,12 @@ -from concurrent.futures import ThreadPoolExecutor -from functools import reduce import itertools import logging -from datetime import datetime -from queue import Empty, Queue import time import timeit -from typing import List, Callable, Iterable, Iterator, TypeVar +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime +from functools import reduce +from queue import Empty, Queue +from typing import Callable, Iterable, Iterator, List, TypeVar T = TypeVar("T") R = TypeVar("R") @@ -42,6 +42,9 @@ def iterable(): return Pipe(iter(iterable())) + def explode(self) -> "Pipe": + return _ExplodingPipe(self) + def filter(self, predicate: Callable[[T], bool]) -> "Pipe[R]": return Pipe(filter(predicate, self)) @@ -65,8 +68,8 @@ def log(self) -> "Pipe[T]": def reduce(self, f: Callable[[R, T], R], initial: R) -> R: return reduce(f, self, initial) - def exhaust_into_limited_list(self, limit: int = float("inf")) -> List[T]: - return [elem for i, elem in enumerate(self.iterator) if i < limit] + def list(self, limit: int = float("inf")) -> List[T]: + return [elem for i, elem in enumerate(self) if i < limit] def timeit(self) -> float: def iterate(): @@ -76,6 +79,19 @@ def iterate(): return timeit.timeit(iterate, number=1) +class _ExplodingPipe(Pipe[T]): + def __init__(self, iterator: Iterator[Iterable[T]]) -> None: + super().__init__(iterator) + self.current_iterator_elem = iter(super().__next__()) + + def __next__(self) -> T: + try: + return next(self.current_iterator_elem) + except StopIteration: + self.current_iterator_elem = iter(super().__next__()) + return next(self) + + class CatchedPipeError: def __init__(self, e: Exception) -> None: self.error_type = type(e) diff --git a/requirements.txt b/requirements.txt index a042c1a..d6a261a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,9 @@ -parameterized==0.9.0 black==23.7.0 +google-cloud-bigquery==2.34.3 +google-cloud-bigquery-datatransfer==3.7.3 +google-cloud-bigquery-storage==2.16.2 +google-cloud-pubsub==2.13.10 +google-cloud-storage==1.44.0 +protobuf==3.19.6 +requests==2.28.1 +parameterized==0.9.0 diff --git a/test/test.py b/test/test.py index 0ceb909..851f1f2 100644 --- a/test/test.py +++ b/test/test.py @@ -60,6 +60,22 @@ def test_map(self): delta=0.3 * (TEN_MS * 8) / num_threads, ) + def test_explode(self): + self.assertListEqual( + list(Pipe(["Hello World", "Happy to be here :)"]).map(str.split).explode()), + ["Hello", "World", "Happy", "to", "be", "here", ":)"], + ) + self.assertEqual( + sum( + Pipe([["1 2 3", "4 5 6"], ["7", "8 9 10"]]) + .explode() + .map(str.split) + .explode() + .map(int) + ), + 55, + ) + def test_filter(self): self.assertListEqual(list(Pipe(range(8)).filter(lambda x: x % 2)), [1, 3, 5, 7]) @@ -95,16 +111,12 @@ def test_head(self): delta=0.3 * TEN_MS * 2, ) - def test_exhaust_into_limited_list(self): - self.assertListEqual( - Pipe(range(8)).exhaust_into_limited_list(limit=6), list(range(6)) - ) - self.assertListEqual(Pipe(range(8)).exhaust_into_limited_list(), list(range(8))) + def test_list(self): + self.assertListEqual(Pipe(range(8)).list(limit=6), list(range(6))) + self.assertListEqual(Pipe(range(8)).list(), list(range(8))) self.assertAlmostEqual( timeit.timeit( - lambda: Pipe(range(8)) - .map(ten_millis_identity) - .exhaust_into_limited_list(0), + lambda: Pipe(range(8)).map(ten_millis_identity).list(0), number=1, ), TEN_MS * 8,