diff --git a/distributed/client.py b/distributed/client.py index 9c076f67bab..12dc2ad7e1b 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -911,6 +911,12 @@ def _handle_error(self, exception=None): def _close(self, fast=False): """ Send close signal and wait until scheduler completes """ with log_errors(): + with ignoring(AttributeError): + dask.set_options(get=self._previous_get) + with ignoring(AttributeError): + dask.set_options(shuffle=self._previous_shuffle) + if self.get == _globals.get('get'): + del _globals['get'] if self.status == 'closed': raise gen.Return() if self.scheduler_comm and self.scheduler_comm.comm and not self.scheduler_comm.comm.closed(): diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 53cd184e260..d2ff316a826 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -41,6 +41,19 @@ def test_gen_cluster(c, s, a, b): assert s.ncores == {w.address: w.ncores for w in [a, b]} +def test_gen_cluster_cleans_up_client(): + import dask.context + assert not dask.context._globals.get('get') + + @gen_cluster(client=True) + def f(c, s, a, b): + pass + + f() + + assert not dask.context._globals.get('get') + + @gen_cluster(client=False) def test_gen_cluster_without_client(s, a, b): assert isinstance(s, Scheduler)