Skip to content

Commit

Permalink
batch: rename period -> seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Dec 25, 2023
1 parent bb76cf8 commit 57c0c2a
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 15 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ pair_integers: Stream[int] = integers.filter(lambda x: x % 2 == 0)
Defines the grouping of parent elements into batches.

```python
integer_batches: Stream[List[int]] = integers.batch(size=100, period=60)
integer_batches: Stream[List[int]] = integers.batch(size=100, seconds=60)
```

In this example a batch will be a list of 100 elements.

It may contain less elements in the following cases:
- the stream is exhausted
- an exception occurred
- more than 60 seconds (the `period` parameter) has elapsed since the last batch has been yielded.
- more than 60 `seconds` have elapsed since the last batch has been yielded.

## `.flatten`

Expand Down
2 changes: 1 addition & 1 deletion examples/xmas_comments_translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
# by batch of maximum size 100 and with at least 1 batch every 10 seconds
# and at a maximum rate of 5 batches per second.
# Also raise if the status code is not 200.
.batch(size=100, period=10)
.batch(size=100, seconds=10)
.slow(freq=5)
.map(
lambda comment: requests.post(
Expand Down
8 changes: 4 additions & 4 deletions streamable/_execution/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ class BatchingIteratorWrapper(IteratorWrapper[List[T]]):
"""
Batch an input iterator and yields its elements packed in a list when one of the following is True:
- len(batch) == size
- the time elapsed between the first next() call on input iterator and last received elements is grater than period
- the time elapsed between the first next() call on input iterator and last received elements is grater than `seconds`
- the next element reception thrown an exception (it is stored in self.to_be_raised and will be raised during the next call to self.__next__)
"""

def __init__(self, iterator: Iterator[T], size: int, period: float) -> None:
def __init__(self, iterator: Iterator[T], size: int, seconds: float) -> None:
super().__init__(iterator)
self.size = size
self.period = period
self.seconds = seconds
self._to_be_raised: Optional[Exception] = None
self._is_exhausted = False

Expand All @@ -127,7 +127,7 @@ def __next__(self) -> List[T]:
batch = None
try:
batch = [next(self.iterator)]
while len(batch) < self.size and (time.time() - start_time) < self.period:
while len(batch) < self.size and (time.time() - start_time) < self.seconds:
batch.append(next(self.iterator))
return batch
except StopIteration:
Expand Down
10 changes: 5 additions & 5 deletions streamable/_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,18 @@ def filter(self, predicate: Callable[[T], bool]) -> "Stream[T]":
"""
return FilterStream(self, predicate)

def batch(self, size: int = 100, period: float = float("inf")) -> "Stream[List[T]]":
def batch(self, size: int = 100, seconds: float = float("inf")) -> "Stream[List[T]]":
"""
Batch elements of the Stream into lists of a specified size or within a specified time window.
Args:
size (int, optional): The maximum number of elements per batch (default is 100).
period (float, optional): The maximum number of seconds to wait before yielding a batch (default is infinity).
seconds (float, optional): The maximum number of seconds to wait before yielding a batch (default is infinity).
Returns:
Stream[List[T]]: A new Stream instance with lists containing batches of elements.
"""
return BatchStream(self, size, period)
return BatchStream(self, size, seconds)

def slow(self, freq: float) -> "Stream[T]":
"""
Expand Down Expand Up @@ -384,10 +384,10 @@ def _accept(self, visitor: "Visitor[V]") -> V:


class BatchStream(Stream[List[Y]]):
def __init__(self, upstream: Stream[Y], size: int, period: float):
def __init__(self, upstream: Stream[Y], size: int, seconds: float):
self.upstream: Stream[Y] = upstream
self.size = size
self.period = period
self.seconds = seconds

def _accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_batch_stream(self)
Expand Down
2 changes: 1 addition & 1 deletion streamable/_visit/_explanation.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def visit_filter_stream(self, stream: _stream.FilterStream) -> Any:

def visit_batch_stream(self, stream: _stream.BatchStream) -> Any:
name = "Batch"
descr = f"elements by groups of {stream.size} element{'s' if stream.size > 1 else ''}, or over a period of {stream.period} second{'s' if stream.period > 1 else ''}"
descr = f"elements by groups of {stream.size} element{'s' if stream.size > 1 else ''}, or over a seconds of {stream.seconds} second{'s' if stream.seconds > 1 else ''}"
return self.visit_any_stream(stream, name, descr)

def visit_slow_stream(self, stream: _stream.SlowStream) -> Any:
Expand Down
2 changes: 1 addition & 1 deletion streamable/_visit/_iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def visit_filter_stream(self, stream: _stream.FilterStream[T]) -> Iterator[T]:
def visit_batch_stream(self, stream: _stream.BatchStream[U]) -> Iterator[T]:
it: Iterator[U] = stream.upstream._accept(IteratorProducingVisitor[U]())
return cast(
Iterator[T], _core.BatchingIteratorWrapper(it, stream.size, stream.period)
Iterator[T], _core.BatchingIteratorWrapper(it, stream.size, stream.seconds)
)

def visit_slow_stream(self, stream: _stream.SlowStream[T]) -> Iterator[T]:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def test_batch(self) -> None:
[list(range(8))],
)
self.assertEqual(
len(list(Stream(range(8).__iter__).slow(10).batch(period=0.09))),
len(list(Stream(range(8).__iter__).slow(10).batch(seconds=0.09))),
7,
)
# assert batch gracefully yields if next elem throw exception
Expand Down

0 comments on commit 57c0c2a

Please sign in to comment.