Skip to content

Commit

Permalink
new pleafletfinder (#69)
Browse files Browse the repository at this point in the history
* fix #47 
* NOTE: 
  * follow-up in #72 (add PBC and other improvements)
  * This is the version of the algorithm published in the IPCP 2018 paper
* Setting up parallel leaflet finder
* Changed run method
* Adding errors in LF and adding test
* Adding Scipy and networkx in setup. Changed balltree with ckdtree
* Single frame test passes, PEP8 both class and test.
* Removing code comments based on PEP8 messages
* trajectory level testing and pep8 changes
* Changing where the paper is mentioned
* Updated Changelog
* Continuing PR review resolve
* Reduced number of tasks
* Removing redundant import
* PEP8 corrections
* Further PEP8 corrections
* Debug, references, increased coverage and PEP8.
* Docs added for Leaflet Finder
* Addind missing file
* Trying to fix linter's messages
* Adding my name on AUTHORS, changelog and conf.py
  • Loading branch information
Ioannis Paraskevakos authored and orbeckst committed Oct 17, 2018
1 parent 3d78fc7 commit a718250
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 5 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ Chronological list of authors
2018
- Shujie Fan
- Richard J Gowers
- Ioannis Paraskevakos

3 changes: 2 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ The rules for this file:
* release numbers follow "Semantic Versioning" http://semver.org

------------------------------------------------------------------------------
xx/xx/18 VOD555, richardjgowers
xx/xx/18 VOD555, richardjgowers, iparask

* 0.2.0

Enhancements
* add add timing for _conclude and _prepare (Issue #49)
* add parallel particle-particle RDF calculation module pmda.rdf (Issue #41)
* add readonly_attributes context manager to ParallelAnalysisBase
* add parallel implementation of Leaflet Finder (Issue #47)


06/07/18 orbeckst
Expand Down
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ also function as examples for how to implement your own functions with
api/rms
api/contacts
api/rdf
api/leaflet
2 changes: 2 additions & 0 deletions docs/api/leaflet.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.. automodule:: pmda.leaflet

2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

# General information about the project.
project = u'PMDA'
author = u'Max Linke, Shujie Fan, Richard J. Gowers, Oliver Beckstein'
author = u'Max Linke, Shujie Fan, Richard J. Gowers, Oliver Beckstein, Ioannis Paraskevakos'
copyright = u'2018, ' + author


Expand Down
13 changes: 11 additions & 2 deletions docs/references.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@
Huff, David Lippa, Dillon Niederhut, and M. Pacer,
editors, Proceedings of the 16th Python in Science
Conference, pages 64–72, Austin,
TX, 2017. SciPy. doi:`10.25080/shinma-7f4c6e7-00a`_
TX, 2017. SciPy. doi:`10.25080/shinma-7f4c6e7-00a`_.
.. _`10.25080/shinma-7f4c6e7-00a`: https://doi.org/10.25080/shinma-7f4c6e7-00a
.. _`10.25080/shinma-7f4c6e7-00a`: https://doi.org/10.25080/shinma-7f4c6e7-00a

.. [Paraskevakos2018] Ioannis Paraskevakos, Andre Luckow, Mahzad Khoshlessan,
George Chantzialexiou, Thomas E. Cheatham, Oliver
Beckstein, Geoffrey C. Fox and Shantenu Jha. Task-
parallel Analysis of Molecular Dynamics Trajectories. In
47th International Conference on Parallel Processing
(ICPP 2018). doi: `10.1145/3225058.3225128`_.
.. _`10.1145/3225058.3225128` : https://doi.org/10.1145/3225058.3225128
310 changes: 310 additions & 0 deletions pmda/leaflet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# PMDA
# Copyright (c) 2018 The MDAnalysis Development Team and contributors
# (see the file AUTHORS for the full list of names)
#
# Released under the GNU Public Licence, v2 or any higher version
"""
LeafletFInder Analysis tool --- :mod:`pmda.leaflet`
==========================================================
This module contains parallel versions of analysis tasks in
:mod:`MDAnalysis.analysis.leaflet`.
.. autoclass:: LeafletFinder
:members:
:undoc-members:
:inherited-members:
"""
from __future__ import absolute_import, division

import numpy as np
import dask.bag as db
import networkx as nx
from scipy.spatial import cKDTree

import MDAnalysis as mda
from dask import distributed, multiprocessing
from joblib import cpu_count

from .parallel import ParallelAnalysisBase, Timing
from .util import timeit


class LeafletFinder(ParallelAnalysisBase):
"""Parallel Leaflet Finder analysis.
Identify atoms in the same leaflet of a lipid bilayer.
This class implements and parallelizes the *LeafletFinder* algorithm
[Michaud-Agrawal2011]_.
The parallelization is done based on [Paraskevakos2018]_.
Attributes
----------
Parameters
----------
Universe : :class:`~MDAnalysis.core.groups.Universe`
a :class:`MDAnalysis.core.groups.Universe` (the
`atomgroup` must belong to this Universe)
atomgroup : tuple of :class:`~MDAnalysis.core.groups.AtomGroup`
atomgroups that are iterated in parallel
Note
----
At the moment, this class has far fewer features than the serial
version :class:`MDAnalysis.analysis.leaflet.LeafletFinder`.
This version offers Leaflet Finder algorithm 4 ("Tree-based Nearest
Neighbor and Parallel-Connected Com- ponents (Tree-Search)") in
[Paraskevakos2018]_.
Currently, periodic boundaries are not taken into account.
The calculation is parallelized on a per-frame basis;
at the moment, no parallelization over trajectory blocks is performed.
"""

def __init__(self, universe, atomgroups):
self._atomgroup = atomgroups
self._results = list()

super(LeafletFinder, self).__init__(universe, (atomgroups,))

def _find_connected_components(self, data, cutoff=15.0):
"""Perform the Connected Components discovery for the atoms in data.
Parameters
----------
data : Tuple of lists of Numpy arrays
This is a data and index tuple. The data are organized as
`([AtomPositions1<NumpyArray>,AtomPositions2<NumpyArray>],
[index1,index2])`. `index1` and `index2` are showing the
position of the `AtomPosition` in the adjacency matrix and
allows to correct the node number of the produced graph.
cutoff : float (optional)
head group-defining atoms within a distance of `cutoff`
Angstroms are deemed to be in the same leaflet [15.0]
Returns
-------
values : list.
A list of all the connected components of the graph that is
generated from `data`
"""
window, index = data[0]
num = window[0].shape[0]
i_index = index[0]
j_index = index[1]
graph = nx.Graph()

if i_index == j_index:
train = window[0]
test = window[1]
else:
train = np.vstack([window[0], window[1]])
test = np.vstack([window[0], window[1]])

tree = cKDTree(train, leafsize=40)
edges = tree.query_ball_point(test, cutoff)
edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list))
for idx, dest_list in enumerate(edges)]

edge_list_flat = np.array([list(item) for sublist in edge_list for
item in sublist])

if i_index == j_index:
res = edge_list_flat.transpose()
res[0] = res[0] + i_index - 1
res[1] = res[1] + j_index - 1
else:
removed_elements = list()
for i in range(edge_list_flat.shape[0]):
if (edge_list_flat[i, 0] >= 0 and
edge_list_flat[i, 0] <= num - 1) and \
(edge_list_flat[i, 1] >= 0 and
edge_list_flat[i, 1] <= num - 1) or \
(edge_list_flat[i, 0] >= num and
edge_list_flat[i, 0] <= 2 * num - 1) and \
(edge_list_flat[i, 1] >= num and
edge_list_flat[i, 1] <= 2 * num - 1) or \
(edge_list_flat[i, 0] >= num and
edge_list_flat[i, 0] <= 2 * num - 1) and \
(edge_list_flat[i, 1] >= 0 and
edge_list_flat[i, 1] <= num - 1):
removed_elements.append(i)
res = np.delete(edge_list_flat, removed_elements,
axis=0).transpose()
res[0] = res[0] + i_index - 1
res[1] = res[1] - num + j_index - 1
if res.shape[1] == 0:
res = np.zeros((2, 1), dtype=np.int)

edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])]
graph.add_edges_from(edges)

# partial connected components

subgraphs = nx.connected_components(graph)
comp = [g for g in subgraphs]
return comp

# pylint: disable=arguments-differ
def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs,
cutoff=15.0):
"""Perform computation on a single trajectory frame.
Must return computed values as a list. You can only **read**
from member variables stored in ``self``. Changing them during
a run will result in undefined behavior. `ts` and any of the
atomgroups can be changed (but changes will be overwritten
when the next time step is read).
Parameters
----------
scheduler_kwargs : Dask Scheduler parameters.
cutoff : float (optional)
head group-defining atoms within a distance of `cutoff`
Angstroms are deemed to be in the same leaflet [15.0]
Returns
-------
values : anything
The output from the computation over a single frame must
be returned. The `value` will be added to a list for each
block and the list of blocks is stored as :attr:`_results`
before :meth:`_conclude` is run. In order to simplify
processing, the `values` should be "simple" shallow data
structures such as arrays or lists of numbers.
"""

# Get positions of the atoms in the atomgroup and find their number.
atoms = ts.positions[atomgroups.indices]
matrix_size = atoms.shape[0]
arranged_coord = list()
part_size = int(matrix_size / n_jobs)
# Partition the data based on a 2-dimensional partitioning
for i in range(1, matrix_size + 1, part_size):
for j in range(i, matrix_size + 1, part_size):
arranged_coord.append(([atoms[i - 1:i - 1 + part_size],
atoms[j - 1:j - 1 + part_size]],
[i, j]))
# Distribute the data over the available cores, apply the map function
# and execute.
parAtoms = db.from_sequence(arranged_coord,
npartitions=len(arranged_coord))
parAtomsMap = parAtoms.map_partitions(self._find_connected_components,
cutoff=cutoff)
Components = parAtomsMap.compute(**scheduler_kwargs)

# Gather the results and start the reduction. TODO: think if it can go
# to the private _reduce method of the based class.
result = list(Components)

# Create the overall connected components of the graph
while len(result) != 0:
item1 = result[0]
result.pop(0)
ind = []
for i, item2 in enumerate(Components):
if item1.intersection(item2):
item1 = item1.union(item2)
ind.append(i)
ind.reverse()
for j in ind:
Components.pop(j)
Components.append(item1)

# Change output for and return.
indices = [np.sort(list(g)) for g in Components]
return indices

# pylint: disable=arguments-differ
def run(self,
start=None,
stop=None,
step=None,
scheduler=None,
n_jobs=-1,
cutoff=15.0):
"""Perform the calculation
Parameters
----------
start : int, optional
start frame of analysis
stop : int, optional
stop frame of analysis
step : int, optional
number of frames to skip between each analysed frame
scheduler : dask scheduler, optional
Use dask scheduler, defaults to multiprocessing. This can be used
to spread work to a distributed scheduler
n_jobs : int, optional
number of tasks to start, if `-1` use number of logical cpu cores.
This argument will be ignored when the distributed scheduler is
used
"""
if scheduler is None:
scheduler = multiprocessing

if n_jobs == -1:
if scheduler == multiprocessing:
n_jobs = cpu_count()
elif isinstance(scheduler, distributed.Client):
n_jobs = len(scheduler.ncores())
else:
raise ValueError(
"Couldn't guess ideal number of jobs from scheduler."
"Please provide `n_jobs` in call to method.")

with timeit() as b_universe:
universe = mda.Universe(self._top, self._traj)

scheduler_kwargs = {'get': scheduler.get}
if scheduler == multiprocessing:
scheduler_kwargs['num_workers'] = n_jobs

start, stop, step = self._trajectory.check_slice_indices(
start, stop, step)
with timeit() as total:
with timeit() as prepare:
self._prepare()

with self.readonly_attributes():
timings = list()
times_io = []
for frame in range(start, stop, step):
with timeit() as b_io:
ts = universe.trajectory[frame]
times_io.append(b_io.elapsed)
with timeit() as b_compute:
components = self. \
_single_frame(ts=ts,
atomgroups=self._atomgroup,
scheduler_kwargs=scheduler_kwargs,
n_jobs=n_jobs,
cutoff=cutoff)

timings.append(b_compute.elapsed)
leaflet1 = self._atomgroup[components[0]]
leaflet2 = self._atomgroup[components[1]]
self._results.append([leaflet1, leaflet2])
with timeit() as conclude:
self._conclude()
self.timing = Timing(times_io,
np.hstack(timings), total.elapsed,
b_universe.elapsed, prepare.elapsed,
conclude.elapsed)
return self

def _conclude(self):
self.results = self._results
Loading

0 comments on commit a718250

Please sign in to comment.