Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race conditions from fetch to compute while AMM requests replica #6248

Merged
merged 8 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2202,12 +2202,6 @@ def transition_memory_released(self, key, stimulus_id, safe: bool = False):
worker_msgs,
) # don't try to recreate

for dts in ts.waiters:
if dts.state in ("no-worker", "processing"):
recommendations[dts.key] = "waiting"
elif dts.state == "waiting":
dts.waiting_on.add(ts)

# XXX factor this out?
worker_msg = {
"op": "free-keys",
Expand All @@ -2232,6 +2226,12 @@ def transition_memory_released(self, key, stimulus_id, safe: bool = False):
elif ts.who_wants or ts.waiters:
recommendations[key] = "waiting"

for dts in ts.waiters:
if dts.state in ("no-worker", "processing"):
recommendations[dts.key] = "waiting"
elif dts.state == "waiting":
dts.waiting_on.add(ts)
Comment on lines +2229 to +2233
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm: the reason for moving this code later is because transitions are insertion-ordered, so this way key gets transitioned to forgotten/waiting before its waiters get transitioned to waiting?

I wonder if this block should even be in transition_memory_released at all? Why shouldn't this part be done in transition_released_waiting and transition_released_forgotten? The fact that we need to make this transition after we make the waiting/forgotten transition makes me think we're overstepping our job in this function, and this should be the job of the other transitions.

I guess I didn't know that the recommendations dict was considered ordered. (Obviously dicts are now ordered in Python, but did it used to be an OrderedDict in py<=3.6?) If there's some dependency structure in the recommendations (transition A has to happen before transition B), I'd think transition A should be responsible for recommending transition B, not that they should be mixed together. That seems easier to reason about.

Copy link
Member Author

@fjetter fjetter May 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm: the reason for moving this code later is because transitions are insertion-ordered,

Yes, dictionaries are insertion ordered and popitem pops from the end

In [1]: adict = {}

In [2]: for x in range(10):
   ...:     adict[x] = x
   ...:

In [3]: adict.popitem()
Out[3]: (9, 9)

I wonder if this block should even be in transition_memory_released at all?

Yes, it should. If a task that was in memory is no longer in memory and a waiter, i.e. a task to be executed, exists, we need to ensure that this waiter is released. These few lines allow task to be resilient to worker failures.

Why shouldn't this part be done in transition_released_waiting

This transition is not there to reset/forget/release something but it schedules something for compute

and transition_released_forgotten

This transition should only be triggered after the scheduler deletes the entire graph. This should only ever have any scheduling consequences if there are multiple graphs scheduled that share keys. Otherwise this is simply a sophisticated pop task

If there's some dependency structure in the recommendations (transition A has to happen before transition B), I'd think transition A should be responsible for recommending transition B, not that they should be mixed together.

It's not about a dependency but about order.

In this specific case (see test

{'f1': 'waiting', 'f2': 'waiting'}

means:

  1. Transition f2 from processing back to waiting
  2. Then transition f1 from released to waiting (f1 was in memory/released before)

I don't see how we could ever infer "please transition f1 to waiting" after we released f1. From a causality perspective, I don't see how we could map this as a dependency

Edit: In a previous statement I argued that it's about finishing a chain of a tasks transitions but this was false. It's quite the opposite.


if self.validate:
assert not ts.waiting_on

Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async def test_worker_stream_died_during_comm(c, s, a, b):
assert any("receive-dep-failed" in msg for msg in b.log)


@gen_cluster(client=True, nthreads=[("", 1)], timeout=4)
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_flight_to_executing_via_cancelled_resumed(c, s, b):

block_get_data = asyncio.Lock()
Expand Down
5 changes: 4 additions & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3169,7 +3169,10 @@ async def test_task_flight_compute_oserror(c, s, a, b):
# inc is lost and needs to be recomputed. Therefore, sum is released
("free-keys", ("f1",)),
("f1", "release-key"),
("f1", "waiting", "released", "released", {"f1": "forgotten"}),
# The recommendations here are hard to predict. Whatever key is
# currently scheduled to be fetched, if any, will be recommended to be
# released.
("f1", "waiting", "released", "released", lambda msg: msg["f1"] == "forgotten"),
("f1", "released", "forgotten", "forgotten", {}),
# Now, we actually compute the task *once*. This must not cycle back
("f1", "compute-task"),
Expand Down
66 changes: 66 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
from itertools import chain

import pytest

from distributed.protocol.serialize import Serialize
from distributed.utils import recursive_to_dict
from distributed.utils_test import assert_story, gen_cluster, inc
from distributed.worker_state_machine import (
ExecuteFailureEvent,
ExecuteSuccessEvent,
Expand All @@ -19,6 +21,11 @@
)


async def wait_for_state(key, state, dask_worker):
while key not in dask_worker.tasks or dask_worker.tasks[key].state != state:
await asyncio.sleep(0.005)


def test_TaskState_get_nbytes():
assert TaskState("x", nbytes=123).get_nbytes() == 123
# Default to distributed.scheduler.default-data-size
Expand Down Expand Up @@ -236,3 +243,62 @@ def test_executefailure_to_dict():
assert ev3.traceback is None
assert ev3.exception_text == "exc text"
assert ev3.traceback_text == "tb text"


@gen_cluster(client=True)
async def test_fetch_to_compute(c, s, a, b):
# Block ensure_communicating to ensure we indeed know that the task is in
# fetch and doesn't leave it accidentally
old_out_connections, b.total_out_connections = b.total_out_connections, 0
old_comm_threshold, b.comm_threshold_bytes = b.comm_threshold_bytes, 0

f1 = c.submit(inc, 1, workers=[a.address], key="f1", allow_other_workers=True)
f2 = c.submit(inc, f1, workers=[b.address], key="f2")

await wait_for_state(f1.key, "fetch", b)
await a.close()

b.total_out_connections = old_out_connections
b.comm_threshold_bytes = old_comm_threshold

await f2

assert_story(
b.log,
# FIXME: This log should be replaced with an
# StateMachineEvent/Instruction log
[
(f2.key, "compute-task"),
# This is a "please fetch" request. We don't have anything like
# this, yet. We don't see the request-dep signal in here because we
# do not wait for the key to be actually scheduled
(f1.key, "ensure-task-exists", "released"),
# After the worker failed, we're instructed to forget f2 before
# something new comes in
("free-keys", (f2.key,)),
(f1.key, "compute-task"),
(f1.key, "put-in-memory"),
(f2.key, "compute-task"),
],
)


@gen_cluster(client=True)
async def test_fetch_via_amm_to_compute(c, s, a, b):
# Block ensure_communicating to ensure we indeed know that the task is in
# fetch and doesn't leave it accidentally
old_out_connections, b.total_out_connections = b.total_out_connections, 0
old_comm_threshold, b.comm_threshold_bytes = b.comm_threshold_bytes, 0

f1 = c.submit(inc, 1, workers=[a.address], key="f1", allow_other_workers=True)

await f1
s.request_acquire_replicas(b.address, [f1.key], stimulus_id="test")

await wait_for_state(f1.key, "fetch", b)
await a.close()

b.total_out_connections = old_out_connections
b.comm_threshold_bytes = old_comm_threshold

await f1
46 changes: 30 additions & 16 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ def __init__(
("cancelled", "resumed"): self.transition_cancelled_resumed,
("cancelled", "fetch"): self.transition_cancelled_fetch,
("cancelled", "released"): self.transition_cancelled_released,
("cancelled", "missing"): self.transition_cancelled_released,
fjetter marked this conversation as resolved.
Show resolved Hide resolved
("cancelled", "waiting"): self.transition_cancelled_waiting,
("cancelled", "forgotten"): self.transition_cancelled_forgotten,
("cancelled", "memory"): self.transition_cancelled_memory,
Expand All @@ -627,7 +628,7 @@ def __init__(
("executing", "released"): self.transition_executing_released,
("executing", "rescheduled"): self.transition_executing_rescheduled,
("fetch", "flight"): self.transition_fetch_flight,
("fetch", "missing"): self.transition_fetch_missing,
("fetch", "missing"): self.transition_generic_missing,
("fetch", "released"): self.transition_generic_released,
("flight", "error"): self.transition_flight_error,
("flight", "fetch"): self.transition_flight_fetch,
Expand All @@ -647,6 +648,7 @@ def __init__(
("ready", "released"): self.transition_generic_released,
("released", "error"): self.transition_generic_error,
("released", "fetch"): self.transition_released_fetch,
("released", "missing"): self.transition_released_fetch,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo - this should be transition_generic_missing.
This causes an infinite transition loop (#6305):

  1. The scheduler calls handle_compute_task with an empty who_has. This is a broken use case to begin with, but it is happening and the worker should either cope with it gracefully or crash loudly in handle_compute_task itself.
  2. handle_compute_task creates the dependencies with ensure_task_exists in state=released and requests a transition to fetch
  3. The released->fetch transition, handled by transition_released_fetch, notices that who_has is empty, so instead of transitioning to fetch it returns {ts: missing} and keeps the task in released state
  4. The released->missing transition, erroneously handled by the same method, repeats point 3 forever.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this "fixed" by #6318?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

("released", "forgotten"): self.transition_released_forgotten,
("released", "memory"): self.transition_released_memory,
("released", "waiting"): self.transition_released_waiting,
Expand Down Expand Up @@ -2021,6 +2023,7 @@ def transition_missing_fetch(
if self.validate:
assert ts.state == "missing"
assert ts.priority is not None
assert ts.who_has

self._missing_dep_flight.discard(ts)
ts.state = "fetch"
Expand Down Expand Up @@ -2049,7 +2052,7 @@ def transition_flight_missing(
ts.done = False
return {}, []

def transition_fetch_missing(
def transition_generic_missing(
self, ts: TaskState, *, stimulus_id: str
) -> RecsInstrs:
ts.state = "missing"
Expand All @@ -2063,6 +2066,8 @@ def transition_released_fetch(
if self.validate:
assert ts.state == "released"
assert ts.priority is not None
if not ts.who_has:
return {ts: "missing"}, []
ts.state = "fetch"
ts.done = False
self.data_needed.push(ts)
Expand Down Expand Up @@ -2640,18 +2645,25 @@ def _transition(
recs, instructions = self._transition(
ts, "released", stimulus_id=stimulus_id
)
v = recs.get(ts, (finish, *args))
v_state: str
v_args: list | tuple
if isinstance(v, tuple):
v_state, *v_args = v
else:
v_state, v_args = v, ()
b_recs, b_instructions = self._transition(
ts, v_state, *v_args, stimulus_id=stimulus_id
while v := recs.pop(ts, None):
if isinstance(v, tuple):
v_state, *v_args = v
else:
v_state, v_args = v, ()
if v_state == "forgotten":
# We do not want to forget. The purpose of this
# transition path is to get to `finish`
continue
fjetter marked this conversation as resolved.
Show resolved Hide resolved
recs, instructions = merge_recs_instructions(
(recs, instructions),
self._transition(ts, v_state, *v_args, stimulus_id=stimulus_id),
)
recs, instructions = merge_recs_instructions(
(recs, instructions),
self._transition(ts, finish, *args, stimulus_id=stimulus_id),
)
recs.update(b_recs)
instructions += b_instructions
except InvalidTransition:
self.log_event(
"invalid-worker-transition",
Expand Down Expand Up @@ -3188,7 +3200,12 @@ async def gather_dep(
for d in has_what:
ts = self.tasks[d]
ts.who_has.remove(worker)
if not ts.who_has and ts.state not in ("released", "memory"):
if not ts.who_has and ts.state in (
"fetch",
"flight",
"resumed",
"cancelled",
):
fjetter marked this conversation as resolved.
Show resolved Hide resolved
recommendations[ts] = "missing"
self.log.append(
("missing-who-has", worker, ts.key, stimulus_id, time())
Expand Down Expand Up @@ -3246,10 +3263,7 @@ async def gather_dep(
"stimulus_id": stimulus_id,
}
)
if ts.who_has:
recommendations[ts] = "fetch"
elif ts.state not in ("released", "memory"):
recommendations[ts] = "missing"
recommendations[ts] = "fetch"
del data, response
self.transitions(recommendations, stimulus_id=stimulus_id)

Expand Down