Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Dec 3, 2023
1 parent 2013f9c commit 20ad0d1
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 65 deletions.
98 changes: 33 additions & 65 deletions kioss/_plan.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import itertools
from abc import ABC, abstractmethod
from typing import (
Any,
Expand All @@ -12,19 +11,26 @@
TypeVar,
)

from kioss import _exec, _concurrent_exec, _util
from kioss import _util, _visitor

T = TypeVar("T")
R = TypeVar("R")
V = TypeVar("V")


class APipe(Iterable[T], ABC):
_ITERATOR_GENERATING_VISITOR_CLASS: Type[_visitor.APipeVisitor[Iterator[T]]] = _visitor.IteratorGeneratingPipeVisitor

def __init__(self, upstream: "Optional[APipe[T]]" = None):
self.upstream = upstream
self.visitor = APipe._ITERATOR_GENERATING_VISITOR_CLASS()

@abstractmethod
def __iter__(self) -> Iterator[T]:
raise NotImplemented() # TODO: Visitor pattern
return self._accept(self.visitor)

@abstractmethod
def _accept(self, visitor: _visitor.APipeVisitor[V]) -> V:
raise NotImplemented()

def __add__(self, other: "APipe[T]") -> "APipe[T]":
return self.chain(other)
Expand Down Expand Up @@ -56,10 +62,7 @@ def map(
"""
APipe.sanitize_n_threads(n_threads)
func = _util.map_exception(func, source=StopIteration, target=RuntimeError)
if n_threads == 1:
return MapPipe[R](self, func)
else:
return ThreadedMapPipe[R](self, func, n_threads)
return MapPipe[R](self, func, n_threads)

def do(
self,
Expand Down Expand Up @@ -89,10 +92,7 @@ def flatten(
n_threads (int): The number of threads for concurrent execution (default is 1, meaning only the main thread is used).
"""
APipe.sanitize_n_threads(n_threads)
if n_threads == 1:
return FlattenPipe[R](self)
else:
return ThreadedFlattenPipe[R](self, n_threads)
return FlattenPipe[R](self, n_threads)

def chain(self, *others: "APipe[T]") -> "APipe[T]":
"""
Expand Down Expand Up @@ -253,75 +253,46 @@ def __init__(self, source: Callable[[], Iterator[T]]):
f"source must be a callable returning an iterator, but the provided source is not a callable: got source '{source}' of type {type(source)}."
)
self.source = source

def _accept(self, visitor: _visitor.APipeVisitor[V]) -> V:
return visitor.visitSourcePipe(self)

def __iter__(self) -> Iterator[T]:
iterator = self.source()
try:
# duck-type checks that the object returned by the source is an iterator
_util.duck_check_type_is_iterator(iterator)
except TypeError as e:
raise TypeError(
f"source must be a callable returning an iterator (implements __iter__ and __next__ methods), but the object resulting from a call to source() was not an iterator: got '{iterator}' of type {type(iterator)}."
) from e
return iterator


class FilterPipe(APipe[T]):
def __init__(self, upstream: APipe[T], predicate: Callable[[T], bool]):
super().__init__(upstream)
self.predicate = predicate

def __iter__(self) -> Iterator[T]:
return filter(self.predicate, iter(self.upstream))
def _accept(self, visitor: _visitor.APipeVisitor[V]) -> V:
return visitor.visitFilterPipe(self)


class MapPipe(APipe[R]):
def __init__(self, upstream: APipe[T], func: Callable[[T], R]):
super().__init__(upstream)
self.func = func

def __iter__(self) -> Iterator[R]:
return map(self.func, iter(self.upstream))


class ThreadedMapPipe(APipe[R]):
def __init__(self, upstream: APipe[T], func: Callable[[T], R], n_threads: int):
super().__init__(upstream)
self.func = func
self.n_threads = n_threads

def __iter__(self) -> Iterator[R]:
return _concurrent_exec.ThreadedMappingIteratorWrapper(
iter(self.upstream), self.func, n_workers=self.n_threads
)

def _accept(self, visitor: _visitor.APipeVisitor[V]) -> V:
return visitor.visitMapPipe(self)

class LogPipe(APipe[T]):
def __init__(self, upstream: APipe[T], what: str = "elements"):
super().__init__(upstream)
self.what = what

def __iter__(self) -> Iterator[T]:
return _exec.LoggingIteratorWrapper(iter(self.upstream), self.what)
def _accept(self, visitor: _visitor.APipeVisitor[V]) -> V:
return visitor.visitLogPipe(self)


class FlattenPipe(APipe[T]):
def __init__(self, upstream: APipe[T]):
super().__init__(upstream)

def __iter__(self) -> Iterator[T]:
return _exec.FlatteningIteratorWrapper(iter(self.upstream))


class ThreadedFlattenPipe(APipe[R]):
def __init__(self, upstream: APipe[T], n_threads: int):
super().__init__(upstream)
self.n_threads = n_threads

def __iter__(self) -> Iterator[R]:
return _concurrent_exec.ThreadedFlatteningIteratorWrapper(
iter(self.upstream), n_workers=self.n_threads
)
def _accept(self, visitor: _visitor.APipeVisitor[V]) -> V:
return visitor.visitFlattenPipe(self)


class BatchPipe(APipe[T]):
Expand All @@ -330,11 +301,8 @@ def __init__(self, upstream: APipe[T], size: int, period: float):
self.size = size
self.period = period

def __iter__(self) -> Iterator[T]:
return _exec.BatchingIteratorWrapper(
iter(self.upstream), self.size, self.period
)

def _accept(self, visitor: _visitor.APipeVisitor[V]) -> V:
return visitor.visitFlattenPipe(self)

class CatchPipe(APipe[T]):
def __init__(
Expand All @@ -347,25 +315,25 @@ def __init__(
self.classes = classes
self.when = when

def __iter__(self) -> Iterator[T]:
return _exec.CatchingIteratorWrapper(
iter(self.upstream), self.classes, self.when
)
def _accept(self, visitor: _visitor.APipeVisitor[V]) -> V:
return visitor.visitCatchPipe(self)


class ChainPipe(APipe[T]):
def __init__(self, upstream: APipe[T], others: List[APipe]):
super().__init__(upstream)
self.others = others

def __iter__(self) -> Iterator[T]:
return itertools.chain(iter(self.upstream), *list(map(iter, self.others)))
def _accept(self, visitor: _visitor.APipeVisitor[V]) -> V:
return visitor.visitChainPipe(self)


class SlowPipe(APipe[T]):
def __init__(self, upstream: APipe[T], freq: float):
super().__init__(upstream)
self.freq = freq

def __iter__(self) -> Iterator[T]:
return _exec.SlowingIteratorWrapper(iter(self.upstream), self.freq)
def _accept(self, visitor: _visitor.APipeVisitor[V]) -> V:
return visitor.visitSlowPipe(self)

a = SourcePipe(range(8).__iter__).do(lambda e:e).map(str).do(print)._accept(APipe._ITERATOR_GENERATING_VISITOR_CLASS())
102 changes: 102 additions & 0 deletions kioss/_visitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import itertools
from abc import ABC, abstractmethod
from typing import (
Any,
Generic,
Iterator,
TypeVar,
)

from kioss import _exec, _concurrent_exec, _util, _plan

V = TypeVar("V")
T = TypeVar("T")


class APipeVisitor(Generic[V], ABC):
@abstractmethod
def visitSourcePipe(self, pipe: _plan.SourcePipe) -> V:
raise NotImplemented()

@abstractmethod
def visitMapPipe(self, pipe: _plan.MapPipe) -> V:
raise NotImplemented()

@abstractmethod
def visitFlattenPipe(self, pipe: _plan.FlattenPipe) -> V:
raise NotImplemented()

@abstractmethod
def visitChainPipe(self, pipe: _plan.ChainPipe) -> V:
raise NotImplemented()

@abstractmethod
def visitFilterPipe(self, pipe: _plan.FilterPipe) -> V:
raise NotImplemented()

@abstractmethod
def visitBatchPipe(self, pipe: _plan.BatchPipe) -> V:
raise NotImplemented()

@abstractmethod
def visitSlowPipe(self, pipe: _plan.SlowPipe) -> V:
raise NotImplemented()

@abstractmethod
def visitCatchPipe(self, pipe: _plan.CatchPipe) -> V:
raise NotImplemented()

@abstractmethod
def visitLogPipe(self, pipe: _plan.LogPipe) -> V:
raise NotImplemented()

class IteratorGeneratingPipeVisitor(APipeVisitor[Iterator[T]]):

def visitSourcePipe(self, pipe: _plan.SourcePipe[T]) -> Iterator[T]:
iterator = pipe.source()
try:
# duck-type checks that the object returned by the source is an iterator
_util.duck_check_type_is_iterator(iterator)
except TypeError as e:
raise TypeError(
f"source must be a callable returning an iterator (implements __iter__ and __next__ methods), but the object resulting from a call to source() was not an iterator: got '{iterator}' of type {type(iterator)}."
) from e
return iterator

def visitMapPipe(self, pipe: _plan.MapPipe[T]) -> Iterator[T]:
if pipe.n_threads == 1:
return map(pipe.func, pipe.upstream._accept(self))
else:
return _concurrent_exec.ThreadedMappingIteratorWrapper(
pipe.upstream._accept(self), pipe.func, n_workers=pipe.n_threads
)

def visitFlattenPipe(self, pipe: _plan.FlattenPipe[T]) -> Iterator[T]:
if pipe.n_threads == 1:
return _exec.FlatteningIteratorWrapper(pipe.upstream._accept(self))
else:
return _concurrent_exec.ThreadedFlatteningIteratorWrapper(
pipe.upstream._accept(self), n_workers=pipe.n_threads
)

def visitChainPipe(self, pipe: _plan.ChainPipe[T]) -> Iterator[T]:
return itertools.chain(pipe.upstream._accept(self), *list(map(_plan.APipe._accept, pipe.others)))

def visitFilterPipe(self, pipe: _plan.FilterPipe[T]) -> Iterator[T]:
return filter(pipe.predicate, pipe.upstream._accept(self))

def visitBatchPipe(self, pipe: _plan.BatchPipe[T]) -> Iterator[T]:
return _exec.BatchingIteratorWrapper(
pipe.upstream._accept(self), pipe.size, pipe.period
)

def visitSlowPipe(self, pipe: _plan.SlowPipe[T]) -> Iterator[T]:
return _exec.SlowingIteratorWrapper(pipe.upstream._accept(self), pipe.freq)

def visitCatchPipe(self, pipe: _plan.CatchPipe[T]) -> Iterator[T]:
return _exec.CatchingIteratorWrapper(
pipe.upstream._accept(self), pipe.classes, pipe.when
)

def visitLogPipe(self, pipe: _plan.LogPipe[T]) -> Iterator[T]:
return _exec.LoggingIteratorWrapper(pipe.upstream._accept(self), pipe.what)

0 comments on commit 20ad0d1

Please sign in to comment.