From ef94538732e75f3d0c3dfb5fb49bcb6b4a8d23bd Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 8 Jun 2021 15:49:17 +0200 Subject: [PATCH] Code review comments --- distributed/tests/test_failed_workers.py | 1 - distributed/tests/test_scheduler.py | 2 +- distributed/tests/test_worker.py | 1 - distributed/worker.py | 18 +++++++++++++----- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index 1be3121fd85..6bdabf46e30 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -407,7 +407,6 @@ def __sizeof__(self) -> int: @gen_cluster(client=True) async def test_worker_who_has_clears_after_failed_connection(c, s, a, b): n = await Nanny(s.address, nthreads=2, loop=s.loop) - n.auto_restart = False start = time() while len(s.nthreads) < 3: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index c8b05414314..22a9c103da2 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1919,7 +1919,7 @@ async def test_task_groups(c, s, a, b): assert s.task_groups[y.name].dependencies == {s.task_groups[x.name]} await c.replicate(y) - # TODO: Are we supposed to track repliacted memory here? See also Scheduelr.add_keys + # TODO: Are we supposed to track replicated memory here? See also Scheduler.add_keys assert tg.nbytes_in_memory == y.nbytes assert "array" in str(tg.types) assert "array" in str(tp.types) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 3dd4b6c128d..ac7f5a67325 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -2157,7 +2157,6 @@ def raise_exc(*args): f.release() g.release() - # We no longer hold any refs. Cluster should reset completely # This is not happening for server in [s, a, b]: while server.tasks: diff --git a/distributed/worker.py b/distributed/worker.py index 58c04562267..22d6406156d 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1530,13 +1530,13 @@ def handle_superfluous_data(self, keys=(), reason=None): For stronger guarantees, see handler free_keys """ + self.log.append(("Handle superfluous data", keys, reason)) for key in list(keys): ts = self.tasks.get(key) - self.log.append((key, "nofity superfluous data", reason)) if ts and not ts.scheduler_holds_ref: self.release_key(key, reason=f"delete data: {reason}", report=False) - logger.debug("Worker %s -- Deleted %d keys", self.name, len(keys)) + logger.debug("Worker %s -- Deleted %d keys", self.name, len(keys)) return "OK" async def set_resources(self, **resources): @@ -1732,7 +1732,7 @@ def transition_new_fetch(self, ts, who_has): if self.validate: assert ts.state == "new" assert ts.runspec is None - # assert who_has + assert who_has for dependent in ts.dependents: dependent.waiting_for_data.add(ts.key) @@ -2541,8 +2541,10 @@ async def handle_missing_dep(self, *deps, **kwargs): if dependent.key in dep.waiting_for_data: self.data_needed.append(dependent.key) if still_missing: - logger.critical( - "Found self referencing who has response from scheduler. Trying again handle_missing" + logger.debug( + "Found self referencing who has response from scheduler for keys %s.\n" + "Trying again handle_missing", + deps, ) await self.handle_missing_dep(*deps) except Exception: @@ -2576,6 +2578,12 @@ def update_who_has(self, who_has): if dep in self.tasks: if self.address in workers and self.tasks[dep].state != "memory": + logger.debug( + "Scheduler claims worker %s holds data for task %s which is not true.", + self.name, + dep, + ) + # Do not mutate the input dict. That's rude workers = set(workers) - {self.address} self.tasks[dep].who_has.update(workers)