Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental model selection #288

Merged
merged 23 commits into from
Sep 5, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 92 additions & 22 deletions dask_ml/model_selection/_incremental.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from __future__ import division

import operator
from copy import deepcopy

from sklearn.base import clone
from sklearn.utils import check_random_state
from time import time
from tornado import gen

import dask
import dask.array as da
from dask.distributed import default_client, futures_of, Future
from dask.distributed import Future, default_client, futures_of, wait
from distributed.utils import log_errors
from sklearn.base import clone
from sklearn.utils import check_random_state

from tornado import gen


def _partial_fit(model_and_meta, X, y, fit_params):
Expand Down Expand Up @@ -218,10 +219,23 @@ def get_futures(partial_fit_calls):

new_scores = list(_scores2.values())

models = {k: client.submit(operator.getitem, v, 0) for k, v in models.items()}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will break the current Hyperband implementation BTW

yield wait(models)
raise gen.Return((info, models, history))


def fit(*args, **kwargs):
def fit(
model,
params,
X_train,
y_train,
X_test,
y_test,
additional_calls,
fit_params=None,
scorer=None,
random_state=None,
):
""" Find a good model and search among a space of hyper-parameters

This does a hyper-parameter search by creating many models and then fitting
Expand All @@ -239,42 +253,98 @@ def fit(*args, **kwargs):
Parameters
----------
model : Estimator
params : dict
parameter grid to be given to ParameterSampler
params : List[Dict]
Parameters to start training on model
X_train : dask Array
y_train : dask Array
X_test : Array
Numpy array or small dask array. Should fit in memory.
Numpy array or small dask array. Should fit in single node's memory.
y_test : Array
Numpy array or small dask array. Should fit in memory.
start : int
Number of parameters to start with
Numpy array or small dask array. Should fit in single node's memory.
additional_calls : callable
A function that takes information about scoring history per model and
returns the number of additional partial fit calls to run on each model
fit_params : dict
Extra parameters to give to partial_fit
random_state :
scorer :
target : callable
A function that takes the start value and the current time step and
returns the number of desired models at that time step
random_state :

Examples
--------
>>> import numpy as np
>>> from dask_ml.datasets import make_classification
>>> X, y = make_classification(n_samples=5000000, n_features=20,
... chunks=100000)

>>> from sklearn.linear_model import SGDClassifier
>>> model = SGDClassifier(tol=1e-3, penalty='elasticnet')

>>> from sklearn.model_selection import ParameterSampler
>>> params = {'alpha': np.logspace(-2, 1, num=1000),
... 'l1_ratio': np.linspace(0, 1, num=1000),
... 'average': [True, False]}
>>> params = list(ParameterSampler(params, 10))

>>> X_test, y_test = X[:100000], y[:100000]
>>> X_train = X[100000:]
>>> y_train = y[100000:]

>>> info, model, history = yield fit(model, params,
... X_train, y_train,
... X_test, y_test,
... start=100,
... fit_params={'classes': [0, 1]})
>>> def remove_worst(scores):
... last_score = {model_id: info[-1]['score']
... for model_id, info in scores.items()}
... worst_score = min(last_score.values())
... out = {}
... for model_id, score in last_score.items():
... if score != worst_score:
... out[model_id] = 1 # do one more training step
... if len(out) == 1:
... out = {k: 0 for k in out} # no more work to do, stops execution
... return out

>>> from dask.distributed import Client
>>> client = Client(processes=False)

>>> from dask_ml.model_selection._incremental import fit
>>> info, models, history = fit(model, params,
... X_train, y_train,
... X_test, y_test,
... additional_calls=remove_worst,
... fit_params={'classes': [0, 1]})

>>> models
{7: <Future: status: finished, type: SGDClassifier, key: ...}
>>> models[7].result()
SGDClassifier(...)
>>> info[7][-1]
{'model_id': 7,
'params': {'l1_ratio': 0.7967967967967968,
'average': False,
'alpha': 0.20812215699863382},
'partial_fit_calls': 9,
'partial_fit_time': 0.09028053283691406,
'score': 0.70231,
'score_time': 0.04503202438354492}

Returns
-------
info : Dict[int, List[Dict]]
Scoring history of each successful model, keyed by model ID.
This has the parameters, scores, and timing information over time
models : Dict[int, Future]
Dask futures pointing to trained models
history : List[Dict]
A history of all models scores over time
"""
return default_client().sync(_fit, *args, **kwargs)
return default_client().sync(
_fit,
model,
params,
X_train,
y_train,
X_test,
y_test,
additional_calls,
fit_params=fit_params,
scorer=scorer,
random_state=random_state,
)
18 changes: 9 additions & 9 deletions tests/model_selection/test_incremental.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import random

import numpy as np
import toolz
from dask.distributed import Future
from distributed.utils_test import gen_cluster, loop # noqa: F401
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import ParameterSampler
import toolz
from tornado import gen

from dask_ml.datasets import make_classification
from dask_ml.model_selection._incremental import fit, _partial_fit, _score
from dask.distributed import Future
from distributed.utils_test import loop, gen_cluster # noqa: F401
from dask_ml.model_selection._incremental import _partial_fit, _score, fit
from tornado import gen


@gen_cluster(client=True, timeout=None)
Expand Down Expand Up @@ -55,11 +55,10 @@ def additional_calls(info):

for model in models.values():
assert isinstance(model, Future)
model2, meta2 = yield model
model2 = yield model
assert isinstance(model2, SGDClassifier)
assert isinstance(meta2, dict)
XX_test, yy_test = yield c.compute([X_test, y_test])
model, meta = yield models[0]
model = yield models[0]
assert model.score(XX_test, yy_test) == info[0][-1]["score"]

# `<` not `==` because we randomly dropped one model
Expand Down Expand Up @@ -159,7 +158,8 @@ def additional_calls(scores):
assert all(model.done() for model in models.values())

models = yield models
model, meta = models[0]
model = models[0]
meta = info[0][-1]

assert meta["params"] == {"alpha": 0.1}
assert meta["partial_fit_calls"] == 6 + 1
Expand Down