diff --git a/docs/source/release_notes.rst b/docs/source/release_notes.rst index 67640d9079..234ba0bb77 100644 --- a/docs/source/release_notes.rst +++ b/docs/source/release_notes.rst @@ -7,6 +7,7 @@ Release Notes * Documentation Changes * Testing Changes * Add ``pytest-timeout``. All tests that run longer than 6 minutes will fail. :pr:`2374` + * Refactored dask tests :pr:`2377` .. warning:: diff --git a/evalml/tests/automl_tests/test_automl_dask.py b/evalml/tests/automl_tests/test_automl_dask.py index 74f49c213c..96ba8e5071 100644 --- a/evalml/tests/automl_tests/test_automl_dask.py +++ b/evalml/tests/automl_tests/test_automl_dask.py @@ -1,8 +1,6 @@ -import unittest - import numpy as np import pytest -from distributed import Client +from dask.distributed import Client, LocalCluster from evalml.automl import AutoMLSearch from evalml.automl.callbacks import raise_error_callback @@ -15,30 +13,33 @@ ) -@pytest.mark.usefixtures("X_y_binary_cls") -class TestAutoMLSearchDask(unittest.TestCase): - @pytest.fixture(autouse=True) - def inject_fixtures(self, caplog): - """Gives the unittests access to the logger""" - self._caplog = caplog +@pytest.fixture +def sequential_engine(): + return SequentialEngine() + - @classmethod - def setUpClass(cls) -> None: - cls.client = Client() - cls.parallel_engine = DaskEngine(cls.client) - cls.sequential_engine = SequentialEngine() +@pytest.fixture(scope="module") +def cluster(): + dask_cluster = LocalCluster( + n_workers=1, threads_per_worker=2, dashboard_address=None + ) + yield dask_cluster + dask_cluster.close() - def test_automl(self): - """Comparing the results of parallel and sequential AutoML to each other.""" - X, y = self.X_y_binary + +def test_automl(X_y_binary_cls, cluster, sequential_engine): + """Comparing the results of parallel and sequential AutoML to each other.""" + with Client(cluster) as client: + parallel_engine = DaskEngine(client) + X, y = X_y_binary_cls par_automl = AutoMLSearch( - X_train=X, y_train=y, problem_type="binary", engine=self.parallel_engine + X_train=X, y_train=y, problem_type="binary", engine=parallel_engine ) par_automl.search() parallel_rankings = par_automl.full_rankings seq_automl = AutoMLSearch( - X_train=X, y_train=y, problem_type="binary", engine=self.sequential_engine + X_train=X, y_train=y, problem_type="binary", engine=sequential_engine ) seq_automl.search() sequential_rankings = seq_automl.full_rankings @@ -60,15 +61,20 @@ def test_automl(self): np.array(par_results["percent_better_than_baseline"]), ) - def test_automl_max_iterations(self): - """Making sure that the max_iterations parameter limits the number of pipelines run.""" - X, y = self.X_y_binary + +def test_automl_max_iterations(X_y_binary_cls, cluster, sequential_engine): + """Making sure that the max_iterations parameter limits the number of pipelines run.""" + + X, y = X_y_binary_cls + with Client(cluster) as client: + parallel_engine = DaskEngine(client) + max_iterations = 4 par_automl = AutoMLSearch( X_train=X, y_train=y, problem_type="binary", - engine=self.parallel_engine, + engine=parallel_engine, max_iterations=max_iterations, ) par_automl.search() @@ -78,7 +84,7 @@ def test_automl_max_iterations(self): X_train=X, y_train=y, problem_type="binary", - engine=self.sequential_engine, + engine=sequential_engine, max_iterations=max_iterations, ) seq_automl.search() @@ -86,44 +92,56 @@ def test_automl_max_iterations(self): assert len(sequential_rankings) == len(parallel_rankings) == max_iterations - def test_automl_train_dask_error_callback(self): - """Make sure the pipeline training error message makes its way back from the workers.""" - self._caplog.clear() - X, y = self.X_y_binary + +def test_automl_train_dask_error_callback(X_y_binary_cls, cluster, caplog): + """Make sure the pipeline training error message makes its way back from the workers.""" + caplog.clear() + with Client(cluster) as client: + parallel_engine = DaskEngine(client) + X, y = X_y_binary_cls + pipelines = [TestPipelineWithFitError({})] automl = AutoMLSearch( X_train=X, y_train=y, problem_type="binary", - engine=self.parallel_engine, + engine=parallel_engine, max_iterations=2, allowed_pipelines=pipelines, ) automl.train_pipelines(pipelines) - assert "Train error for PipelineWithError: Yikes" in self._caplog.text + assert "Train error for PipelineWithError: Yikes" in caplog.text + + +def test_automl_score_dask_error_callback(X_y_binary_cls, cluster, caplog): + """Make sure the pipeline scoring error message makes its way back from the workers.""" + caplog.clear() + with Client(cluster) as client: + parallel_engine = DaskEngine(client) - def test_automl_score_dask_error_callback(self): - """Make sure the pipeline scoring error message makes its way back from the workers.""" - self._caplog.clear() - X, y = self.X_y_binary + X, y = X_y_binary_cls pipelines = [TestPipelineWithScoreError({})] automl = AutoMLSearch( X_train=X, y_train=y, problem_type="binary", - engine=self.parallel_engine, + engine=parallel_engine, max_iterations=2, allowed_pipelines=pipelines, ) automl.score_pipelines( pipelines, X, y, objectives=["Log Loss Binary", "F1", "AUC"] ) - assert "Score error for PipelineWithError" in self._caplog.text + assert "Score error for PipelineWithError" in caplog.text + + +def test_automl_immediate_quit(X_y_binary_cls, cluster, caplog): + """Make sure the AutoMLSearch quits when error_callback is defined and does no further work.""" + caplog.clear() + X, y = X_y_binary_cls + with Client(cluster) as client: + parallel_engine = DaskEngine(client) - def test_automl_immediate_quit(self): - """Make sure the AutoMLSearch quits when error_callback is defined and does no further work.""" - self._caplog.clear() - X, y = self.X_y_binary pipelines = [ TestPipelineFast({}), TestPipelineWithFitError({}), @@ -133,7 +151,7 @@ def test_automl_immediate_quit(self): X_train=X, y_train=y, problem_type="binary", - engine=self.parallel_engine, + engine=parallel_engine, max_iterations=4, allowed_pipelines=pipelines, error_callback=raise_error_callback, @@ -155,7 +173,3 @@ def test_automl_immediate_quit(self): assert TestPipelineWithFitError.custom_name not in set( automl.full_rankings["pipeline_name"] ) - - @classmethod - def tearDownClass(cls) -> None: - cls.client.close() diff --git a/evalml/tests/automl_tests/test_dask_engine.py b/evalml/tests/automl_tests/test_dask_engine.py index 977e0c067d..f8e68902f3 100644 --- a/evalml/tests/automl_tests/test_dask_engine.py +++ b/evalml/tests/automl_tests/test_dask_engine.py @@ -1,10 +1,8 @@ -import unittest - import numpy as np import pandas as pd import pytest import woodwork as ww -from distributed import Client +from dask.distributed import Client, LocalCluster from evalml.automl.engine.dask_engine import DaskComputation, DaskEngine from evalml.automl.engine.engine_base import ( @@ -23,26 +21,32 @@ ) -@pytest.mark.usefixtures("X_y_binary_cls") -class TestDaskEngine(unittest.TestCase): - @classmethod - def setUpClass(cls) -> None: - cls.client = Client() +@pytest.fixture(scope="module") +def cluster(): + dask_cluster = LocalCluster( + n_workers=1, threads_per_worker=1, dashboard_address=None + ) + yield dask_cluster + dask_cluster.close() + - def test_init(self): - engine = DaskEngine(client=self.client) - assert engine.client == self.client +def test_init(cluster): + with Client(cluster) as client: + engine = DaskEngine(client=client) + assert engine.client == client with pytest.raises( TypeError, match="Expected dask.distributed.Client, received" ): DaskEngine(client="Client") - def test_submit_training_job_single(self): - """Test that training a single pipeline using the parallel engine produces the - same results as simply running the train_pipeline function.""" - X, y = self.X_y_binary - engine = DaskEngine(client=self.client) + +def test_submit_training_job_single(X_y_binary_cls, cluster): + """Test that training a single pipeline using the parallel engine produces the + same results as simply running the train_pipeline function.""" + X, y = X_y_binary_cls + with Client(cluster) as client: + engine = DaskEngine(client=client) pipeline = BinaryClassificationPipeline( component_graph=["Logistic Regression Classifier"], parameters={"Logistic Regression Classifier": {"n_jobs": 1}}, @@ -68,10 +72,12 @@ def test_submit_training_job_single(self): dask_pipeline_fitted.predict(X), original_pipeline_fitted.predict(X) ) - def test_submit_training_jobs_multiple(self): - """Test that training multiple pipelines using the parallel engine produces the - same results as the sequential engine.""" - X, y = self.X_y_binary + +def test_submit_training_jobs_multiple(X_y_binary_cls, cluster): + """Test that training multiple pipelines using the parallel engine produces the + same results as the sequential engine.""" + X, y = X_y_binary_cls + with Client(cluster) as client: pipelines = [ BinaryClassificationPipeline( component_graph=["Logistic Regression Classifier"], @@ -98,7 +104,7 @@ def fit_pipelines(pipelines, engine): assert pipeline._is_fitted # Verify all pipelines are trained and fitted. - par_pipelines = fit_pipelines(pipelines, DaskEngine(client=self.client)) + par_pipelines = fit_pipelines(pipelines, DaskEngine(client=client)) for pipeline in par_pipelines: assert pipeline._is_fitted @@ -107,19 +113,22 @@ def fit_pipelines(pipelines, engine): for par_pipeline in par_pipelines: assert par_pipeline in seq_pipelines - def test_submit_evaluate_job_single(self): - """Test that evaluating a single pipeline using the parallel engine produces the - same results as simply running the evaluate_pipeline function.""" - X, y = self.X_y_binary - X.ww.init() - y = ww.init_series(y) + +def test_submit_evaluate_job_single(X_y_binary_cls, cluster): + """Test that evaluating a single pipeline using the parallel engine produces the + same results as simply running the evaluate_pipeline function.""" + X, y = X_y_binary_cls + X.ww.init() + y = ww.init_series(y) + + with Client(cluster) as client: pipeline = BinaryClassificationPipeline( component_graph=["Logistic Regression Classifier"], parameters={"Logistic Regression Classifier": {"n_jobs": 1}}, ) - engine = DaskEngine(client=self.client) + engine = DaskEngine(client=client) # Verify that engine evaluates a pipeline pipeline_future = engine.submit_evaluation_job( @@ -155,12 +164,15 @@ def test_submit_evaluate_job_single(self): == original_eval_results.get("logger").logs ) - def test_submit_evaluate_jobs_multiple(self): - """Test that evaluating multiple pipelines using the parallel engine produces the - same results as the sequential engine.""" - X, y = self.X_y_binary - X.ww.init() - y = ww.init_series(y) + +def test_submit_evaluate_jobs_multiple(X_y_binary_cls, cluster): + """Test that evaluating multiple pipelines using the parallel engine produces the + same results as the sequential engine.""" + X, y = X_y_binary_cls + X.ww.init() + y = ww.init_series(y) + + with Client(cluster) as client: pipelines = [ BinaryClassificationPipeline( @@ -182,7 +194,7 @@ def eval_pipelines(pipelines, engine): results = [f.get_result() for f in futures] return results - par_eval_results = eval_pipelines(pipelines, DaskEngine(client=self.client)) + par_eval_results = eval_pipelines(pipelines, DaskEngine(client=client)) par_dicts = [s.get("scores") for s in par_eval_results] par_scores = [s["cv_data"][0]["mean_cv_score"] for s in par_dicts] par_pipelines = [s.get("pipeline") for s in par_eval_results] @@ -205,18 +217,21 @@ def eval_pipelines(pipelines, engine): for par_pipeline in par_pipelines: assert par_pipeline in seq_pipelines - def test_submit_scoring_job_single(self): - """Test that scoring a single pipeline using the parallel engine produces the - same results as simply running the score_pipeline function.""" - X, y = self.X_y_binary - X.ww.init() - y = ww.init_series(y) + +def test_submit_scoring_job_single(X_y_binary_cls, cluster): + """Test that scoring a single pipeline using the parallel engine produces the + same results as simply running the score_pipeline function.""" + X, y = X_y_binary_cls + X.ww.init() + y = ww.init_series(y) + + with Client(cluster) as client: pipeline = BinaryClassificationPipeline( component_graph=["Logistic Regression Classifier"], parameters={"Logistic Regression Classifier": {"n_jobs": 1}}, ) - engine = DaskEngine(client=self.client) + engine = DaskEngine(client=client) objectives = [automl_data.objective] pipeline_future = engine.submit_training_job( @@ -238,12 +253,15 @@ def test_submit_scoring_job_single(self): assert not np.isnan(pipeline_score["Log Loss Binary"]) assert pipeline_score == original_pipeline_score - def test_submit_scoring_jobs_multiple(self): - """Test that scoring multiple pipelines using the parallel engine produces the - same results as the sequential engine.""" - X, y = self.X_y_binary - X.ww.init() - y = ww.init_series(y) + +def test_submit_scoring_jobs_multiple(X_y_binary_cls, cluster): + """Test that scoring multiple pipelines using the parallel engine produces the + same results as the sequential engine.""" + X, y = X_y_binary_cls + X.ww.init() + y = ww.init_series(y) + + with Client(cluster) as client: pipelines = [ BinaryClassificationPipeline( @@ -277,7 +295,7 @@ def score_pipelines(pipelines, engine): results = [f.get_result() for f in futures] return results - par_eval_results = score_pipelines(pipelines, DaskEngine(client=self.client)) + par_eval_results = score_pipelines(pipelines, DaskEngine(client=client)) par_scores = [s["Log Loss Binary"] for s in par_eval_results] seq_eval_results = score_pipelines(pipelines, SequentialEngine()) @@ -289,11 +307,14 @@ def score_pipelines(pipelines, engine): assert not any([np.isnan(s) for s in seq_scores]) np.testing.assert_allclose(par_scores, seq_scores, rtol=1e-10) - def test_cancel_job(self): - """Test that training a single pipeline using the parallel engine produces the - same results as simply running the train_pipeline function.""" - X, y = self.X_y_binary - engine = DaskEngine(client=self.client) + +def test_cancel_job(X_y_binary_cls, cluster): + """Test that training a single pipeline using the parallel engine produces the + same results as simply running the train_pipeline function.""" + X, y = X_y_binary_cls + + with Client(cluster) as client: + engine = DaskEngine(client=client) pipeline = TestPipelineSlow({"Logistic Regression Classifier": {"n_jobs": 1}}) # Verify that engine fits a pipeline @@ -303,9 +324,12 @@ def test_cancel_job(self): pipeline_future.cancel() assert pipeline_future.is_cancelled - def test_dask_sends_woodwork_schema(self): - X, y = self.X_y_binary - engine = DaskEngine(client=self.client) + +def test_dask_sends_woodwork_schema(X_y_binary_cls, cluster): + X, y = X_y_binary_cls + + with Client(cluster) as client: + engine = DaskEngine(client=client) X.ww.init( logical_types={0: "Categorical"}, semantic_tags={0: ["my cool feature"]} @@ -349,7 +373,3 @@ def test_dask_sends_woodwork_schema(self): future = engine.submit_evaluation_job(new_config, pipeline, X, y) future.get_result() - - @classmethod - def tearDownClass(cls) -> None: - cls.client.close() diff --git a/evalml/tests/conftest.py b/evalml/tests/conftest.py index 654af8aa4f..6bce86b58b 100644 --- a/evalml/tests/conftest.py +++ b/evalml/tests/conftest.py @@ -201,12 +201,12 @@ def X_y_binary(): return X, y -@pytest.fixture(scope="class") -def X_y_binary_cls(request): +@pytest.fixture +def X_y_binary_cls(): X, y = datasets.make_classification( n_samples=100, n_features=20, n_informative=2, n_redundant=2, random_state=0 ) - request.cls.X_y_binary = pd.DataFrame(X), pd.Series(y) + return pd.DataFrame(X), pd.Series(y) @pytest.fixture