diff --git a/.travis.yml b/.travis.yml index f308baf6..5d29ff74 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,14 +19,14 @@ env: # mdanalysis develop from source (see below), which needs # minimal CONDA_MDANALYSIS_DEPENDENCIES #- CONDA_DEPENDENCIES="mdanalysis mdanalysistests dask joblib pytest-pep8 mock codecov cython hypothesis sphinx" - #- CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" + - CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" - CONDA_MDANALYSIS_DEPENDENCIES="mdanalysis mdanalysistests" - CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask distributed joblib pytest-pep8 mock codecov" - CONDA_CHANNELS='conda-forge' - CONDA_CHANNEL_PRIORITY=True # install development version of MDAnalysis (needed until the test # files for analysis.rdf are available in release 0.19.0) - #- PIP_DEPENDENCIES="git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysis&subdirectory=package git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysistests&subdirectory=testsuite" + - PIP_DEPENDENCIES="git+https://github.com/yuxuanzhuang/mdanalysis.git@serialize_io&subdirectory=package git+https://github.com/yuxuanzhuang/mdanalysis.git@serialize_io&#egg=mdanalysistests&subdirectory=testsuite" - NUMPY_VERSION=stable - BUILD_CMD='python setup.py develop' - CODECOV=false diff --git a/pmda/custom.py b/pmda/custom.py index 23056def..0159feba 100644 --- a/pmda/custom.py +++ b/pmda/custom.py @@ -18,7 +18,6 @@ """ from __future__ import absolute_import -from MDAnalysis.core.groups import AtomGroup from MDAnalysis.core.universe import Universe from MDAnalysis.coordinates.base import ProtoReader import numpy as np @@ -75,32 +74,19 @@ def __init__(self, function, universe, *args, **kwargs): analyze can not be passed as keyword arguments currently. """ - self.function = function - - # collect all atomgroups with the same trajectory object as universe - trajectory = universe.trajectory - arg_ags = [] - self.other_args = [] - for arg in args: - if isinstance(arg, - AtomGroup) and arg.universe.trajectory == trajectory: - arg_ags.append(arg) - else: - self.other_args.append(arg) - - super(AnalysisFromFunction, self).__init__(universe, arg_ags) + super().__init__(universe) + self.args = args self.kwargs = kwargs def _prepare(self): - self.results = [] + self._results = [None] * self.n_frames - def _single_frame(self, ts, atomgroups): - args = atomgroups + self.other_args - return self.function(*args, **self.kwargs) + def _single_frame(self): + self._results[self._frame_index] = self.function(*self.args, **self.kwargs) def _conclude(self): - self.results = np.concatenate(self._results) + self.results = self._results def analysis_class(function): @@ -144,13 +130,12 @@ def analysis_class(function): class WrapperClass(AnalysisFromFunction): """Custom Analysis Function""" - def __init__(self, trajectory=None, *args, **kwargs): - if not (isinstance(trajectory, ProtoReader) or isinstance( - trajectory, Universe)): - print(type(trajectory)) + def __init__(self, universe=None, *args, **kwargs): + if not isinstance(universe, Universe): + print(type(universe)) raise ValueError( - "First argument needs to be an MDAnalysis reader object.") - super(WrapperClass, self).__init__(function, trajectory, *args, - **kwargs) + "First argument needs to be an MDAnalysis Universe.") + super().__init__(function, universe, *args, + **kwargs) return WrapperClass diff --git a/pmda/density.py b/pmda/density.py index 461ae1f7..60ce17dd 100644 --- a/pmda/density.py +++ b/pmda/density.py @@ -240,8 +240,8 @@ def __init__(self, atomgroup, delta=1.0, atomselection=None, metadata=None, padding=2.0, updating=False, parameters=None, gridcenter=None, xdim=None, ydim=None, zdim=None): - u = atomgroup.universe - super(DensityAnalysis, self).__init__(u, (atomgroup, )) + universe = atomgroup.universe + super().__init__(universe) self._atomgroup = atomgroup self._delta = delta self._atomselection = atomselection @@ -253,7 +253,7 @@ def __init__(self, atomgroup, delta=1.0, atomselection=None, self._xdim = xdim self._ydim = ydim self._zdim = zdim - self._trajectory = u.trajectory + self._trajectory = universe.trajectory if updating and atomselection is None: raise ValueError("updating=True requires a atomselection string") elif not updating and atomselection is not None: @@ -289,20 +289,31 @@ def _prepare(self): grid, edges = np.histogramdd(np.zeros((1, 3)), bins=bins, range=arange, normed=False) grid *= 0.0 - self._grid = grid + + self._results = [grid] * self.n_frames self._edges = edges self._arange = arange self._bins = bins - def _single_frame(self, ts, atomgroups): - coord = self.current_coordinates(atomgroups[0], self._atomselection, - self._updating) - result = np.histogramdd(coord, bins=self._bins, range=self._arange, - normed=False) - return result[0] + def _single_frame(self): + h, _ = np.histogramdd(self._atomgroup.positions, + bins=self._bins, range=self._arange, + normed=False) + # reduce (proposed change #2542 to match the parallel version in pmda.density) + # return self._reduce(self._grid, h) + # + # serial code can simply do + + # the current timestep of the trajectory is self._ts +# self._results[self._frame_index][0] = self._ts.frame + # the actual trajectory is at self._trajectory +# self._results[self._frame_index][1] = self._trajectory.time + self._results[self._frame_index] = h def _conclude(self): - self._grid = self._results[:].sum(axis=0) + + # sum both inside and among blocks. + self._grid = self._results[:].sum(axis=(0, 1)) self._grid /= float(self.n_frames) metadata = self._metadata if self._metadata is not None else {} metadata['psf'] = self._atomgroup.universe.filename @@ -322,14 +333,6 @@ def _conclude(self): density.make_density() self.density = density - @staticmethod - def _reduce(res, result_single_frame): - """ 'accumulate' action for a time series""" - if isinstance(res, list) and len(res) == 0: - res = result_single_frame - else: - res += result_single_frame - return res @staticmethod def current_coordinates(atomgroup, atomselection, updating): diff --git a/pmda/leaflet.py b/pmda/leaflet.py index b0e0bbed..efc38d88 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -76,7 +76,7 @@ def __init__(self, universe, atomgroups): super(LeafletFinder, self).__init__(universe, (atomgroups,)) - def _find_connected_components(self, data, cutoff=15.0): + def _find_connected_components(self, data_list, cutoff=15.0): """Perform the Connected Components discovery for the atoms in data. Parameters @@ -99,62 +99,66 @@ def _find_connected_components(self, data, cutoff=15.0): """ # pylint: disable=unsubscriptable-object - window, index = data[0] - num = window[0].shape[0] - i_index = index[0] - j_index = index[1] - graph = nx.Graph() - - if i_index == j_index: - train = window[0] - test = window[1] - else: - train = np.vstack([window[0], window[1]]) - test = np.vstack([window[0], window[1]]) - - tree = cKDTree(train, leafsize=40) - edges = tree.query_ball_point(test, cutoff) - edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list)) - for idx, dest_list in enumerate(edges)] - - edge_list_flat = np.array([list(item) for sublist in edge_list for - item in sublist]) - - if i_index == j_index: - res = edge_list_flat.transpose() - res[0] = res[0] + i_index - 1 - res[1] = res[1] + j_index - 1 - else: - removed_elements = list() - for i in range(edge_list_flat.shape[0]): - if (edge_list_flat[i, 0] >= 0 and - edge_list_flat[i, 0] <= num - 1) and \ - (edge_list_flat[i, 1] >= 0 and - edge_list_flat[i, 1] <= num - 1) or \ - (edge_list_flat[i, 0] >= num and - edge_list_flat[i, 0] <= 2 * num - 1) and \ - (edge_list_flat[i, 1] >= num and - edge_list_flat[i, 1] <= 2 * num - 1) or \ - (edge_list_flat[i, 0] >= num and - edge_list_flat[i, 0] <= 2 * num - 1) and \ - (edge_list_flat[i, 1] >= 0 and - edge_list_flat[i, 1] <= num - 1): - removed_elements.append(i) - res = np.delete(edge_list_flat, removed_elements, - axis=0).transpose() - res[0] = res[0] + i_index - 1 - res[1] = res[1] - num + j_index - 1 - if res.shape[1] == 0: - res = np.zeros((2, 1), dtype=np.int) - - edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])] - graph.add_edges_from(edges) - - # partial connected components - - subgraphs = nx.connected_components(graph) - comp = [g for g in subgraphs] - return comp + #raise TypeError(data) + comp_s = [] + for data in data_list: + window, index = data + num = window[0].shape[0] + i_index = index[0] + j_index = index[1] + graph = nx.Graph() + + if i_index == j_index: + train = window[0] + test = window[1] + else: + train = np.vstack([window[0], window[1]]) + test = np.vstack([window[0], window[1]]) + + tree = cKDTree(train, leafsize=40) + edges = tree.query_ball_point(test, cutoff) + edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list)) + for idx, dest_list in enumerate(edges)] + + edge_list_flat = np.array([list(item) for sublist in edge_list for + item in sublist]) + + if i_index == j_index: + res = edge_list_flat.transpose() + res[0] = res[0] + i_index - 1 + res[1] = res[1] + j_index - 1 + else: + removed_elements = list() + for i in range(edge_list_flat.shape[0]): + if (edge_list_flat[i, 0] >= 0 and + edge_list_flat[i, 0] <= num - 1) and \ + (edge_list_flat[i, 1] >= 0 and + edge_list_flat[i, 1] <= num - 1) or \ + (edge_list_flat[i, 0] >= num and + edge_list_flat[i, 0] <= 2 * num - 1) and \ + (edge_list_flat[i, 1] >= num and + edge_list_flat[i, 1] <= 2 * num - 1) or \ + (edge_list_flat[i, 0] >= num and + edge_list_flat[i, 0] <= 2 * num - 1) and \ + (edge_list_flat[i, 1] >= 0 and + edge_list_flat[i, 1] <= num - 1): + removed_elements.append(i) + res = np.delete(edge_list_flat, removed_elements, + axis=0).transpose() + res[0] = res[0] + i_index - 1 + res[1] = res[1] - num + j_index - 1 + if res.shape[1] == 0: + res = np.zeros((2, 1), dtype=np.int) + + edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])] + graph.add_edges_from(edges) + + # partial connected components + + subgraphs = nx.connected_components(graph) + comp = [g for g in subgraphs] + comp_s.append(comp) + return comp_s # pylint: disable=arguments-differ def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs, @@ -200,12 +204,13 @@ def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs, # Distribute the data over the available cores, apply the map function # and execute. parAtoms = db.from_sequence(arranged_coord, - npartitions=len(arranged_coord)) + npartitions=n_jobs) parAtomsMap = parAtoms.map_partitions(self._find_connected_components, cutoff=cutoff) Components = parAtomsMap.compute(**scheduler_kwargs) # Gather the results and start the reduction. TODO: think if it can go + Components = [item for sublist in Components for item in sublist] # to the private _reduce method of the based class. result = list(Components) diff --git a/pmda/parallel.py b/pmda/parallel.py index 636c7d5b..3eb3fc3e 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -35,7 +35,7 @@ class Timing(object): store various timeing results of obtained during a parallel analysis run """ - def __init__(self, io, compute, total, universe, prepare, + def __init__(self, io, compute, total, prepare, prepare_dask, conclude, wait=None, io_block=None, compute_block=None): self._io = io @@ -44,8 +44,8 @@ def __init__(self, io, compute, total, universe, prepare, self._compute_block = compute_block self._total = total self._cumulate = np.sum(io) + np.sum(compute) - self._universe = universe self._prepare = prepare + self._prepare_dask = prepare_dask self._conclude = conclude self._wait = wait @@ -83,16 +83,16 @@ def cumulate_time(self): """ return self._cumulate - @property - def universe(self): - """time to create a universe for each block""" - return self._universe - @property def prepare(self): """time to prepare""" return self._prepare + @property + def prepare_dask(self): + """time for blocks to start working""" + return self._prepare_dask + @property def conclude(self): """time to conclude""" @@ -189,7 +189,7 @@ def _reduce(res, result_single_frame): """ - def __init__(self, universe, atomgroups): + def __init__(self, universe): """Parameters ---------- Universe : :class:`~MDAnalysis.core.groups.Universe` @@ -207,10 +207,8 @@ def __init__(self, universe, atomgroups): :meth:`pmda.parallel.ParallelAnalysisBase._single_frame`. """ + self._universe = universe self._trajectory = universe.trajectory - self._top = universe.filename - self._traj = universe.trajectory.filename - self._indices = [ag.indices for ag in atomgroups] @contextmanager def readonly_attributes(self): @@ -232,7 +230,10 @@ def __setattr__(self, key, val): # guards to stop people assigning to self when they shouldn't # if locked, the only attribute you can modify is _attr_lock # if self._attr_lock isn't set, default to unlocked - if key == '_attr_lock' or not getattr(self, '_attr_lock', False): + + # Current version depends on modifying the attributes. + # but adding key == '_frame_index' below does not work somehow. + if key == '_attr_lock' or not getattr(self, '_attr_lock', False) or True: super(ParallelAnalysisBase, self).__setattr__(key, val) else: # raise HalError("I'm sorry Dave, I'm afraid I can't do that") @@ -251,9 +252,9 @@ def _conclude(self): def _prepare(self): """additional preparation to run""" - pass # pylint: disable=unnecessary-pass + self._results = [None] * self.n_frames - def _single_frame(self, ts, atomgroups): + def _single_frame(self): """Perform computation on a single trajectory frame. Must return computed values as a list. You can only **read** @@ -348,8 +349,8 @@ def run(self, if scheduler == 'processes': scheduler_kwargs['num_workers'] = n_jobs - start, stop, step = self._trajectory.check_slice_indices(start, - stop, step) + start, stop, step = self._universe.trajectory.check_slice_indices(start, + stop, step) n_frames = len(range(start, stop, step)) self.start, self.stop, self.step = start, stop, step @@ -374,22 +375,23 @@ def run(self, blocks = [] _blocks = [] with self.readonly_attributes(): - for bslice in slices: - task = delayed( - self._dask_helper, pure=False)( - bslice, - self._indices, - self._top, - self._traj, ) - blocks.append(task) - # save the frame numbers for each block - _blocks.append(range(bslice.start, - bslice.stop, bslice.step)) - blocks = delayed(blocks) + with timeit() as prepare_dask: + for bslice in slices: + task = delayed( + self._dask_helper, pure=False)( + bslice, + ) + blocks.append(task) + # save the frame numbers for each block + _blocks.append(range(bslice.start, + bslice.stop, bslice.step)) + blocks = delayed(blocks) + time_prepare_dask = prepare_dask.elapsed # record the time when scheduler starts working wait_start = time.time() res = blocks.compute(**scheduler_kwargs) + self._res_dask = res # hack to handle n_frames == 0 in this framework if len(res) == 0: # everything else wants list of block tuples @@ -404,47 +406,48 @@ def run(self, self.timing = Timing( np.hstack([el[1] for el in res]), np.hstack([el[2] for el in res]), total.elapsed, - np.array([el[3] for el in res]), time_prepare, + time_prepare, + time_prepare_dask, conclude.elapsed, # waiting time = wait_end - wait_start - np.array([el[4]-wait_start for el in res]), - np.array([el[5] for el in res]), - np.array([el[6] for el in res])) + np.array([el[3]-wait_start for el in res]), + np.array([el[4] for el in res]), + np.array([el[5] for el in res])) return self - def _dask_helper(self, bslice, indices, top, traj): + def _dask_helper(self, bslice): """helper function to actually setup dask graph""" # wait_end needs to be first line for accurate timing wait_end = time.time() - # record time to generate universe and atom groups - with timeit() as b_universe: - u = mda.Universe(top, traj) - agroups = [u.atoms[idx] for idx in indices] - - res = [] times_io = [] times_compute = [] # NOTE: bslice.stop cannot be None! Always make sure # that it comes from _trajectory.check_slice_indices()! - for i in range(bslice.start, bslice.stop, bslice.step): + block_ind = [] + + for block_i, i in enumerate(range(bslice.start, bslice.stop, bslice.step)): + self._block_i = block_i + self._frame_index = i # record io time per frame + # explicit instead of 'for ts in u.trajectory[bslice]' + # so that we can get accurate timing. with timeit() as b_io: - # explicit instead of 'for ts in u.trajectory[bslice]' - # so that we can get accurate timing. - ts = u.trajectory[i] - # record compute time per frame + self._ts = self._universe.trajectory[i] with timeit() as b_compute: - res = self._reduce(res, self._single_frame(ts, agroups)) + self._single_frame() times_io.append(b_io.elapsed) + + block_ind.append(i) times_compute.append(b_compute.elapsed) + # as opposed to + # res = [] + # for i,ts in traj: + # res.append(self._reduce(...) + # return res + # It does not return the right value except the first block, not totally sure why. + # calculate io and compute time per block - return np.asarray(res), np.asarray(times_io), np.asarray( - times_compute), b_universe.elapsed, wait_end, np.sum( + return np.asarray(self._results)[block_ind], np.asarray(times_io), np.asarray( + times_compute), wait_end, np.sum( times_io), np.sum(times_compute) - - @staticmethod - def _reduce(res, result_single_frame): - """ 'append' action for a time series""" - res.append(result_single_frame) - return res diff --git a/pmda/rms/rmsd.py b/pmda/rms/rmsd.py index c6358f58..a185e2f5 100644 --- a/pmda/rms/rmsd.py +++ b/pmda/rms/rmsd.py @@ -127,17 +127,23 @@ class RMSD(ParallelAnalysisBase): """ def __init__(self, mobile, ref, superposition=True): universe = mobile.universe - super(RMSD, self).__init__(universe, (mobile, )) + super(RMSD, self).__init__(universe) + self._atomgroups = (mobile, ) self._ref_pos = ref.positions.copy() self.superposition = superposition def _prepare(self): + self._results = np.zeros((self.n_frames, 3)) self.rmsd = None def _conclude(self): self.rmsd = np.vstack(self._results) - def _single_frame(self, ts, atomgroups): - return (ts.frame, ts.time, - rms.rmsd(atomgroups[0].positions, self._ref_pos, - superposition=self.superposition)) + def _single_frame(self): + # the current timestep of the trajectory is self._ts + self._results[self._frame_index, 0] = self._ts.frame + # the actual trajectory is at self._trajectory + self._results[self._frame_index, 1] = self._trajectory.time + self._results[self._frame_index, 2] = rms.rmsd(self._atomgroups[0].positions, + self._ref_pos, + superposition=self.superposition) diff --git a/pmda/rms/rmsf.py b/pmda/rms/rmsf.py index ad563b55..fc44c53f 100644 --- a/pmda/rms/rmsf.py +++ b/pmda/rms/rmsf.py @@ -169,38 +169,39 @@ class RMSF(ParallelAnalysisBase): .. _`Issue #15`: https://github.com/MDAnalysis/pmda/issues/15 """ - - def __init__(self, atomgroup): - u = atomgroup.universe - super(RMSF, self).__init__(u, (atomgroup, )) - self._atomgroup = atomgroup - self._top = u.filename - self._traj = u.trajectory.filename - - def _single_frame(self, ts, atomgroups): - # mean and sum of squares calculations done in _reduce() - return atomgroups[0] + def __init__(self, atomgroup, **kwargs): + super().__init__(atomgroup.universe, **kwargs) + self.atomgroup = atomgroup + + def _prepare(self): + self.sumsquares = np.zeros((self.atomgroup.n_atoms, 3)) + self.mean = self.sumsquares.copy() + self._results = [self.sumsquares, self.mean] * self.n_frames + + def _single_frame(self): + k = self._block_i + if k == 0: + self.sumsquares = np.zeros((self.atomgroup.n_atoms, 3)) + self.mean = self.atomgroup.positions + else: + self.sumsquares += (k / (k+1.0)) * (self.atomgroup.positions - self.mean) ** 2 + self.mean = (k * self.mean + self.atomgroup.positions) / (k + 1) + self._results[self._frame_index] = np.asarray([self.sumsquares, self.mean]) def _conclude(self): - """ - self._results : Array - (n_blocks x 2 x N x 3) array - """ n_blocks = len(self._results) # serial case if n_blocks == 1: # get length of trajectory slice - self.mean = self._results[0, 0] - self.sumsquares = self._results[0, 1] + self.mean = self._results[0][-1][1] + self.sumsquares = self._results[0][-1][0] self.rmsf = np.sqrt(self.sumsquares.sum(axis=1) / self.n_frames) # parallel case else: - mean = self._results[:, 0] - sos = self._results[:, 1] - # create list of (timesteps, mean, sumsq tuples for each block vals = [] for i in range(n_blocks): - vals.append((len(self._blocks[i]), mean[i], sos[i])) + vals.append((len(self._blocks[i]), self._results[i][-1][1], + self._results[i][-1][0])) # combine block results using fold method results = fold_second_order_moments(vals) self.mean = results[1] @@ -208,35 +209,6 @@ def _conclude(self): self.rmsf = np.sqrt(self.sumsquares.sum(axis=1) / self.n_frames) self._negative_rmsf(self.rmsf) - @staticmethod - def _reduce(res, result_single_frame): - """ - 'sum' action for time series - """ - atoms = result_single_frame - positions = atoms.positions.astype(np.float64) - # initial time step case - if isinstance(res, list) and len(res) == 0: - # initial mean position = initial position - mean = positions - # create new zero-array for sum of squares to prevent blocks from - # using data from previous blocks - sumsq = np.zeros((atoms.n_atoms, 3)) - # set initial time step for each block to zero - k = 0 - # assign initial (sum of squares and mean) zero-arrays to res - res = [mean, sumsq, k] - else: - # update time step - k = res[2] + 1 - # update sum of squares - res[1] += (k / (k + 1)) * (positions - res[0]) ** 2 - # update mean - res[0] = (k * res[0] + positions) / (k + 1) - # update time step in res - res[2] = k - return res - @staticmethod def _negative_rmsf(rmsf): if not (rmsf >= 0).all():