Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] Parameterization #34

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 43 additions & 30 deletions ndspflow/workflows/param.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,17 @@ def run_subflows(wf, iterable, attr, axis=None, n_jobs=-1, progress=None):
Nested results with shape (n_inputs, (n_grid_common,) (n_grid_unique,) n_params).
"""

# If using an axis
if axis is not None and iterable is None:
iterable = np.swapaxes(deepcopy(wf.y_array).T, 0, axis)
del wf.y_array
# If y_array exists, pop to prevent full copies
# from being passed to each mp pool.
x_array = None
if iterable is None:
if axis is not None:
# If using an axis
iterable = np.swapaxes(deepcopy(wf.y_array).T, 0, axis)
else:
iterable = wf.y_array if wf.y_array.ndim > 1 else wf.y_array.reshape(1, -1)
wf.y_array = None
x_array = wf.x_array

# Split shared and forked nodes
nodes_common, nodes_unique = parse_nodes(wf.nodes)
Expand Down Expand Up @@ -96,23 +103,33 @@ def run_subflows(wf, iterable, attr, axis=None, n_jobs=-1, progress=None):
wf.grid_keys_common = keys_common
wf.grid_keys_unique = keys_unique

if (wf.nodes[0][0] == 'transform' and
if attr != 'y_array' and (wf.nodes[0][0] == 'transform' and
all([res.shape == results[0][0].shape for res in results[0][1:]])):
# Transforms will produce stackable arrays
results = np.stack(results)
return results

shape_common = np.squeeze(np.array(grid_common)).shape
shape_unique = [len(np.unique(i)) for i in
np.vectorize(id)(np.squeeze(np.array(grid_unique))).T]
try:
shape_common = np.squeeze(np.array(grid_common)).shape
shape_unique = [len(np.unique(i)) for i in
np.vectorize(id)(np.squeeze(np.array(grid_unique))).T]

if np.all(np.array(shape_unique) == shape_unique[0]):
# Using forks
results = np.array(results)
else:
# Using Param objects
results = np.array(results, dtype='object')
results = results.reshape(len(iterable), *list(shape_common) + shape_unique)
if np.all(np.array(shape_unique) == shape_unique[0]):
# Using forks
results = np.array(results)
else:
# Using Param objects
results = np.array(results, dtype='object')
results = results.reshape(len(iterable), *list(shape_common) + shape_unique)
except ValueError:
# Inhomogeneous shape, give up and return a jagged array
results = np.array(results, dtype="object")

# Reattach y_array
if isinstance(iterable, np.ndarray):
wf.y_array = iterable
if x_array is not None:
wf.x_array = x_array

return results

Expand All @@ -122,9 +139,6 @@ def _run_sub_wf(index, wf=None, attr=None, nodes_common_grid=None, nodes_unique_

from .workflow import WorkFlow

if isinstance(index, np.ndarray):
wf.y_array = index

wfs = []

for nodes_common in nodes_common_grid:
Expand All @@ -144,7 +158,7 @@ def _run_sub_wf(index, wf=None, attr=None, nodes_common_grid=None, nodes_unique_
if wf_pre.x_array is not None:
xs = wf_pre.x_array
else:
ys = wf.y_array
ys = index
xs = wf.x_array

# Post fork workflow
Expand Down Expand Up @@ -172,7 +186,8 @@ def _run_sub_wf(index, wf=None, attr=None, nodes_common_grid=None, nodes_unique_
wf_param.y_array = ys
wf_param.x_array = xs
wf_param.nodes = nodes
wf_param.run(n_jobs=1)
wf_param._try_parameterize = False
wf_param.run(n_jobs=1, attrs=wf.attrs)

if wf_param.results is None and wf_param.x_array is None:
# Workflow ended on a transform node.
Expand All @@ -189,11 +204,7 @@ def _run_sub_wf(index, wf=None, attr=None, nodes_common_grid=None, nodes_unique_
wfs_sim.append(wfs_fit)

wfs_sim = wfs_sim[0] if len(wfs_sim) == 1 else wfs_sim

if isinstance(wfs_sim, list) and isinstance(wfs_sim[0], list):
wfs.append(list(map(list, zip(*wfs_sim))))
else:
wfs.append(wfs_sim)
wfs.append(wfs_sim)

wfs = wfs[0] if len(wfs) == 1 else wfs

Expand Down Expand Up @@ -241,7 +252,7 @@ def parse_nodes(nodes):
has_fork = False
forks = {}
in_fork = None

i -= 1
for node in nodes[i:]:

if not has_fork:
Expand All @@ -265,6 +276,7 @@ def parse_nodes(nodes):
nodes_fork = []
elif node[0] != 'fit':
nodes_fork.append(node)

elif node[0] == 'fit':
add_fork = in_fork is not None and in_fork in list(forks.keys())
if add_fork and len(nodes) == 0:
Expand Down Expand Up @@ -425,10 +437,11 @@ def check_is_parameterized(args, kwargs):
if isinstance(arg, Param):
is_parameterized = True
break
for v in kwargs.values():
if isinstance(v, Param):
is_parameterized = True
break

for v in kwargs.values():
if isinstance(v, Param):
is_parameterized = True
break

return is_parameterized

Expand Down
2 changes: 1 addition & 1 deletion ndspflow/workflows/sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def simulate(self, func, *args, operator='add', **kwargs):
args, kwargs = parse_args(list(args), kwargs, self)

is_parameterized = check_is_parameterized(args, kwargs)

self.is_parameterized = True
self.nodes.append(['simulate', func, args,
{'operator': operator}, kwargs, is_parameterized])

Expand Down
2 changes: 1 addition & 1 deletion ndspflow/workflows/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def transform(self, func, *args, axis=None, mode=None, **kwargs):
"""

is_parameterized = check_is_parameterized(args, kwargs)

self.is_parameterized = True
self.nodes.append(['transform', func, args,
{'axis': axis}, kwargs, is_parameterized])

Expand Down
37 changes: 26 additions & 11 deletions ndspflow/workflows/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def __init__(self, y_array=None, x_array=None, **kwargs):
self.grid_unique = None
self.grid_keys_common = None
self.grid_keys_unique = None
self.is_parameterized = False

def __call__(self, y_array, x_array=None):
"""Call class to update array inputs."""
Expand All @@ -108,8 +109,8 @@ def __call__(self, y_array, x_array=None):
self.x_array = x_array


def run(self, axis=None, attrs=None, parameterize=False, flatten=False,
optimize=False, n_jobs=-1, progress=None):
def run(self, axis=None, attrs=None, flatten=False,
n_jobs=-1, progress=None):
"""Run workflow.

Parameters
Expand All @@ -119,33 +120,46 @@ def run(self, axis=None, attrs=None, parameterize=False, flatten=False,
Identical to numpy axis arguments.
attrs : list of str, optional, default: None
Model attributes to return.
parameterize : bool, optional, default: False
Attempt to parameterize the workflow if True.
optimize : bool, optional, default: False
Optimize parameters of the
flatten : bool, optional, default: False
Flattens all models and attributes into a 1d array, per y_array.
n_jobs : int, optional, default: -1
Number of jobs to run in parallel.
progress : {None, tqdm.notebook.tqdm, tqdm.tqdm}
Progress bar.
"""
if parameterize:

self.attrs = attrs

# Infer if the workflow is parameterized
if self.is_parameterized:

from .param import run_subflows

iterable = None

# Iterate over seeds or subjects
for attr in ['seeds', 'subjects']:

iterable = getattr(self, attr)

if iterable is not None:
break

if iterable is None and self.y_array is not None:
iterable = None
attr = 'y_array'
elif iterable is None:
raise ValueError("Either .y_array, .read_bids, or .simulate must be defined.")

# Sub-flow will be ran and will jump back the to this method,
# this if-block should be skipped or infinite recursion will happen
self.is_parameterized = False

self.results = run_subflows(self, iterable, attr, axis=axis,
n_jobs=n_jobs, progress=progress)

self.is_parameterized = True

return

# Handle merges
Expand Down Expand Up @@ -174,8 +188,6 @@ def run(self, axis=None, attrs=None, parameterize=False, flatten=False,
self.y_array_stash = [None] * len(self.fork_inds)
self.x_array_stash = [None] * len(self.fork_inds)

self.attrs = attrs

# Infer input array type
origshape = None
if self.y_array is not None and axis is not None:
Expand Down Expand Up @@ -265,7 +277,10 @@ def run(self, axis=None, attrs=None, parameterize=False, flatten=False,
return

# Flatten should return an even array (unless models produce sparse results)
if flatten:
if isinstance(_results, (float, int)):
self.results = _results
return
elif flatten:
self.results = np.array(_results)
return

Expand Down Expand Up @@ -432,7 +447,7 @@ def fit_transform(self, model, y_attrs=None, x_attrs=None, axis=None, queue=Fals

# Fit
self.fit(model, *fit_args, axis=axis, **fit_kwargs)
self.run(axis, y_attrs, False, True, n_jobs, progress)
self.run(axis, y_attrs, True, n_jobs, progress)

# Transform
self.y_array = self.results
Expand Down