diff --git a/kioss/_iterator_generating_visitor.py b/kioss/_iterator_generating_visitor.py deleted file mode 100644 index 9f9cfbe..0000000 --- a/kioss/_iterator_generating_visitor.py +++ /dev/null @@ -1,66 +0,0 @@ -import itertools -from abc import ABC, abstractmethod -from typing import ( - Generic, - Iterator, - List, - TypeVar, -) - -from kioss import _exec, _concurrent_exec, _pipe, _util, _visitor - -V = TypeVar("V") -T = TypeVar("T") -U = TypeVar("U") - -class IteratorGeneratingVisitor(_visitor.AVisitor): - - def visitSourcePipe(self, pipe: _pipe.SourcePipe[T]) -> Iterator[T]: - iterator = pipe.source() - try: - # duck-type checks that the object returned by the source is an iterator - _util.duck_check_type_is_iterator(iterator) - except TypeError as e: - raise TypeError( - f"source must be a callable returning an iterator (implements __iter__ and __next__ methods), but the object resulting from a call to source() was not an iterator: got '{iterator}' of type {type(iterator)}." - ) from e - return iterator - - def visitMapPipe(self, pipe: _pipe.MapPipe[T]) -> Iterator[T]: - if pipe.n_threads == 1: - return map(pipe.func, iter(pipe.upstream)) - else: - return _concurrent_exec.ThreadedMappingIteratorWrapper( - iter(pipe.upstream), pipe.func, n_workers=pipe.n_threads - ) - - def visitFlattenPipe(self, pipe: _pipe.FlattenPipe[T]) -> Iterator[T]: - upstream_pipe: _pipe.APipe[Iterator[T]] = pipe.upstream - if pipe.n_threads == 1: - return _exec.FlatteningIteratorWrapper(iter(upstream_pipe)) - else: - return _concurrent_exec.ThreadedFlatteningIteratorWrapper( - iter(pipe.upstream), n_workers=pipe.n_threads - ) - - def visitChainPipe(self, pipe: _pipe.ChainPipe[T]) -> Iterator[T]: - return itertools.chain(iter(pipe.upstream), *list(map(iter, pipe.others))) - - def visitFilterPipe(self, pipe: _pipe.FilterPipe[T]) -> Iterator[T]: - return filter(pipe.predicate, iter(pipe.upstream)) - - def visitBatchPipe(self, pipe: _pipe.BatchPipe[U]) -> Iterator[List[U]]: - return _exec.BatchingIteratorWrapper( - iter(pipe.upstream), pipe.size, pipe.period - ) - - def visitSlowPipe(self, pipe: _pipe.SlowPipe[T]) -> Iterator[T]: - return _exec.SlowingIteratorWrapper(iter(pipe.upstream), pipe.freq) - - def visitCatchPipe(self, pipe: _pipe.CatchPipe[T]) -> Iterator[T]: - return _exec.CatchingIteratorWrapper( - iter(pipe.upstream), *pipe.classes, when=pipe.when - ) - - def visitLogPipe(self, pipe: _pipe.LogPipe[T]) -> Iterator[T]: - return _exec.LoggingIteratorWrapper(iter(pipe.upstream), pipe.what) diff --git a/kioss/_pipe.py b/kioss/_pipe.py index 10a071f..85fe197 100644 --- a/kioss/_pipe.py +++ b/kioss/_pipe.py @@ -288,7 +288,7 @@ def _accept(self, visitor: "_visitor.AVisitor") -> Any: return visitor.visitMapPipe(self) def __str__(self) -> str: - return f"Map(function of type {type(self.func)}, using {self.n_threads} threads)" + return f"Map(function of type {type(self.func)}, using {self.n_threads} thread{'s' if self.n_threads > 1 else ''})" class LogPipe(APipe[T]): def __init__(self, upstream: APipe[T], what: str = "elements"): @@ -310,7 +310,7 @@ def _accept(self, visitor: "_visitor.AVisitor") -> Any: return visitor.visitFlattenPipe(self) def __str__(self) -> str: - return f"Flatten(using {self.n_threads} threads)" + return f"Flatten(using {self.n_threads} thread{'s' if self.n_threads > 1 else ''})" class BatchPipe(APipe[List[T]]): def __init__(self, upstream: APipe[T], size: int, period: float): @@ -322,7 +322,7 @@ def _accept(self, visitor: "_visitor.AVisitor") -> Any: return visitor.visitBatchPipe(self) def __str__(self) -> str: - return f"Batch(elements by groups of {self.size}, or over a period of {self.period} seconds)" + return f"Batch(elements by groups of {self.size} element{'s' if self.size > 1 else ''}, or over a period of {self.period} second{'s' if self.period > 1 else ''})" class CatchPipe(APipe[T]): def __init__( diff --git a/kioss/_util.py b/kioss/_util.py index a0c1d31..02c4e72 100644 --- a/kioss/_util.py +++ b/kioss/_util.py @@ -1,5 +1,5 @@ from typing import Any, Callable, Iterable, Iterator, Type, TypeVar, Union - +from typing_extensions import TypeGuard import logging LOGGER = logging.getLogger("kioss") @@ -43,7 +43,7 @@ def identity(obj: T) -> T: return obj -def duck_check_type_is_iterator(expected_iterator: Any) -> None: +def duck_check_type_is_iterator(expected_iterator: Any) -> TypeGuard[Iterator]: """ Raises: TypeError: If the expected_iterator does not implement __iter__ and __next__ methods. @@ -72,3 +72,5 @@ def duck_check_type_is_iterator(expected_iterator: Any) -> None: raise TypeError( f"Provided object is not an iterator because it implements the __iter__ but not the __next__ one." ) + + return True \ No newline at end of file diff --git a/kioss/_visitor.py b/kioss/_visitor.py index 49761e8..10643dc 100644 --- a/kioss/_visitor.py +++ b/kioss/_visitor.py @@ -98,7 +98,6 @@ def visitLogPipe(self, pipe: _pipe.LogPipe) -> Any: return self.visitAnyPipe(pipe) class IteratorGeneratingVisitor(AVisitor): - def visitSourcePipe(self, pipe: _pipe.SourcePipe[T]) -> Iterator[T]: iterator = pipe.source() try: @@ -134,7 +133,7 @@ def visitFilterPipe(self, pipe: _pipe.FilterPipe[T]) -> Iterator[T]: def visitBatchPipe(self, pipe: _pipe.BatchPipe[T]) -> Iterator[List[T]]: return _exec.BatchingIteratorWrapper( - iter(pipe.upstream), pipe.size, pipe.period + pipe.upstream._accept(self), pipe.size, pipe.period ) def visitSlowPipe(self, pipe: _pipe.SlowPipe[T]) -> Iterator[T]: