-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create joblib.rst #1171
Create joblib.rst #1171
Conversation
nested parallelism trap
.. code-block:: python | ||
|
||
from distributed.joblib import DistributedBackend | ||
register_parallel_backend('distributed', DistributedBackend, make_default=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused here. We already call on the user's machine if they import distributed.joblib
.
I'm also not sure about your comment regarding running this on the workers. Are the workers also submitting jobs to the scheduler in your use case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the use-case is if you have an outer parallel loop which uses something like client.map(function, data) and then within your function there is a call to sklearn using dask.distributed:
import distributed.joblib
x = np.reshape(np.array(range(200)), (100,2))
y = np.random.normal(size=100)
def function(i):
with parallel_backend('dask.distributed', scheduler_host=dask_addr):
score = sk.model_selection.cross_val_score(sk.svm.SVR(kernel='linear') , x , y , cv=2, n_jobs=-1)
return score
here calling function(1)
causes no problems.
However if we now want to parallelize calls to function(i)
:
dask_client.gather( dask_client.map(function, range(5)) )
we get KeyError: 'dask.distributed'
in sklearn/externals/joblib/parallel.py in parallel_backend()
. The fix is to put register_parallel_backend(...)
inside function(i)
.
def function(i):
parallel.register_parallel_backend('dask.distributed',DistributedBackend, make_default=True )
with parallel_backend('dask.distributed', scheduler_host=dask_addr):
score = sk.model_selection.cross_val_score(sk.svm.SVR(kernel='linear') , x , y , cv=2, n_jobs=-1)
return score
dask_client.gather( dask_client.map(function, range(5)) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that does make sense. It wasn't clear to me from the documentation that is written here though. Also, in that case I would recommend just importing distributed.joblib
which does the operation you suggest, but will track the version of dask a bit better.
As far as I understand though this isn't common case behavior, so we would probably want to provide some context (and maybe give general documentation on that context) before diving into this point.
Maybe a general warning would be useful about how you have to import distributed.joblib
regardless? And that this is even true if you choose to call Joblib from inside a remote task, in which case you should probably import distributed.joblib
within your function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, import distributed.joblib
from within the function to be mapped also works.
Perhaps that should be added to the documentation instead of what I wrote?
As far as I understand though this isn't common case behavior, so we would probably want to provide some context (and maybe give general documentation on that context) before diving into this point.
So the reason I am using Dask.distributed is that the default multiprocessing backend for sklearn doesn't allow nested parallelism, which is a really serious restriction on sklearn.
What's the best way forward from here? I feel like the documentation would be improved a lot if it mentioned that you need to import distributed.joblib
from within a function if you are trying to nest parallel calls like this. Should I rewrite with more context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets say that your machine had four cores. Client()
would create a local cluster with four workers. Four of your original function
calls would start and would themselves launch other tasks on which they would wait. However none of these new tasks would be able to start, because all of the workers are busy with the original calls to function
.
The solution here is for tasks that are mostly doing dask-things (waiting on futures) to leave the threadpool and make way for more tasks to participate. If you have a chance you might want to look over this documentation PR and in particular look for the term secede
(which currently depends on #1201).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but the situation I had was a 16-core machine where I was running 8 independent runs of 2-fold crossvalidation in parallel, and as far as I can tell since 8*2 = 16 there should be no waiting
In any case, the problem that made me switch to the second version was not that tasks were waiting, it was that the first version was unstable (i.e. continually caused Jupyter/ipython to crash)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I can't do any waiting on futures in this code because none of the parts are dependent on each other - it's 8 independent runs, each of which contains 2 independent folds of crossvalidation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, rather than try to document this failure I'm more inclined to find a way to make things work. If you can provide a reproducible example I'm happy to take a look at it. I briefly getting your example above to work but couldn't figure out the imports (I'm not very familiar with sklearn). Something copy-pastable would be ideal if you have the time and interest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is a copy-pastable version of the failure from above?
from dask.distributed import Client
from distributed.joblib import DistributedBackend
from sklearn.model_selection import cross_val_score
from sklearn.svm import SVR
from sklearn.datasets import make_regression
from joblib import parallel_backend, register_parallel_backend
X, y = make_regression()
dask_client = Client()
def function(i):
register_parallel_backend('dask.distributed', DistributedBackend,
make_default=True)
with parallel_backend('dask.distributed',
scheduler_host=dask_client.scheduler.address):
score = cross_val_score(SVR(kernel='linear'), X, y, cv=2, n_jobs=-1)
return score
dask_client.gather(dask_client.map(function, range(5)))
For me that raise a long traceback ending in
/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
494 reduce = getattr(obj, "__reduce_ex__", None)
495 if reduce is not None:
--> 496 rv = reduce(self.proto)
497 else:
498 reduce = getattr(obj, "__reduce__", None)
TypeError: can't pickle select.kqueue objects
Does that look familiar @RokoMijic ?
Should this be closed in light of PR ( #2298 )? |
nested parallelism trap explained