Skip to content

Commit

Permalink
period_to_max_yields
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Oct 7, 2024
1 parent 54f7107 commit 3d8a043
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 51 deletions.
8 changes: 5 additions & 3 deletions streamable/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,11 @@ def throttle(
):
iterator = ThrottlingPerPeriodIterator(
iterator,
per_second,
per_minute,
per_hour,
{
1: per_second,
60: per_minute,
3660: per_hour,
},
)
if interval > datetime.timedelta(0):
iterator = ThrottlingIntervalIterator(iterator, interval.total_seconds())
Expand Down
69 changes: 21 additions & 48 deletions streamable/iters.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Coroutine,
DefaultDict,
Deque,
Dict,
Generic,
Iterable,
Iterator,
Expand Down Expand Up @@ -303,23 +304,15 @@ class ThrottlingPerPeriodIterator(Iterator[T]):
def __init__(
self,
iterator: Iterator[T],
per_second: int,
per_minute: int,
per_hour: int,
period_to_max_yields: Dict[int, int],
) -> None:
self.iterator = iterator

self.per_second = per_second
self.per_minute = per_minute
self.per_hour = per_hour
self.period_to_max_yields = period_to_max_yields

self.second: int = -1
self.minute: int = -1
self.hour: int = -1
self.current_period: List[int] = [-1] * len(period_to_max_yields)

self.yields_in_second = 0
self.yields_in_minute = 0
self.yields_in_hour = 0
self.yields_in_period = [0] * len(period_to_max_yields)

self.offset: Optional[float] = None

Expand All @@ -329,48 +322,28 @@ def __next__(self) -> T:
self.offset = current_time
current_time -= self.offset

current_second = int(current_time)
if self.second != current_second:
self.second = current_second
self.yields_in_second = 0

current_minute = current_second // 60
if self.minute != current_minute:
self.minute = current_minute
self.yields_in_minute = 0

current_hour = current_minute // 60
if self.hour != current_hour:
self.hour = current_hour
self.yields_in_hour = 0

to_sleep = 0.0

if self.yields_in_second >= self.per_second:
# sleep until next second
to_sleep = ceil(current_time) - current_time

if self.yields_in_minute >= self.per_minute:
# sleep until next minute
to_sleep = max(
to_sleep,
(ceil(current_time / 60) - current_time / 60) * 60,
)

if self.yields_in_hour >= self.per_hour: # pragma: no cover
# sleep until next hour
to_sleep = max(
to_sleep,
(ceil(current_time / 3600) - current_time / 3600) * 3600,
)
for index, (period, max_yields_per_period) in enumerate(
self.period_to_max_yields.items()
):
current_period = int(current_time) // period
if self.current_period[index] != current_period:
self.current_period[index] = current_period
self.yields_in_period[index] = 0

if self.yields_in_period[index] >= max_yields_per_period:
# sleep until next period
to_sleep = max(
to_sleep,
(ceil(current_time / period) - current_time / period) * period,
)

if to_sleep:
time.sleep(to_sleep)
return next(self)

self.yields_in_second += 1
self.yields_in_minute += 1
self.yields_in_hour += 1
for index in range(len(self.yields_in_period)):
self.yields_in_period[index] += 1
return next(self.iterator)


Expand Down

0 comments on commit 3d8a043

Please sign in to comment.