From 99ad636d8eafb4b77e0477d05716de514863ddf9 Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 11 Jan 2021 14:20:48 +0100 Subject: [PATCH] Query who has if workers are busy --- distributed/worker.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index 0d5e944d7f..fc38d435a5 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2164,6 +2164,8 @@ async def gather_dep(self, worker, deps, total_nbytes, cause): # Exponential backoff to avoid hammering scheduler/worker self.repetitively_busy += 1 await asyncio.sleep(0.100 * 1.5 ** self.repetitively_busy) + # See if anyone new has the data + await self.query_who_has(*deps) self.ensure_communicating() def bad_dep(self, ts): @@ -2221,7 +2223,8 @@ async def handle_missing_dep(self, *deps, worker): self.transition(dep, "waiting", worker=worker) async def query_who_has(self, *deps): - # FIXME: If this is improperly called, the query fails. We should ensure that deps is a flat list here + if self.validate: + assert all(k in self.tasks for k in deps) with log_errors(): response = await retry_operation(self.scheduler.who_has, keys=deps) self.update_who_has(response)