diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index 324d9b4799..94408d1ef8 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -23,6 +23,7 @@ ExecuteSuccessEvent, Instruction, RecommendationsConflict, + RefreshWhoHasEvent, ReleaseWorkerDataMsg, RescheduleEvent, RescheduleMsg, @@ -603,3 +604,46 @@ async def test_missing_to_waiting(c, s, w1, w2, w3): await w1.close() await f1 + + +@gen_cluster(client=True, nthreads=[("", 1)] * 3) +async def test_fetch_to_missing_on_refresh_who_has(c, s, w1, w2, w3): + """ + 1. Two tasks, x and y, are only available on a busy worker. + The worker sends request-refresh-who-has to the scheduler. + 2. The scheduler responds that x has become missing, while y has gained an + additional replica + 3. The handler for RefreshWhoHasEvent empties x.who_has and recommends a transition + to missing. + 4. Before the recommendation can be implemented, the same event invokes + _ensure_communicating to let y to transition to flight. This in turn pops x from + data_needed - but x has an empty who_has, which is an exceptional situation. + 5. The transition fetch->missing is executed, but x is no longer in + data_needed - another exceptional situation. + """ + x = c.submit(inc, 1, key="x", workers=[w1.address]) + y = c.submit(inc, 2, key="y", workers=[w1.address]) + await wait([x, y]) + w1.total_in_connections = 0 + s.request_acquire_replicas(w3.address, ["x", "y"], stimulus_id="test1") + + # The tasks will now flip-flop between fetch and flight every 150ms + # (see Worker.retry_busy_worker_later) + await wait_for_state("x", "fetch", w3) + await wait_for_state("y", "fetch", w3) + assert w1.address in w3.busy_workers + # w3 sent {op: request-refresh-who-has, keys: [x, y]} + # There also may have been enough time for a refresh-who-has message to come back, + # which reiterated what w3 already knew: + # {op: refresh-who-has, who_has={x: [w1.address], y: [w1.address]}} + + # Let's instead simulate that, while request-refresh-who-has was in transit, + # w2 gained a replica of y and w1 closed down. + # When request-refresh-who-has lands, the scheduler will respond: + # {op: refresh-who-has, who_has={x: [], y: [w2.address]}} + w3.handle_stimulus( + RefreshWhoHasEvent(who_has={"x": [], "y": [w2.address]}, stimulus_id="test2") + ) + assert w3.tasks["x"].state == "missing" + assert w3.tasks["y"].state == "flight" + assert w3.tasks["y"].who_has == {w2.address} diff --git a/distributed/worker.py b/distributed/worker.py index d7598ad182..880f741502 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2181,7 +2181,14 @@ def transition_fetch_flight( def transition_fetch_missing( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: - self.data_needed.remove(ts) + # There's a use case where ts won't be found in self.data_needed, so + # `self.data_needed.remove(ts)` would crash: + # 1. An event handler empties who_has and pushes a recommendation to missing + # 2. The same event handler calls _ensure_communicating, which pops the task + # from data_needed + # 3. The recommendation is enacted + # See matching code in _ensure_communicating. + self.data_needed.discard(ts) return self.transition_generic_missing(ts, stimulus_id=stimulus_id) def transition_memory_released( @@ -3003,9 +3010,16 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs: if self.validate: assert ts.state == "fetch" - assert ts.who_has assert self.address not in ts.who_has + if not ts.who_has: + # An event handler just emptied who_has and recommended a fetch->missing + # transition. Then, the same handler called _ensure_communicating. The + # transition hasn't been enacted yet, so the task is still in fetch + # state and in data_needed. + # See matching code in transition_fetch_missing. + continue + workers = [ w for w in ts.who_has