diff --git a/AUTHORS b/AUTHORS index d283ee33..8c75971a 100644 --- a/AUTHORS +++ b/AUTHORS @@ -22,4 +22,5 @@ Chronological list of authors 2018 - Shujie Fan - Richard J Gowers + - Ioannis Paraskevakos diff --git a/CHANGELOG b/CHANGELOG index 21572956..7611e8ad 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -13,7 +13,7 @@ The rules for this file: * release numbers follow "Semantic Versioning" http://semver.org ------------------------------------------------------------------------------ -xx/xx/18 VOD555, richardjgowers +xx/xx/18 VOD555, richardjgowers, iparask * 0.2.0 @@ -21,6 +21,7 @@ Enhancements * add add timing for _conclude and _prepare (Issue #49) * add parallel particle-particle RDF calculation module pmda.rdf (Issue #41) * add readonly_attributes context manager to ParallelAnalysisBase + * add parallel implementation of Leaflet Finder (Issue #47) 06/07/18 orbeckst diff --git a/docs/api.rst b/docs/api.rst index 7d2e3668..00522c40 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -42,3 +42,4 @@ also function as examples for how to implement your own functions with api/rms api/contacts api/rdf + api/leaflet diff --git a/docs/api/leaflet.rst b/docs/api/leaflet.rst new file mode 100644 index 00000000..11767989 --- /dev/null +++ b/docs/api/leaflet.rst @@ -0,0 +1,2 @@ +.. automodule:: pmda.leaflet + diff --git a/docs/conf.py b/docs/conf.py index d9fb6398..a918e6b7 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -57,7 +57,7 @@ # General information about the project. project = u'PMDA' -author = u'Max Linke, Shujie Fan, Richard J. Gowers, Oliver Beckstein' +author = u'Max Linke, Shujie Fan, Richard J. Gowers, Oliver Beckstein, Ioannis Paraskevakos' copyright = u'2018, ' + author diff --git a/docs/references.rst b/docs/references.rst index 4655c4dd..9f9a2a1d 100644 --- a/docs/references.rst +++ b/docs/references.rst @@ -31,6 +31,15 @@ Huff, David Lippa, Dillon Niederhut, and M. Pacer, editors, Proceedings of the 16th Python in Science Conference, pages 64–72, Austin, - TX, 2017. SciPy. doi:`10.25080/shinma-7f4c6e7-00a`_ + TX, 2017. SciPy. doi:`10.25080/shinma-7f4c6e7-00a`_. -.. _`10.25080/shinma-7f4c6e7-00a`: https://doi.org/10.25080/shinma-7f4c6e7-00a +.. _`10.25080/shinma-7f4c6e7-00a`: https://doi.org/10.25080/shinma-7f4c6e7-00a + +.. [Paraskevakos2018] Ioannis Paraskevakos, Andre Luckow, Mahzad Khoshlessan, + George Chantzialexiou, Thomas E. Cheatham, Oliver + Beckstein, Geoffrey C. Fox and Shantenu Jha. Task- + parallel Analysis of Molecular Dynamics Trajectories. In + 47th International Conference on Parallel Processing + (ICPP 2018). doi: `10.1145/3225058.3225128`_. + +.. _`10.1145/3225058.3225128` : https://doi.org/10.1145/3225058.3225128 \ No newline at end of file diff --git a/pmda/leaflet.py b/pmda/leaflet.py new file mode 100644 index 00000000..3f879ae5 --- /dev/null +++ b/pmda/leaflet.py @@ -0,0 +1,310 @@ +# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*- +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 +# +# PMDA +# Copyright (c) 2018 The MDAnalysis Development Team and contributors +# (see the file AUTHORS for the full list of names) +# +# Released under the GNU Public Licence, v2 or any higher version +""" +LeafletFInder Analysis tool --- :mod:`pmda.leaflet` +========================================================== + +This module contains parallel versions of analysis tasks in +:mod:`MDAnalysis.analysis.leaflet`. + +.. autoclass:: LeafletFinder + :members: + :undoc-members: + :inherited-members: + +""" +from __future__ import absolute_import, division + +import numpy as np +import dask.bag as db +import networkx as nx +from scipy.spatial import cKDTree + +import MDAnalysis as mda +from dask import distributed, multiprocessing +from joblib import cpu_count + +from .parallel import ParallelAnalysisBase, Timing +from .util import timeit + + +class LeafletFinder(ParallelAnalysisBase): + """Parallel Leaflet Finder analysis. + + Identify atoms in the same leaflet of a lipid bilayer. + This class implements and parallelizes the *LeafletFinder* algorithm + [Michaud-Agrawal2011]_. + + The parallelization is done based on [Paraskevakos2018]_. + + Attributes + ---------- + + Parameters + ---------- + Universe : :class:`~MDAnalysis.core.groups.Universe` + a :class:`MDAnalysis.core.groups.Universe` (the + `atomgroup` must belong to this Universe) + atomgroup : tuple of :class:`~MDAnalysis.core.groups.AtomGroup` + atomgroups that are iterated in parallel + + Note + ---- + At the moment, this class has far fewer features than the serial + version :class:`MDAnalysis.analysis.leaflet.LeafletFinder`. + + This version offers Leaflet Finder algorithm 4 ("Tree-based Nearest + Neighbor and Parallel-Connected Com- ponents (Tree-Search)") in + [Paraskevakos2018]_. + + Currently, periodic boundaries are not taken into account. + + The calculation is parallelized on a per-frame basis; + at the moment, no parallelization over trajectory blocks is performed. + + """ + + def __init__(self, universe, atomgroups): + self._atomgroup = atomgroups + self._results = list() + + super(LeafletFinder, self).__init__(universe, (atomgroups,)) + + def _find_connected_components(self, data, cutoff=15.0): + """Perform the Connected Components discovery for the atoms in data. + + Parameters + ---------- + data : Tuple of lists of Numpy arrays + This is a data and index tuple. The data are organized as + `([AtomPositions1,AtomPositions2], + [index1,index2])`. `index1` and `index2` are showing the + position of the `AtomPosition` in the adjacency matrix and + allows to correct the node number of the produced graph. + cutoff : float (optional) + head group-defining atoms within a distance of `cutoff` + Angstroms are deemed to be in the same leaflet [15.0] + + Returns + ------- + values : list. + A list of all the connected components of the graph that is + generated from `data` + + """ + window, index = data[0] + num = window[0].shape[0] + i_index = index[0] + j_index = index[1] + graph = nx.Graph() + + if i_index == j_index: + train = window[0] + test = window[1] + else: + train = np.vstack([window[0], window[1]]) + test = np.vstack([window[0], window[1]]) + + tree = cKDTree(train, leafsize=40) + edges = tree.query_ball_point(test, cutoff) + edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list)) + for idx, dest_list in enumerate(edges)] + + edge_list_flat = np.array([list(item) for sublist in edge_list for + item in sublist]) + + if i_index == j_index: + res = edge_list_flat.transpose() + res[0] = res[0] + i_index - 1 + res[1] = res[1] + j_index - 1 + else: + removed_elements = list() + for i in range(edge_list_flat.shape[0]): + if (edge_list_flat[i, 0] >= 0 and + edge_list_flat[i, 0] <= num - 1) and \ + (edge_list_flat[i, 1] >= 0 and + edge_list_flat[i, 1] <= num - 1) or \ + (edge_list_flat[i, 0] >= num and + edge_list_flat[i, 0] <= 2 * num - 1) and \ + (edge_list_flat[i, 1] >= num and + edge_list_flat[i, 1] <= 2 * num - 1) or \ + (edge_list_flat[i, 0] >= num and + edge_list_flat[i, 0] <= 2 * num - 1) and \ + (edge_list_flat[i, 1] >= 0 and + edge_list_flat[i, 1] <= num - 1): + removed_elements.append(i) + res = np.delete(edge_list_flat, removed_elements, + axis=0).transpose() + res[0] = res[0] + i_index - 1 + res[1] = res[1] - num + j_index - 1 + if res.shape[1] == 0: + res = np.zeros((2, 1), dtype=np.int) + + edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])] + graph.add_edges_from(edges) + + # partial connected components + + subgraphs = nx.connected_components(graph) + comp = [g for g in subgraphs] + return comp + + # pylint: disable=arguments-differ + def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs, + cutoff=15.0): + """Perform computation on a single trajectory frame. + + Must return computed values as a list. You can only **read** + from member variables stored in ``self``. Changing them during + a run will result in undefined behavior. `ts` and any of the + atomgroups can be changed (but changes will be overwritten + when the next time step is read). + + Parameters + ---------- + scheduler_kwargs : Dask Scheduler parameters. + cutoff : float (optional) + head group-defining atoms within a distance of `cutoff` + Angstroms are deemed to be in the same leaflet [15.0] + + Returns + ------- + values : anything + The output from the computation over a single frame must + be returned. The `value` will be added to a list for each + block and the list of blocks is stored as :attr:`_results` + before :meth:`_conclude` is run. In order to simplify + processing, the `values` should be "simple" shallow data + structures such as arrays or lists of numbers. + + """ + + # Get positions of the atoms in the atomgroup and find their number. + atoms = ts.positions[atomgroups.indices] + matrix_size = atoms.shape[0] + arranged_coord = list() + part_size = int(matrix_size / n_jobs) + # Partition the data based on a 2-dimensional partitioning + for i in range(1, matrix_size + 1, part_size): + for j in range(i, matrix_size + 1, part_size): + arranged_coord.append(([atoms[i - 1:i - 1 + part_size], + atoms[j - 1:j - 1 + part_size]], + [i, j])) + # Distribute the data over the available cores, apply the map function + # and execute. + parAtoms = db.from_sequence(arranged_coord, + npartitions=len(arranged_coord)) + parAtomsMap = parAtoms.map_partitions(self._find_connected_components, + cutoff=cutoff) + Components = parAtomsMap.compute(**scheduler_kwargs) + + # Gather the results and start the reduction. TODO: think if it can go + # to the private _reduce method of the based class. + result = list(Components) + + # Create the overall connected components of the graph + while len(result) != 0: + item1 = result[0] + result.pop(0) + ind = [] + for i, item2 in enumerate(Components): + if item1.intersection(item2): + item1 = item1.union(item2) + ind.append(i) + ind.reverse() + for j in ind: + Components.pop(j) + Components.append(item1) + + # Change output for and return. + indices = [np.sort(list(g)) for g in Components] + return indices + + # pylint: disable=arguments-differ + def run(self, + start=None, + stop=None, + step=None, + scheduler=None, + n_jobs=-1, + cutoff=15.0): + """Perform the calculation + + Parameters + ---------- + start : int, optional + start frame of analysis + stop : int, optional + stop frame of analysis + step : int, optional + number of frames to skip between each analysed frame + scheduler : dask scheduler, optional + Use dask scheduler, defaults to multiprocessing. This can be used + to spread work to a distributed scheduler + n_jobs : int, optional + number of tasks to start, if `-1` use number of logical cpu cores. + This argument will be ignored when the distributed scheduler is + used + + """ + if scheduler is None: + scheduler = multiprocessing + + if n_jobs == -1: + if scheduler == multiprocessing: + n_jobs = cpu_count() + elif isinstance(scheduler, distributed.Client): + n_jobs = len(scheduler.ncores()) + else: + raise ValueError( + "Couldn't guess ideal number of jobs from scheduler." + "Please provide `n_jobs` in call to method.") + + with timeit() as b_universe: + universe = mda.Universe(self._top, self._traj) + + scheduler_kwargs = {'get': scheduler.get} + if scheduler == multiprocessing: + scheduler_kwargs['num_workers'] = n_jobs + + start, stop, step = self._trajectory.check_slice_indices( + start, stop, step) + with timeit() as total: + with timeit() as prepare: + self._prepare() + + with self.readonly_attributes(): + timings = list() + times_io = [] + for frame in range(start, stop, step): + with timeit() as b_io: + ts = universe.trajectory[frame] + times_io.append(b_io.elapsed) + with timeit() as b_compute: + components = self. \ + _single_frame(ts=ts, + atomgroups=self._atomgroup, + scheduler_kwargs=scheduler_kwargs, + n_jobs=n_jobs, + cutoff=cutoff) + + timings.append(b_compute.elapsed) + leaflet1 = self._atomgroup[components[0]] + leaflet2 = self._atomgroup[components[1]] + self._results.append([leaflet1, leaflet2]) + with timeit() as conclude: + self._conclude() + self.timing = Timing(times_io, + np.hstack(timings), total.elapsed, + b_universe.elapsed, prepare.elapsed, + conclude.elapsed) + return self + + def _conclude(self): + self.results = self._results diff --git a/pmda/test/test_leaflet.py b/pmda/test/test_leaflet.py new file mode 100644 index 00000000..cf16b57b --- /dev/null +++ b/pmda/test/test_leaflet.py @@ -0,0 +1,64 @@ +from __future__ import absolute_import, division, print_function + +import pytest +from numpy.testing import (assert_almost_equal, assert_array_equal, + assert_array_almost_equal) + +import MDAnalysis +from MDAnalysisTests.datafiles import Martini_membrane_gro +from MDAnalysisTests.datafiles import GRO_MEMPROT, XTC_MEMPROT +from dask import multiprocessing +from pmda import leaflet +import numpy as np + + +class TestLeafLet(object): + + @pytest.fixture() + def u_one_frame(self): + return MDAnalysis.Universe(Martini_membrane_gro) + + @pytest.fixture() + def universe(self): + return MDAnalysis.Universe(GRO_MEMPROT, XTC_MEMPROT) + + @pytest.fixture() + def correct_values(self): + return [np.array([36507, 36761, 37523, 37650, 38031, 38285]), + np.array([36634]), + np.array([36507, 36761, 38285, 39174]), + np.array([36634]), + np.array([36507, 36761, 37650, 38285, 39174, 39936]), + np.array([36634]), + np.array([36507, 36761, 37650, 38285, 39174, 39428, 39936]), + np.array([36634]), + np.array([36507, 36761]), + np.array([36634])] + + @pytest.fixture() + def correct_values_single_frame(self): + return [np.arange(1, 2150, 12), np.arange(2521, 4670, 12)] + + def test_leaflet(self, universe, correct_values): + lipid_heads = universe.select_atoms("name P and resname POPG") + universe.trajectory.rewind() + leaflets = leaflet.LeafletFinder(universe, lipid_heads) + leaflets.run(scheduler=multiprocessing, n_jobs=1) + results = [atoms.indices for atomgroup in leaflets.results + for atoms in atomgroup] + [assert_almost_equal(x, y, err_msg="error: leaflets should match " + + "test values") for x, y in + zip(results, correct_values)] + + def test_leaflet_single_frame(self, + u_one_frame, + correct_values_single_frame): + lipid_heads = u_one_frame.select_atoms("name PO4") + u_one_frame.trajectory.rewind() + leaflets = leaflet.LeafletFinder(u_one_frame, + lipid_heads).run(start=0, stop=1) + + assert_almost_equal([atoms.indices for atomgroup in leaflets.results + for atoms in atomgroup], + correct_values_single_frame, err_msg="error: " + + "leaflets should match test values") diff --git a/setup.py b/setup.py index 5a14ee9d..67ab3d22 100644 --- a/setup.py +++ b/setup.py @@ -51,7 +51,9 @@ 'MDAnalysis>=0.18', 'dask', 'six', - 'joblib', # cpu_count func currently + 'joblib', # cpu_count func currently + 'networkx', + 'scipy', ], tests_require=[ 'pytest',