-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Race conditions from fetch to compute while AMM requests replica #6248
Changes from 5 commits
d1c9d07
bb25cb7
fa4e421
0e9fe37
49d4ff2
93494cc
e6e4b50
9816c11
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -602,6 +602,7 @@ def __init__( | |
("cancelled", "resumed"): self.transition_cancelled_resumed, | ||
("cancelled", "fetch"): self.transition_cancelled_fetch, | ||
("cancelled", "released"): self.transition_cancelled_released, | ||
("cancelled", "missing"): self.transition_cancelled_released, | ||
fjetter marked this conversation as resolved.
Show resolved
Hide resolved
|
||
("cancelled", "waiting"): self.transition_cancelled_waiting, | ||
("cancelled", "forgotten"): self.transition_cancelled_forgotten, | ||
("cancelled", "memory"): self.transition_cancelled_memory, | ||
|
@@ -621,7 +622,7 @@ def __init__( | |
("executing", "released"): self.transition_executing_released, | ||
("executing", "rescheduled"): self.transition_executing_rescheduled, | ||
("fetch", "flight"): self.transition_fetch_flight, | ||
("fetch", "missing"): self.transition_fetch_missing, | ||
("fetch", "missing"): self.transition_generic_to_missing, | ||
crusaderky marked this conversation as resolved.
Show resolved
Hide resolved
|
||
("fetch", "released"): self.transition_generic_released, | ||
("flight", "error"): self.transition_flight_error, | ||
("flight", "fetch"): self.transition_flight_fetch, | ||
|
@@ -641,6 +642,7 @@ def __init__( | |
("ready", "released"): self.transition_generic_released, | ||
("released", "error"): self.transition_generic_error, | ||
("released", "fetch"): self.transition_released_fetch, | ||
("released", "missing"): self.transition_released_fetch, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo - this should be
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this "fixed" by #6318? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. |
||
("released", "forgotten"): self.transition_released_forgotten, | ||
("released", "memory"): self.transition_released_memory, | ||
("released", "waiting"): self.transition_released_waiting, | ||
|
@@ -2017,6 +2019,7 @@ def transition_missing_fetch( | |
if self.validate: | ||
assert ts.state == "missing" | ||
assert ts.priority is not None | ||
assert ts.who_has | ||
|
||
self._missing_dep_flight.discard(ts) | ||
ts.state = "fetch" | ||
|
@@ -2045,7 +2048,7 @@ def transition_flight_missing( | |
ts.done = False | ||
return {}, [] | ||
|
||
def transition_fetch_missing( | ||
def transition_generic_to_missing( | ||
self, ts: TaskState, *, stimulus_id: str | ||
) -> RecsInstrs: | ||
ts.state = "missing" | ||
|
@@ -2059,6 +2062,8 @@ def transition_released_fetch( | |
if self.validate: | ||
assert ts.state == "released" | ||
assert ts.priority is not None | ||
if not ts.who_has: | ||
return {ts: "missing"}, [] | ||
ts.state = "fetch" | ||
ts.done = False | ||
self.data_needed.push(ts) | ||
|
@@ -2636,18 +2641,28 @@ def _transition( | |
recs, instructions = self._transition( | ||
ts, "released", stimulus_id=stimulus_id | ||
) | ||
v = recs.get(ts, (finish, *args)) | ||
v_state: str | ||
v_args: list | tuple | ||
if isinstance(v, tuple): | ||
v_state, *v_args = v | ||
else: | ||
v_state, v_args = v, () | ||
b_recs, b_instructions = self._transition( | ||
ts, v_state, *v_args, stimulus_id=stimulus_id | ||
while v := recs.pop(ts, None): | ||
if isinstance(v, tuple): | ||
v_state, *v_args = v | ||
else: | ||
v_state, v_args = v, () | ||
if v_state == "forgotten": | ||
# We do not want to forget. The purpose of this | ||
# transition path is to get to `finish` | ||
continue | ||
fjetter marked this conversation as resolved.
Show resolved
Hide resolved
|
||
b_recs, b_instructions = self._transition( | ||
ts, v_state, *v_args, stimulus_id=stimulus_id | ||
) | ||
recs.update(b_recs) | ||
instructions += b_instructions | ||
fjetter marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
c_recs, c_instructions = self._transition( | ||
ts, finish, *args, stimulus_id=stimulus_id | ||
) | ||
recs.update(b_recs) | ||
instructions += b_instructions | ||
recs.update(c_recs) | ||
instructions += c_instructions | ||
fjetter marked this conversation as resolved.
Show resolved
Hide resolved
|
||
except InvalidTransition: | ||
self.log_event( | ||
"invalid-worker-transition", | ||
|
@@ -2826,8 +2841,11 @@ def stimulus_story( | |
def ensure_communicating(self) -> None: | ||
if self.status != Status.running: | ||
return | ||
if not hasattr(self, "_stim_counter"): | ||
self._stim_counter = 0 | ||
fjetter marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._stim_counter += 1 | ||
|
||
stimulus_id = f"ensure-communicating-{time()}" | ||
stimulus_id = f"ensure-communicating-{self._stim_counter}" | ||
skipped_worker_in_flight_or_busy = [] | ||
|
||
while self.data_needed and ( | ||
|
@@ -3184,7 +3202,12 @@ async def gather_dep( | |
for d in has_what: | ||
ts = self.tasks[d] | ||
ts.who_has.remove(worker) | ||
if not ts.who_has and ts.state not in ("released", "memory"): | ||
if not ts.who_has and ts.state in ( | ||
"fetch", | ||
"flight", | ||
"resumed", | ||
"cancelled", | ||
): | ||
fjetter marked this conversation as resolved.
Show resolved
Hide resolved
|
||
recommendations[ts] = "missing" | ||
self.log.append( | ||
("missing-who-has", worker, ts.key, stimulus_id, time()) | ||
|
@@ -3242,10 +3265,7 @@ async def gather_dep( | |
"stimulus_id": stimulus_id, | ||
} | ||
) | ||
if ts.who_has: | ||
recommendations[ts] = "fetch" | ||
elif ts.state not in ("released", "memory"): | ||
recommendations[ts] = "missing" | ||
recommendations[ts] = "fetch" | ||
del data, response | ||
self.transitions(recommendations, stimulus_id=stimulus_id) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To confirm: the reason for moving this code later is because transitions are insertion-ordered, so this way
key
gets transitioned toforgotten
/waiting
before its waiters get transitioned towaiting
?I wonder if this block should even be in
transition_memory_released
at all? Why shouldn't this part be done intransition_released_waiting
andtransition_released_forgotten
? The fact that we need to make this transition after we make thewaiting
/forgotten
transition makes me think we're overstepping our job in this function, and this should be the job of the other transitions.I guess I didn't know that the recommendations dict was considered ordered. (Obviously dicts are now ordered in Python, but did it used to be an OrderedDict in py<=3.6?) If there's some dependency structure in the recommendations (transition A has to happen before transition B), I'd think transition A should be responsible for recommending transition B, not that they should be mixed together. That seems easier to reason about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, dictionaries are insertion ordered and popitem pops from the end
Yes, it should. If a task that was in memory is no longer in memory and a waiter, i.e. a task to be executed, exists, we need to ensure that this waiter is released. These few lines allow task to be resilient to worker failures.
This transition is not there to reset/forget/release something but it schedules something for compute
This transition should only be triggered after the scheduler deletes the entire graph. This should only ever have any scheduling consequences if there are multiple graphs scheduled that share keys. Otherwise this is simply a sophisticated
pop task
It's not about a dependency but about order.
In this specific case (see test
means:
I don't see how we could ever infer "please transition f1 to waiting" after we released f1. From a causality perspective, I don't see how we could map this as a dependency
Edit: In a previous statement I argued that it's about finishing a chain of a tasks transitions but this was false. It's quite the opposite.