From dff0ea7fa0439eea1d7914a40c56ab584286d7f4 Mon Sep 17 00:00:00 2001 From: bonnal-enzo Date: Mon, 4 Dec 2023 02:09:55 +0100 Subject: [PATCH] ExplainingVisitor --- kioss/__init__.py | 7 ++- kioss/_iterator_generating_visitor.py | 22 +++---- kioss/{_plan.py => _pipe.py} | 48 ++++++++++++--- kioss/_visitor.py | 87 +++++++++++++++++++++------ tests/test_legacy.py | 20 ++++++ 5 files changed, 141 insertions(+), 43 deletions(-) rename kioss/{_plan.py => _pipe.py} (87%) diff --git a/kioss/__init__.py b/kioss/__init__.py index c46d08e..752d4d4 100644 --- a/kioss/__init__.py +++ b/kioss/__init__.py @@ -1,4 +1,5 @@ -from kioss._plan import APipe, SourcePipe as Pipe +from kioss._pipe import APipe, SourcePipe as Pipe from kioss._util import LOGGER -from kioss import _plan, _visitor -_plan.ITERATOR_GENERATING_VISITOR = _visitor.IteratorGeneratingVisitor() \ No newline at end of file +from kioss import _pipe, _visitor +_pipe.ITERATOR_GENERATING_VISITOR = _visitor.IteratorGeneratingVisitor() +_pipe.EXPLAINING_VISITOR_CLASS = _visitor.ExplainingVisitor \ No newline at end of file diff --git a/kioss/_iterator_generating_visitor.py b/kioss/_iterator_generating_visitor.py index 62f4409..9f9cfbe 100644 --- a/kioss/_iterator_generating_visitor.py +++ b/kioss/_iterator_generating_visitor.py @@ -7,7 +7,7 @@ TypeVar, ) -from kioss import _exec, _concurrent_exec, _util, _plan, _visitor +from kioss import _exec, _concurrent_exec, _pipe, _util, _visitor V = TypeVar("V") T = TypeVar("T") @@ -15,7 +15,7 @@ class IteratorGeneratingVisitor(_visitor.AVisitor): - def visitSourcePipe(self, pipe: _plan.SourcePipe[T]) -> Iterator[T]: + def visitSourcePipe(self, pipe: _pipe.SourcePipe[T]) -> Iterator[T]: iterator = pipe.source() try: # duck-type checks that the object returned by the source is an iterator @@ -26,7 +26,7 @@ def visitSourcePipe(self, pipe: _plan.SourcePipe[T]) -> Iterator[T]: ) from e return iterator - def visitMapPipe(self, pipe: _plan.MapPipe[T]) -> Iterator[T]: + def visitMapPipe(self, pipe: _pipe.MapPipe[T]) -> Iterator[T]: if pipe.n_threads == 1: return map(pipe.func, iter(pipe.upstream)) else: @@ -34,8 +34,8 @@ def visitMapPipe(self, pipe: _plan.MapPipe[T]) -> Iterator[T]: iter(pipe.upstream), pipe.func, n_workers=pipe.n_threads ) - def visitFlattenPipe(self, pipe: _plan.FlattenPipe[T]) -> Iterator[T]: - upstream_pipe: _plan.APipe[Iterator[T]] = pipe.upstream + def visitFlattenPipe(self, pipe: _pipe.FlattenPipe[T]) -> Iterator[T]: + upstream_pipe: _pipe.APipe[Iterator[T]] = pipe.upstream if pipe.n_threads == 1: return _exec.FlatteningIteratorWrapper(iter(upstream_pipe)) else: @@ -43,24 +43,24 @@ def visitFlattenPipe(self, pipe: _plan.FlattenPipe[T]) -> Iterator[T]: iter(pipe.upstream), n_workers=pipe.n_threads ) - def visitChainPipe(self, pipe: _plan.ChainPipe[T]) -> Iterator[T]: + def visitChainPipe(self, pipe: _pipe.ChainPipe[T]) -> Iterator[T]: return itertools.chain(iter(pipe.upstream), *list(map(iter, pipe.others))) - def visitFilterPipe(self, pipe: _plan.FilterPipe[T]) -> Iterator[T]: + def visitFilterPipe(self, pipe: _pipe.FilterPipe[T]) -> Iterator[T]: return filter(pipe.predicate, iter(pipe.upstream)) - def visitBatchPipe(self, pipe: _plan.BatchPipe[U]) -> Iterator[List[U]]: + def visitBatchPipe(self, pipe: _pipe.BatchPipe[U]) -> Iterator[List[U]]: return _exec.BatchingIteratorWrapper( iter(pipe.upstream), pipe.size, pipe.period ) - def visitSlowPipe(self, pipe: _plan.SlowPipe[T]) -> Iterator[T]: + def visitSlowPipe(self, pipe: _pipe.SlowPipe[T]) -> Iterator[T]: return _exec.SlowingIteratorWrapper(iter(pipe.upstream), pipe.freq) - def visitCatchPipe(self, pipe: _plan.CatchPipe[T]) -> Iterator[T]: + def visitCatchPipe(self, pipe: _pipe.CatchPipe[T]) -> Iterator[T]: return _exec.CatchingIteratorWrapper( iter(pipe.upstream), *pipe.classes, when=pipe.when ) - def visitLogPipe(self, pipe: _plan.LogPipe[T]) -> Iterator[T]: + def visitLogPipe(self, pipe: _pipe.LogPipe[T]) -> Iterator[T]: return _exec.LoggingIteratorWrapper(iter(pipe.upstream), pipe.what) diff --git a/kioss/_plan.py b/kioss/_pipe.py similarity index 87% rename from kioss/_plan.py rename to kioss/_pipe.py index e5d69f6..10a071f 100644 --- a/kioss/_plan.py +++ b/kioss/_pipe.py @@ -2,8 +2,6 @@ from typing import ( Any, Callable, - Collection, - Generic, Iterable, Iterator, List, @@ -22,12 +20,19 @@ R = TypeVar("R") ITERATOR_GENERATING_VISITOR: "Optional[_visitor.IteratorGeneratingVisitor]" = None +EXPLAINING_VISITOR_CLASS: "Optional[Type[_visitor.ExplainingVisitor]]" = None class APipe(Iterable[T], ABC): + upstream: "Optional[APipe]" def __iter__(self) -> Iterator[T]: if ITERATOR_GENERATING_VISITOR is None: raise ValueError("_plan.ITERATOR_GENERATING_VISITOR is None") return self._accept(ITERATOR_GENERATING_VISITOR) + + def __repr__(self) -> str: + if EXPLAINING_VISITOR_CLASS is None: + raise ValueError("_plan.EXPLAINER_VISITOR is None") + return self._accept(EXPLAINING_VISITOR_CLASS()) @abstractmethod def _accept(self, visitor: "_visitor.AVisitor") -> Any: @@ -247,6 +252,7 @@ def __init__(self, source: Callable[[], Iterator[T]]): Args: source (Callable[[], Iterator[T]]): A factory function called to obtain a fresh data source iterator for each iteration. """ + self.upstream = None if not callable(source): raise TypeError( f"source must be a callable returning an iterator, but the provided source is not a callable: got source '{source}' of type {type(source)}." @@ -256,53 +262,68 @@ def __init__(self, source: Callable[[], Iterator[T]]): def _accept(self, visitor: "_visitor.AVisitor") -> Any: return visitor.visitSourcePipe(self) + def __str__(self) -> str: + return f"Source(of type: {type(self.source)})" class FilterPipe(APipe[T]): def __init__(self, upstream: APipe[T], predicate: Callable[[T], bool]): - self.upstream = upstream + self.upstream: APipe[T] = upstream self.predicate = predicate def _accept(self, visitor: "_visitor.AVisitor") -> Any: return visitor.visitFilterPipe(self) + def __str__(self) -> str: + return f"Filter(using predicate function of type {type(self.predicate)})" + class MapPipe(APipe[R]): def __init__(self, upstream: APipe[T], func: Callable[[T], R], n_threads: int): - self.upstream = upstream + self.upstream: APipe[T] = upstream self.func = func self.n_threads = n_threads def _accept(self, visitor: "_visitor.AVisitor") -> Any: return visitor.visitMapPipe(self) + def __str__(self) -> str: + return f"Map(function of type {type(self.func)}, using {self.n_threads} threads)" + class LogPipe(APipe[T]): def __init__(self, upstream: APipe[T], what: str = "elements"): - self.upstream = upstream + self.upstream: APipe[T] = upstream self.what = what def _accept(self, visitor: "_visitor.AVisitor") -> Any: return visitor.visitLogPipe(self) + def __str__(self) -> str: + return f"Log('{self.what}')" class FlattenPipe(APipe[T]): def __init__(self, upstream: APipe[Iterator[T]], n_threads: int): - self.upstream = upstream + self.upstream: APipe[Iterator[T]] = upstream self.n_threads = n_threads def _accept(self, visitor: "_visitor.AVisitor") -> Any: return visitor.visitFlattenPipe(self) + def __str__(self) -> str: + return f"Flatten(using {self.n_threads} threads)" class BatchPipe(APipe[List[T]]): def __init__(self, upstream: APipe[T], size: int, period: float): - self.upstream = upstream + self.upstream: APipe[T] = upstream self.size = size self.period = period def _accept(self, visitor: "_visitor.AVisitor") -> Any: return visitor.visitBatchPipe(self) + def __str__(self) -> str: + return f"Batch(elements by groups of {self.size}, or over a period of {self.period} seconds)" + class CatchPipe(APipe[T]): def __init__( self, @@ -310,30 +331,37 @@ def __init__( *classes: Type[Exception], when: Optional[Callable[[Exception], bool]] = None, ): - self.upstream = upstream + self.upstream: APipe[T] = upstream self.classes = classes self.when = when def _accept(self, visitor: "_visitor.AVisitor") -> Any: return visitor.visitCatchPipe(self) + def __str__(self) -> str: + return f"Catch(exceptions istances of classes [{', '.join(map(lambda class_: class_.__name__, self.classes))}]{', with an additional `when` condition' if self.when is not None else ''})" class ChainPipe(APipe[T]): def __init__(self, upstream: APipe[T], others: List[APipe]): - self.upstream = upstream + self.upstream: APipe[T] = upstream self.others = others def _accept(self, visitor: "_visitor.AVisitor") -> Any: return visitor.visitChainPipe(self) + def __str__(self) -> str: + return f"Chain(with other pipes: [{', '.join(map(str, map(id, self.others)))}])" # TODO itricate explains class SlowPipe(APipe[T]): def __init__(self, upstream: APipe[T], freq: float): - self.upstream = upstream + self.upstream: APipe[T] = upstream self.freq = freq def _accept(self, visitor: "_visitor.AVisitor") -> Any: return visitor.visitSlowPipe(self) + def __str__(self) -> str: + return f"Slow(at a maximum frequancy of {self.freq} element{'s' if self.freq > 1 else ''} per second)" + # a: Iterator[str] = SourcePipe(range(8).__iter__).do(lambda e:e).map(str).do(print)._accept(ITERATOR_GENERATING_VISITOR()) # b: Iterator[str] = SourcePipe(range(8).__iter__).do(lambda e:e).map(str).do(print)._accept(_visitor.IteratorGeneratingVisitor()) \ No newline at end of file diff --git a/kioss/_visitor.py b/kioss/_visitor.py index 705c6c9..49761e8 100644 --- a/kioss/_visitor.py +++ b/kioss/_visitor.py @@ -7,50 +7,99 @@ TypeVar, ) -from kioss import _exec, _concurrent_exec, _util, _plan +from kioss import _exec, _concurrent_exec, _pipe, _util T = TypeVar("T") class AVisitor(ABC): @abstractmethod - def visitSourcePipe(self, pipe: _plan.SourcePipe) -> Any: + def visitSourcePipe(self, pipe: _pipe.SourcePipe) -> Any: raise NotImplementedError() @abstractmethod - def visitMapPipe(self, pipe: _plan.MapPipe) -> Any: + def visitMapPipe(self, pipe: _pipe.MapPipe) -> Any: raise NotImplementedError() @abstractmethod - def visitFlattenPipe(self, pipe: _plan.FlattenPipe) -> Any: + def visitFlattenPipe(self, pipe: _pipe.FlattenPipe) -> Any: raise NotImplementedError() @abstractmethod - def visitChainPipe(self, pipe: _plan.ChainPipe) -> Any: + def visitChainPipe(self, pipe: _pipe.ChainPipe) -> Any: raise NotImplementedError() @abstractmethod - def visitFilterPipe(self, pipe: _plan.FilterPipe) -> Any: + def visitFilterPipe(self, pipe: _pipe.FilterPipe) -> Any: raise NotImplementedError() @abstractmethod - def visitBatchPipe(self, pipe: _plan.BatchPipe) -> Any: + def visitBatchPipe(self, pipe: _pipe.BatchPipe) -> Any: raise NotImplementedError() @abstractmethod - def visitSlowPipe(self, pipe: _plan.SlowPipe) -> Any: + def visitSlowPipe(self, pipe: _pipe.SlowPipe) -> Any: raise NotImplementedError() @abstractmethod - def visitCatchPipe(self, pipe: _plan.CatchPipe) -> Any: + def visitCatchPipe(self, pipe: _pipe.CatchPipe) -> Any: raise NotImplementedError() @abstractmethod - def visitLogPipe(self, pipe: _plan.LogPipe) -> Any: + def visitLogPipe(self, pipe: _pipe.LogPipe) -> Any: raise NotImplementedError() +class ExplainingVisitor(AVisitor): + def __init__(self, n_initial_left_margin_spaces: int = 0): + self.current_margin = n_initial_left_margin_spaces + self.margin_step = 2 + self.add_header = True + + def visitAnyPipe(self, pipe: _pipe.APipe) -> str: + if self.add_header: + header = "\033[1mPipe's plan\033[0m:\n" + self.add_header = False + else: + header = '' + name, descr = str(pipe).split('(') + colored_pipe_str = f"\033[91m{name}\033[0m({descr}" + additional_repr_lines = f"\033[92m+{'-'*self.current_margin}\033[0m {colored_pipe_str}\n" + self.current_margin += self.margin_step + if pipe.upstream is not None: + upstream_repr = pipe.upstream._accept(self) + else: + upstream_repr = '' + return header + additional_repr_lines + upstream_repr + + def visitSourcePipe(self, pipe: _pipe.SourcePipe) -> Any: + return self.visitAnyPipe(pipe) + + def visitMapPipe(self, pipe: _pipe.MapPipe) -> Any: + return self.visitAnyPipe(pipe) + + def visitFlattenPipe(self, pipe: _pipe.FlattenPipe) -> Any: + return self.visitAnyPipe(pipe) + + def visitChainPipe(self, pipe: _pipe.ChainPipe) -> Any: + return self.visitAnyPipe(pipe) + + def visitFilterPipe(self, pipe: _pipe.FilterPipe) -> Any: + return self.visitAnyPipe(pipe) + + def visitBatchPipe(self, pipe: _pipe.BatchPipe) -> Any: + return self.visitAnyPipe(pipe) + + def visitSlowPipe(self, pipe: _pipe.SlowPipe) -> Any: + return self.visitAnyPipe(pipe) + + def visitCatchPipe(self, pipe: _pipe.CatchPipe) -> Any: + return self.visitAnyPipe(pipe) + + def visitLogPipe(self, pipe: _pipe.LogPipe) -> Any: + return self.visitAnyPipe(pipe) + class IteratorGeneratingVisitor(AVisitor): - def visitSourcePipe(self, pipe: _plan.SourcePipe[T]) -> Iterator[T]: + def visitSourcePipe(self, pipe: _pipe.SourcePipe[T]) -> Iterator[T]: iterator = pipe.source() try: # duck-type checks that the object returned by the source is an iterator @@ -61,7 +110,7 @@ def visitSourcePipe(self, pipe: _plan.SourcePipe[T]) -> Iterator[T]: ) from e return iterator - def visitMapPipe(self, pipe: _plan.MapPipe[T]) -> Iterator[T]: + def visitMapPipe(self, pipe: _pipe.MapPipe[T]) -> Iterator[T]: if pipe.n_threads == 1: return map(pipe.func, pipe.upstream._accept(self)) else: @@ -69,7 +118,7 @@ def visitMapPipe(self, pipe: _plan.MapPipe[T]) -> Iterator[T]: pipe.upstream._accept(self), pipe.func, n_workers=pipe.n_threads ) - def visitFlattenPipe(self, pipe: _plan.FlattenPipe[T]) -> Iterator[T]: + def visitFlattenPipe(self, pipe: _pipe.FlattenPipe[T]) -> Iterator[T]: if pipe.n_threads == 1: return _exec.FlatteningIteratorWrapper(pipe.upstream._accept(self)) else: @@ -77,24 +126,24 @@ def visitFlattenPipe(self, pipe: _plan.FlattenPipe[T]) -> Iterator[T]: pipe.upstream._accept(self), n_workers=pipe.n_threads ) - def visitChainPipe(self, pipe: _plan.ChainPipe[T]) -> Iterator[T]: + def visitChainPipe(self, pipe: _pipe.ChainPipe[T]) -> Iterator[T]: return itertools.chain(pipe.upstream._accept(self), *list(map(iter, pipe.others))) - def visitFilterPipe(self, pipe: _plan.FilterPipe[T]) -> Iterator[T]: + def visitFilterPipe(self, pipe: _pipe.FilterPipe[T]) -> Iterator[T]: return filter(pipe.predicate, pipe.upstream._accept(self)) - def visitBatchPipe(self, pipe: _plan.BatchPipe[T]) -> Iterator[List[T]]: + def visitBatchPipe(self, pipe: _pipe.BatchPipe[T]) -> Iterator[List[T]]: return _exec.BatchingIteratorWrapper( iter(pipe.upstream), pipe.size, pipe.period ) - def visitSlowPipe(self, pipe: _plan.SlowPipe[T]) -> Iterator[T]: + def visitSlowPipe(self, pipe: _pipe.SlowPipe[T]) -> Iterator[T]: return _exec.SlowingIteratorWrapper(pipe.upstream._accept(self), pipe.freq) - def visitCatchPipe(self, pipe: _plan.CatchPipe[T]) -> Iterator[T]: + def visitCatchPipe(self, pipe: _pipe.CatchPipe[T]) -> Iterator[T]: return _exec.CatchingIteratorWrapper( pipe.upstream._accept(self), *pipe.classes, when=pipe.when ) - def visitLogPipe(self, pipe: _plan.LogPipe[T]) -> Iterator[T]: + def visitLogPipe(self, pipe: _pipe.LogPipe[T]) -> Iterator[T]: return _exec.LoggingIteratorWrapper(pipe.upstream._accept(self), pipe.what) diff --git a/tests/test_legacy.py b/tests/test_legacy.py index edb6b8a..fddce6f 100644 --- a/tests/test_legacy.py +++ b/tests/test_legacy.py @@ -525,3 +525,23 @@ def test_generator_already_generating(self) -> None: ), Counter(list(range(N)) + list(range(N)) + list(range(N))), ) + + def test_str(self) -> None: + p = ( + Pipe(range(8).__iter__) + .filter(lambda _: True) + .batch(100) + .log('batches') + .map(iter) + .flatten(n_threads=4) + .slow(64) + .log('slowed elems') + .chain(Pipe([].__iter__), Pipe([].__iter__)) + .catch(ValueError, TypeError, when=lambda e: True) + ) + a = repr(p) + p.collect() + b = repr(p) + self.assertEqual(a, b) + self.assertGreater(len(a), 32) + print(a)