Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dask.bag to parallelize computations rather than multiprocess #172

Merged
merged 11 commits into from
Apr 19, 2017

Conversation

spencerkclark
Copy link
Collaborator

@spencerahill @darothen thanks both for prompting some thinking here.

With some additional experimentation, I think I've found that dask.bag should suffice for an intermediate step toward addressing #169. By default if one sets parallelize=True when they submit calculations, the code in this PR will use dask.distributed to create a LocalCluster and an associated Client, which will handle scheduling calculations submitted though dask.bag.map. In addition, this PR enables the use of an external distributed Client (say if one wanted to connect to a larger cluster set up through other means).

This still needs a unit test for an using an external Client (though I've tried it out locally creating a multi-node cluster on analysis at GFDL); however, tests are implemented for using a LocalCluster (the default).

@spencerahill what are your thoughts here? Is this moving in the right direction?

logging.info('Connected to distributed client: {}'.format(client))
with dask.set_options(get=client.get):
return db.from_sequence(calcs).map(f).compute()

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can avoid the repeated code (L280-283 and L285-287) by just encapsulating L280-283 into a helper function.

('exec_options'),
[dict(parallelize=True, write_to_tar=False),
dict(parallelize=True, write_to_tar=True)])
def test_submit_mult_calcs_external_client(calcsuite_init_specs_single_calc,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to get a test of a CalcSuite with multiple calculations.

_compute_or_skip_on_error(calc, compute_kwargs),
calcs)
f = lambda calc: _compute_or_skip_on_error(calc, compute_kwargs)
if client is None:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically PEP8 is to use lambda only for inline functions when passed as arguments, e.g. foo(bar=lambda x:x**2). So use def here.

@spencerahill
Copy link
Owner

@spencerkclark thanks for this! Sorry for taking a few days. Looks good overall. A few things:

  • Do you have any sense of how to optimize performance via the LocalCluster and Client init arguments? For example, would it make sense for n_workers be set to the number of Calcs?
  • Can you also remove multiprocess from setup.py?
  • Does this get us the progress visualization?! :)

@spencerahill
Copy link
Owner

spencerahill commented Apr 6, 2017

We'll also need a what's new entry (please start the v0.1.3 section) and add distributed to setup.py

Edit: we also need to update all of our docs that relate to parallelized jobs/use of multiprocess

@spencerkclark
Copy link
Collaborator Author

Do you have any sense of how to optimize performance via the LocalCluster and Client init arguments? For example, would it make sense for n_workers be set to the number of Calcs?

I don't have a great sense here; in a quick experiment (where I submitted 140 calculations, a medium-size number) and set that as the number of workers on the LocalCluster I found that it took a while for the cluster to start up, and I ran into into an issue with "Too many open files" (though I think that may be able to be remedied now in xarray with the autoclose=True argument to open_mfdataset; I haven't tried it yet though). But considering that there is no upper bound on the number of calculations a user might submit at once, it seems that this kind of setting could be dangerous.

That being said, the default number of workers used when setting up a LocalCluster is the number of cores available on the machine; if one submits fewer calculations than that number, it would be inefficient to start more workers than would be used. So maybe it's worth setting n_workers = min(n_cores, n_calcs)?

Does this get us the progress visualization?! :)

Yes! Though without parallelized array operations, the progress visualization isn't as interesting as at could be :)

@darothen
Copy link

darothen commented Apr 7, 2017

@spencerkclark -

Yes! Though without parallelized array operations, the progress visualization isn't as interesting as at could be :)

But, you can use the old ProgressBar, which is really, really useful if you're running a batch job from the command line or a notebook, because it gives you instant feedback on how many jobs are left to complete.

@spencerkclark
Copy link
Collaborator Author

But, you can use the old ProgressBar, which is really, really useful if you're running a batch job from the command line or a notebook, because it gives you instant feedback on how many jobs are left to complete.

@darothen this is really convenient; thanks for the heads up!

@spencerkclark
Copy link
Collaborator Author

@spencerahill I went ahead with setting n_workers = min(n_cores, n_calcs), added tests involving more than one calculation, and added a note to what's new. When you have a chance please have another look at this PR. Do you have any thoughts on what's causing the failing tests in python 3.4? Is it a versions issue?

return db.from_sequence(calcs).map(func).compute()


def _n_workers(calcs):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: let's go with the slightly more expressive _num_workers

return pool.map(lambda calc:
_compute_or_skip_on_error(calc, compute_kwargs),
calcs)
def func(calc):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add either a docstring or a comment with a sentence that explains why we need this wrapper.

aospy/calc.py Outdated
if not os.path.isdir(self.dir_tar_out):
# When submitted in parallel and the directory does not exist yet
# multiple processes may try to create a new directory; this leads
# an OSError for all processes that tried to make the
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"leads to an"

@spencerahill
Copy link
Owner

any thoughts on what's causing the failing tests in python 3.4?

Yes, it looks like collections_to_dsk was added to dask after the v0.13 release: dask/dask#1927, https://github.com/dask/dask/releases. So please set dask >= 0.14 in setup.py and see if that solves it.

Please also add a section to the what's new called 'Dependencies' noting this and, separately, that parallelized calculations no longer require the optional multiprocess library.

@spencerahill
Copy link
Owner

this is really convenient; thanks for the heads up!

I would like to make this available to our users in a transparent way. Does it make sense to directly use either the ProgressBar or the other visualization within aospy?

Even if not, I think we should at least add an example in the docs of how to use it in conjunction with submit_mult_calcs (maybe even with a snapshot or GIF of the visualization).

@spencerahill
Copy link
Owner

@spencerkclark all for now. Thanks for the work on this!

@spencerkclark
Copy link
Collaborator Author

I would like to make this available to our users in a transparent way. Does it make sense to directly use either the ProgressBar or the other visualization within aospy?

I agree this would be nice.

The browser-based status page, while it starts automatically with a dask scheduler (i.e. it's already available with this PR) and is really cool, might be overkill for casual use. When using a remote machine the best way to view it is through port forwarding, which requires a little configuration to one's SSH setup (for most remote machines I've used this tends to be pretty straightforward (and I typically set things up so it's always configured automatically when I log in), but it's an additional step for new users). Therefore I think an in-terminal solution would be more convenient (and if one wanted the browser-based status page, it would still be available).

That said, to use the in-terminal ProgressBar I think we would need to re-think how we currently do logging. Since we push everything to the standard out (the same place the ProgressBar is printed/updated), the ProgressBar would constantly be interrupted by a log message. An option would be to push the log to a file, though it is somewhat nice to see any errors that come up in real time.

@spencerahill what are your thoughts here?

@spencerahill
Copy link
Owner

Finally got around to actually trying out the browser status page. So cool!

Is it possible to access the status page if the user doesn't pass in their own Client (i.e. if client=None)? Particularly since we use Client and LocalCluster as context managers. I tried this with a short batch, but by the time I pasted the address into my browser, the processes had completed and so the Client had gone away. (Of course, as you note, if the submission is that short, the status page isn't that helpful.)

What about adding an option e.g. open_status_page that automatically opens the status page in their browser (e.g. using the builtin webbrowser)?

When using a remote machine the best way to view it is through port forwarding, which requires a little configuration to one's SSH setup (for most remote machines I've used this tends to be pretty straightforward (and I typically set things up so it's always configured automatically when I log in), but it's an additional step for new users).

This is the type of thing I had in mind for adding to the docs. I'm envisioning a sub-section in Using aospy and/or Examples, "Parallelized calculations", explaining some of these common sticking points. Maybe even including an example SSH setup that would be a good starting point for users.

I think we would need to re-think how we currently do logging. Since we push everything to the standard out (the same place the ProgressBar is printed/updated), the ProgressBar would constantly be interrupted by a log message.

Good point. Is it possible to generate the progress bar first, and then have the log messages print beneath it? That's not ideal either, because unless you scroll up, the terminal will follow the log messages and the bar will quickly get bumped off the screen.

Also, I couldn't get it to work, but the ProgressBar is supposed to open as a widget if run from within a Notebook. If we could get this to work, maybe we could offer that as an example?

@spencerahill
Copy link
Owner

@spencerkclark I'm realizing this discussion of docs and the status bar has strayed beyond the scope of this PR; I'll open a separate issue to track.

The one remaining task I see is: Please also add a section to the what's new called 'Dependencies' noting the dask version change and, separately, that parallelized calculations no longer require the optional multiprocess library. I'm good to merge once that's in.

@spencerkclark
Copy link
Collaborator Author

spencerkclark commented Apr 19, 2017

Is it possible to access the status page if the user doesn't pass in their own Client (i.e. if client=None)? Particularly since we use Client and LocalCluster as context managers. I tried this with a short batch, but by the time I pasted the address into my browser, the processes had completed and so the Client had gone away. (Of course, as you note, if the submission is that short, the status page isn't that helpful.)

Thanks for giving this PR a try -- yes, this has been my experience as well for client=None; I admit it's not really ideal for short sets of calculations. With a long enough set of calculations (n_calcs=56) I have been successful in viewing it in this context. If one uses an external client (that is started before aospy, and continues running after, it is possible to view the progress even for short calculations).

What about adding an option e.g. open_status_page that automatically opens the status page in their browser (e.g. using the builtin webbrowser)?

This is an interesting option (thanks for pointing me to the webbrowser package). I'm a little hesitant (but not necessarily opposed), because this would open a browser on the remote machine (which one would need to use X11 or VNC to view, both of which can be slow). One other thing to think about is that it would also require a little extra logic to make sure it directed the user to the proper URL (one does not need to always use port 8787 for the diagnostics server, so for an external client the URL wouldn't always be http://localhost:8787).

This is the type of thing I had in mind for adding to the docs. I'm envisioning a sub-section in Using aospy and/or Examples, "Parallelized calculations", explaining some of these common sticking points. Maybe even including an example SSH setup that would be a good starting point for users.

I agree this could be helpful to users (possibly not just for using aospy).

Also, I couldn't get it to work, but the ProgressBar is supposed to open as a widget if run from within a Notebook. If we could get this to work, maybe we could offer that as an example?

There may be something on the aospy side that is the issue; in my initial experiments with the ProgressBar I used a dummy application, and the in-terminal ProgressBar worked well:

import time

import dask.bag as db
from dask.diagnostics import ProgressBar

def dummy(x):
    time.sleep(2)
    return x * x

with ProgressBar():
    bag = db.from_sequence([n for n in range(32)]).map(dummy)
    print(bag.compute())

When I tried using it in aospy, however, I too ran into trouble (the in-terminal version wouldn't show up either).

I'm realizing this discussion of docs and the status bar has strayed beyond the scope of this PR; I'll open a separate issue to track.

Thanks, yes, I think there are still a lot of things to think about! I agree, let's push further discussion to the issue tracker.

@spencerkclark spencerkclark changed the title WIP Use dask.bag to parallelize computations rather than multiprocess Use dask.bag to parallelize computations rather than multiprocess Apr 19, 2017
@spencerahill
Copy link
Owner

@spencerkclark all good points that you raise. Let's continue the conversation in #173.

And thanks for all your work on this PR! And @darothen thanks for your extremely useful input...we'd be thrilled for more from you on literally any aspect of the project.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants