Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update to dask 0.18.0 #66

Merged
merged 11 commits into from
Nov 1, 2018
Merged

update to dask 0.18.0 #66

merged 11 commits into from
Nov 1, 2018

Conversation

kain88-de
Copy link
Member

@kain88-de kain88-de commented Sep 20, 2018

Fix #48 and #17

Changes made in this Pull Request:

PR Checklist

  • Tests?
  • Docs?
  • CHANGELOG updated?
  • Issue raised/referenced?

Copy link
Member

@orbeckst orbeckst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming tests pass eventually, this looks awesome to me.

EDIT: Also, update docs please! (I only realized after reviewing...)

@@ -95,7 +95,7 @@ def scheduler(request, client):
if request.param == 'distributed':
return client
else:
return multiprocessing
return request.param
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is cool: so now we will be able to expand to all schedulers that dask supports by just adding strings to the fixture parametrization.

@orbeckst
Copy link
Member

@kain88-de I might have approved a bit prematurely but in principle I think this is very good and I trust that you know best what else needs to be done.

@orbeckst
Copy link
Member

The docs do not mention get anywhere because a while back we switched to scheduler for the user API.

@kain88-de kain88-de mentioned this pull request Sep 21, 2018
4 tasks
@kain88-de
Copy link
Member Author

kain88-de commented Sep 21, 2018 via email

@kain88-de
Copy link
Member Author

Dask introduced some more changes that require larger changes within pmda. It should make the code clearer at the end. @VOD555 can you look into this? The relevant documentation is linked below.

https://dask.pydata.org/en/latest/configuration.html

@kain88-de
Copy link
Member Author

The main issue is that dask now uses a global value to look for the appropriate interpreter. For the tests we now have to remove the session scope from fixtures to properly set the global values. I haven't experimented how to handle two clusters in the same session.

@orbeckst orbeckst mentioned this pull request Oct 17, 2018
4 tasks
@orbeckst orbeckst mentioned this pull request Oct 29, 2018
4 tasks
…ring

- modified tests so that they use default scheduler
- supplying n_jobs
- NOTE: test_leaflets() failes for n_jobs=2; this NEEDS TO BE FIXED in a
        separate PR; right now this is marked as XFAIL
@@ -39,24 +38,29 @@ def correct_values(self):
def correct_values_single_frame(self):
return [np.arange(1, 2150, 12), np.arange(2521, 4670, 12)]

def test_leaflet(self, universe, correct_values):
# XFAIL for 2 jobs needs to be fixed!
@pytest.mark.parametrize('n_jobs', (1, pytest.mark.xfail(2)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iparask the test_leaflets test failed for me with n_jobs=2

E   AssertionError:
E   Arrays are not almost equal to 7 decimals
E   error: leaflets should match test values
E   (shapes (1,), (6,) mismatch)
E    x: array([36634])
E    y: array([36507, 36761, 37523, 37650, 38031, 38285])

My expectation was that this should give the same answer, just run faster... Can you please look into this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See also #76

@orbeckst
Copy link
Member

With these changes, all tests pass locally (with dask 0.20)

(pmda) yngvi:pmda oliver$ pytest -n 4 --disable-warnings --pep8 pmda
======================================================= test session starts ========================================================
platform darwin -- Python 3.6.5, pytest-3.6.3, py-1.5.4, pluggy-0.6.0
rootdir: ~/MDAnalysis/pmda, inifile: setup.cfg
plugins: xdist-1.22.2, pep8-1.0.6, forked-0.2, cov-2.5.1, hypothesis-3.66.1
gw0 [1000] / gw1 [1000] / gw2 [1000] / gw3 [1000]
scheduling tests via LoadScheduling
.........................................x.................................................................................. [ 12%]
............................................................................................................................ [ 24%]
............................................................................................................................ [ 37%]
............................................................................................................................. [ 49%]
............................................................................................................................ [ 62%]
............................................................................................................................. [ 74%]
............................................................................................................................ [ 87%]
............................................................................................................................ [ 99%]
......                                                                                                                       [100%]Future exception was never retrieved
future: <Future finished exception=TimeoutError('Timeout',)>
tornado.util.TimeoutError: Timeout

======================================= 999 passed, 1 xfailed, 24 warnings in 44.79 seconds ========================================

@orbeckst
Copy link
Member

@kain88-de in #66 (comment) you commented on a new global state in dask. How would that manifest itself as a problem for the tests?

My understanding of

@pytest.fixture(scope="session", params=(1, 2))
def client(tmpdir_factory, request):
    with tmpdir_factory.mktemp("dask_cluster").as_cwd():
        lc = distributed.LocalCluster(n_workers=request.param, processes=True)
        client = distributed.Client(lc)

        yield client

        client.close()
        lc.close()

is that we set up a single distributed cluster for all tests (actually, two clusters, one with 1 and one with 2 workers) and tests that use it, get scheduled as workers become available.

Can you explain how the session scope is a problem for new dask?

- passes 'multiprocessing' as the scheduler instead of multiprocessing
  (which does not work with dask >= 0.20 anymore)
- actually passes whatever we define as parameter; only distributed
  is currently an exception
- removed superfluous import of distributed.multiprocessing
@codecov
Copy link

codecov bot commented Oct 29, 2018

Codecov Report

Merging #66 into master will decrease coverage by 3.67%.
The diff coverage is 54.34%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master      #66      +/-   ##
==========================================
- Coverage   98.09%   94.41%   -3.68%     
==========================================
  Files           8        8              
  Lines         419      448      +29     
  Branches       58       61       +3     
==========================================
+ Hits          411      423      +12     
- Misses          4       18      +14     
- Partials        4        7       +3
Impacted Files Coverage Δ
pmda/leaflet.py 86.5% <34.78%> (-8.14%) ⬇️
pmda/parallel.py 95.12% <73.91%> (-4.88%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 6aa6c61...dd88867. Read the comment docs.

@orbeckst
Copy link
Member

@kain88-de @richardjgowers @VOD555 The tests for upgrade to dask 0.20 pass now; the coverage dropped for reasons that I do not understand.

I had reviewed and approved this PR before it was passing Travis but now that I edited it, I'd appreciate some additional eyes, please.

@orbeckst
Copy link
Member

I get locally

 pytest --pep8 -n 4 --cov pmda

all passing

============== 1001 passed, 1 xfailed, 24 warnings in 96.34 seconds ==============

but different coverage changes:

---------- coverage: platform darwin, python 3.6.5-final-0 -----------
Name               Stmts   Miss Branch BrPart  Cover
----------------------------------------------------
pmda/__init__.py       5      0      0      0   100%
pmda/contacts.py      53      1     14      1    97%
pmda/custom.py        34      0      6      0   100%
pmda/leaflet.py      112     37     44      3    60%
pmda/parallel.py     108      0     34      0   100%
pmda/rdf.py           53      4      6      1    92%
pmda/rms.py           17      1      0      0    94%
pmda/util.py          37      0     14      0   100%
----------------------------------------------------
TOTAL                419     43    118      5    87%

Using

coverage html
open htmlcov/pmda_leaflet_py.html

shows that

  • njobs == -1 is not tested
  • the whole _find_connected_components() code is not run (?!?!?!), which is used in

    pmda/pmda/leaflet.py

    Lines 203 to 204 in 6aa6c61

    parAtomsMap = parAtoms.map_partitions(self._find_connected_components,
    cutoff=cutoff)
    where parAtoms is a dask bag.

Perhaps coverage has a hard time to see what's been covered when something is run under dask, under certain circumstances??

This test is needed to get coverage of leaflet back up but: TEST or CODE needs to be fixed.
@orbeckst orbeckst mentioned this pull request Oct 30, 2018
@orbeckst
Copy link
Member

I wanted to add tests for leafletfinder with distributed (by using the new parametrized scheduler fixture) but as described in #76 this opened a whole can of worms so this has to wait.

@VOD555
Copy link
Collaborator

VOD555 commented Oct 30, 2018

In my local tests, the things under _single_frame() in rdf.py and rms.py are not covered. That's weird...

@kain88-de
Copy link
Member Author

I haven't looked at the code changes yet! But I did stop working on this because dask 0.18 has changed the idiomatic style to change the scheduler docs. The new idom is to set the scheduler in a global variable

dask.config.set(scheduler='threads')

or with a context manager

with dask.config.set(scheduler='threads'):
    x.compute()

The distributed scheduler overwrites these defaults now on creation

from dask.distributed import Client
client = Client(...)  # Connect to distributed cluster and override default
df.x.sum().compute()  # This now runs on the distributed system

The correct solution seems to rather be we remove the scheduler and get keyword arguments completely. In the tests I guess we can work with something like

@pytest.fixture(params=['multiprocessing', ClientIP])
def scheduler(params):
    with dask.config.set(params)
        yield

I assume I have the API wrong but the general idea is to start a context manager in the fixture and yield to release it at the end. How well this works I don't know.

@orbeckst
Copy link
Member

From my reading, setting the scheduler on compute()

x.compute(scheduler='threads')

is still supported. I think as long as all our compute() calls also contain the scheduler, we should be ok, even though

client = Client(...)  # Connect to distributed cluster and override default

will set the global defaults.

Or do I misunderstand how this is working now?

The correct solution seems to rather be we remove the scheduler and get keyword arguments completely.

I think you're right that this is the medium term correct solution so that using PMDA conforms to how people use Dask. In the short term (i.e., for this PR at least!) I'd like to move ahead with our current scheduler argument because it is still correct.

Or do you see a problem?

@orbeckst
Copy link
Member

(Alternatively, if someone manages to get the new Dask paradigm working I am also happy... I just only have limited time for this right now.)

@kain88-de
Copy link
Member Author

The problem is that code suddenly behaves unexpected. Take the following example

client = Client()  # yeah lets use dask.distributed.
pdma.contacts.Contacts.run()  # This uses multiprocessing! 

I would be surprised to see here that the distributed workers don't receive any jobs. Without knowledge of how dask used to work this is also hard to debug.

# job. Therefore we run this on the single threaded scheduler for
# debugging.
if scheduler is None and n_jobs == 1:
scheduler = 'single-threaded'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixes #17

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

- someone should check with dask. It seems a bit brittle
- fix tests maybe
- update documentation

fixes #17
@orbeckst
Copy link
Member

I would be surprised to see here that the distributed workers don't receive any jobs.

... because PMDA defaults to 'multiprocessing'? Yes, I agree with you.

Thanks for working on it!

- fix #48
- updated boiler-plate code in ParallelAnalysisBase.run and copied and pasted into
  leaflet.LeafletFinder.run() (TODO: makes this more DRY)
- dask.distributed added as dependency (it is recommended by dask for a single node anyway, and
  it avoids imports inside if statements... much cleaner code in PMDA)
- removed scheduler kwarg: use dask.config.set(scheduler=...)
- 'multiprocessing' and n_jobs=-1 are now only selected if nothing is set by dask;
  if one wants n_jobs=-1 to always grab all cores then you must set the multiprocessing
  scheduler
- default for n_jobs=1 (instead of -1), i.e., the single threaded scheduler
- updated tests
- removed unnecessary broken(?) test for "no deprecations" in parallel.ParallelAnalysisBase
- updated CHANGELOG
- install conda package of MDA on travis
- require MDA and MDATests >= 0.19.0
@orbeckst
Copy link
Member

@kain88-de are you sure you wanted to push 114a2b0? It looks as if it undoes some of the changes that I pushed and breaks the tests again. If it was intentional and you're working on it then just ignore me ;-).

@kain88-de
Copy link
Member Author

I don't want to depend on distributed for such simple checks. The code now does a trial import when necessary. We can still check the distributed scheduler in our tests it is not needed for the pmda though/

@orbeckst
Copy link
Member

Fine with me, although I am pretty sure that anyone using dask will also install distributed or not mind having it installed, especially as http://docs.dask.org/en/latest/scheduling.html says

we currently recommend the distributed scheduler on a local machine

i.e., pretty much in all cases.

@orbeckst
Copy link
Member

@kain88-de please check – I'd like to get this merged so that we can move forward and I'd like to get 0.2.0 asap.

Copy link
Member

@orbeckst orbeckst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments.

scheduler (Issue #48)
* removed the 'scheduler' keyword from the run() method; use
dask.config.set(scheduler=...) as recommended in the dask docs
* uses single-threaaded scheduler if n_jobs=1 (Issue #17)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo, needs fixing

dask.config.set(scheduler=...) as recommended in the dask docs
* uses single-threaaded scheduler if n_jobs=1 (Issue #17)
* n_jobs=1 is now the default for run() (used to be n_jobs=-1)
* dask.distributed is now a dependency
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setup.py has it as full dep; it could be moved into test dependencies if you really want to keep it optional. If you make it fully optional, please remove this line from CHANGELOG

@@ -8,7 +8,8 @@
#
# Released under the GNU Public Licence, v2 or any higher version

from dask import distributed, multiprocessing
from dask import distributed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests require distributed

@@ -17,7 +17,7 @@ are provided as keyword arguments:

set up the parallel analysis

.. method:: run(n_jobs=-1, scheduler=None)
.. method:: run(n_jobs=-1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default is now n_jobs=1, isn't it?

@@ -91,6 +86,10 @@ def test_no_frames(analysis, n_jobs):
assert analysis.timing.universe == 0


def test_scheduler(analysis, scheduler):
analysis.run()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No assert here – either ERROR or pass?

@kain88-de
Copy link
Member Author

I removed my changes again. Lets go with the easier version

@kain88-de
Copy link
Member Author

I don't know where the reduced coverage comes from right now.

@orbeckst
Copy link
Member

orbeckst commented Nov 1, 2018

I'll merge it regardless and then we need to dig a bit more into how coverage works with the different schedulers.

Thanks for pushing forward here!!!

1 similar comment
@orbeckst
Copy link
Member

orbeckst commented Nov 1, 2018

I'll merge it regardless and then we need to dig a bit more into how coverage works with the different schedulers.

Thanks for pushing forward here!!!

@orbeckst
Copy link
Member

orbeckst commented Nov 1, 2018

Stupid GitHub web interface does not work on my slightly outdated mobile. Can you please do a squash merge? Thanks!

This will allow @VOD555 to continue.

@VOD555 VOD555 merged commit aaa478c into master Nov 1, 2018
@VOD555
Copy link
Collaborator

VOD555 commented Nov 1, 2018

@orbeckst I've merged this PR.

@orbeckst
Copy link
Member

orbeckst commented Nov 1, 2018

Thanks.

The drop in coverage is due to leaflet.py because we do not currently test all schedulers. This should be addressed as part of #76

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants