From 09365f628dc86832190efd03a308ba177755bf13 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 22 Sep 2022 08:59:42 -0700 Subject: [PATCH] Close client during `wait_until_finish`; rm async. --- sdks/python/apache_beam/runners/dask/dask_runner.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/dask/dask_runner.py b/sdks/python/apache_beam/runners/dask/dask_runner.py index ed711ae00657..5b4c297052c0 100644 --- a/sdks/python/apache_beam/runners/dask/dask_runner.py +++ b/sdks/python/apache_beam/runners/dask/dask_runner.py @@ -88,13 +88,14 @@ def wait_until_finish(self, duration=None) -> PipelineState: # Convert milliseconds to seconds duration /= 1000 self.client.wait_for_workers(timeout=duration) - self.client.gather(self.futures, errors='raise', asynchronous=True) + self.client.gather(self.futures, errors='raise') self._state = PipelineState.DONE except: # pylint: disable=broad-except self._state = PipelineState.FAILED raise - # finally: - # self.client.close(timeout=duration) + finally: + self.client.close() + self.client.cluster.close() return self._state def cancel(self) -> PipelineState: