Skip to content

Commit

Permalink
Add failing test for forgotten tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Apr 11, 2022
1 parent e6cc6a2 commit f4cf53e
Showing 1 changed file with 27 additions and 0 deletions.
27 changes: 27 additions & 0 deletions distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,3 +395,30 @@ async def test_restrictions(c, s, a, b):

await y
assert all(stringify(key) in a.data for key in y.__dask_keys__())


@pytest.mark.xfail(reason="Don't clean up forgotten shuffles")
@gen_cluster(client=True)
async def test_delete_some_results(c, s, a, b):
df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-10",
dtypes={"x": float, "y": float},
freq="10 s",
)
x = dd.shuffle.shuffle(df, "x", shuffle="p2p").persist()
while not s.tasks or not any(ts.state == "memory" for ts in s.tasks.values()):
await asyncio.sleep(0.01)

n = len(s.tasks)

x = x.partitions[: x.npartitions // 2].persist()

This comment has been minimized.

Copy link
@gjoseph92

gjoseph92 Apr 11, 2022

Collaborator

I would have expected the order of these to be flipped, as in

    x = x.partitions[: x.npartitions // 2].persist()  # submit subset
    # wait for tasks to start
    x = dd.shuffle.shuffle(df, "x", shuffle="p2p").persist()  # submit full

That may trigger a more serious issue than just data not being cleaned up. I think it also would cause half of the output data to be missing (basically it would act like the second, full shuffle never happened)?

This comment has been minimized.

Copy link
@mrocklin

mrocklin via email Apr 11, 2022

Author Member

while len(s.tasks) == n:
await asyncio.sleep(0.1)

await x

clean_worker(a)
clean_worker(b)
clean_scheduler(s)

0 comments on commit f4cf53e

Please sign in to comment.