From a045531eca13ee6d61088c385d97968956d2ea9a Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 17 Dec 2020 17:47:22 +0100 Subject: [PATCH] Remove release keys where they seem to be misplaced --- distributed/worker.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 288a5614ac9..e17439915db 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1602,9 +1602,6 @@ def transition_flight_waiting(self, ts, worker=None, remove=True, runspec=None): self.data_needed.appendleft(dependent.key) else: # worker was probably busy, wait a while self.data_needed.append(dependent.key) - - if not ts.dependents: - self.release_key(ts.key) except Exception as e: logger.exception(e) if LOG_PDB: @@ -2130,9 +2127,6 @@ async def gather_dep(self, worker, dep, deps, total_nbytes, cause=None): if not busy and d in data: self.transition(ts, "memory", value=data[d]) - elif ts is None or ts.state == "executing": - self.release_key(d) - continue elif ts.state not in ("ready", "memory"): self.transition(ts, "waiting", worker=worker, remove=not busy) @@ -2170,7 +2164,6 @@ def bad_dep(self, dep): self.release_key(dep.key) async def handle_missing_dep(self, *deps, **kwargs): - print("HANDLE MISSING DEPS") try: self.log.append(("handle-missing", deps)) deps = {dep for dep in deps if dep.dependents}