Skip to content

Commit

Permalink
0.0.2: streamable
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Dec 25, 2023
1 parent 860192b commit 1fedf5e
Show file tree
Hide file tree
Showing 19 changed files with 60 additions and 46 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- name: mypy
run: |
python -m pip install -r requirements.txt
python -m mypy iterable tests
python -m mypy streamable tests
lint:
runs-on: ubuntu-latest
Expand All @@ -45,7 +45,7 @@ jobs:

- name: checks
run: |
# python -m autoflake --in-place --remove-all-unused-imports --remove-unused-variables --ignore-init-module -r iterable tests && python -m isort iterable tests && python -m black iterable tests
# python -m autoflake --in-place --remove-all-unused-imports --remove-unused-variables --ignore-init-module -r streamable tests && python -m isort streamable tests && python -m black streamable tests
python -m pip install -r requirements.txt
python -m autoflake --in-place --remove-all-unused-imports --remove-unused-variables --ignore-init-module --check -r iterable tests
python -m black --check iterable/* tests/*
python -m autoflake --in-place --remove-all-unused-imports --remove-unused-variables --ignore-init-module --check -r streamable tests
python -m black --check streamable/* tests/*
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
# `iterable`
# `streamable`
**Keep Iterables Simple and Stupid**

[![Actions Status](https://github.com/bonnal-enzo/iterable/workflows/test/badge.svg)](https://github.com/bonnal-enzo/iterable/actions) [![Actions Status](https://github.com/bonnal-enzo/iterable/workflows/PyPI/badge.svg)](https://github.com/bonnal-enzo/iterable/actions)
[![Actions Status](https://github.com/bonnal-enzo/streamable/workflows/test/badge.svg)](https://github.com/bonnal-enzo/streamable/actions) [![Actions Status](https://github.com/bonnal-enzo/streamable/workflows/PyPI/badge.svg)](https://github.com/bonnal-enzo/streamable/actions)

Ease the manipulation of `Iterable`s.

## 1. install

```bash
pip install iterable
pip install streamable
```

## 2. import
```python
from iterable import Stream
from streamable import Stream
```

## 3. init
Expand Down Expand Up @@ -126,7 +126,7 @@ Defines the ungrouping of parent elements assuming that the parent elements are
integers: Stream[int] = integer_batches.flatten()
```

It also has an optional `n_threads` parameter to flatten concurrently several parent iterables.
It also has an optional `n_threads` parameter to flatten concurrently several parent streamables.

## `.chain`

Expand Down Expand Up @@ -222,15 +222,15 @@ These scripts are typically composed of:

- The logic to **catch** exceptions of a given type. Also, we typically want to catch errors and seamlessly proceed with the integration until completion. For instance, if you have 1000 records to integrate and encounter an exception at the 236th record due to a malformed record, it is often more favorable to successfully integrate 999 records and raise after the interation has completed compared to skipping 763 valid records prematurely.

The ambition of `iterable` is to help us write these type of scripts in a **DRY** (Don't Repeat Yourself), **flexible**, **robust** and **readable** way.
The ambition of `streamable` is to help us write these type of scripts in a **DRY** (Don't Repeat Yourself), **flexible**, **robust** and **readable** way.

Let's delve into an example to gain a better understanding of what a job using `iterable` entails!
Let's delve into an example to gain a better understanding of what a job using `streamable` entails!

## 1. imports
```python
import datetime
import requests
from iterable import Stream
from streamable import Stream
from google.cloud import bigquery
from typing import Iterable, Iterator, Dict, Any
```
Expand Down
2 changes: 1 addition & 1 deletion examples/xmas_comments_translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import requests
from google.cloud import bigquery, translate # type: ignore
from iterable import Stream
from streamable import Stream

(
# Read the comments made on your platform from your BigQuery datawarehouse
Expand Down
1 change: 0 additions & 1 deletion iterable/__init__.py

This file was deleted.

8 changes: 4 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

setup(
name='streamable',
version='0.0.1',
version='0.0.2',
packages=find_packages(),
package_data={"iterable": ["py.typed"]},
url='http://github.com/bonnal-enzo/iterable',
package_data={"streamable": ["py.typed"]},
url='http://github.com/bonnal-enzo/streamable',
license='Apache 2.',
author='bonnal-enzo',
author_email='bonnal.enzo.dev@gmail.com',
description='Ease the manipulation of iterables.'
description='Ease the manipulation of streamables.'
)
1 change: 1 addition & 0 deletions streamable/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from streamable._stream import Stream
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from queue import Queue
from typing import Callable, Iterable, Iterator, Optional, Set, TypeVar, Union

from iterable import _util
from iterable._execution._core import IteratorWrapper
from streamable import _util
from streamable._execution._core import IteratorWrapper

T = TypeVar("T")
R = TypeVar("R")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
T = TypeVar("T")
R = TypeVar("R")

from iterable import _util
from streamable import _util


class IteratorWrapper(Iterator[T], ABC):
Expand Down
8 changes: 4 additions & 4 deletions iterable/_stream.py → streamable/_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
overload,
)

from iterable import _util
from streamable import _util

if TYPE_CHECKING:
from iterable._visit._base import Visitor
from streamable._visit._base import Visitor

R = TypeVar("R")
T = TypeVar("T")
Expand All @@ -44,15 +44,15 @@ def __init__(self, source: Callable[[], Iterable[T]]) -> None:
self.source = source

def __iter__(self) -> Iterator[T]:
from iterable._visit import _iter
from streamable._visit import _iter

return self._accept(_iter.IteratorProducingVisitor[T]())

def __add__(self, other: "Stream[T]") -> "Stream[T]":
return self.chain(other)

def explain(self, colored: bool = False) -> str:
from iterable._visit import _explanation
from streamable._visit import _explanation

return self._accept(_explanation.ExplainingVisitor(colored))

Expand Down
2 changes: 1 addition & 1 deletion iterable/_util.py → streamable/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from typing_extensions import TypeGuard

LOGGER = logging.getLogger("iterable")
LOGGER = logging.getLogger("streamable")
LOGGER.propagate = False
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion iterable/_visit/_base.py → streamable/_visit/_base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import Generic, TypeVar

from iterable import _stream
from streamable import _stream

V = TypeVar("V")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any

from iterable import _stream, _util
from iterable._visit._base import Visitor
from streamable import _stream, _util
from streamable._visit._base import Visitor


class ExplainingVisitor(Visitor[str]):
Expand Down
14 changes: 9 additions & 5 deletions iterable/_visit/_iter.py → streamable/_visit/_iter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import itertools
from typing import Iterable, Iterator, List, TypeVar, cast

from iterable import _stream, _util
from iterable._execution import _concurrency, _core
from iterable._visit._base import Visitor
from streamable import _stream, _util
from streamable._execution import _concurrency, _core
from streamable._visit._base import Visitor

T = TypeVar("T")
U = TypeVar("U")
Expand All @@ -16,7 +16,9 @@ def visit_source_stream(self, stream: _stream.Stream[T]) -> Iterator[T]:
return iter(iterable)

def visit_map_stream(self, stream: _stream.MapStream[U, T]) -> Iterator[T]:
func = _util.map_exception(stream.func, source=StopIteration, target=RuntimeError)
func = _util.map_exception(
stream.func, source=StopIteration, target=RuntimeError
)
it: Iterator[U] = stream.upstream._accept(IteratorProducingVisitor[U]())
if stream.n_threads == 1:
return map(func, it)
Expand All @@ -29,7 +31,9 @@ def visit_do_stream(self, stream: _stream.DoStream[T]) -> Iterator[T]:
func = _util.sidify(
_util.map_exception(stream.func, source=StopIteration, target=RuntimeError)
)
return self.visit_map_stream(_stream.MapStream(stream.upstream, func, stream.n_threads))
return self.visit_map_stream(
_stream.MapStream(stream.upstream, func, stream.n_threads)
)

def visit_flatten_stream(self, stream: _stream.FlattenStream[T]) -> Iterator[T]:
it = stream.upstream._accept(IteratorProducingVisitor[Iterable]())
Expand Down
File renamed without changes.
28 changes: 19 additions & 9 deletions tests/test_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from parameterized import parameterized # type: ignore

from iterable import Stream, _util
from streamable import Stream, _util

TEN_MS = 0.01
DELTA = 0.35
Expand Down Expand Up @@ -38,7 +38,7 @@ def raise_stopiteration() -> bool:

class TestStream(unittest.TestCase):
def test_init(self) -> None:
# from iterable
# from streamable
self.assertListEqual(list(Stream(range(8).__iter__)), list(range(8)))
# from iterator
self.assertListEqual(list(Stream(range(8).__iter__)), list(range(8)))
Expand Down Expand Up @@ -95,7 +95,9 @@ def test_flatten(self, n_threads: int):

# test potential recursion issue with chained empty iters
list(
Stream([iter([]) for _ in range(2000)].__iter__).flatten(n_threads=n_threads)
Stream([iter([]) for _ in range(2000)].__iter__).flatten(
n_threads=n_threads
)
)

# test concurrency
Expand Down Expand Up @@ -242,7 +244,9 @@ def test_map(self, n_threads: int):
def test_map_threading_bench(self) -> None:
# non-threaded vs threaded execution time
stream = Stream(range(N).__iter__).map(ten_ms_identity)
self.assertAlmostEqual(timestream(stream), TEN_MS * N, delta=DELTA * (TEN_MS * N))
self.assertAlmostEqual(
timestream(stream), TEN_MS * N, delta=DELTA * (TEN_MS * N)
)
n_threads = 2
stream = Stream(range(N).__iter__).map(ten_ms_identity, n_threads=n_threads)
self.assertAlmostEqual(
Expand Down Expand Up @@ -291,7 +295,9 @@ def test_filter(self) -> None:
[1, 3, 5, 7],
)

self.assertListEqual(list(Stream(range(8).__iter__).filter(lambda _: False)), [])
self.assertListEqual(
list(Stream(range(8).__iter__).filter(lambda _: False)), []
)

self.assertEqual(
list(
Expand Down Expand Up @@ -356,7 +362,9 @@ def store_errors(error):
def test_slow(self, n_threads: int):
freq = 64
stream = (
Stream(range(N).__iter__).map(ten_ms_identity, n_threads=n_threads).slow(freq)
Stream(range(N).__iter__)
.map(ten_ms_identity, n_threads=n_threads)
.slow(freq)
)
self.assertAlmostEqual(
timestream(stream),
Expand Down Expand Up @@ -454,7 +462,9 @@ def store_errors(error):
)

def test_iterate(self) -> None:
self.assertListEqual(Stream("123".__iter__).map(int).iterate(collect_limit=2), [1, 2])
self.assertListEqual(
Stream("123".__iter__).map(int).iterate(collect_limit=2), [1, 2]
)
self.assertListEqual(Stream("123".__iter__).map(int).iterate(), [])

# errors
Expand Down Expand Up @@ -585,9 +595,9 @@ def test_accept_typing(self) -> None:
Stream(lambda: range(10)).batch().map(lambda b: list(map(str, b))).flatten()
)
it: Iterator[str] = iter(p)
from iterable._visit._iter import IteratorProducingVisitor
from streamable._visit._iter import IteratorProducingVisitor

p._accept(IteratorProducingVisitor[str]())
from iterable._visit._explanation import ExplainingVisitor
from streamable._visit._explanation import ExplainingVisitor

p._accept(ExplainingVisitor())
4 changes: 2 additions & 2 deletions tests/test_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from parameterized import parameterized # type: ignore

from iterable import Stream
from streamable import Stream

T = TypeVar("T")

Expand Down Expand Up @@ -102,7 +102,7 @@ def test_iter(self) -> None:
)

def test_add(self) -> None:
from iterable._stream import ChainStream
from streamable._stream import ChainStream

stream = Stream(src)
self.assertIsInstance(
Expand Down
2 changes: 1 addition & 1 deletion tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import unittest

from iterable._util import sidify
from streamable._util import sidify


class TestUtil(unittest.TestCase):
Expand Down

0 comments on commit 1fedf5e

Please sign in to comment.