Skip to content

Commit

Permalink
Code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Jun 8, 2021
1 parent 5a35ce1 commit ef94538
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 8 deletions.
1 change: 0 additions & 1 deletion distributed/tests/test_failed_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ def __sizeof__(self) -> int:
@gen_cluster(client=True)
async def test_worker_who_has_clears_after_failed_connection(c, s, a, b):
n = await Nanny(s.address, nthreads=2, loop=s.loop)
n.auto_restart = False

start = time()
while len(s.nthreads) < 3:
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1919,7 +1919,7 @@ async def test_task_groups(c, s, a, b):
assert s.task_groups[y.name].dependencies == {s.task_groups[x.name]}

await c.replicate(y)
# TODO: Are we supposed to track repliacted memory here? See also Scheduelr.add_keys
# TODO: Are we supposed to track replicated memory here? See also Scheduler.add_keys
assert tg.nbytes_in_memory == y.nbytes
assert "array" in str(tg.types)
assert "array" in str(tp.types)
Expand Down
1 change: 0 additions & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2157,7 +2157,6 @@ def raise_exc(*args):
f.release()
g.release()

# We no longer hold any refs. Cluster should reset completely
# This is not happening
for server in [s, a, b]:
while server.tasks:
Expand Down
18 changes: 13 additions & 5 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1530,13 +1530,13 @@ def handle_superfluous_data(self, keys=(), reason=None):
For stronger guarantees, see handler free_keys
"""
self.log.append(("Handle superfluous data", keys, reason))
for key in list(keys):
ts = self.tasks.get(key)
self.log.append((key, "nofity superfluous data", reason))
if ts and not ts.scheduler_holds_ref:
self.release_key(key, reason=f"delete data: {reason}", report=False)

logger.debug("Worker %s -- Deleted %d keys", self.name, len(keys))
logger.debug("Worker %s -- Deleted %d keys", self.name, len(keys))
return "OK"

async def set_resources(self, **resources):
Expand Down Expand Up @@ -1732,7 +1732,7 @@ def transition_new_fetch(self, ts, who_has):
if self.validate:
assert ts.state == "new"
assert ts.runspec is None
# assert who_has
assert who_has

for dependent in ts.dependents:
dependent.waiting_for_data.add(ts.key)
Expand Down Expand Up @@ -2541,8 +2541,10 @@ async def handle_missing_dep(self, *deps, **kwargs):
if dependent.key in dep.waiting_for_data:
self.data_needed.append(dependent.key)
if still_missing:
logger.critical(
"Found self referencing who has response from scheduler. Trying again handle_missing"
logger.debug(
"Found self referencing who has response from scheduler for keys %s.\n"
"Trying again handle_missing",
deps,
)
await self.handle_missing_dep(*deps)
except Exception:
Expand Down Expand Up @@ -2576,6 +2578,12 @@ def update_who_has(self, who_has):

if dep in self.tasks:
if self.address in workers and self.tasks[dep].state != "memory":
logger.debug(
"Scheduler claims worker %s holds data for task %s which is not true.",
self.name,
dep,
)
# Do not mutate the input dict. That's rude
workers = set(workers) - {self.address}
self.tasks[dep].who_has.update(workers)

Expand Down

0 comments on commit ef94538

Please sign in to comment.