Skip to content

Commit

Permalink
ExplainingVisitor: set margin_step = 2
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Dec 4, 2023
1 parent 83b00f1 commit 4d92efb
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 49 deletions.
34 changes: 0 additions & 34 deletions kioss/_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,6 @@ def __init__(self, source: Callable[[], Iterator[T]]):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_source_pipe(self)

def __str__(self) -> str:
return f"Source(of type: {type(self.source)})"


class FilterPipe(APipe[T]):
def __init__(self, upstream: APipe[T], predicate: Callable[[T], bool]):
Expand All @@ -283,9 +280,6 @@ def __init__(self, upstream: APipe[T], predicate: Callable[[T], bool]):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_filter_pipe(self)

def __str__(self) -> str:
return f"Filter(using predicate function of type {type(self.predicate)})"


class MapPipe(APipe[R]):
def __init__(self, upstream: APipe[T], func: Callable[[T], R], n_threads: int):
Expand All @@ -296,9 +290,6 @@ def __init__(self, upstream: APipe[T], func: Callable[[T], R], n_threads: int):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_map_pipe(self)

def __str__(self) -> str:
return f"Map(function of type {type(self.func)}, using {self.n_threads} thread{'s' if self.n_threads > 1 else ''})"


class DoPipe(APipe[T]):
def __init__(self, upstream: APipe[T], func: Callable[[T], R], n_threads: int):
Expand All @@ -309,9 +300,6 @@ def __init__(self, upstream: APipe[T], func: Callable[[T], R], n_threads: int):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_do_pipe(self)

def __str__(self) -> str:
return f"Do(side effects by applying a function of type {type(self.func)}, using {self.n_threads} thread{'s' if self.n_threads > 1 else ''})"


class LogPipe(APipe[T]):
def __init__(self, upstream: APipe[T], what: str = "elements"):
Expand All @@ -321,9 +309,6 @@ def __init__(self, upstream: APipe[T], what: str = "elements"):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_log_pipe(self)

def __str__(self) -> str:
return f"Log('{self.what}')"


class FlattenPipe(APipe[T]):
def __init__(self, upstream: APipe[Iterator[T]], n_threads: int):
Expand All @@ -333,11 +318,6 @@ def __init__(self, upstream: APipe[Iterator[T]], n_threads: int):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_flatten_pipe(self)

def __str__(self) -> str:
return (
f"Flatten(using {self.n_threads} thread{'s' if self.n_threads > 1 else ''})"
)


class BatchPipe(APipe[List[T]]):
def __init__(self, upstream: APipe[T], size: int, period: float):
Expand All @@ -348,9 +328,6 @@ def __init__(self, upstream: APipe[T], size: int, period: float):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_batch_pipe(self)

def __str__(self) -> str:
return f"Batch(elements by groups of {self.size} element{'s' if self.size > 1 else ''}, or over a period of {self.period} second{'s' if self.period > 1 else ''})"


class CatchPipe(APipe[T]):
def __init__(
Expand All @@ -366,10 +343,6 @@ def __init__(
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_catch_pipe(self)

def __str__(self) -> str:
return f"Catch(exception instances of class in [{', '.join(map(lambda class_: class_.__name__, self.classes))}]{', with an additional `when` condition' if self.when is not None else ''})"


class ChainPipe(APipe[T]):
def __init__(self, upstream: APipe[T], others: List[APipe]):
self.upstream: APipe[T] = upstream
Expand All @@ -378,9 +351,6 @@ def __init__(self, upstream: APipe[T], others: List[APipe]):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_chain_pipe(self)

def __str__(self) -> str:
return f"Chain({len(self.others)+1} pipes)" # TODO itricate explains


class SlowPipe(APipe[T]):
def __init__(self, upstream: APipe[T], freq: float):
Expand All @@ -390,9 +360,5 @@ def __init__(self, upstream: APipe[T], freq: float):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_slow_pipe(self)

def __str__(self) -> str:
return f"Slow(at a maximum frequancy of {self.freq} element{'s' if self.freq > 1 else ''} per second)"


# a: Iterator[str] = SourcePipe(range(8).__iter__).do(lambda e:e).map(str).do(print)._accept(ITERATOR_PRODUCING_VISITOR())
# b: Iterator[str] = SourcePipe(range(8).__iter__).do(lambda e:e).map(str).do(print)._accept(_visitor.IteratorGeneratingVisitor())
51 changes: 36 additions & 15 deletions kioss/_visit/_explanation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@ def __init__(self, initial_margin: int = 0, add_header: bool = True):
self.margin_step = 2
self.add_header = add_header

def additional_explain_lines(self, pipe: _pipe.APipe) -> str:
name, descr = str(pipe).split("(")
def additional_explain_lines(self, name: str, descr: str) -> str:
return f"{' '*self.current_margin}{_util.colorize_in_grey('└' + '─'*(self.margin_step - 1))}{_util.colorize_in_red(name)}({descr}\n"

def visit_any_pipe(self, pipe: _pipe.APipe) -> str:
def visit_any_pipe(self, pipe: _pipe.APipe, name: str, descr: str) -> str:
if self.add_header:
header = _util.bold(ExplainingVisitor.HEADER) + "\n"
self.add_header = False
else:
header = ""
additional_explain_lines = self.additional_explain_lines(pipe)
additional_explain_lines = self.additional_explain_lines(name, descr)
self.current_margin += self.margin_step
if pipe.upstream is not None:
upstream_repr = pipe.upstream._accept(self)
Expand All @@ -31,33 +30,55 @@ def visit_any_pipe(self, pipe: _pipe.APipe) -> str:
return f"{header}{additional_explain_lines}{upstream_repr}"

def visit_chain_pipe(self, pipe: _pipe.ChainPipe) -> Any:
additional_explain_lines = self.additional_explain_lines(pipe)
name = "Chain"
descr = f"{len(pipe.others)+1} pipes"
additional_explain_lines = self.additional_explain_lines(name, descr)
self.current_margin += self.margin_step
return f"{additional_explain_lines}{''.join(map(lambda pipe: pipe._accept(ExplainingVisitor(self.current_margin, add_header=False)), pipe.others))}{self.visit_any_pipe(pipe.upstream)}"
upstream_repr = pipe.upstream._accept(self)
chained_pipes_repr = ''.join(map(lambda pipe: pipe._accept(ExplainingVisitor(self.current_margin, add_header=False)), pipe.others))
return f"{additional_explain_lines}{chained_pipes_repr}{upstream_repr}"

def visit_source_pipe(self, pipe: _pipe.SourcePipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Source"
descr = f"of type: {type(pipe.source)}"
return self.visit_any_pipe(pipe, name, descr)

def visit_map_pipe(self, pipe: _pipe.MapPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Map"
descr = f"function of type {type(pipe.func)}, using {pipe.n_threads} thread{'s' if pipe.n_threads > 1 else ''}"
return self.visit_any_pipe(pipe, name, descr)

def visit_do_pipe(self, pipe: _pipe.DoPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Do"
descr = f"side effects by applying a function of type {type(pipe.func)}, using {pipe.n_threads} thread{'s' if pipe.n_threads > 1 else ''}"
return self.visit_any_pipe(pipe, name, descr)

def visit_flatten_pipe(self, pipe: _pipe.FlattenPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Flatten"
descr = f"using {pipe.n_threads} thread{'s' if pipe.n_threads > 1 else ''}"
return self.visit_any_pipe(pipe, name, descr)

def visit_filter_pipe(self, pipe: _pipe.FilterPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Filter"
descr = f"using predicate function of type {type(pipe.predicate)}"
return self.visit_any_pipe(pipe, name, descr)

def visit_batch_pipe(self, pipe: _pipe.BatchPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Batch"
descr = f"elements by groups of {pipe.size} element{'s' if pipe.size > 1 else ''}, or over a period of {pipe.period} second{'s' if pipe.period > 1 else ''}"
return self.visit_any_pipe(pipe, name, descr)

def visit_slow_pipe(self, pipe: _pipe.SlowPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Slow"
descr = f"at a maximum frequancy of {pipe.freq} element{'s' if pipe.freq > 1 else ''} per second"
return self.visit_any_pipe(pipe, name, descr)

def visit_catch_pipe(self, pipe: _pipe.CatchPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Catch"
descr = f"exception instances of class in [{', '.join(map(lambda class_: class_.__name__, pipe.classes))}]{', with an additional `when` condition' if pipe.when is not None else ''}"
return self.visit_any_pipe(pipe, name, descr)

def visit_log_pipe(self, pipe: _pipe.LogPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Log"
descr = f"the evolution of the ieration over {pipe.what}"
return self.visit_any_pipe(pipe, name, descr)

0 comments on commit 4d92efb

Please sign in to comment.