From 2b49c73f7f6a952818a74709cc8deefdfc925f46 Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 8 Jan 2021 10:04:27 +0100 Subject: [PATCH] Increase timeouts for failing tests --- distributed/tests/test_resources.py | 6 +++++- distributed/tests/test_stress.py | 2 +- distributed/worker.py | 10 +++++++--- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/distributed/tests/test_resources.py b/distributed/tests/test_resources.py index ac7c06f07eb..aaba48b99f9 100644 --- a/distributed/tests/test_resources.py +++ b/distributed/tests/test_resources.py @@ -249,7 +249,11 @@ async def test_minimum_resource(c, s, a): assert a.total_resources == a.available_resources -@gen_cluster(client=True, nthreads=[("127.0.0.1", 2, {"resources": {"A": 1}})]) +@gen_cluster( + client=True, + nthreads=[("127.0.0.1", 2, {"resources": {"A": 1}})], + active_rpc_timeout=10, +) async def test_prefer_constrained(c, s, a): futures = c.map(slowinc, range(1000), delay=0.1) constrained = c.map(inc, range(10), resources={"A": 1}) diff --git a/distributed/tests/test_stress.py b/distributed/tests/test_stress.py index d699ac9452d..755f5f97efd 100644 --- a/distributed/tests/test_stress.py +++ b/distributed/tests/test_stress.py @@ -79,7 +79,7 @@ async def test_cancel_stress(c, s, *workers): def test_cancel_stress_sync(loop): da = pytest.importorskip("dask.array") x = da.random.random((50, 50), chunks=(2, 2)) - with cluster(active_rpc_timeout=10) as (s, [a, b]): + with cluster(active_rpc_timeout=10, disconnect_timeout=10) as (s, [a, b]): with Client(s["address"], loop=loop) as c: x = c.persist(x) y = (x.sum(axis=0) + x.sum(axis=1) + 1).std() diff --git a/distributed/worker.py b/distributed/worker.py index 2b229104e52..e871bcfcd7f 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2521,11 +2521,15 @@ async def execute(self, key, report=False): return ts = self.tasks[key] if ts.state != "executing": - raise RuntimeError( - f"Trying to execute a task {ts} which is not in executing state anymore" + # This might happen if keys are canceled + logger.debug( + "Trying to execute a task %s which is not in executing state anymore" + % ts ) + return if ts.runspec is None: - raise RuntimeError(f"No runspec available for task {ts}") + logger.critical("No runspec available for task %s." % ts) + return if self.validate: assert not ts.waiting_for_data assert ts.state == "executing"