From dbb13ecf3c78f6ad301c8c40b18cebdef71789bf Mon Sep 17 00:00:00 2001 From: Florian Jetter Date: Tue, 29 Jun 2021 16:02:56 +0200 Subject: [PATCH] No longer hold dependencies of erred tasks in memory #4918 This is a follow up to #4784 and reduces complexity of Worker.release_key significantly. There is one non-trivial behavioural change regarding erred tasks. Current main branch holds on to dependencies of an erred task on a worker and implements a release mechanism once that erred task is released. I implemented this recently trying to capture status quo but I'm not convinced any longer that this is the correct behaviour. It treats the erred case specially which introduces a lot of complexity. The only place where this might be of interest is if an erred task wants to be recomputed locally. Not forgetting the data keys until the erred task was released would speed up this process. However, we'd still need to potentially compute some keys and I'm inclined to strike this feature in favour of reduced complexity. --- distributed/tests/test_worker.py | 51 ++++++++++++++---------- distributed/worker.py | 66 ++++++-------------------------- 2 files changed, 41 insertions(+), 76 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 2f3a7f58ed..903241f722 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -48,6 +48,7 @@ mul, nodebug, slowinc, + slowsum, ) from distributed.worker import Worker, error_message, logger, parse_memory_limit @@ -2087,11 +2088,6 @@ def raise_exc(*args): await asyncio.sleep(0.01) expected_states = { - # We currently don't have a good way to actually release this memory as - # long as the tasks still have a dependent. We'll need to live with this - # memory for now - f.key: "memory", - g.key: "memory", res.key: "error", } @@ -2159,6 +2155,7 @@ def raise_exc(*args): expected_states = { f.key: "memory", + g.key: "memory", } assert_task_states_on_worker(expected_states, a) @@ -2166,7 +2163,6 @@ def raise_exc(*args): f.release() g.release() - # This is not happening for server in [s, a, b]: while server.tasks: await asyncio.sleep(0.01) @@ -2220,13 +2216,14 @@ def raise_exc(*args): res.release() # We no longer hold any refs to f or g and B didn't have any erros. It # releases everything as expected - while a.tasks: + while len(a.tasks) > 1: await asyncio.sleep(0.01) expected_states = { g.key: "memory", } + assert_task_states_on_worker(expected_states, a) assert_task_states_on_worker(expected_states, b) g.release() @@ -2283,7 +2280,6 @@ def raise_exc(*args): assert_task_states_on_worker(expected_states_A, a) expected_states_B = { - f.key: "memory", g.key: "memory", h.key: "memory", res.key: "error", @@ -2301,15 +2297,6 @@ def raise_exc(*args): # B must not forget a task since all have a still valid dependent expected_states_B = { - f.key: "memory", - # We actually cannot hold on to G even though the graph would suggest - # otherwise. This is because H was only introduced as a dependency and - # the scheduler never told the worker how H fits into the big picture. - # Therefore, it thinks that G does not have any dependents anymore and - # releases it. Too bad. Once we have speculative task assignments this - # should be more exact since we should always tell the worker what's - # going on - # g.key: released, h.key: "memory", res.key: "error", } @@ -2320,10 +2307,6 @@ def raise_exc(*args): expected_states_A = {} assert_task_states_on_worker(expected_states_A, a) expected_states_B = { - f.key: "memory", - # See above - # g.key: released, - h.key: "memory", res.key: "error", } @@ -2334,3 +2317,29 @@ def raise_exc(*args): for server in [s, a, b]: while server.tasks: await asyncio.sleep(0.01) + + +@gen_cluster(client=True, nthreads=[("127.0.0.1", x) for x in range(4)], timeout=None) +async def test_hold_on_to_replicas(c, s, *workers): + f1 = c.submit(inc, 1, workers=[workers[0].address], key="f1") + f2 = c.submit(inc, 2, workers=[workers[1].address], key="f2") + + sum_1 = c.submit( + slowsum, [f1, f2], delay=0.1, workers=[workers[2].address], key="sum" + ) + sum_2 = c.submit( + slowsum, [f1, sum_1], delay=0.2, workers=[workers[3].address], key="sum_2" + ) + f1.release() + f2.release() + + while sum_2.key not in workers[3].tasks: + await asyncio.sleep(0.01) + + while not workers[3].tasks[sum_2.key].state == "memory": + assert len(s.tasks[f1.key].who_has) >= 2 + assert s.tasks[f2.key].state == "released" + await asyncio.sleep(0.01) + + while len(workers[2].tasks) > 1: + await asyncio.sleep(0.01) diff --git a/distributed/worker.py b/distributed/worker.py index 3d5146f09e..1f04b4cbde 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1658,8 +1658,7 @@ def add_task( ts.dependencies.add(dep_ts) dep_ts.dependents.add(ts) - if dep_ts.state in ("fetch", "flight"): - # if we _need_ to grab data or are in the process + if dep_ts.state not in ("memory",): ts.waiting_for_data.add(dep_ts.key) self.update_who_has(who_has=who_has) @@ -1762,9 +1761,6 @@ def transition_fetch_waiting(self, ts, runspec): # clear `who_has` of stale info ts.who_has.clear() - # remove entry from dependents to avoid a spurious `gather_dep` call`` - for dependent in ts.dependents: - dependent.waiting_for_data.discard(ts.key) except Exception as e: logger.exception(e) if LOG_PDB: @@ -1794,9 +1790,6 @@ def transition_flight_waiting(self, ts, runspec): # clear `who_has` of stale info ts.who_has.clear() - # remove entry from dependents to avoid a spurious `gather_dep` call`` - for dependent in ts.dependents: - dependent.waiting_for_data.discard(ts.key) except Exception as e: logger.exception(e) if LOG_PDB: @@ -1991,6 +1984,8 @@ def transition_executing_done(self, ts, value=no_value, report=True): ts.traceback = msg["traceback"] ts.state = "error" out = "error" + for d in ts.dependents: + d.waiting_for_data.add(ts.key) # Don't release the dependency keys, but do remove them from `dependents` for dependency in ts.dependencies: @@ -2621,12 +2616,12 @@ def release_key( if self.validate: assert isinstance(key, str) - ts = self.tasks.get(key, TaskState(key=key)) + ts = self.tasks.get(key, None) # If the scheduler holds a reference which is usually the # case when it instructed the task to be computed here or if # data was scattered we must not release it unless the # scheduler allow us to. See also handle_delete_data and - if ts and ts.scheduler_holds_ref: + if ts is None or ts.scheduler_holds_ref: return logger.debug( "Release key %s", @@ -2640,28 +2635,14 @@ def release_key( self.log.append((key, "release-key", {"cause": cause}, reason)) else: self.log.append((key, "release-key", reason)) - if key in self.data and not ts.dependents: + if key in self.data: try: del self.data[key] except FileNotFoundError: logger.error("Tried to delete %s but no file found", exc_info=True) - if key in self.actors and not ts.dependents: + if key in self.actors: del self.actors[key] - # for any dependencies of key we are releasing remove task as dependent - for dependency in ts.dependencies: - dependency.dependents.discard(ts) - - if not dependency.dependents and dependency.state not in ( - # don't boot keys that are in flight - # we don't know if they're already queued up for transit - # in a gather_dep callback - "flight", - # The same is true for already executing keys. - "executing", - ): - self.release_key(dependency.key, reason=f"Dependent {ts} released") - for worker in ts.who_has: self.has_what[worker].discard(ts.key) ts.who_has.clear() @@ -2681,8 +2662,10 @@ def release_key( # Inform the scheduler of keys which will have gone missing # We are releasing them before they have completed if ts.state in PROCESSING: + # This path is only hit with work stealing msg = {"op": "release", "key": key, "cause": cause} else: + # This path is only hit when calling release_key manually msg = { "op": "release-worker-data", "keys": [key], @@ -2691,9 +2674,8 @@ def release_key( self.batched_stream.send(msg) self._notify_plugins("release_key", key, ts.state, cause, reason, report) - if key in self.tasks and not ts.dependents: - self.tasks.pop(key) - del ts + del self.tasks[key] + except CommClosedError: pass except Exception as e: @@ -2704,32 +2686,6 @@ def release_key( pdb.set_trace() raise - def rescind_key(self, key): - try: - if self.tasks[key].state not in PENDING: - return - - ts = self.tasks.pop(key) - - # Task has been rescinded - # For every task that it required - for dependency in ts.dependencies: - # Remove it as a dependent - dependency.dependents.remove(key) - # If the dependent is now without purpose (no dependencies), remove it - if not dependency.dependents: - self.release_key( - dependency.key, reason="All dependent keys rescinded" - ) - - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb - - pdb.set_trace() - raise - ################ # Execute Task # ################