Skip to content

Commit

Permalink
refresh-who-has can break the worker state machine (#6529)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Jun 9, 2022
1 parent 9e4e3ab commit 9b8172b
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 2 deletions.
44 changes: 44 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ExecuteSuccessEvent,
Instruction,
RecommendationsConflict,
RefreshWhoHasEvent,
ReleaseWorkerDataMsg,
RescheduleEvent,
RescheduleMsg,
Expand Down Expand Up @@ -603,3 +604,46 @@ async def test_missing_to_waiting(c, s, w1, w2, w3):
await w1.close()

await f1


@gen_cluster(client=True, nthreads=[("", 1)] * 3)
async def test_fetch_to_missing_on_refresh_who_has(c, s, w1, w2, w3):
"""
1. Two tasks, x and y, are only available on a busy worker.
The worker sends request-refresh-who-has to the scheduler.
2. The scheduler responds that x has become missing, while y has gained an
additional replica
3. The handler for RefreshWhoHasEvent empties x.who_has and recommends a transition
to missing.
4. Before the recommendation can be implemented, the same event invokes
_ensure_communicating to let y to transition to flight. This in turn pops x from
data_needed - but x has an empty who_has, which is an exceptional situation.
5. The transition fetch->missing is executed, but x is no longer in
data_needed - another exceptional situation.
"""
x = c.submit(inc, 1, key="x", workers=[w1.address])
y = c.submit(inc, 2, key="y", workers=[w1.address])
await wait([x, y])
w1.total_in_connections = 0
s.request_acquire_replicas(w3.address, ["x", "y"], stimulus_id="test1")

# The tasks will now flip-flop between fetch and flight every 150ms
# (see Worker.retry_busy_worker_later)
await wait_for_state("x", "fetch", w3)
await wait_for_state("y", "fetch", w3)
assert w1.address in w3.busy_workers
# w3 sent {op: request-refresh-who-has, keys: [x, y]}
# There also may have been enough time for a refresh-who-has message to come back,
# which reiterated what w3 already knew:
# {op: refresh-who-has, who_has={x: [w1.address], y: [w1.address]}}

# Let's instead simulate that, while request-refresh-who-has was in transit,
# w2 gained a replica of y and w1 closed down.
# When request-refresh-who-has lands, the scheduler will respond:
# {op: refresh-who-has, who_has={x: [], y: [w2.address]}}
w3.handle_stimulus(
RefreshWhoHasEvent(who_has={"x": [], "y": [w2.address]}, stimulus_id="test2")
)
assert w3.tasks["x"].state == "missing"
assert w3.tasks["y"].state == "flight"
assert w3.tasks["y"].who_has == {w2.address}
18 changes: 16 additions & 2 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2181,7 +2181,14 @@ def transition_fetch_flight(
def transition_fetch_missing(
self, ts: TaskState, *, stimulus_id: str
) -> RecsInstrs:
self.data_needed.remove(ts)
# There's a use case where ts won't be found in self.data_needed, so
# `self.data_needed.remove(ts)` would crash:
# 1. An event handler empties who_has and pushes a recommendation to missing
# 2. The same event handler calls _ensure_communicating, which pops the task
# from data_needed
# 3. The recommendation is enacted
# See matching code in _ensure_communicating.
self.data_needed.discard(ts)
return self.transition_generic_missing(ts, stimulus_id=stimulus_id)

def transition_memory_released(
Expand Down Expand Up @@ -3003,9 +3010,16 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs:

if self.validate:
assert ts.state == "fetch"
assert ts.who_has
assert self.address not in ts.who_has

if not ts.who_has:
# An event handler just emptied who_has and recommended a fetch->missing
# transition. Then, the same handler called _ensure_communicating. The
# transition hasn't been enacted yet, so the task is still in fetch
# state and in data_needed.
# See matching code in transition_fetch_missing.
continue

workers = [
w
for w in ts.who_has
Expand Down

0 comments on commit 9b8172b

Please sign in to comment.