Skip to content

Commit

Permalink
Query who has if workers are busy
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Jan 11, 2021
1 parent bf86166 commit f90381f
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2164,7 +2164,9 @@ async def gather_dep(self, worker, deps, total_nbytes, cause):
# Exponential backoff to avoid hammering scheduler/worker
self.repetitively_busy += 1
await asyncio.sleep(0.100 * 1.5 ** self.repetitively_busy)
self.ensure_communicating()
# See if anyone new has the data
await self.query_who_has(*deps)
self.ensure_communicating()

def bad_dep(self, ts):
exc = RuntimeError(
Expand Down Expand Up @@ -2221,7 +2223,8 @@ async def handle_missing_dep(self, *deps, worker):
self.transition(dep, "waiting", worker=worker)

async def query_who_has(self, *deps):
# FIXME: If this is improperly called, the query fails. We should ensure that deps is a flat list here
if self.validate:
assert all(k in self.tasks for k in deps)
with log_errors():
response = await retry_operation(self.scheduler.who_has, keys=deps)
self.update_who_has(response)
Expand Down

0 comments on commit f90381f

Please sign in to comment.