Skip to content

Commit

Permalink
ExplainingVisitor
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Dec 4, 2023
1 parent d87eeae commit dff0ea7
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 43 deletions.
7 changes: 4 additions & 3 deletions kioss/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from kioss._plan import APipe, SourcePipe as Pipe
from kioss._pipe import APipe, SourcePipe as Pipe
from kioss._util import LOGGER
from kioss import _plan, _visitor
_plan.ITERATOR_GENERATING_VISITOR = _visitor.IteratorGeneratingVisitor()
from kioss import _pipe, _visitor
_pipe.ITERATOR_GENERATING_VISITOR = _visitor.IteratorGeneratingVisitor()
_pipe.EXPLAINING_VISITOR_CLASS = _visitor.ExplainingVisitor
22 changes: 11 additions & 11 deletions kioss/_iterator_generating_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
TypeVar,
)

from kioss import _exec, _concurrent_exec, _util, _plan, _visitor
from kioss import _exec, _concurrent_exec, _pipe, _util, _visitor

V = TypeVar("V")
T = TypeVar("T")
U = TypeVar("U")

class IteratorGeneratingVisitor(_visitor.AVisitor):

def visitSourcePipe(self, pipe: _plan.SourcePipe[T]) -> Iterator[T]:
def visitSourcePipe(self, pipe: _pipe.SourcePipe[T]) -> Iterator[T]:
iterator = pipe.source()
try:
# duck-type checks that the object returned by the source is an iterator
Expand All @@ -26,41 +26,41 @@ def visitSourcePipe(self, pipe: _plan.SourcePipe[T]) -> Iterator[T]:
) from e
return iterator

def visitMapPipe(self, pipe: _plan.MapPipe[T]) -> Iterator[T]:
def visitMapPipe(self, pipe: _pipe.MapPipe[T]) -> Iterator[T]:
if pipe.n_threads == 1:
return map(pipe.func, iter(pipe.upstream))
else:
return _concurrent_exec.ThreadedMappingIteratorWrapper(
iter(pipe.upstream), pipe.func, n_workers=pipe.n_threads
)

def visitFlattenPipe(self, pipe: _plan.FlattenPipe[T]) -> Iterator[T]:
upstream_pipe: _plan.APipe[Iterator[T]] = pipe.upstream
def visitFlattenPipe(self, pipe: _pipe.FlattenPipe[T]) -> Iterator[T]:
upstream_pipe: _pipe.APipe[Iterator[T]] = pipe.upstream
if pipe.n_threads == 1:
return _exec.FlatteningIteratorWrapper(iter(upstream_pipe))
else:
return _concurrent_exec.ThreadedFlatteningIteratorWrapper(
iter(pipe.upstream), n_workers=pipe.n_threads
)

def visitChainPipe(self, pipe: _plan.ChainPipe[T]) -> Iterator[T]:
def visitChainPipe(self, pipe: _pipe.ChainPipe[T]) -> Iterator[T]:
return itertools.chain(iter(pipe.upstream), *list(map(iter, pipe.others)))

def visitFilterPipe(self, pipe: _plan.FilterPipe[T]) -> Iterator[T]:
def visitFilterPipe(self, pipe: _pipe.FilterPipe[T]) -> Iterator[T]:
return filter(pipe.predicate, iter(pipe.upstream))

def visitBatchPipe(self, pipe: _plan.BatchPipe[U]) -> Iterator[List[U]]:
def visitBatchPipe(self, pipe: _pipe.BatchPipe[U]) -> Iterator[List[U]]:
return _exec.BatchingIteratorWrapper(
iter(pipe.upstream), pipe.size, pipe.period
)

def visitSlowPipe(self, pipe: _plan.SlowPipe[T]) -> Iterator[T]:
def visitSlowPipe(self, pipe: _pipe.SlowPipe[T]) -> Iterator[T]:
return _exec.SlowingIteratorWrapper(iter(pipe.upstream), pipe.freq)

def visitCatchPipe(self, pipe: _plan.CatchPipe[T]) -> Iterator[T]:
def visitCatchPipe(self, pipe: _pipe.CatchPipe[T]) -> Iterator[T]:
return _exec.CatchingIteratorWrapper(
iter(pipe.upstream), *pipe.classes, when=pipe.when
)

def visitLogPipe(self, pipe: _plan.LogPipe[T]) -> Iterator[T]:
def visitLogPipe(self, pipe: _pipe.LogPipe[T]) -> Iterator[T]:
return _exec.LoggingIteratorWrapper(iter(pipe.upstream), pipe.what)
48 changes: 38 additions & 10 deletions kioss/_plan.py → kioss/_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from typing import (
Any,
Callable,
Collection,
Generic,
Iterable,
Iterator,
List,
Expand All @@ -22,12 +20,19 @@
R = TypeVar("R")

ITERATOR_GENERATING_VISITOR: "Optional[_visitor.IteratorGeneratingVisitor]" = None
EXPLAINING_VISITOR_CLASS: "Optional[Type[_visitor.ExplainingVisitor]]" = None

class APipe(Iterable[T], ABC):
upstream: "Optional[APipe]"
def __iter__(self) -> Iterator[T]:
if ITERATOR_GENERATING_VISITOR is None:
raise ValueError("_plan.ITERATOR_GENERATING_VISITOR is None")
return self._accept(ITERATOR_GENERATING_VISITOR)

def __repr__(self) -> str:
if EXPLAINING_VISITOR_CLASS is None:
raise ValueError("_plan.EXPLAINER_VISITOR is None")
return self._accept(EXPLAINING_VISITOR_CLASS())

@abstractmethod
def _accept(self, visitor: "_visitor.AVisitor") -> Any:
Expand Down Expand Up @@ -247,6 +252,7 @@ def __init__(self, source: Callable[[], Iterator[T]]):
Args:
source (Callable[[], Iterator[T]]): A factory function called to obtain a fresh data source iterator for each iteration.
"""
self.upstream = None
if not callable(source):
raise TypeError(
f"source must be a callable returning an iterator, but the provided source is not a callable: got source '{source}' of type {type(source)}."
Expand All @@ -256,84 +262,106 @@ def __init__(self, source: Callable[[], Iterator[T]]):
def _accept(self, visitor: "_visitor.AVisitor") -> Any:
return visitor.visitSourcePipe(self)

def __str__(self) -> str:
return f"Source(of type: {type(self.source)})"


class FilterPipe(APipe[T]):
def __init__(self, upstream: APipe[T], predicate: Callable[[T], bool]):
self.upstream = upstream
self.upstream: APipe[T] = upstream
self.predicate = predicate

def _accept(self, visitor: "_visitor.AVisitor") -> Any:
return visitor.visitFilterPipe(self)

def __str__(self) -> str:
return f"Filter(using predicate function of type {type(self.predicate)})"


class MapPipe(APipe[R]):
def __init__(self, upstream: APipe[T], func: Callable[[T], R], n_threads: int):
self.upstream = upstream
self.upstream: APipe[T] = upstream
self.func = func
self.n_threads = n_threads

def _accept(self, visitor: "_visitor.AVisitor") -> Any:
return visitor.visitMapPipe(self)

def __str__(self) -> str:
return f"Map(function of type {type(self.func)}, using {self.n_threads} threads)"

class LogPipe(APipe[T]):
def __init__(self, upstream: APipe[T], what: str = "elements"):
self.upstream = upstream
self.upstream: APipe[T] = upstream
self.what = what

def _accept(self, visitor: "_visitor.AVisitor") -> Any:
return visitor.visitLogPipe(self)

def __str__(self) -> str:
return f"Log('{self.what}')"

class FlattenPipe(APipe[T]):
def __init__(self, upstream: APipe[Iterator[T]], n_threads: int):
self.upstream = upstream
self.upstream: APipe[Iterator[T]] = upstream
self.n_threads = n_threads

def _accept(self, visitor: "_visitor.AVisitor") -> Any:
return visitor.visitFlattenPipe(self)

def __str__(self) -> str:
return f"Flatten(using {self.n_threads} threads)"

class BatchPipe(APipe[List[T]]):
def __init__(self, upstream: APipe[T], size: int, period: float):
self.upstream = upstream
self.upstream: APipe[T] = upstream
self.size = size
self.period = period

def _accept(self, visitor: "_visitor.AVisitor") -> Any:
return visitor.visitBatchPipe(self)

def __str__(self) -> str:
return f"Batch(elements by groups of {self.size}, or over a period of {self.period} seconds)"

class CatchPipe(APipe[T]):
def __init__(
self,
upstream: APipe[T],
*classes: Type[Exception],
when: Optional[Callable[[Exception], bool]] = None,
):
self.upstream = upstream
self.upstream: APipe[T] = upstream
self.classes = classes
self.when = when

def _accept(self, visitor: "_visitor.AVisitor") -> Any:
return visitor.visitCatchPipe(self)

def __str__(self) -> str:
return f"Catch(exceptions istances of classes [{', '.join(map(lambda class_: class_.__name__, self.classes))}]{', with an additional `when` condition' if self.when is not None else ''})"

class ChainPipe(APipe[T]):
def __init__(self, upstream: APipe[T], others: List[APipe]):
self.upstream = upstream
self.upstream: APipe[T] = upstream
self.others = others

def _accept(self, visitor: "_visitor.AVisitor") -> Any:
return visitor.visitChainPipe(self)

def __str__(self) -> str:
return f"Chain(with other pipes: [{', '.join(map(str, map(id, self.others)))}])" # TODO itricate explains

class SlowPipe(APipe[T]):
def __init__(self, upstream: APipe[T], freq: float):
self.upstream = upstream
self.upstream: APipe[T] = upstream
self.freq = freq

def _accept(self, visitor: "_visitor.AVisitor") -> Any:
return visitor.visitSlowPipe(self)

def __str__(self) -> str:
return f"Slow(at a maximum frequancy of {self.freq} element{'s' if self.freq > 1 else ''} per second)"

# a: Iterator[str] = SourcePipe(range(8).__iter__).do(lambda e:e).map(str).do(print)._accept(ITERATOR_GENERATING_VISITOR())
# b: Iterator[str] = SourcePipe(range(8).__iter__).do(lambda e:e).map(str).do(print)._accept(_visitor.IteratorGeneratingVisitor())
87 changes: 68 additions & 19 deletions kioss/_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,50 +7,99 @@
TypeVar,
)

from kioss import _exec, _concurrent_exec, _util, _plan
from kioss import _exec, _concurrent_exec, _pipe, _util

T = TypeVar("T")

class AVisitor(ABC):
@abstractmethod
def visitSourcePipe(self, pipe: _plan.SourcePipe) -> Any:
def visitSourcePipe(self, pipe: _pipe.SourcePipe) -> Any:
raise NotImplementedError()

@abstractmethod
def visitMapPipe(self, pipe: _plan.MapPipe) -> Any:
def visitMapPipe(self, pipe: _pipe.MapPipe) -> Any:
raise NotImplementedError()

@abstractmethod
def visitFlattenPipe(self, pipe: _plan.FlattenPipe) -> Any:
def visitFlattenPipe(self, pipe: _pipe.FlattenPipe) -> Any:
raise NotImplementedError()

@abstractmethod
def visitChainPipe(self, pipe: _plan.ChainPipe) -> Any:
def visitChainPipe(self, pipe: _pipe.ChainPipe) -> Any:
raise NotImplementedError()

@abstractmethod
def visitFilterPipe(self, pipe: _plan.FilterPipe) -> Any:
def visitFilterPipe(self, pipe: _pipe.FilterPipe) -> Any:
raise NotImplementedError()

@abstractmethod
def visitBatchPipe(self, pipe: _plan.BatchPipe) -> Any:
def visitBatchPipe(self, pipe: _pipe.BatchPipe) -> Any:
raise NotImplementedError()

@abstractmethod
def visitSlowPipe(self, pipe: _plan.SlowPipe) -> Any:
def visitSlowPipe(self, pipe: _pipe.SlowPipe) -> Any:
raise NotImplementedError()

@abstractmethod
def visitCatchPipe(self, pipe: _plan.CatchPipe) -> Any:
def visitCatchPipe(self, pipe: _pipe.CatchPipe) -> Any:
raise NotImplementedError()

@abstractmethod
def visitLogPipe(self, pipe: _plan.LogPipe) -> Any:
def visitLogPipe(self, pipe: _pipe.LogPipe) -> Any:
raise NotImplementedError()

class ExplainingVisitor(AVisitor):
def __init__(self, n_initial_left_margin_spaces: int = 0):
self.current_margin = n_initial_left_margin_spaces
self.margin_step = 2
self.add_header = True

def visitAnyPipe(self, pipe: _pipe.APipe) -> str:
if self.add_header:
header = "\033[1mPipe's plan\033[0m:\n"
self.add_header = False
else:
header = ''
name, descr = str(pipe).split('(')
colored_pipe_str = f"\033[91m{name}\033[0m({descr}"
additional_repr_lines = f"\033[92m+{'-'*self.current_margin}\033[0m {colored_pipe_str}\n"
self.current_margin += self.margin_step
if pipe.upstream is not None:
upstream_repr = pipe.upstream._accept(self)
else:
upstream_repr = ''
return header + additional_repr_lines + upstream_repr

def visitSourcePipe(self, pipe: _pipe.SourcePipe) -> Any:
return self.visitAnyPipe(pipe)

def visitMapPipe(self, pipe: _pipe.MapPipe) -> Any:
return self.visitAnyPipe(pipe)

def visitFlattenPipe(self, pipe: _pipe.FlattenPipe) -> Any:
return self.visitAnyPipe(pipe)

def visitChainPipe(self, pipe: _pipe.ChainPipe) -> Any:
return self.visitAnyPipe(pipe)

def visitFilterPipe(self, pipe: _pipe.FilterPipe) -> Any:
return self.visitAnyPipe(pipe)

def visitBatchPipe(self, pipe: _pipe.BatchPipe) -> Any:
return self.visitAnyPipe(pipe)

def visitSlowPipe(self, pipe: _pipe.SlowPipe) -> Any:
return self.visitAnyPipe(pipe)

def visitCatchPipe(self, pipe: _pipe.CatchPipe) -> Any:
return self.visitAnyPipe(pipe)

def visitLogPipe(self, pipe: _pipe.LogPipe) -> Any:
return self.visitAnyPipe(pipe)

class IteratorGeneratingVisitor(AVisitor):

def visitSourcePipe(self, pipe: _plan.SourcePipe[T]) -> Iterator[T]:
def visitSourcePipe(self, pipe: _pipe.SourcePipe[T]) -> Iterator[T]:
iterator = pipe.source()
try:
# duck-type checks that the object returned by the source is an iterator
Expand All @@ -61,40 +110,40 @@ def visitSourcePipe(self, pipe: _plan.SourcePipe[T]) -> Iterator[T]:
) from e
return iterator

def visitMapPipe(self, pipe: _plan.MapPipe[T]) -> Iterator[T]:
def visitMapPipe(self, pipe: _pipe.MapPipe[T]) -> Iterator[T]:
if pipe.n_threads == 1:
return map(pipe.func, pipe.upstream._accept(self))
else:
return _concurrent_exec.ThreadedMappingIteratorWrapper(
pipe.upstream._accept(self), pipe.func, n_workers=pipe.n_threads
)

def visitFlattenPipe(self, pipe: _plan.FlattenPipe[T]) -> Iterator[T]:
def visitFlattenPipe(self, pipe: _pipe.FlattenPipe[T]) -> Iterator[T]:
if pipe.n_threads == 1:
return _exec.FlatteningIteratorWrapper(pipe.upstream._accept(self))
else:
return _concurrent_exec.ThreadedFlatteningIteratorWrapper(
pipe.upstream._accept(self), n_workers=pipe.n_threads
)

def visitChainPipe(self, pipe: _plan.ChainPipe[T]) -> Iterator[T]:
def visitChainPipe(self, pipe: _pipe.ChainPipe[T]) -> Iterator[T]:
return itertools.chain(pipe.upstream._accept(self), *list(map(iter, pipe.others)))

def visitFilterPipe(self, pipe: _plan.FilterPipe[T]) -> Iterator[T]:
def visitFilterPipe(self, pipe: _pipe.FilterPipe[T]) -> Iterator[T]:
return filter(pipe.predicate, pipe.upstream._accept(self))

def visitBatchPipe(self, pipe: _plan.BatchPipe[T]) -> Iterator[List[T]]:
def visitBatchPipe(self, pipe: _pipe.BatchPipe[T]) -> Iterator[List[T]]:
return _exec.BatchingIteratorWrapper(
iter(pipe.upstream), pipe.size, pipe.period
)

def visitSlowPipe(self, pipe: _plan.SlowPipe[T]) -> Iterator[T]:
def visitSlowPipe(self, pipe: _pipe.SlowPipe[T]) -> Iterator[T]:
return _exec.SlowingIteratorWrapper(pipe.upstream._accept(self), pipe.freq)

def visitCatchPipe(self, pipe: _plan.CatchPipe[T]) -> Iterator[T]:
def visitCatchPipe(self, pipe: _pipe.CatchPipe[T]) -> Iterator[T]:
return _exec.CatchingIteratorWrapper(
pipe.upstream._accept(self), *pipe.classes, when=pipe.when
)

def visitLogPipe(self, pipe: _plan.LogPipe[T]) -> Iterator[T]:
def visitLogPipe(self, pipe: _pipe.LogPipe[T]) -> Iterator[T]:
return _exec.LoggingIteratorWrapper(pipe.upstream._accept(self), pipe.what)
Loading

0 comments on commit dff0ea7

Please sign in to comment.