diff --git a/ndspflow/workflows/param.py b/ndspflow/workflows/param.py index 14a4ec4..7263b05 100644 --- a/ndspflow/workflows/param.py +++ b/ndspflow/workflows/param.py @@ -41,10 +41,17 @@ def run_subflows(wf, iterable, attr, axis=None, n_jobs=-1, progress=None): Nested results with shape (n_inputs, (n_grid_common,) (n_grid_unique,) n_params). """ - # If using an axis - if axis is not None and iterable is None: - iterable = np.swapaxes(deepcopy(wf.y_array).T, 0, axis) - del wf.y_array + # If y_array exists, pop to prevent full copies + # from being passed to each mp pool. + x_array = None + if iterable is None: + if axis is not None: + # If using an axis + iterable = np.swapaxes(deepcopy(wf.y_array).T, 0, axis) + else: + iterable = wf.y_array if wf.y_array.ndim > 1 else wf.y_array.reshape(1, -1) + wf.y_array = None + x_array = wf.x_array # Split shared and forked nodes nodes_common, nodes_unique = parse_nodes(wf.nodes) @@ -96,23 +103,33 @@ def run_subflows(wf, iterable, attr, axis=None, n_jobs=-1, progress=None): wf.grid_keys_common = keys_common wf.grid_keys_unique = keys_unique - if (wf.nodes[0][0] == 'transform' and + if attr != 'y_array' and (wf.nodes[0][0] == 'transform' and all([res.shape == results[0][0].shape for res in results[0][1:]])): # Transforms will produce stackable arrays results = np.stack(results) return results - shape_common = np.squeeze(np.array(grid_common)).shape - shape_unique = [len(np.unique(i)) for i in - np.vectorize(id)(np.squeeze(np.array(grid_unique))).T] + try: + shape_common = np.squeeze(np.array(grid_common)).shape + shape_unique = [len(np.unique(i)) for i in + np.vectorize(id)(np.squeeze(np.array(grid_unique))).T] - if np.all(np.array(shape_unique) == shape_unique[0]): - # Using forks - results = np.array(results) - else: - # Using Param objects - results = np.array(results, dtype='object') - results = results.reshape(len(iterable), *list(shape_common) + shape_unique) + if np.all(np.array(shape_unique) == shape_unique[0]): + # Using forks + results = np.array(results) + else: + # Using Param objects + results = np.array(results, dtype='object') + results = results.reshape(len(iterable), *list(shape_common) + shape_unique) + except ValueError: + # Inhomogeneous shape, give up and return a jagged array + results = np.array(results, dtype="object") + + # Reattach y_array + if isinstance(iterable, np.ndarray): + wf.y_array = iterable + if x_array is not None: + wf.x_array = x_array return results @@ -122,9 +139,6 @@ def _run_sub_wf(index, wf=None, attr=None, nodes_common_grid=None, nodes_unique_ from .workflow import WorkFlow - if isinstance(index, np.ndarray): - wf.y_array = index - wfs = [] for nodes_common in nodes_common_grid: @@ -144,7 +158,7 @@ def _run_sub_wf(index, wf=None, attr=None, nodes_common_grid=None, nodes_unique_ if wf_pre.x_array is not None: xs = wf_pre.x_array else: - ys = wf.y_array + ys = index xs = wf.x_array # Post fork workflow @@ -172,7 +186,8 @@ def _run_sub_wf(index, wf=None, attr=None, nodes_common_grid=None, nodes_unique_ wf_param.y_array = ys wf_param.x_array = xs wf_param.nodes = nodes - wf_param.run(n_jobs=1) + wf_param._try_parameterize = False + wf_param.run(n_jobs=1, attrs=wf.attrs) if wf_param.results is None and wf_param.x_array is None: # Workflow ended on a transform node. @@ -189,11 +204,7 @@ def _run_sub_wf(index, wf=None, attr=None, nodes_common_grid=None, nodes_unique_ wfs_sim.append(wfs_fit) wfs_sim = wfs_sim[0] if len(wfs_sim) == 1 else wfs_sim - - if isinstance(wfs_sim, list) and isinstance(wfs_sim[0], list): - wfs.append(list(map(list, zip(*wfs_sim)))) - else: - wfs.append(wfs_sim) + wfs.append(wfs_sim) wfs = wfs[0] if len(wfs) == 1 else wfs @@ -241,7 +252,7 @@ def parse_nodes(nodes): has_fork = False forks = {} in_fork = None - + i -= 1 for node in nodes[i:]: if not has_fork: @@ -265,6 +276,7 @@ def parse_nodes(nodes): nodes_fork = [] elif node[0] != 'fit': nodes_fork.append(node) + elif node[0] == 'fit': add_fork = in_fork is not None and in_fork in list(forks.keys()) if add_fork and len(nodes) == 0: @@ -425,10 +437,11 @@ def check_is_parameterized(args, kwargs): if isinstance(arg, Param): is_parameterized = True break - for v in kwargs.values(): - if isinstance(v, Param): - is_parameterized = True - break + + for v in kwargs.values(): + if isinstance(v, Param): + is_parameterized = True + break return is_parameterized diff --git a/ndspflow/workflows/sim.py b/ndspflow/workflows/sim.py index 253673f..1f454b1 100644 --- a/ndspflow/workflows/sim.py +++ b/ndspflow/workflows/sim.py @@ -53,7 +53,7 @@ def simulate(self, func, *args, operator='add', **kwargs): args, kwargs = parse_args(list(args), kwargs, self) is_parameterized = check_is_parameterized(args, kwargs) - + self.is_parameterized = True self.nodes.append(['simulate', func, args, {'operator': operator}, kwargs, is_parameterized]) diff --git a/ndspflow/workflows/transform.py b/ndspflow/workflows/transform.py index ea09039..c56a9e1 100644 --- a/ndspflow/workflows/transform.py +++ b/ndspflow/workflows/transform.py @@ -52,7 +52,7 @@ def transform(self, func, *args, axis=None, mode=None, **kwargs): """ is_parameterized = check_is_parameterized(args, kwargs) - + self.is_parameterized = True self.nodes.append(['transform', func, args, {'axis': axis}, kwargs, is_parameterized]) diff --git a/ndspflow/workflows/workflow.py b/ndspflow/workflows/workflow.py index daffac4..c2307db 100644 --- a/ndspflow/workflows/workflow.py +++ b/ndspflow/workflows/workflow.py @@ -100,6 +100,7 @@ def __init__(self, y_array=None, x_array=None, **kwargs): self.grid_unique = None self.grid_keys_common = None self.grid_keys_unique = None + self.is_parameterized = False def __call__(self, y_array, x_array=None): """Call class to update array inputs.""" @@ -108,8 +109,8 @@ def __call__(self, y_array, x_array=None): self.x_array = x_array - def run(self, axis=None, attrs=None, parameterize=False, flatten=False, - optimize=False, n_jobs=-1, progress=None): + def run(self, axis=None, attrs=None, flatten=False, + n_jobs=-1, progress=None): """Run workflow. Parameters @@ -119,10 +120,6 @@ def run(self, axis=None, attrs=None, parameterize=False, flatten=False, Identical to numpy axis arguments. attrs : list of str, optional, default: None Model attributes to return. - parameterize : bool, optional, default: False - Attempt to parameterize the workflow if True. - optimize : bool, optional, default: False - Optimize parameters of the flatten : bool, optional, default: False Flattens all models and attributes into a 1d array, per y_array. n_jobs : int, optional, default: -1 @@ -130,12 +127,17 @@ def run(self, axis=None, attrs=None, parameterize=False, flatten=False, progress : {None, tqdm.notebook.tqdm, tqdm.tqdm} Progress bar. """ - if parameterize: + + self.attrs = attrs + + # Infer if the workflow is parameterized + if self.is_parameterized: from .param import run_subflows iterable = None + # Iterate over seeds or subjects for attr in ['seeds', 'subjects']: iterable = getattr(self, attr) @@ -143,9 +145,21 @@ def run(self, axis=None, attrs=None, parameterize=False, flatten=False, if iterable is not None: break + if iterable is None and self.y_array is not None: + iterable = None + attr = 'y_array' + elif iterable is None: + raise ValueError("Either .y_array, .read_bids, or .simulate must be defined.") + + # Sub-flow will be ran and will jump back the to this method, + # this if-block should be skipped or infinite recursion will happen + self.is_parameterized = False + self.results = run_subflows(self, iterable, attr, axis=axis, n_jobs=n_jobs, progress=progress) + self.is_parameterized = True + return # Handle merges @@ -174,8 +188,6 @@ def run(self, axis=None, attrs=None, parameterize=False, flatten=False, self.y_array_stash = [None] * len(self.fork_inds) self.x_array_stash = [None] * len(self.fork_inds) - self.attrs = attrs - # Infer input array type origshape = None if self.y_array is not None and axis is not None: @@ -265,7 +277,10 @@ def run(self, axis=None, attrs=None, parameterize=False, flatten=False, return # Flatten should return an even array (unless models produce sparse results) - if flatten: + if isinstance(_results, (float, int)): + self.results = _results + return + elif flatten: self.results = np.array(_results) return @@ -432,7 +447,7 @@ def fit_transform(self, model, y_attrs=None, x_attrs=None, axis=None, queue=Fals # Fit self.fit(model, *fit_args, axis=axis, **fit_kwargs) - self.run(axis, y_attrs, False, True, n_jobs, progress) + self.run(axis, y_attrs, True, n_jobs, progress) # Transform self.y_array = self.results