Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create joblib.rst #1171

Closed
wants to merge 1 commit into from
Closed

Create joblib.rst #1171

wants to merge 1 commit into from

Conversation

RokoMijic
Copy link

nested parallelism trap explained

nested parallelism trap
.. code-block:: python

from distributed.joblib import DistributedBackend
register_parallel_backend('distributed', DistributedBackend, make_default=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused here. We already call on the user's machine if they import distributed.joblib.

I'm also not sure about your comment regarding running this on the workers. Are the workers also submitting jobs to the scheduler in your use case?

Copy link
Author

@RokoMijic RokoMijic Jun 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the use-case is if you have an outer parallel loop which uses something like client.map(function, data) and then within your function there is a call to sklearn using dask.distributed:

import distributed.joblib

x = np.reshape(np.array(range(200)), (100,2))
y = np.random.normal(size=100)

def function(i):
    with parallel_backend('dask.distributed', scheduler_host=dask_addr):
        score = sk.model_selection.cross_val_score(sk.svm.SVR(kernel='linear') , x , y , cv=2, n_jobs=-1)
    return score 

here calling function(1) causes no problems.

However if we now want to parallelize calls to function(i):

dask_client.gather(  dask_client.map(function, range(5))   )

we get KeyError: 'dask.distributed' in sklearn/externals/joblib/parallel.py in parallel_backend(). The fix is to put register_parallel_backend(...) inside function(i).

def function(i):
    parallel.register_parallel_backend('dask.distributed',DistributedBackend, make_default=True )
    with parallel_backend('dask.distributed', scheduler_host=dask_addr):
        score = sk.model_selection.cross_val_score(sk.svm.SVR(kernel='linear') , x , y , cv=2, n_jobs=-1)
    return score   

dask_client.gather(  dask_client.map(function, range(5))   )

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this make sense?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that does make sense. It wasn't clear to me from the documentation that is written here though. Also, in that case I would recommend just importing distributed.joblib which does the operation you suggest, but will track the version of dask a bit better.

As far as I understand though this isn't common case behavior, so we would probably want to provide some context (and maybe give general documentation on that context) before diving into this point.

Maybe a general warning would be useful about how you have to import distributed.joblib regardless? And that this is even true if you choose to call Joblib from inside a remote task, in which case you should probably import distributed.joblib within your function.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, import distributed.joblib from within the function to be mapped also works.

Perhaps that should be added to the documentation instead of what I wrote?

As far as I understand though this isn't common case behavior, so we would probably want to provide some context (and maybe give general documentation on that context) before diving into this point.

So the reason I am using Dask.distributed is that the default multiprocessing backend for sklearn doesn't allow nested parallelism, which is a really serious restriction on sklearn.

What's the best way forward from here? I feel like the documentation would be improved a lot if it mentioned that you need to import distributed.joblib from within a function if you are trying to nest parallel calls like this. Should I rewrite with more context?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets say that your machine had four cores. Client() would create a local cluster with four workers. Four of your original function calls would start and would themselves launch other tasks on which they would wait. However none of these new tasks would be able to start, because all of the workers are busy with the original calls to function.

The solution here is for tasks that are mostly doing dask-things (waiting on futures) to leave the threadpool and make way for more tasks to participate. If you have a chance you might want to look over this documentation PR and in particular look for the term secede (which currently depends on #1201).

Copy link
Author

@RokoMijic RokoMijic Jul 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but the situation I had was a 16-core machine where I was running 8 independent runs of 2-fold crossvalidation in parallel, and as far as I can tell since 8*2 = 16 there should be no waiting

In any case, the problem that made me switch to the second version was not that tasks were waiting, it was that the first version was unstable (i.e. continually caused Jupyter/ipython to crash)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I can't do any waiting on futures in this code because none of the parts are dependent on each other - it's 8 independent runs, each of which contains 2 independent folds of crossvalidation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, rather than try to document this failure I'm more inclined to find a way to make things work. If you can provide a reproducible example I'm happy to take a look at it. I briefly getting your example above to work but couldn't figure out the imports (I'm not very familiar with sklearn). Something copy-pastable would be ideal if you have the time and interest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is a copy-pastable version of the failure from above?

from dask.distributed import Client
from distributed.joblib import DistributedBackend
from sklearn.model_selection import cross_val_score
from sklearn.svm import SVR
from sklearn.datasets import make_regression

from joblib import parallel_backend, register_parallel_backend

X, y = make_regression()
dask_client = Client()


def function(i):
    register_parallel_backend('dask.distributed', DistributedBackend,
                              make_default=True)
    with parallel_backend('dask.distributed',
                          scheduler_host=dask_client.scheduler.address):
        score = cross_val_score(SVR(kernel='linear'), X, y, cv=2, n_jobs=-1)
    return score


dask_client.gather(dask_client.map(function, range(5)))

For me that raise a long traceback ending in

/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    494             reduce = getattr(obj, "__reduce_ex__", None)
    495             if reduce is not None:
--> 496                 rv = reduce(self.proto)
    497             else:
    498                 reduce = getattr(obj, "__reduce__", None)

TypeError: can't pickle select.kqueue objects

Does that look familiar @RokoMijic ?

@jakirkham
Copy link
Member

Should this be closed in light of PR ( #2298 )?

@mrocklin mrocklin closed this Nov 3, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants