Skip to content

Commit

Permalink
0.4.2: make flatten resilient to 'generator already executing' while …
Browse files Browse the repository at this point in the history
…remaining resilient to 'max recur depth exceeded'
  • Loading branch information
ebonnal committed Oct 17, 2023
1 parent 42a30d6 commit 744dfb4
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
26 changes: 16 additions & 10 deletions kioss/_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,23 @@ def __next__(self):
except KeyError: # KeyError: 'pop from an empty set'
raise StopIteration()

def f():
try:
elem = next(next_iterator_elem)
except StopIteration:
def f(backoff: int = 0.005):
while True:
try:
self.iterators_pool.remove(next_iterator_elem)
except KeyError:
pass
return ThreadedFlatteningIteratorWrapper._SKIP

return elem
elem = next(next_iterator_elem)
except StopIteration:
try:
self.iterators_pool.remove(next_iterator_elem)
except KeyError:
pass
return ThreadedFlatteningIteratorWrapper._SKIP
except ValueError as e: # generator anlready executing
if not "generator already executing" in str(e):
raise e
time.sleep(backoff)
backoff *= 2
continue
return elem

return f

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='kioss',
version='0.4.0',
version='0.4.2',
packages=['kioss'],
url='http://github.com/bonnal-enzo/kioss',
license='Apache 2.',
Expand Down
10 changes: 10 additions & 0 deletions tests/test_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,3 +483,13 @@ def test_planning_and_execution_decoupling(self):
self.assertListEqual(a.collect(), list(range(N)))
# test b not affected by a execution
self.assertListEqual(b.collect(), [list(range(N))])

def test_generator_already_generating(self):
self.assertEqual(
Counter(
Pipe(
lambda: [(ten_ms_identity(x) for x in range(N)) for _ in range(3)]
).flatten(n_threads=2)
),
Counter(list(range(N)) + list(range(N)) + list(range(N))),
)

0 comments on commit 744dfb4

Please sign in to comment.