-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use dask.bag to parallelize computations rather than multiprocess #172
Use dask.bag to parallelize computations rather than multiprocess #172
Conversation
aospy/automate.py
Outdated
logging.info('Connected to distributed client: {}'.format(client)) | ||
with dask.set_options(get=client.get): | ||
return db.from_sequence(calcs).map(f).compute() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can avoid the repeated code (L280-283 and L285-287) by just encapsulating L280-283 into a helper function.
('exec_options'), | ||
[dict(parallelize=True, write_to_tar=False), | ||
dict(parallelize=True, write_to_tar=True)]) | ||
def test_submit_mult_calcs_external_client(calcsuite_init_specs_single_calc, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to get a test of a CalcSuite with multiple calculations.
aospy/automate.py
Outdated
_compute_or_skip_on_error(calc, compute_kwargs), | ||
calcs) | ||
f = lambda calc: _compute_or_skip_on_error(calc, compute_kwargs) | ||
if client is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically PEP8 is to use lambda only for inline functions when passed as arguments, e.g. foo(bar=lambda x:x**2)
. So use def
here.
@spencerkclark thanks for this! Sorry for taking a few days. Looks good overall. A few things:
|
We'll also need a what's new entry (please start the v0.1.3 section) and add distributed to setup.py Edit: we also need to update all of our docs that relate to parallelized jobs/use of multiprocess |
I don't have a great sense here; in a quick experiment (where I submitted 140 calculations, a medium-size number) and set that as the number of workers on the LocalCluster I found that it took a while for the cluster to start up, and I ran into into an issue with "Too many open files" (though I think that may be able to be remedied now in xarray with the That being said, the default number of workers used when setting up a LocalCluster is the number of cores available on the machine; if one submits fewer calculations than that number, it would be inefficient to start more workers than would be used. So maybe it's worth setting
Yes! Though without parallelized array operations, the progress visualization isn't as interesting as at could be :) |
But, you can use the old ProgressBar, which is really, really useful if you're running a batch job from the command line or a notebook, because it gives you instant feedback on how many jobs are left to complete. |
@darothen this is really convenient; thanks for the heads up! |
@spencerahill I went ahead with setting |
aospy/automate.py
Outdated
return db.from_sequence(calcs).map(func).compute() | ||
|
||
|
||
def _n_workers(calcs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: let's go with the slightly more expressive _num_workers
return pool.map(lambda calc: | ||
_compute_or_skip_on_error(calc, compute_kwargs), | ||
calcs) | ||
def func(calc): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add either a docstring or a comment with a sentence that explains why we need this wrapper.
aospy/calc.py
Outdated
if not os.path.isdir(self.dir_tar_out): | ||
# When submitted in parallel and the directory does not exist yet | ||
# multiple processes may try to create a new directory; this leads | ||
# an OSError for all processes that tried to make the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"leads to an"
Yes, it looks like Please also add a section to the what's new called 'Dependencies' noting this and, separately, that parallelized calculations no longer require the optional |
I would like to make this available to our users in a transparent way. Does it make sense to directly use either the ProgressBar or the other visualization within aospy? Even if not, I think we should at least add an example in the docs of how to use it in conjunction with |
@spencerkclark all for now. Thanks for the work on this! |
I agree this would be nice. The browser-based status page, while it starts automatically with a dask scheduler (i.e. it's already available with this PR) and is really cool, might be overkill for casual use. When using a remote machine the best way to view it is through port forwarding, which requires a little configuration to one's SSH setup (for most remote machines I've used this tends to be pretty straightforward (and I typically set things up so it's always configured automatically when I log in), but it's an additional step for new users). Therefore I think an in-terminal solution would be more convenient (and if one wanted the browser-based status page, it would still be available). That said, to use the in-terminal ProgressBar I think we would need to re-think how we currently do logging. Since we push everything to the standard out (the same place the ProgressBar is printed/updated), the ProgressBar would constantly be interrupted by a log message. An option would be to push the log to a file, though it is somewhat nice to see any errors that come up in real time. @spencerahill what are your thoughts here? |
Finally got around to actually trying out the browser status page. So cool! Is it possible to access the status page if the user doesn't pass in their own Client (i.e. if What about adding an option e.g.
This is the type of thing I had in mind for adding to the docs. I'm envisioning a sub-section in Using aospy and/or Examples, "Parallelized calculations", explaining some of these common sticking points. Maybe even including an example SSH setup that would be a good starting point for users.
Good point. Is it possible to generate the progress bar first, and then have the log messages print beneath it? That's not ideal either, because unless you scroll up, the terminal will follow the log messages and the bar will quickly get bumped off the screen. Also, I couldn't get it to work, but the ProgressBar is supposed to open as a widget if run from within a Notebook. If we could get this to work, maybe we could offer that as an example? |
@spencerkclark I'm realizing this discussion of docs and the status bar has strayed beyond the scope of this PR; I'll open a separate issue to track. The one remaining task I see is: Please also add a section to the what's new called 'Dependencies' noting the dask version change and, separately, that parallelized calculations no longer require the optional multiprocess library. I'm good to merge once that's in. |
Thanks for giving this PR a try -- yes, this has been my experience as well for
This is an interesting option (thanks for pointing me to the webbrowser package). I'm a little hesitant (but not necessarily opposed), because this would open a browser on the remote machine (which one would need to use X11 or VNC to view, both of which can be slow). One other thing to think about is that it would also require a little extra logic to make sure it directed the user to the proper URL (one does not need to always use port 8787 for the diagnostics server, so for an external client the URL wouldn't always be
I agree this could be helpful to users (possibly not just for using aospy).
There may be something on the import time
import dask.bag as db
from dask.diagnostics import ProgressBar
def dummy(x):
time.sleep(2)
return x * x
with ProgressBar():
bag = db.from_sequence([n for n in range(32)]).map(dummy)
print(bag.compute()) When I tried using it in
Thanks, yes, I think there are still a lot of things to think about! I agree, let's push further discussion to the issue tracker. |
@spencerkclark all good points that you raise. Let's continue the conversation in #173. And thanks for all your work on this PR! And @darothen thanks for your extremely useful input...we'd be thrilled for more from you on literally any aspect of the project. |
@spencerahill @darothen thanks both for prompting some thinking here.
With some additional experimentation, I think I've found that
dask.bag
should suffice for an intermediate step toward addressing #169. By default if one setsparallelize=True
when they submit calculations, the code in this PR will usedask.distributed
to create aLocalCluster
and an associatedClient
, which will handle scheduling calculations submitted thoughdask.bag.map
. In addition, this PR enables the use of an external distributed Client (say if one wanted to connect to a larger cluster set up through other means).This still needs a unit test for an using an external Client (though I've tried it out locally creating a multi-node cluster on analysis at GFDL); however, tests are implemented for using a LocalCluster (the default).
@spencerahill what are your thoughts here? Is this moving in the right direction?