Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gather CV Results as Completed #433

Merged
merged 88 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
ce3e404
in progress updates
Dec 1, 2018
6b3688a
clean up a bit more
Dec 2, 2018
9a8585d
note on futures possibly returning out of order
Dec 2, 2018
6e4123e
ensure scores from futures line up with parameters
Dec 2, 2018
a506d58
update comment about score order
Dec 2, 2018
1fa69e3
revert candidate_param change given we are not using it anymore
Dec 2, 2018
578c5f8
loop over keys instead of items
Dec 2, 2018
12892fa
take first value using iteration instead of index
Dec 2, 2018
2c1d005
optional import for distributed
Dec 2, 2018
c9d3a75
add distributed scheduler to param test
Dec 2, 2018
e81f9e5
put client first and close it so it doesn't leak into other tests. is…
Dec 2, 2018
000059d
remove duplicate import
Dec 2, 2018
6c83b91
remove test as it already existed
Dec 2, 2018
ee34b12
actually revert tests
Dec 2, 2018
2d0627c
ensure dask distributed exists before we check for the client
Dec 2, 2018
fa77f19
in progress tests for distributed as_completed cv
Dec 3, 2018
a0ad0d6
trying to figure out a way to test this
Dec 3, 2018
184e728
split up graphs
Dec 5, 2018
915cae3
Merge branch 'master' into gather_results_as_completed
Dec 5, 2018
406f6be
Change module import and dask detection.
Dec 5, 2018
a587fa2
Formatting to match master.
Dec 5, 2018
66f245e
more formatting.
Dec 5, 2018
cfdc7a8
fix sklearn tests and comment out as completed tests for now.
Dec 5, 2018
0871f35
move dask graph and n_splits to the proper place.
Dec 5, 2018
61ea9dc
move dask_graph and n_splits next to results given they are all modif…
Dec 5, 2018
111d77a
add back try/catch for distributed model
Dec 5, 2018
1dea132
scores map dict comprehension with batches.
Dec 5, 2018
d6d15ca
only remove keys that have been processed / exist. This also
Dec 8, 2018
df86170
fix linting
Dec 8, 2018
577d987
remove as completed test until we find a better way.
Dec 8, 2018
0d8dbc6
more flake8 fixes.
Dec 8, 2018
cc70fe0
more flake8 fixes.
Dec 8, 2018
7967c5c
remove uncessary format changes.
Dec 8, 2018
8105ea8
black formatting
Dec 8, 2018
fc84640
compute results locally, in progress refit graph
Dec 9, 2018
bb54522
fix a few broken tests. 2 more left
Dec 9, 2018
6d5e242
fix multimetric tests, flake8 and black formatting
Dec 9, 2018
c55513f
simplifly build_refit_graph args
Dec 9, 2018
0ba7619
properly separate refit and search
Dec 10, 2018
28490ab
flake8/black formatting. comment out as_completed tests for now
Dec 10, 2018
751b824
single colon slicing
Dec 10, 2018
39a2343
only check multimetric once
Dec 10, 2018
673d316
As completed test
Dec 14, 2018
7df0d20
Parameterize min complete and black formatting
Dec 14, 2018
bfa6dff
Merge https://github.com/dask/dask-ml into gather_results_as_completed
Dec 20, 2018
db8e153
remove dask distributed check
Feb 13, 2019
71db4b6
Merge branch 'master' of https://github.com/dask/dask-ml into gather_…
Feb 13, 2019
f619766
remove pytest check for has_distributed
Feb 13, 2019
6de0af0
context manager for lock
Feb 13, 2019
fa0f65e
add logging to retry and use with_results
Feb 14, 2019
946ba5b
seems like as completed tests work now
Feb 14, 2019
b00b558
min complete to 7
Feb 14, 2019
19cf8da
remove type hint
Feb 14, 2019
6171a97
try latest tornado?
Feb 14, 2019
f6e0160
try tornado 6.0.0b1 in pip
Feb 14, 2019
d6441e7
revert back to latest tornado
Feb 14, 2019
922d27c
add classification mixin
Feb 15, 2019
a72388e
looks like test estimators need kwargs to be serialized
Feb 15, 2019
a609ce5
try exit(1)
Feb 15, 2019
a0fdbd8
try localcluster
Feb 15, 2019
7c450a9
catch exception if worker killed during .result()
Feb 15, 2019
b2d2cf9
see if this retry works on linux
Feb 15, 2019
ca5b07e
remove commented exit(1)
Feb 15, 2019
cde1f5c
black formatting and isort
Feb 15, 2019
63ef4c3
print exception with retry
Feb 15, 2019
7a455ce
flake8 fix
Feb 15, 2019
8dc53ab
more flake8
Feb 15, 2019
6c4150e
match imports with master given loop fails flake8
Feb 15, 2019
250e08c
match master with has distributed
Feb 15, 2019
b962af2
clean up AsCompletedEstimator
Feb 15, 2019
f206bc5
use base estimator for AsCompletedEstimator
Feb 15, 2019
6a18f30
looks like we need to use mock classifier
Feb 15, 2019
fa9fb17
init w/ foo param
Feb 15, 2019
bfdea5e
2.7 super init
Feb 15, 2019
8ef5338
need to use AsCompletedEstimator in super
Feb 15, 2019
869e8b9
black reformatting
Feb 15, 2019
9c034ab
move liter eval out of lock
Feb 15, 2019
e482ed9
don't need fit and transform for AsCompleted Test
Feb 15, 2019
9008744
use built in reschedule for as_completed
Feb 16, 2019
7623003
retry in batches
Feb 16, 2019
9da5b8f
black reformatting
Feb 16, 2019
e86bc1f
remove dead code
May 4, 2019
3db7c28
Merge branch 'master' of https://github.com/dask/dask-ml into gather_…
May 4, 2019
687a83b
Remove optional distributed check as it is a dependency now
May 4, 2019
1eb113c
Remove has_distributed check from tests
May 7, 2019
163f3bd
Merge branch 'master' of github.com:dask/dask-ml into gather_results_…
May 20, 2019
d5b95e4
Add compat for TPOT
TomAugspurger May 20, 2019
33f6fd2
expecte token
TomAugspurger May 20, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 156 additions & 71 deletions dask_ml/model_selection/_search.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import absolute_import, division, print_function

import logging
import numbers
from collections import defaultdict
from itertools import repeat
Expand All @@ -11,6 +12,7 @@
import packaging.version
from dask.base import tokenize
from dask.delayed import delayed
from dask.distributed import as_completed
from dask.utils import derived_from
from sklearn import model_selection
from sklearn.base import BaseEstimator, MetaEstimatorMixin, clone, is_classifier
Expand Down Expand Up @@ -42,19 +44,19 @@
cv_extract_params,
cv_n_samples,
cv_split,
decompress_params,
feature_union,
feature_union_concat,
fit,
fit_and_score,
fit_best,
fit_transform,
get_best_params,
pipeline,
score,
)
from .utils import DeprecationDict, is_dask_collection, to_indexable, to_keys, unzip

logger = logging.getLogger(__name__)

try:
from cytoolz import get, pluck
except ImportError: # pragma: no cover
Expand All @@ -63,7 +65,6 @@

__all__ = ["GridSearchCV", "RandomizedSearchCV"]


if SK_VERSION <= packaging.version.parse("0.21.dev0"):

_RETURN_TRAIN_SCORE_DEFAULT = "warn"
Expand Down Expand Up @@ -102,6 +103,19 @@ def __call__(self, est):
return self.token if c == 0 else self.token + str(c)


def map_fit_params(dsk, fit_params):
if fit_params:
# A mapping of {name: (name, graph-key)}
param_values = to_indexable(*fit_params.values(), allow_scalars=True)
fit_params = {
k: (k, v) for (k, v) in zip(fit_params, to_keys(dsk, *param_values))
}
else:
fit_params = {}

return fit_params


def build_graph(
estimator,
cv,
Expand All @@ -118,7 +132,68 @@ def build_graph(
cache_cv=True,
multimetric=False,
):
# This is provided for compatibility with TPOT. Remove
# once TPOT is updated and requires a dask-ml>=0.13.0
def decompress_params(fields, params):
return [{k: v for k, v in zip(fields, p) if v is not MISSING} for p in params]

fields, tokens, params = normalize_params(candidate_params)
dsk, keys, n_splits, main_token = build_cv_graph(
estimator,
cv,
scorer,
candidate_params,
X,
y=y,
groups=groups,
fit_params=fit_params,
iid=iid,
error_score=error_score,
return_train_score=return_train_score,
cache_cv=cache_cv,
)
cv_name = "cv-split-" + main_token
if iid:
weights = "cv-n-samples-" + main_token
dsk[weights] = (cv_n_samples, cv_name)
scores = keys[1:]
else:
scores = keys

cv_results = "cv-results-" + main_token
candidate_params_name = "cv-parameters-" + main_token
dsk[candidate_params_name] = (decompress_params, fields, params)
if multimetric:
metrics = list(scorer.keys())
else:
metrics = None
dsk[cv_results] = (
create_cv_results,
scores,
candidate_params_name,
n_splits,
error_score,
weights,
metrics,
)
keys = [cv_results]
return dsk, keys, n_splits


def build_cv_graph(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we provide a compatibility shim with the old name? I believe that TPOT is using build_graph https://github.com/EpistasisLab/tpot/blob/b626271e6b5896a73fb9d7d29bebc7aa9100772e/tpot/gp_deap.py#L429

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just add back build_graph with a comment that it's going to be removed (not even a deprecation warning). This is a private module, but I was lazy when adding dask support to TPOT. Once this is in, I can cut a release and make a PR to tpot updating their call.

estimator,
cv,
scorer,
candidate_params,
X,
y=None,
groups=None,
fit_params=None,
iid=True,
error_score="raise",
return_train_score=_RETURN_TRAIN_SCORE_DEFAULT,
cache_cv=True,
):
X, y, groups = to_indexable(X, y, groups)
cv = check_cv(cv, y, is_classifier(estimator))
# "pairwise" estimators require a different graph for CV splitting
Expand All @@ -128,14 +203,7 @@ def build_graph(
X_name, y_name, groups_name = to_keys(dsk, X, y, groups)
n_splits = compute_n_splits(cv, X, y, groups)

if fit_params:
# A mapping of {name: (name, graph-key)}
param_values = to_indexable(*fit_params.values(), allow_scalars=True)
fit_params = {
k: (k, v) for (k, v) in zip(fit_params, to_keys(dsk, *param_values))
}
else:
fit_params = {}
fit_params = map_fit_params(dsk, fit_params)

fields, tokens, params = normalize_params(candidate_params)
main_token = tokenize(
Expand Down Expand Up @@ -176,50 +244,33 @@ def build_graph(
scorer,
return_train_score,
)
keys = [weights] + scores if weights else scores
return dsk, keys, n_splits, main_token

cv_results = "cv-results-" + main_token
candidate_params_name = "cv-parameters-" + main_token
dsk[candidate_params_name] = (decompress_params, fields, params)
if multimetric:
metrics = list(scorer.keys())
else:
metrics = None
dsk[cv_results] = (
create_cv_results,
scores,
candidate_params_name,
n_splits,
error_score,
weights,
metrics,
)
keys = [cv_results]

if refit:
if multimetric:
scorer = refit
else:
scorer = "score"
def build_refit_graph(estimator, X, y, best_params, fit_params):
X, y = to_indexable(X, y)
dsk = {}
X_name, y_name = to_keys(dsk, X, y)

best_params = "best-params-" + main_token
dsk[best_params] = (get_best_params, candidate_params_name, cv_results, scorer)
best_estimator = "best-estimator-" + main_token
if fit_params:
fit_params = (
dict,
(zip, list(fit_params.keys()), list(pluck(1, fit_params.values()))),
)
dsk[best_estimator] = (
fit_best,
clone(estimator),
best_params,
X_name,
y_name,
fit_params,
)
keys.append(best_estimator)
fit_params = map_fit_params(dsk, fit_params)
main_token = tokenize(normalize_estimator(estimator), X_name, y_name, fit_params)

return dsk, keys, n_splits
best_estimator = "best-estimator-" + main_token
if fit_params:
fit_params = (
dict,
(zip, list(fit_params.keys()), list(pluck(1, fit_params.values()))),
)
dsk[best_estimator] = (
fit_best,
clone(estimator),
best_params,
X_name,
y_name,
fit_params,
)
return dsk, [best_estimator]


def normalize_params(params):
Expand Down Expand Up @@ -1166,24 +1217,21 @@ def fit(self, X, y=None, groups=None, **fit_params):
"error_score must be the string 'raise' or a" " numeric value."
)

dsk, keys, n_splits = build_graph(
candidate_params = list(self._get_param_iterator())
dsk, keys, n_splits, _ = build_cv_graph(
estimator,
self.cv,
self.scorer_,
list(self._get_param_iterator()),
candidate_params,
X,
y,
groups,
fit_params,
y=y,
groups=groups,
fit_params=fit_params,
iid=self.iid,
refit=self.refit,
error_score=error_score,
return_train_score=self.return_train_score,
cache_cv=self.cache_cv,
multimetric=multimetric,
)
self.dask_graph_ = dsk
self.n_splits_ = n_splits

n_jobs = _normalize_n_jobs(self.n_jobs)
scheduler = dask.base.get_scheduler(scheduler=self.scheduler)
Expand All @@ -1193,21 +1241,59 @@ def fit(self, X, y=None, groups=None, **fit_params):
if scheduler is dask.threaded.get and n_jobs == 1:
scheduler = dask.local.get_sync

out = scheduler(dsk, keys, num_workers=n_jobs)
if "Client" in type(getattr(scheduler, "__self__", None)).__name__:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this approach be heavier on communication? Or is it the same amount, just spread out over time?

My real question is should we provide an option to disable this behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can see I don't see why this would add additional overhead. This "should" submit effectively the same task graph in the traditional dask scheduler as well as distributed. The difference here is that the distributed scheduler allows you to submit the graph asynchronously and return back a bunch of futures with you can then yield the results as they return. Given this I don't see why it would be preferable to disable this feature if a distributed client is present.

futures = scheduler(
dsk, keys, allow_other_workers=True, num_workers=n_jobs, sync=False
)

results = handle_deprecated_train_score(out[0], self.return_train_score)
result_map = {}
ac = as_completed(futures, with_results=True, raise_errors=False)
for batch in ac.batches():
for future, result in batch:
if future.status == "finished":
result_map[future.key] = result
else:
logger.warning("{} has failed... retrying".format(future.key))
future.retry()
ac.add(future)

out = [result_map[k] for k in keys]
else:
out = scheduler(dsk, keys, num_workers=n_jobs)

if self.iid:
weights = out[0]
scores = out[1:]
else:
weights = None
scores = out

if multimetric:
metrics = list(scorer.keys())
scorer = self.refit
else:
metrics = None
scorer = "score"

cv_results = create_cv_results(
scores, candidate_params, n_splits, error_score, weights, metrics
)

results = handle_deprecated_train_score(cv_results, self.return_train_score)
self.dask_graph_ = dsk
self.n_splits_ = n_splits
self.cv_results_ = results

if self.refit:
if self.multimetric_:
key = self.refit
else:
key = "score"
self.best_index_ = np.flatnonzero(results["rank_test_{}".format(key)] == 1)[
0
]
self.best_index_ = np.flatnonzero(
results["rank_test_{}".format(scorer)] == 1
)[0]

best_params = candidate_params[self.best_index_]
dsk, keys = build_refit_graph(estimator, X, y, best_params, fit_params)

self.best_estimator_ = out[1]
out = scheduler(dsk, keys, num_workers=n_jobs)
self.best_estimator_ = out[0]

return self

Expand Down Expand Up @@ -1584,7 +1670,6 @@ def __init__(
n_jobs=-1,
cache_cv=True,
):

super(RandomizedSearchCV, self).__init__(
estimator=estimator,
scoring=scoring,
Expand Down
9 changes: 0 additions & 9 deletions dask_ml/model_selection/methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,6 @@ def cv_extract_params(cvs, keys, vals, n):
return {k: cvs.extract_param(tok, v, n) for (k, tok), v in zip(keys, vals)}


def decompress_params(fields, params):
return [{k: v for k, v in zip(fields, p) if v is not MISSING} for p in params]


def _maybe_timed(x):
"""Unpack (est, fit_time) tuples if provided"""
return x if isinstance(x, tuple) and len(x) == 2 else (x, 0.0)
Expand Down Expand Up @@ -452,11 +448,6 @@ def create_cv_results(
return results


def get_best_params(candidate_params, cv_results, scorer):
best_index = np.flatnonzero(cv_results["rank_test_{}".format(scorer)] == 1)[0]
return candidate_params[best_index]


def fit_best(estimator, params, X, y, fit_params):
estimator = copy_estimator(estimator).set_params(**params)
estimator.fit(X, y, **fit_params)
Expand Down
Loading