diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 2f3a7f58ed..903241f722 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -48,6 +48,7 @@ mul, nodebug, slowinc, + slowsum, ) from distributed.worker import Worker, error_message, logger, parse_memory_limit @@ -2087,11 +2088,6 @@ def raise_exc(*args): await asyncio.sleep(0.01) expected_states = { - # We currently don't have a good way to actually release this memory as - # long as the tasks still have a dependent. We'll need to live with this - # memory for now - f.key: "memory", - g.key: "memory", res.key: "error", } @@ -2159,6 +2155,7 @@ def raise_exc(*args): expected_states = { f.key: "memory", + g.key: "memory", } assert_task_states_on_worker(expected_states, a) @@ -2166,7 +2163,6 @@ def raise_exc(*args): f.release() g.release() - # This is not happening for server in [s, a, b]: while server.tasks: await asyncio.sleep(0.01) @@ -2220,13 +2216,14 @@ def raise_exc(*args): res.release() # We no longer hold any refs to f or g and B didn't have any erros. It # releases everything as expected - while a.tasks: + while len(a.tasks) > 1: await asyncio.sleep(0.01) expected_states = { g.key: "memory", } + assert_task_states_on_worker(expected_states, a) assert_task_states_on_worker(expected_states, b) g.release() @@ -2283,7 +2280,6 @@ def raise_exc(*args): assert_task_states_on_worker(expected_states_A, a) expected_states_B = { - f.key: "memory", g.key: "memory", h.key: "memory", res.key: "error", @@ -2301,15 +2297,6 @@ def raise_exc(*args): # B must not forget a task since all have a still valid dependent expected_states_B = { - f.key: "memory", - # We actually cannot hold on to G even though the graph would suggest - # otherwise. This is because H was only introduced as a dependency and - # the scheduler never told the worker how H fits into the big picture. - # Therefore, it thinks that G does not have any dependents anymore and - # releases it. Too bad. Once we have speculative task assignments this - # should be more exact since we should always tell the worker what's - # going on - # g.key: released, h.key: "memory", res.key: "error", } @@ -2320,10 +2307,6 @@ def raise_exc(*args): expected_states_A = {} assert_task_states_on_worker(expected_states_A, a) expected_states_B = { - f.key: "memory", - # See above - # g.key: released, - h.key: "memory", res.key: "error", } @@ -2334,3 +2317,29 @@ def raise_exc(*args): for server in [s, a, b]: while server.tasks: await asyncio.sleep(0.01) + + +@gen_cluster(client=True, nthreads=[("127.0.0.1", x) for x in range(4)], timeout=None) +async def test_hold_on_to_replicas(c, s, *workers): + f1 = c.submit(inc, 1, workers=[workers[0].address], key="f1") + f2 = c.submit(inc, 2, workers=[workers[1].address], key="f2") + + sum_1 = c.submit( + slowsum, [f1, f2], delay=0.1, workers=[workers[2].address], key="sum" + ) + sum_2 = c.submit( + slowsum, [f1, sum_1], delay=0.2, workers=[workers[3].address], key="sum_2" + ) + f1.release() + f2.release() + + while sum_2.key not in workers[3].tasks: + await asyncio.sleep(0.01) + + while not workers[3].tasks[sum_2.key].state == "memory": + assert len(s.tasks[f1.key].who_has) >= 2 + assert s.tasks[f2.key].state == "released" + await asyncio.sleep(0.01) + + while len(workers[2].tasks) > 1: + await asyncio.sleep(0.01) diff --git a/distributed/worker.py b/distributed/worker.py index 3d5146f09e..1f04b4cbde 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1658,8 +1658,7 @@ def add_task( ts.dependencies.add(dep_ts) dep_ts.dependents.add(ts) - if dep_ts.state in ("fetch", "flight"): - # if we _need_ to grab data or are in the process + if dep_ts.state not in ("memory",): ts.waiting_for_data.add(dep_ts.key) self.update_who_has(who_has=who_has) @@ -1762,9 +1761,6 @@ def transition_fetch_waiting(self, ts, runspec): # clear `who_has` of stale info ts.who_has.clear() - # remove entry from dependents to avoid a spurious `gather_dep` call`` - for dependent in ts.dependents: - dependent.waiting_for_data.discard(ts.key) except Exception as e: logger.exception(e) if LOG_PDB: @@ -1794,9 +1790,6 @@ def transition_flight_waiting(self, ts, runspec): # clear `who_has` of stale info ts.who_has.clear() - # remove entry from dependents to avoid a spurious `gather_dep` call`` - for dependent in ts.dependents: - dependent.waiting_for_data.discard(ts.key) except Exception as e: logger.exception(e) if LOG_PDB: @@ -1991,6 +1984,8 @@ def transition_executing_done(self, ts, value=no_value, report=True): ts.traceback = msg["traceback"] ts.state = "error" out = "error" + for d in ts.dependents: + d.waiting_for_data.add(ts.key) # Don't release the dependency keys, but do remove them from `dependents` for dependency in ts.dependencies: @@ -2621,12 +2616,12 @@ def release_key( if self.validate: assert isinstance(key, str) - ts = self.tasks.get(key, TaskState(key=key)) + ts = self.tasks.get(key, None) # If the scheduler holds a reference which is usually the # case when it instructed the task to be computed here or if # data was scattered we must not release it unless the # scheduler allow us to. See also handle_delete_data and - if ts and ts.scheduler_holds_ref: + if ts is None or ts.scheduler_holds_ref: return logger.debug( "Release key %s", @@ -2640,28 +2635,14 @@ def release_key( self.log.append((key, "release-key", {"cause": cause}, reason)) else: self.log.append((key, "release-key", reason)) - if key in self.data and not ts.dependents: + if key in self.data: try: del self.data[key] except FileNotFoundError: logger.error("Tried to delete %s but no file found", exc_info=True) - if key in self.actors and not ts.dependents: + if key in self.actors: del self.actors[key] - # for any dependencies of key we are releasing remove task as dependent - for dependency in ts.dependencies: - dependency.dependents.discard(ts) - - if not dependency.dependents and dependency.state not in ( - # don't boot keys that are in flight - # we don't know if they're already queued up for transit - # in a gather_dep callback - "flight", - # The same is true for already executing keys. - "executing", - ): - self.release_key(dependency.key, reason=f"Dependent {ts} released") - for worker in ts.who_has: self.has_what[worker].discard(ts.key) ts.who_has.clear() @@ -2681,8 +2662,10 @@ def release_key( # Inform the scheduler of keys which will have gone missing # We are releasing them before they have completed if ts.state in PROCESSING: + # This path is only hit with work stealing msg = {"op": "release", "key": key, "cause": cause} else: + # This path is only hit when calling release_key manually msg = { "op": "release-worker-data", "keys": [key], @@ -2691,9 +2674,8 @@ def release_key( self.batched_stream.send(msg) self._notify_plugins("release_key", key, ts.state, cause, reason, report) - if key in self.tasks and not ts.dependents: - self.tasks.pop(key) - del ts + del self.tasks[key] + except CommClosedError: pass except Exception as e: @@ -2704,32 +2686,6 @@ def release_key( pdb.set_trace() raise - def rescind_key(self, key): - try: - if self.tasks[key].state not in PENDING: - return - - ts = self.tasks.pop(key) - - # Task has been rescinded - # For every task that it required - for dependency in ts.dependencies: - # Remove it as a dependent - dependency.dependents.remove(key) - # If the dependent is now without purpose (no dependencies), remove it - if not dependency.dependents: - self.release_key( - dependency.key, reason="All dependent keys rescinded" - ) - - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb - - pdb.set_trace() - raise - ################ # Execute Task # ################