Skip to content

Commit

Permalink
respect new dask.config global selection of scheduler
Browse files Browse the repository at this point in the history
- fix #48
- updated boiler-plate code in ParallelAnalysisBase.run and copied and pasted into
  leaflet.LeafletFinder.run() (TODO: makes this more DRY)
- dask.distributed added as dependency (it is recommended by dask for a single node anyway, and
  it avoids imports inside if statements... much cleaner code in PMDA)
- removed scheduler kwarg: use dask.config.set(scheduler=...)
- 'multiprocessing' and n_jobs=-1 are now only selected if nothing is set by dask;
  if one wants n_jobs=-1 to always grab all cores then you must set the multiprocessing
  scheduler
- default for n_jobs=1 (instead of -1), i.e., the single threaded scheduler
- updated tests
- removed unnecessary broken(?) test for "no deprecations" in parallel.ParallelAnalysisBase
- updated CHANGELOG
  • Loading branch information
orbeckst committed Oct 31, 2018
1 parent 6f54546 commit 5444720
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ env:
# minimal CONDA_MDANALYSIS_DEPENDENCIES
#- CONDA_DEPENDENCIES="mdanalysis mdanalysistests dask joblib pytest-pep8 mock codecov cython hypothesis sphinx"
- CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis"
- CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask joblib pytest-pep8 mock codecov"
- CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask distributed joblib pytest-pep8 mock codecov"
- CONDA_CHANNELS='conda-forge'
- CONDA_CHANNEL_PRIORITY=True
# install development version of MDAnalysis (needed until the test
Expand Down
11 changes: 10 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The rules for this file:
* release numbers follow "Semantic Versioning" http://semver.org

------------------------------------------------------------------------------
xx/xx/18 VOD555, richardjgowers, iparask, orbeckst
11/xx/18 VOD555, richardjgowers, iparask, orbeckst, kain88-de

* 0.2.0

Expand All @@ -27,6 +27,15 @@ Fixes
* always distribute frames over blocks so that no empty blocks are
created ("balanced blocks", Issue #71)

Changes
* requires dask >= 0.18.0 and respects/requires globally setting of the dask
scheduler (Issue #48)
* removed the 'scheduler' keyword from the run() method; use
dask.config.set(scheduler=...) as recommended in the dask docs
* uses single-threaaded scheduler if n_jobs=1 (Issue #17)
* n_jobs=1 is now the default for run() (used to be n_jobs=-1)
* dask.distributed is now a dependency


06/07/18 orbeckst

Expand Down
4 changes: 3 additions & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ def client(tmpdir_factory, request):
lc.close()


@pytest.fixture(scope='session', params=('distributed', 'multiprocessing', 'single-threaded'))
@pytest.fixture(scope='session', params=('distributed',
'multiprocessing',
'single-threaded'))
def scheduler(request, client):
if request.param == 'distributed':
arg = client
Expand Down
25 changes: 7 additions & 18 deletions pmda/leaflet.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,19 +249,15 @@ def run(self,
used
"""
# are we using a distributed scheduler or should we use multiprocessing?
# are we using a distributed scheduler or should we use
# multiprocessing?
scheduler = dask.config.get('scheduler', None)
if scheduler is None and client is None:
scheduler = 'multiprocessing'
elif scheduler is None:
if scheduler is None:
# maybe we can grab a global worker
try:
from dask import distributed
scheduler = distributed.worker.get_client()
scheduler = dask.distributed.worker.get_client()
except ValueError:
pass
except ImportError:
pass

if n_jobs == -1:
n_jobs = cpu_count()
Expand All @@ -272,16 +268,9 @@ def run(self,
if scheduler is None and n_jobs == 1:
scheduler = 'single-threaded'

if n_blocks is None:
if scheduler == 'multiprocessing':
n_blocks = n_jobs
elif isinstance(scheduler, distributed.Client):
n_blocks = len(scheduler.ncores())
else:
n_blocks = 1
warnings.warn(
"Couldn't guess ideal number of blocks from scheduler. Set n_blocks=1"
"Please provide `n_blocks` in call to method.")
# fall back to multiprocessing, we tried everything
if scheduler is None:
scheduler = 'multiprocessing'

scheduler_kwargs = {'scheduler': scheduler}
if scheduler == 'multiprocessing':
Expand Down
22 changes: 12 additions & 10 deletions pmda/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import MDAnalysis as mda
from dask.delayed import delayed
import dask
import dask.distributed
from joblib import cpu_count
import numpy as np

Expand Down Expand Up @@ -288,19 +289,15 @@ def run(self,
to n_jobs or number of available workers in scheduler.
"""
# are we using a distributed scheduler or should we use multiprocessing?
# are we using a distributed scheduler or should we use
# multiprocessing?
scheduler = dask.config.get('scheduler', None)
if scheduler is None and client is None:
scheduler = 'multiprocessing'
elif scheduler is None:
if scheduler is None:
# maybe we can grab a global worker
try:
from dask import distributed
scheduler = distributed.worker.get_client()
scheduler = dask.distributed.worker.get_client()
except ValueError:
pass
except ImportError:
pass

if n_jobs == -1:
n_jobs = cpu_count()
Expand All @@ -311,15 +308,20 @@ def run(self,
if scheduler is None and n_jobs == 1:
scheduler = 'single-threaded'

# fall back to multiprocessing, we tried everything
if scheduler is None:
scheduler = 'multiprocessing'

if n_blocks is None:
if scheduler == 'multiprocessing':
n_blocks = n_jobs
elif isinstance(scheduler, distributed.Client):
elif isinstance(scheduler, dask.distributed.Client):
n_blocks = len(scheduler.ncores())
else:
n_blocks = 1
warnings.warn(
"Couldn't guess ideal number of blocks from scheduler. Set n_blocks=1"
"Couldn't guess ideal number of blocks from scheduler. "
"Setting n_blocks=1. "
"Please provide `n_blocks` in call to method.")

scheduler_kwargs = {'scheduler': scheduler}
Expand Down
15 changes: 0 additions & 15 deletions pmda/test/test_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import numpy as np
import MDAnalysis as mda
from MDAnalysisTests.datafiles import PSF, DCD
from MDAnalysisTests.util import no_deprecated_call
import pytest
from numpy.testing import assert_equal

Expand Down Expand Up @@ -82,17 +81,3 @@ def test_analysis_class():
with pytest.raises(ValueError):
ana_class(2)


def test_analysis_class_decorator():
# Issue #1511
# analysis_class should not raise
# a DeprecationWarning
u = mda.Universe(PSF, DCD)

def distance(a, b):
return np.linalg.norm((a.centroid() - b.centroid()))

Distances = custom.analysis_class(distance)

with no_deprecated_call():
Distances(u, u.atoms[:10], u.atoms[10:20]).run()
5 changes: 3 additions & 2 deletions pmda/test/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from MDAnalysisTests.datafiles import DCD, PSF
import joblib

from dask import distributed
import dask

from pmda import parallel

Expand Down Expand Up @@ -105,7 +105,8 @@ def test_nblocks(analysis, n_blocks):


def test_guess_nblocks(analysis):
analysis.run(n_jobs=-1)
with dask.config.set(scheduler='multiprocessing'):
analysis.run(n_jobs=-1)
assert len(analysis._results) == joblib.cpu_count()


Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@
install_requires=[
'MDAnalysis>=0.18',
'dask>=0.18',
'distributed',
'six',
'joblib', # cpu_count func currently
'networkx',
'scipy',
'scipy',
],
tests_require=[
'pytest',
Expand Down

0 comments on commit 5444720

Please sign in to comment.