diff --git a/distributed/tests/test_speculative.py b/distributed/tests/test_speculative.py index e1177bbb134..9d12f817c2f 100644 --- a/distributed/tests/test_speculative.py +++ b/distributed/tests/test_speculative.py @@ -1,19 +1,27 @@ +import asyncio + from dask import delayed -from distributed.utils_test import gen_cluster, inc, dec +from distributed.utils_test import gen_cluster, inc, dec, slowinc @gen_cluster(client=True, config={"dask.optimization.fuse.active": False}) async def test_speculative_assignment_simple(c, s, a, b): - x = delayed(inc)(1) + x = delayed(slowinc)(1) y = delayed(inc)(x) z = delayed(dec)(y) - zz = c.compute(z) - result = await zz - assert result == 2 - assert (y.key, "speculative", "ready") in a.story(y.key) - assert (z.key, "speculative", "ready") in a.story(z.key) + result = c.compute(z) + while not a.tasks: + await asyncio.sleep(0.001) + + assert y.key in a.tasks + assert z.key in a.tasks + + assert a.tasks[y.key].state == "speculative" + assert a.tasks[z.key].state == "speculative" + + assert await result == 2 @gen_cluster(client=True, config={"dask.optimization.fuse.active": False}) @@ -48,7 +56,7 @@ async def test_spec_assign_intermittent(c, s, a, b): f i # both spec | | g j # both spec - \ / + \ / # noqa k # spec """