Skip to content

Commit

Permalink
AsyncConcurrentMappingIterable: avoid storing loop as attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Oct 2, 2024
1 parent e3772da commit 61894d2
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 22 deletions.
20 changes: 9 additions & 11 deletions streamable/iters.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,12 @@ def __init__(
self.buffer_size = buffer_size
self.ordered = ordered

@abstractmethod
def _context_manager(self) -> ContextManager: ...
def _context_manager(self) -> ContextManager:
@contextmanager
def dummy_context_manager_generator():
yield

return dummy_context_manager_generator()

@abstractmethod
def _launch_task(
Expand Down Expand Up @@ -454,12 +458,6 @@ def __init__(
) -> None:
super().__init__(iterator, buffer_size, ordered)
self.transformation = transformation
self._loop: asyncio.AbstractEventLoop

@contextmanager
def _context_manager(self):
self._loop = asyncio.new_event_loop()
yield

async def _safe_transformation(
self, elem: T
Expand All @@ -479,16 +477,16 @@ def _launch_task(
) -> "Future[Union[U, RaisingIterator.ExceptionContainer]]":
return cast(
"Future[Union[U, RaisingIterator.ExceptionContainer]]",
self._loop.create_task(self._safe_transformation(elem)),
asyncio.get_event_loop().create_task(self._safe_transformation(elem)),
)

def _future_result_collection(
self,
) -> FutureResultCollection[Union[U, RaisingIterator.ExceptionContainer]]:
if self.ordered:
return FIFOAsyncFutureResultCollection(self._loop)
return FIFOAsyncFutureResultCollection()
else:
return FDFOAsyncFutureResultCollection(self._loop)
return FDFOAsyncFutureResultCollection()


class ConcurrentFlatteningIterable(
Expand Down
18 changes: 7 additions & 11 deletions streamable/util/futuretools.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
from abc import ABC, abstractmethod
from asyncio import AbstractEventLoop
from collections import deque
from concurrent.futures import Future
from multiprocessing import Queue
Expand Down Expand Up @@ -76,29 +75,26 @@ class FIFOAsyncFutureResultCollection(DequeFutureResultCollection[T]):
First In First Out
"""

def __init__(self, loop: AbstractEventLoop) -> None:
super().__init__()
self._loop = loop

def __next__(self) -> T:
return self._loop.run_until_complete(self._futures.popleft()) # type: ignore
return asyncio.get_event_loop().run_until_complete(self._futures.popleft()) # type: ignore


class FDFOAsyncFutureResultCollection(CallbackFutureResultCollection[T]):
"""
First Done First Out
"""

def __init__(self, loop: AbstractEventLoop) -> None:
def __init__(self) -> None:
super().__init__()
self._loop = loop
self._waiter: asyncio.futures.Future[T] = self._loop.create_future()
self._waiter: asyncio.futures.Future[T] = (
asyncio.get_event_loop().create_future()
)

def _done_callback(self, future: "Future[T]") -> None:
self._waiter.set_result(future.result())

def __next__(self) -> T:
result = self._loop.run_until_complete(self._waiter)
result = asyncio.get_event_loop().run_until_complete(self._waiter)
self._n_futures -= 1
self._waiter = self._loop.create_future()
self._waiter = asyncio.get_event_loop().create_future()
return result

0 comments on commit 61894d2

Please sign in to comment.