From 7dfe5cfe516cc0f5562bc53e3e81dc357b87cc83 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 3 Aug 2023 15:11:12 +0200 Subject: [PATCH] handle errors properly --- distributed/scheduler.py | 100 +++++++++++++++++++-------------------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index c5552bd365f..1cb44e00e3c 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4542,14 +4542,57 @@ async def update_graph( start = time() async with self._update_graph_lock: try: - graph = deserialize(graph_header, graph_frames).data - del graph_header, graph_frames - except Exception as e: - msg = """\ - Error during deserialization of the task graph. This frequently occurs if the Scheduler and Client have different environments. For more information, see https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments - """ - raise RuntimeError(textwrap.dedent(msg)) from e + try: + graph = deserialize(graph_header, graph_frames).data + del graph_header, graph_frames + except Exception as e: + msg = """\ + Error during deserialization of the task graph. This frequently occurs if the Scheduler and Client have different environments. For more information, see https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments + """ + raise RuntimeError(textwrap.dedent(msg)) from e + else: + ( + dsk, + dependencies, + annotations_by_type, + ) = await offload( + _materialize_graph, + graph=graph, + global_annotations=annotations or {}, + ) + del graph + if not internal_priority: + # Removing all non-local keys before calling order() + dsk_keys = set( + dsk + ) # intersection() of sets is much faster than dict_keys + stripped_deps = { + k: v.intersection(dsk_keys) + for k, v in dependencies.items() + if k in dsk_keys + } + internal_priority = await offload( + dask.order.order, dsk=dsk, dependencies=stripped_deps + ) + self._create_taskstate_from_graph( + dsk=dsk, + client=client, + dependencies=dependencies, + keys=set(keys), + ordered=internal_priority or {}, + submitting_task=submitting_task, + user_priority=user_priority, + actors=actors, + fifo_timeout=fifo_timeout, + code=code, + annotations_by_type=annotations_by_type, + # FIXME: This is just used to attach to Computation + # objects. This should be removed + global_annotations=annotations, + start=start, + stimulus_id=stimulus_id or f"update-graph-{start}", + ) except RuntimeError as e: err = error_message(e) for key in keys: @@ -4561,49 +4604,6 @@ async def update_graph( "traceback": err["traceback"], } ) - else: - ( - dsk, - dependencies, - annotations_by_type, - ) = await offload( - _materialize_graph, - graph=graph, - global_annotations=annotations or {}, - ) - del graph - if not internal_priority: - # Removing all non-local keys before calling order() - dsk_keys = set( - dsk - ) # intersection() of sets is much faster than dict_keys - stripped_deps = { - k: v.intersection(dsk_keys) - for k, v in dependencies.items() - if k in dsk_keys - } - internal_priority = await offload( - dask.order.order, dsk=dsk, dependencies=stripped_deps - ) - - self._create_taskstate_from_graph( - dsk=dsk, - client=client, - dependencies=dependencies, - keys=set(keys), - ordered=internal_priority or {}, - submitting_task=submitting_task, - user_priority=user_priority, - actors=actors, - fifo_timeout=fifo_timeout, - code=code, - annotations_by_type=annotations_by_type, - # FIXME: This is just used to attach to Computation objects. This - # should be removed - global_annotations=annotations, - start=start, - stimulus_id=stimulus_id or f"update-graph-{start}", - ) end = time() self.digest_metric("update-graph-duration", end - start)