"""
Pype class and supporting functions.
"""
import itertools
import time
from collections import defaultdict, deque
from contextlib import suppress
from copy import deepcopy
from types import MappingProxyType as MapProxy
from typing import Union, Dict, Optional, Callable, Any, List, Sequence, Hashable
from spype.callbacks import debug_callback
from spype.constants import CALLBACK_NAMES, HOW_ARGS
from spype.core import task
from spype.core import wrap
from spype.core.digraph import _WrapDiGraph
from spype.core.sbase import _SpypeBase
from spype.exceptions import UnresolvedDependency, TaskReturnedNone
from spype.utils import iterate, args_kwargs, de_args_kwargs
pype_input_type = Optional[Union["task.Task", "wrap.Wrap", "Pype", str]]
[docs]class Pype(_SpypeBase):
"""
Class to control the data flow between tasks.
Parameters
----------
arg
A task, pype, or any hashable that has been registered as a pype.
If hashable, a copy of the registered pype will be returned.
"""
_registered_pypes = {} # a store for optionally registering pypes
name = None
def __init__(self, arg: pype_input_type = None, name: Optional[Hashable] = None):
"""
"""
# graphs for data flow and data dependency
self.flow = _WrapDiGraph()
self.flow.add_wrap(task.pype_input.wrap())
self.dependencies = _WrapDiGraph()
# validation state and counts
self.validated = False
# dict for outputs of last call
self.outputs = {}
self._partials = {}
# add first task to pype
if arg is not None: # add task to pype
with suppress(TypeError):
arg = self.__class__._registered_pypes.get(arg, arg)
_connect_to_pype(self, arg, inplace=True)
self.register(name or getattr(arg, "name", None))
# --- task hookup
def __or__(self, other):
return _connect_to_pype(self, other)
def __ior__(self, other):
return _connect_to_pype(self, other, inplace=True)
def __and__(self, other):
return _connect_to_pype(self, other, how="first")
def __iand__(self, other):
return _connect_to_pype(self, other, how="first", inplace=True)
def __lshift__(self, other):
return _connect_to_pype(self, other, wrap_func=lambda x: x.fan())
def __rshift__(self, other):
def task_func(x):
return x.agg(scope="object")
return _connect_to_pype(self, other, wrap_func=task_func)
# --- call stuff
def __call__(self, *args, **kwargs):
if not self.validated: # validate pype if it is not already
self.validate()
_meta = self._create_run_dict() # create main control structure
# iterate over iterable, extract args and kwargs and run queue
que = deque()
que.append((self.flow.get_input_wrap(), (args, kwargs)))
self._run_queue(_meta, que)
# set results
self.outputs = _meta["outputs"]
return _meta["output"][0]
def _run_queue(self, _meta, que):
""" run the queue until complete """
# run que until complete or all tasks are waiting agg results
assert self.flow.get_input_wrap().task is task.pype_input
fixtures = MapProxy({"meta": _meta, "pype": self, **self._partials})
while len(que):
wrap_, (args, kwargs) = que.pop()
wrap_: wrap.Wrap
try:
output = wrap_(*args, **kwargs, _pype_fixtures=fixtures)
except UnresolvedDependency: # task needs to be put back
_meta["defer_count"][wrap_] += 1 # up task deferment counter
que.appendleft((wrap_, (args, kwargs)))
continue
except TaskReturnedNone: # task returned None
continue
else: # everything went fine
_meta["outputs"][wrap_.task] = output
for neighbor in self.flow.neighbors(wrap_): # queue neighbors
neighbor._queue_up(output, _meta, que, sending_wrap=wrap_)
# run tasks that waited for object scoped aggregations
self._run_aggregations(_meta, que)
_meta["output"].append(de_args_kwargs(*output))
def _run_aggregations(self, _meta, que):
""" run aggregated values """
# collect aggregated things and run queue again
for wrap_ in _meta["object_scope_map"]:
# dont run again if object scope is finished
if wrap_ in _meta["object_scope_finished"]:
continue
_meta["object_scope_finished"].add(wrap_) # mark as complete
needed_task = _meta["object_scope_map"][wrap_]
inputs = _meta["object_aggregates"][needed_task]
if len(inputs):
que.append((wrap_, args_kwargs(inputs)))
if len(que): # if there is anything on the queue run it again
self._run_queue(_meta, que)
def _create_run_dict(self) -> Dict:
"""
Creates the main control structure for running Spype
"""
out = dict(
time=time.time(), # start time of the call
outputs={}, # store for most recent output of each task
# a set of tasks that have completed aggregations on needed scope
object_scope_finished=set(),
# map of tasks that need outputs of other tasks for aggregating
object_scope_map={},
# map of tasks to their outputs for both applicable scopes
object_aggregates=defaultdict(list),
# a counter of how many times a task has been deferred
defer_count=defaultdict(lambda: 0),
output=[], # a list for storing results
pype_inputs=None, # store inputs for first pype
print_flow=getattr(self, "print_flow", False),
)
return out
# --- validation
[docs] def validate(self):
"""
Run checks on the pype to detect potential problems.
Will raise an InvalidPype exception if compatibility issues are found,
or a TypeError if any invalid callbacks are found.
"""
# validate task compatibility
self.flow.validate(extra_params=MapProxy(self._partials))
# validate callbacks
for wrap_ in self.flow.wraps:
wrap_._validate_callbacks()
# --- flow control attributes
[docs] def iff(self, predicate: Callable[[Any], bool], inplace=False) -> "Pype":
"""
Run data through the pype only if predicate evaluates to True.
Parameters
----------
predicate
A callable that returns a boolean and takes the same inputs as
the first task in the pype (excluding pype_input)
inplace
If True modify the pype in place, else return a pype with iff
applied.
Returns
-------
Pype
"""
pype = self if inplace else self.copy()
for wrap_ in pype._first_tasks: # iterate first tasks and set predicate
wrap_.iff(predicate)
return pype
# --- misc dunders
def __getitem__(self, item):
if isinstance(item, task.Task):
return _TaskView(item, self)
def __setitem__(self, key, value):
if isinstance(value, task.Task):
if value not in self.flow.tasks:
_connect_to_pype(self, value, how="first", inplace=True)
self._partials[key] = value
def __len__(self):
""" len should be equal to the number of nodes """
return len(self.flow.wraps)
def __str__(self):
nodes = [x.task for x in self.flow.wraps]
edges = [(x.task, y.task) for x, y in self.flow.edges]
deps = [(x.task, y.task) for x, y in self.dependencies.edges]
msg = (
f"Pype isntance\n\nNODES: \n\n{nodes} \n\n EDGES: \n\n"
f"{edges} \n\nDEPENDENCIES:\n\n {deps}\n"
)
return msg
def __repr__(self):
return str(self)
def __enter__(self):
return self.copy()
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def __deepcopy__(self, memo):
cls = self.__class__
result = cls.__new__(cls)
# ensue tasks are not copied
for task_ in self.flow.tasks:
memo[id(task_)] = task_
for k, v in self.__dict__.items():
setattr(result, k, deepcopy(v, memo))
return result
# --- visualization
[docs] def plot(self, file_name: Optional[str] = None, view: bool = True):
"""
Plot the graph
Parameters
----------
file_name
The name of the graph viz file output.
view
If True display the graph network.
Returns
-------
Instance of graphviz.Digraph
"""
return self.flow.plot(file_name, view)
# --- task stuff
@property
def _first_tasks(self) -> List["wrap.Wrap"]:
""" return all nodes (wraps) attached to pype input"""
input_wrap = self.flow.get_input_wrap()
return list(self.flow.neighbors(input_wrap))
@property
def _last_tasks(self):
"""
Return all the tasks that do not connect to other tasks.
"""
connected = {x for x in self.flow.node_map if self.flow.node_map[x]}
return set(self.flow.wraps) - connected
[docs] def add_callback(
self,
callback: callable,
callback_type: str,
tasks: Optional[Sequence["task.Task"]] = None,
) -> "Pype":
"""
Add a callback to all, or some, tasks in the pype. Return new Pype.
Parameters
----------
callback
The callable to attach to the tasks in the pype
callback_type
The type of callback: supported types are:
on_start, on_failure, on_success, and on_exception
tasks
A sequence of tasks to apply callback to, else apply to all tasks.
Returns
-------
A copy of Pype
"""
assert (
callback_type in CALLBACK_NAMES
), f"unsported callback type {callback_type}"
pype = self.copy()
# get a list of wraps to apply callbacks to
if tasks is None:
wraps = pype.flow.wraps
else:
wraps_ = [
iterate(selected_wraps)
for task in iterate(tasks)
for selected_wraps in pype.flow.tasks[task]
]
wraps = itertools.chain.from_iterable(wraps_)
# apply callbacks
for wrap_ in wraps:
setattr(wrap_, callback_type, callback)
return pype
[docs] def debug(
self, tasks: Optional[Sequence["task.Task"]] = None, callback_type="on_start"
) -> "Pype":
"""
Return of copy of Pype with debugging callbacks set.
Optionally, a list of tasks to set debug on can be defined to limit
the breakpoints to only include those tasks. The callback where to
debugger is called is also configurable.
Parameters
----------
tasks
If not None a task or sequence of tasks to debug. If None
debug all tasks.
callback_type
The callback to set debug function. Controls where in the
execution cycle debugging tasks place
Returns
-------
A copy of this pype.
"""
return self.add_callback(debug_callback, callback_type, tasks=tasks)
[docs] def register(self, name: Hashable) -> None:
"""
Register a pype under name.
Allows accessing the pype, or copies of it, later.
Parameters
----------
name
Any Non-None hashable
"""
if name is not None:
self.__class__._registered_pypes[name] = self
self.name = name
class _TaskView(_SpypeBase):
""" A class for selecting a task from a pype """
def __init__(self, task_: "task.Task", pype: Pype):
if len(pype.flow.tasks[task_]) != 1:
msg = (
f"pype has {len(pype.flow.tasks[task])} instances of {task}"
f"it must have exactly one to use this feature."
)
raise TypeError(msg)
self.task = task_
self.pype = pype
self.wrap = pype.flow.tasks[task_][0]
def __call__(self, *args, **kwargs):
return self.wrap(*args, **kwargs)
# --- overloaded operators
def __or__(self, other):
return _connect_to_pype(self.pype, other, how=self.task)
def __ior__(self, other):
return _connect_to_pype(self.pype, other, inplace=True, how=self.task)
def __lshift__(self, other):
def task_func(x):
return x.fan()
return _connect_to_pype(self.pype, other, wrap_func=task_func, how=self.task)
def __rshift__(self, other):
def task_func(x):
return x.agg(scope="object")
return _connect_to_pype(self.pype, other, wrap_func=task_func, how=self.task)
# ------------------- pype functions
pype_or_wrap = Union[Pype, "wrap.Wrap"]
pype_wrap_or_task = Union[Pype, "wrap.Wrap", "task.Task"]
def _get_attach_wraps(pype, how):
""" return a list of Wraps which should be connected based on how arg """
out = [] # list of tasks to be connected to other
for arg in iterate(how):
# make sure input is valid
assert how in HOW_ARGS or isinstance(arg, task.Task)
if isinstance(arg, task.Task):
assert arg in pype.flow.tasks and len(pype.flow.tasks[arg]) == 1
out.append(pype.flow.tasks[arg][0])
elif how == "last":
out += pype._last_tasks
elif how == "first":
out.append(pype.flow.tasks[task.pype_input][0])
return out
def _pype_to_pype(pype1, attach_tasks, pype2):
""" attach pype1 to pype2 on attach_tasks (list of tasks in pype1)
modifies pype1 in place """
pype1.flow = pype1.flow | pype2.flow
for wrap_ in pype2._first_tasks:
assert wrap_ in pype1.flow.wraps and wrap_ in pype1.flow.wraps
_wrap_to_pype(pype1, attach_tasks, wrap_)
def _wrap_to_pype(pype1: Pype, attach_wraps, wrap):
""" attach attach warps to wrap, modifies pype1 in place """
for wrap1 in attach_wraps:
if wrap.task is not task.pype_input: # pype_input must always be first
pype1.flow.add_edge(wrap1, wrap)
def _yield_first_wraps(obj: pype_or_wrap):
""" Yield the first wrap in obj, or obj if it is a Wrap instance """
if isinstance(obj, wrap.Wrap):
yield obj
else:
for first_wrap in obj._first_tasks:
yield first_wrap
def _apply_wrap_func(obj: pype_or_wrap, func: Callable[["wrap.Wrap"], None]):
""" apply func to the first wrap in obj if pype or to obj itself
if it is a wrap """
assert isinstance(obj, (Pype, wrap.Wrap)), f"{obj} is not a wrap or pype"
for wrap_ in _yield_first_wraps(obj):
func(wrap_)
def _route_to_pype(route):
""" recursively convert dict objects to pypes """
if isinstance(route, dict):
out = []
for predicate, task_like in route.items():
# make sure inputs are compatible
assert callable(predicate) or isinstance(predicate, bool)
out.append(_route_to_pype(task_like).iff(predicate))
return task.pype_input | tuple(out)
else:
assert hasattr(route, "iff")
return route
def _connect_to_pype(
pype: Pype,
other,
how: Union[str, "task.Task"] = "last",
inplace: bool = False,
wrap_func: Optional[Callable] = None,
):
"""
Add task or pype to the pype structure.
Parameters
----------
pype
Pype to join
other
Pype, Task, or Wrap instance to connect to pype.
how
How the connection should be done. Supported options are:
"first" : connect other to input_task of pype
"last" : connect other to last tasks in pype
Task instance : connect other to a specific task in pype
inplace
If False deepcopy pype before modfiying, else modify in place.
wrap_func
A function to call on the first wrap of other.
Returns
-------
Pype connectect
"""
pype1 = pype if inplace else deepcopy(pype)
# get attach points (where the other should be hooked)
attach_wraps = _get_attach_wraps(pype1, how)
# iterate items to be attached to pype
for oth in reversed(iterate(other)):
# handle route objects by converting them to pypes
if isinstance(oth, dict):
oth = _route_to_pype(oth)
# wrap or deepcopy to ensure data is ready for next step
oth = deepcopy(oth) if isinstance(oth, Pype) else _wrap_task(oth)
# apply task_func to other
if wrap_func is not None:
_apply_wrap_func(oth, wrap_func)
if isinstance(oth, Pype): # handle hooking up pypes
_pype_to_pype(pype1, attach_wraps, oth)
elif isinstance(oth, (task.Task, wrap.Wrap)): # hook up everything else
_wrap_to_pype(pype1, attach_wraps, oth)
# ensure input task was handled correctly
assert len(pype1.flow.tasks[task.pype_input]) == 1
assert task.pype_input in pype1.flow.tasks
assert pype1.flow.get_input_wrap().task is task.pype_input
return pype1
def _wrap_task(obj):
""" wrap up an object or sequence of objects"""
if isinstance(obj, task.Task):
return _wrap_task(wrap.Wrap(obj))
elif isinstance(obj, (list, tuple)):
return [_wrap_task(x) for x in obj]
assert isinstance(obj, wrap.Wrap), "non_wrap returned by _wrap_task"
return obj