Skip to content

Commit

Permalink
Add test case for gh7498
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Aug 20, 2024
1 parent 93f4c6f commit b53ec14
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 1 deletion.
3 changes: 3 additions & 0 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ def __str__(self) -> str:
result = "\n".join([result, self.msg])
return result

def __reduce__(self):
return self.__class__, (self.key, self.reason, self.msg)


class FuturesCancelledError(CancelledError):
error_groups: list[CancelledFuturesGroup]
Expand Down
2 changes: 1 addition & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4632,7 +4632,7 @@ def _match_graph_with_tasks(
): # bad key
lost_keys.add(k)
logger.info("User asked for computation on lost data, %s", k)
del dsk[k]
dsk.pop(k, None)
del dependencies[k]
if k in keys:
keys.remove(k)
Expand Down
38 changes: 38 additions & 0 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8437,3 +8437,41 @@ def test_release_persisted_collection_sync(c):
# submitting to the scheduler is different to what we are in
# client.compute
arr.compute()


@pytest.mark.slow()
@pytest.mark.parametrize("do_wait", [True, False])
def test_persisted_collection_submitted(c, do_wait):
# Note: sending collections like this should be considered an anti-pattern
# but it is possible. As long as the user ensures the futures stay alive
# this is fine but the cluster will not take over this responsibility. The
# client will not unpack the collection when using submit and will therefore
# not handle the dependencies in any way.
# See also https://github.com/dask/distributed/issues/7498
da = pytest.importorskip("dask.array")
x = da.arange(10, chunks=(5,)).persist()
if do_wait:
wait(x)

def f(x):
assert isinstance(x, da.Array)
return x.sum().compute()

future = c.submit(f, x)
result = future.result()
assert result == sum(range(10))
del x, future, result

# Now we delete the persisted collection before computing the result
y = da.arange(10, chunks=(4,)).persist()
if do_wait:
wait(y)
future = c.submit(f, y)
del y
with pytest.raises(FutureCancelledError):
future.result()
del future

future = c.submit(f, da.arange(10, chunks=(4,)).persist())
with pytest.raises(FutureCancelledError):
future.result()

0 comments on commit b53ec14

Please sign in to comment.