Skip to content

Commit

Permalink
handle errors properly
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Aug 3, 2023
1 parent 802f67f commit 7dfe5cf
Showing 1 changed file with 50 additions and 50 deletions.
100 changes: 50 additions & 50 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4542,14 +4542,57 @@ async def update_graph(
start = time()
async with self._update_graph_lock:
try:
graph = deserialize(graph_header, graph_frames).data
del graph_header, graph_frames
except Exception as e:
msg = """\
Error during deserialization of the task graph. This frequently occurs if the Scheduler and Client have different environments. For more information, see https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments
"""
raise RuntimeError(textwrap.dedent(msg)) from e
try:
graph = deserialize(graph_header, graph_frames).data
del graph_header, graph_frames
except Exception as e:
msg = """\
Error during deserialization of the task graph. This frequently occurs if the Scheduler and Client have different environments. For more information, see https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments
"""
raise RuntimeError(textwrap.dedent(msg)) from e
else:
(
dsk,
dependencies,
annotations_by_type,
) = await offload(
_materialize_graph,
graph=graph,
global_annotations=annotations or {},
)
del graph
if not internal_priority:
# Removing all non-local keys before calling order()
dsk_keys = set(
dsk
) # intersection() of sets is much faster than dict_keys
stripped_deps = {
k: v.intersection(dsk_keys)
for k, v in dependencies.items()
if k in dsk_keys
}
internal_priority = await offload(
dask.order.order, dsk=dsk, dependencies=stripped_deps
)

self._create_taskstate_from_graph(
dsk=dsk,
client=client,
dependencies=dependencies,
keys=set(keys),
ordered=internal_priority or {},
submitting_task=submitting_task,
user_priority=user_priority,
actors=actors,
fifo_timeout=fifo_timeout,
code=code,
annotations_by_type=annotations_by_type,
# FIXME: This is just used to attach to Computation
# objects. This should be removed
global_annotations=annotations,
start=start,
stimulus_id=stimulus_id or f"update-graph-{start}",
)
except RuntimeError as e:
err = error_message(e)
for key in keys:
Expand All @@ -4561,49 +4604,6 @@ async def update_graph(
"traceback": err["traceback"],
}
)
else:
(
dsk,
dependencies,
annotations_by_type,
) = await offload(
_materialize_graph,
graph=graph,
global_annotations=annotations or {},
)
del graph
if not internal_priority:
# Removing all non-local keys before calling order()
dsk_keys = set(
dsk
) # intersection() of sets is much faster than dict_keys
stripped_deps = {
k: v.intersection(dsk_keys)
for k, v in dependencies.items()
if k in dsk_keys
}
internal_priority = await offload(
dask.order.order, dsk=dsk, dependencies=stripped_deps
)

self._create_taskstate_from_graph(
dsk=dsk,
client=client,
dependencies=dependencies,
keys=set(keys),
ordered=internal_priority or {},
submitting_task=submitting_task,
user_priority=user_priority,
actors=actors,
fifo_timeout=fifo_timeout,
code=code,
annotations_by_type=annotations_by_type,
# FIXME: This is just used to attach to Computation objects. This
# should be removed
global_annotations=annotations,
start=start,
stimulus_id=stimulus_id or f"update-graph-{start}",
)
end = time()
self.digest_metric("update-graph-duration", end - start)

Expand Down

0 comments on commit 7dfe5cf

Please sign in to comment.