Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forget erred tasks // Fix deadlocks on worker #4784

Merged
merged 11 commits into from
Jun 11, 2021

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented May 4, 2021

I need some feedback because the cluster behaves differently than I would expect and I would like to know for sure what we expect to happen.

The test case constructs a very simple graph consisting of three tasks, F, G and Res where F and G are simple dependencies executed on different workers while Res is task which will fail. I pinned the workers down explicitly to ensure determinism but otherwise this is not relevant.

What happens is that the task errs which is expected. We keep all tasks in memory, still expected. Afterwards, two things happen which surprised me and I would like to know if those are bugs or expected behaviour I misunderstood

  1. I release refs to F and G and would expect them to be released, i.e. the memory is freed because Res is done and we no longer hold a reference to the dependencies F and G. Instead, both stay in memory. Is this expected?
  2. After I move on, I release refs to Res and would expect the cluster to reset, i.e. forget about all tasks. This isn't happening but instead F and G are released and Res stays erred. The tasks are never forgotten.

cc @jrbourbeau


Edit:

While investigating the free error tasks, I found two deadlock situations which were more likely to be triggered in this branch than in others. This increased the scope of this PR

Comment on lines 1854 to 2154
f.key: "memory", # expected to be gone
g.key: "memory", # expected to be gone
res.key: "error",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This state is actually never left so I do consider this a bug. Eventually the tasks should be released (I commented out the wait below)

# A was instructed to compute this result and we're still holding a ref via `f`
f.key: "memory",
# This was fetched from another worker. While we hold a ref via `g`, the scheduler
g.key: "memory",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect g to be released on worker A since it was only on worker A as a dependency. However, it is kept after the task erred. Do we consider this sane behaviour? Do we intentionally keep this dependency in memory to allow for retries?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference, the scheduler decides in this case that the keys should be kept, or rather it doesn't decide to release them. I am questioning whether this is the correct decision

Comment on lines +1837 to +2099
f.key: "memory",
g.key: "memory",
Copy link
Member Author

@fjetter fjetter May 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even after explicit refs to f and g are released, we still keep holding on to them since there is the dependency res which erred. I'm wondering what the expected behaviour should be here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference, the scheduler forgot all tasks at this point

@fjetter fjetter changed the title Add a test about expected task states in an exception case Allow erred tasks to be forgotten on workers May 4, 2021
distributed/worker.py Outdated Show resolved Hide resolved
@fjetter fjetter force-pushed the add_test_about_error_case branch from 1d968a4 to 520f2bb Compare May 4, 2021 15:54
distributed/scheduler.py Outdated Show resolved Hide resolved
Comment on lines +1532 to +1540
@property
def erred_on(self):
return self._erred_on
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the reasons why the erred task state is never forgotten is that keys are only "remembered" in processing_on (processing) or who_has (memory), depending on the state of the task. This would be the equivalent for erred tasks to allow us to tell the worker to forget the task.

@jakirkham you have been involved in the scheduler state machine a lot recently. Just pinging in case you have thoughts about adding more state here or if you see other options

@fjetter fjetter mentioned this pull request May 5, 2021
5 tasks

tg = s.task_groups[y.name]
assert tg.states["memory"] == 5

assert s.task_groups[y.name].dependencies == {s.task_groups[x.name]}

await c.replicate(y)
# TODO: Are we supposed to track repliacted memory here? See also Scheduelr.add_keys
assert tg.nbytes_in_memory == y.nbytes
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran into some issues with the nbytes counting and wondered if this is correct. While the test states this explicitly, I'm a bit surprised and believe there are code areas where this is treated differently

distributed/worker.py Outdated Show resolved Hide resolved
@fjetter fjetter force-pushed the add_test_about_error_case branch from 14a2f49 to 03210bc Compare May 6, 2021 20:12
@fjetter
Copy link
Member Author

fjetter commented May 6, 2021

To not release too many tasks but release those which are free I ended up introducing additional state on the scheduler erred_on and there is one additional attribute on worker TaskState side, scheduler_holds_ref. This is a subtle problem I was trying to solve in #4772 slightly differently but the end result is the same. The worker needs to know whether the scheduler still requires a task or not. Unless we replicate the entire graph we otherwise do not have a chance to make the right decision. Even if we were to replicate the entire graph we probably would need to introduce something like this to give the scheduler ultimate authority. That problem will pop up again once we approach speculative task assignment.

This additional reference count is required because in the past, when tasks and dependencies were clearly separated, this reference count was implicit. Tasks were always referred to by the scheduler, dependencies were just a side product and could be forgotten asap. You will therefore notice that all "compute-task" TaskState objects have this attribute set and dependencies don't.

Tests look promising and while this is not the end of the story for the stability issues (I still have reproducing tests which deadlock) this is an important step.

Gentle ping to @jrbourbeau and @gforsyth for review

Copy link
Contributor

@gforsyth gforsyth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @fjetter -- this looks good to me. I flagged one small typo in the scheduler attributes but that doesn't need to block this.

distributed/scheduler.py Outdated Show resolved Hide resolved
@fjetter
Copy link
Member Author

fjetter commented May 14, 2021

Hitting an issue in the py3.8 ubu test which might be connected although I'm not sure, yet.

Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 1720, in transition_fetch_flight
    assert ts.dependents
AssertionError
distributed.worker - ERROR - 
Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 2057, in ensure_communicating
    self.transition(self.tasks[d], "flight", worker=worker)
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 1615, in transition
    state = func(ts, **kwargs)
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 1720, in transition_fetch_flight
    assert ts.dependents
AssertionError
distributed.core - ERROR - 
Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 582, in handle_stream
    func()
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 2057, in ensure_communicating
    self.transition(self.tasks[d], "flight", worker=worker)
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 1615, in transition
    state = func(ts, **kwargs)
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 1720, in transition_fetch_flight
    assert ts.dependents
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 1615, in transition
    state = func(ts, **kwargs)
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 1773, in transition_flight_memory
    self.batched_stream.send({"op": "add-keys", "keys": [ts.key]})
  File "/home/runner/work/distributed/distributed/distributed/batched.py", line 136, in send
    raise CommClosedError()
distributed.comm.core.CommClosedError
distributed.utils - ERROR - "('sum-partial-5b2c0f6d9d691a15dc449febe739d843', 0, 1)"
Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/utils.py", line 671, in log_errors
    yield
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4042, in add_worker
    typename=types[key],
KeyError: "('sum-partial-5b2c0f6d9d691a15dc449febe739d843', 0, 1)"
distributed.core - ERROR - Exception while handling op register-worker
Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 501, in handle_comm
    result = await result
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4042, in add_worker
    typename=types[key],

This also triggers a similar error as reported in #4800 with

distributed.utils - ERROR - "('sum-partial-5b2c0f6d9d691a15dc449febe739d843', 0, 1)"
Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/utils.py", line 671, in log_errors
    yield
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4042, in add_worker
    typename=types[key],
KeyError: "('sum-partial-5b2c0f6d9d691a15dc449febe739d843', 0, 1)"

at least the very first assert could be related

@fjetter
Copy link
Member Author

fjetter commented May 14, 2021

I assume the above is connected since all three failures (ubu 3.8 and the two osx failures) are the same failing test. However, it's one of these incredibly rare events which is hard to reproduce locally

@fjetter fjetter force-pushed the add_test_about_error_case branch 4 times, most recently from f2abe61 to 05fb3df Compare May 25, 2021 10:59
@fjetter fjetter mentioned this pull request May 27, 2021
@fjetter fjetter force-pushed the add_test_about_error_case branch from b11b657 to d9cb0e9 Compare May 28, 2021 11:48
@fjetter fjetter changed the title Allow erred tasks to be forgotten on workers Forget erred tasks // Fix deadlocks on worker Jun 7, 2021
@fjetter fjetter force-pushed the add_test_about_error_case branch from 23e104a to 5a35ce1 Compare June 7, 2021 12:48
@fjetter
Copy link
Member Author

fjetter commented Jun 7, 2021

All tests green 🎉

@fjetter
Copy link
Member Author

fjetter commented Jun 7, 2021

friendly ping to @gforsyth and @jrbourbeau for another review.

What changed since the last time I asked for reviews are the handlers which control task releases on the worker and how and when the scheduler invokes them. I changed the names for all of them since I consider "release" to be much too ambiguous but maybe the new names are equally confusing. Happy to give everything different names if smbd comes up with a suggestion

Copy link
Contributor

@gforsyth gforsyth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick first pass -- I'll try to find time to dig in a bit deeper

distributed/tests/test_worker.py Show resolved Hide resolved
distributed/tests/test_scheduler.py Outdated Show resolved Hide resolved
distributed/tests/test_worker.py Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
@fjetter fjetter force-pushed the add_test_about_error_case branch from ef94538 to 05c1c0e Compare June 9, 2021 11:56
@fjetter
Copy link
Member Author

fjetter commented Jun 9, 2021

@jrbourbeau and @gforsyth can you please indicate whether I can expect a review? If you are too busy that's fine. I just would like to know if I am waiting for something to happen. Any further work on the worker is kind of blocked by this PR and I am confident that this improves stability.

@fjetter
Copy link
Member Author

fjetter commented Jun 9, 2021

OSX test failure is unrelated and connected to unclosed RPC after startup/teardown of an adaptive cluster

@jrbourbeau
Copy link
Member

I began reviewing the changes here yesterday, but didn't have much dedicated time to focus on this today. If you're confident in the changes here, then feel free to merge them in. Otherwise, I will have time to review tomorrow

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of minor nits, and one genuine question :)

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
if ts and not ts.scheduler_holds_ref:
self.release_key(key, reason=f"delete data: {reason}", report=False)

logger.debug("Worker %s -- Deleted %d keys", self.name, len(keys))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about why there should be multiple methods for this. In principle I think of our current approach as the scheduler saying

I don't need you to have this data

The worker though, as you say, may disagree and hold on to it. I don't imagine a situation where the scheduler would override the worker, and so I don't see a reason to have multiple methods for this signal.

Thoughts?

Copy link
Member Author

@fjetter fjetter Jun 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, I think this particular signal I called "superfluous data" should not exist and I believe this is what you're saying as well.
This signal is issued here (link to main branch)

if ts is not None and ts._state == "memory":
if ts not in ws._has_what:
ws._nbytes += ts.get_nbytes()
ws._has_what[ts] = None
ts._who_has.add(ws)
else:
self.worker_send(
worker, {"op": "delete-data", "keys": [key], "report": False}
)

I am actually a bit fuzzy myself how this particular race condition occurs but I think something like the following causes otherwise a race condition which results in a deadlock

Task A (Worker A) -->  Task C (Worker B)
                           - ^
                        /        
                      /
Task B (Worker B)

(pardon aweful ascii art. C depends on A and B)

  • Worker B tries to execute C but needs to fetch Task A to do so
  • Worker A submits data to B
  • Data arrives at B and is being deserialized or is waiting for a loop, whatever
  • Worker A dies and the scheduler notices
  • Scheduler thinks all is lost and transitions Task A to released and tries to reschedule on Worker B
  • ... In the meantime Worker B is done deserializing and puts key in memory, notifies scheduler usign add-keys
  • Worker B receives the compute-task signal from scheduler and thinks he's lucky since he got the data already, cool. He tells the schdeuler -> task-finished signal
  • Scheduler receives the add-keys response but thinks "This task is not in memory any more, please forget it" -> delete-data signal (what I call superfluous data)
  • Worker B receives the delete-data signal and forgets everything (this is where my superfluous-data signal has a safeguard)
  • Scheduler receives the task-finished signal from before and thinks everything is OK and proceeds as before, however, worker B does not have the data.

I assume this only is a problem due to another bug in how the worker corrects missing data (which I am also trying to fix) which would eventually heal the cluster but I'm not entirely certain

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha, that's a fun comedy of errors.

Yeah, so I agree that this feels like a different bug. In principle it'd be nice to fix that bug rather than add this signal, but I can imagine that it might be best to do that in future work.

client=True,
nthreads=[("127.0.0.1", 1), ("127.0.0.1", 2), ("127.0.0.1", 3)],
)
async def test_worker_same_host_replicas_missing(c, s, a, b, x):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a test case for the last deadlock we were made aware of. It requires a bit of patching since it is otherwise not possible to stimulate the same-host with multiple who-has edge case this is hitting

This happens if there are multiple who_has such that the scheduler doesn't retrigger a clean computation but the worker always pick the same erroneous worker. If the worker doesn't forget about that wrong worker after first try, we're running in a loop.

local = [w for w in workers if get_address_host(w) == host]
if local:
worker = random.choice(local)
else:
worker = random.choice(list(workers))

@fjetter fjetter linked an issue Jun 11, 2021 that may be closed by this pull request


@gen_cluster(client=True, timeout=None)
async def test_handle_superfluous_data(c, s, a, b):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is rather close to the implementation since we somehow need to wait for worker B to actually start deserializing and we do not have any sane hooks exposing this at the moment, therefore I'm watching the connection pool to make this work.
see #4784 (comment)

This test proves the race condition (replace handle-superfluous-data with handle-free-keys and it is stuck) but I couldn't prove that we require any handler like this at all. I kept this path to preserver "previous functionality" and added a really stupid test below which constructs the situation without any actual work, just to have the code path covered. I will try further to reconstruct a case for this but I don't want this to block the PR (see test_forget_data_not_supposed_to_have)

@fjetter
Copy link
Member Author

fjetter commented Jun 11, 2021

Win-py38: #4859
osx-py37: #4905

@fjetter
Copy link
Member Author

fjetter commented Jun 11, 2021

Since the test failures appear to be unrelated, I will go ahead and merge this now. Feel free to leave review comments afterwards, I'll address everything that comes up.

@fjetter fjetter merged commit 5e5b983 into dask:main Jun 11, 2021
@fjetter fjetter deleted the add_test_about_error_case branch June 11, 2021 17:12
@jrbourbeau jrbourbeau mentioned this pull request Jun 14, 2021
4 tasks
mrocklin pushed a commit that referenced this pull request Jun 29, 2021
This is a follow up to #4784 and reduces complexity of Worker.release_key significantly.

There is one non-trivial behavioural change regarding erred tasks. Current main branch holds on to dependencies of an erred task on a worker and implements a release mechanism once that erred task is released. I implemented this recently trying to capture status quo but I'm not convinced any longer that this is the correct behaviour. It treats the erred case specially which introduces a lot of complexity. The only place where this might be of interest is if an erred task wants to be recomputed locally. Not forgetting the data keys until the erred task was released would speed up this process. However, we'd still need to potentially compute some keys and I'm inclined to strike this feature in favour of reduced complexity.
@fjetter fjetter mentioned this pull request Aug 6, 2021
7 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Computation deadlocks after inter worker communication error
4 participants