Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor dask tests #2377

Merged
merged 27 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Release Notes
* Documentation Changes
* Testing Changes
* Add ``pytest-timeout``. All tests that run longer than 6 minutes will fail. :pr:`2374`
* Refactored dask tests :pr:`2377`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omega nit-pick but maybe include why so that when we try to trace this back in the future we know? :P


.. warning::

Expand Down
104 changes: 59 additions & 45 deletions evalml/tests/automl_tests/test_automl_dask.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import unittest

import numpy as np
import pytest
from distributed import Client
from dask.distributed import Client, LocalCluster

from evalml.automl import AutoMLSearch
from evalml.automl.callbacks import raise_error_callback
Expand All @@ -15,30 +13,33 @@
)


@pytest.mark.usefixtures("X_y_binary_cls")
class TestAutoMLSearchDask(unittest.TestCase):
@pytest.fixture(autouse=True)
def inject_fixtures(self, caplog):
"""Gives the unittests access to the logger"""
self._caplog = caplog
@pytest.fixture
def sequential_engine():
return SequentialEngine()


@classmethod
def setUpClass(cls) -> None:
cls.client = Client()
cls.parallel_engine = DaskEngine(cls.client)
cls.sequential_engine = SequentialEngine()
@pytest.fixture(scope="module")
def cluster():
dask_cluster = LocalCluster(
n_workers=1, threads_per_worker=2, dashboard_address=None
)
yield dask_cluster
dask_cluster.close()

def test_automl(self):
"""Comparing the results of parallel and sequential AutoML to each other."""
X, y = self.X_y_binary

def test_automl(X_y_binary_cls, cluster, sequential_engine):
"""Comparing the results of parallel and sequential AutoML to each other."""
with Client(cluster) as client:
parallel_engine = DaskEngine(client)
X, y = X_y_binary_cls
par_automl = AutoMLSearch(
X_train=X, y_train=y, problem_type="binary", engine=self.parallel_engine
X_train=X, y_train=y, problem_type="binary", engine=parallel_engine
)
par_automl.search()
parallel_rankings = par_automl.full_rankings

seq_automl = AutoMLSearch(
X_train=X, y_train=y, problem_type="binary", engine=self.sequential_engine
X_train=X, y_train=y, problem_type="binary", engine=sequential_engine
)
seq_automl.search()
sequential_rankings = seq_automl.full_rankings
Expand All @@ -60,15 +61,20 @@ def test_automl(self):
np.array(par_results["percent_better_than_baseline"]),
)

def test_automl_max_iterations(self):
"""Making sure that the max_iterations parameter limits the number of pipelines run."""
X, y = self.X_y_binary

def test_automl_max_iterations(X_y_binary_cls, cluster, sequential_engine):
"""Making sure that the max_iterations parameter limits the number of pipelines run."""

X, y = X_y_binary_cls
with Client(cluster) as client:
parallel_engine = DaskEngine(client)

max_iterations = 4
par_automl = AutoMLSearch(
X_train=X,
y_train=y,
problem_type="binary",
engine=self.parallel_engine,
engine=parallel_engine,
max_iterations=max_iterations,
)
par_automl.search()
Expand All @@ -78,52 +84,64 @@ def test_automl_max_iterations(self):
X_train=X,
y_train=y,
problem_type="binary",
engine=self.sequential_engine,
engine=sequential_engine,
max_iterations=max_iterations,
)
seq_automl.search()
sequential_rankings = seq_automl.full_rankings

assert len(sequential_rankings) == len(parallel_rankings) == max_iterations

def test_automl_train_dask_error_callback(self):
"""Make sure the pipeline training error message makes its way back from the workers."""
self._caplog.clear()
X, y = self.X_y_binary

def test_automl_train_dask_error_callback(X_y_binary_cls, cluster, caplog):
"""Make sure the pipeline training error message makes its way back from the workers."""
caplog.clear()
with Client(cluster) as client:
parallel_engine = DaskEngine(client)
X, y = X_y_binary_cls

pipelines = [TestPipelineWithFitError({})]
automl = AutoMLSearch(
X_train=X,
y_train=y,
problem_type="binary",
engine=self.parallel_engine,
engine=parallel_engine,
max_iterations=2,
allowed_pipelines=pipelines,
)
automl.train_pipelines(pipelines)
assert "Train error for PipelineWithError: Yikes" in self._caplog.text
assert "Train error for PipelineWithError: Yikes" in caplog.text


def test_automl_score_dask_error_callback(X_y_binary_cls, cluster, caplog):
"""Make sure the pipeline scoring error message makes its way back from the workers."""
caplog.clear()
with Client(cluster) as client:
parallel_engine = DaskEngine(client)

def test_automl_score_dask_error_callback(self):
"""Make sure the pipeline scoring error message makes its way back from the workers."""
self._caplog.clear()
X, y = self.X_y_binary
X, y = X_y_binary_cls
pipelines = [TestPipelineWithScoreError({})]
automl = AutoMLSearch(
X_train=X,
y_train=y,
problem_type="binary",
engine=self.parallel_engine,
engine=parallel_engine,
max_iterations=2,
allowed_pipelines=pipelines,
)
automl.score_pipelines(
pipelines, X, y, objectives=["Log Loss Binary", "F1", "AUC"]
)
assert "Score error for PipelineWithError" in self._caplog.text
assert "Score error for PipelineWithError" in caplog.text


def test_automl_immediate_quit(X_y_binary_cls, cluster, caplog):
"""Make sure the AutoMLSearch quits when error_callback is defined and does no further work."""
caplog.clear()
X, y = X_y_binary_cls
with Client(cluster) as client:
parallel_engine = DaskEngine(client)

def test_automl_immediate_quit(self):
"""Make sure the AutoMLSearch quits when error_callback is defined and does no further work."""
self._caplog.clear()
X, y = self.X_y_binary
pipelines = [
TestPipelineFast({}),
TestPipelineWithFitError({}),
Expand All @@ -133,7 +151,7 @@ def test_automl_immediate_quit(self):
X_train=X,
y_train=y,
problem_type="binary",
engine=self.parallel_engine,
engine=parallel_engine,
max_iterations=4,
allowed_pipelines=pipelines,
error_callback=raise_error_callback,
Expand All @@ -155,7 +173,3 @@ def test_automl_immediate_quit(self):
assert TestPipelineWithFitError.custom_name not in set(
automl.full_rankings["pipeline_name"]
)

@classmethod
def tearDownClass(cls) -> None:
cls.client.close()
Loading