Skip to content

Commit

Permalink
_new
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Oct 14, 2024
1 parent 47607b6 commit 191cb6b
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions streamable/iters.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,32 +402,32 @@ def __init__(
self.buffer_size = buffer_size
self.ordered = ordered

@property
def _context_manager(self) -> Callable[[], Optional[ContextManager]]:
# factory method
def _new_context_manager(self) -> Optional[ContextManager]:
@contextmanager
def dummy_context_manager_generator():
yield

return lambda: dummy_context_manager_generator()

@abstractmethod
def _launch_task(
self, elem: T
) -> "Future[Union[U, RaisingIterator.ExceptionContainer]]": ...
return dummy_context_manager_generator()

# factory method
@abstractmethod
def _future_result_collection(
def _new_future_result_collection(
self,
) -> FutureResultCollection[Union[U, RaisingIterator.ExceptionContainer]]: ...

@abstractmethod
def _launch_task(
self, elem: T
) -> "Future[Union[U, RaisingIterator.ExceptionContainer]]": ...

def __iter__(self) -> Iterator[Union[U, RaisingIterator.ExceptionContainer]]:
context_manager = self._context_manager()
context_manager = self._new_context_manager()
if context_manager is None:
raise ValueError("context manager is None")

with context_manager:
future_results = self._future_result_collection()
future_results = self._new_future_result_collection()

# queue tasks up to buffer_size
with suppress(StopIteration):
Expand Down Expand Up @@ -459,7 +459,7 @@ def __init__(
self.via_processes = via_processes

@property
def _context_manager(self) -> Callable[[], Optional[ContextManager]]:
def _new_context_manager(self) -> Callable[[], Optional[ContextManager]]:
if self.via_processes:
self.executor = ProcessPoolExecutor(max_workers=self.concurrency)
else:
Expand All @@ -483,7 +483,7 @@ def _launch_task(
self._safe_transformation, self.transformation, elem
)

def _future_result_collection(
def _new_future_result_collection(
self,
) -> FutureResultCollection[Union[U, RaisingIterator.ExceptionContainer]]:
if self.ordered:
Expand Down Expand Up @@ -524,7 +524,7 @@ def _launch_task(
asyncio.get_event_loop().create_task(self._safe_transformation(elem)),
)

def _future_result_collection(
def _new_future_result_collection(
self,
) -> FutureResultCollection[Union[U, RaisingIterator.ExceptionContainer]]:
if self.ordered:
Expand Down

0 comments on commit 191cb6b

Please sign in to comment.