Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle service resilience #6105

Closed
mrocklin opened this issue Apr 11, 2022 · 7 comments · Fixed by #7508
Closed

Shuffle service resilience #6105

mrocklin opened this issue Apr 11, 2022 · 7 comments · Fixed by #7508
Assignees
Labels
discussion Discussing a topic with no specific actions yet scheduler shuffle

Comments

@mrocklin
Copy link
Member

In #5520 and #5976 and #6007 we've started a shuffle service. This has better memory characteristics, but is not resilient. In particular, it can break in a few ways:

  1. A worker holding shuffle outputs can die mid-shuffle
  2. New outputs of a shuffle can be requested by a client after the shuffle has started
  3. Output futures of a shuffle can be unrequested by a client

There are a few ways to solve this problem. One way I'd like to discuss here is opening up scheduler events to extensions, and letting them trigger transitions. In particular both scenarios 1 and 2 can be handled by letting the extension track remove_worker and update_graph events and restart all shuffle tasks if an output-holding worker dies, or if any of the existing shuffles occur in a new graph. Scenario 3 can be handled by letting the extension track transition events, and clean things up when the barrier task transitions out of memory.

So far, I think that this can solve all of the resilience issues in shuffling (at least everything I've come across). However, it introduces two possible concerns:

1 - Scheduler performance

Maybe it doesn't make sense for every transition to cycle through every extension to see if they care about transitions.

This doesn't seem to be that expensive in reality

In [1]: extensions = [object() for _ in range(10)]

In [2]: %%timeit
   ...: for i in range(1000):
   ...:     for extension in extensions:
   ...:         if hasattr(extension, "transition"):
   ...:             extension.transiiton()
   ...:
383 µs ± 7.07 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

So around 40ns per extension per transition. It's well under a microsecond for all of our extensions.

2 - Complexity

Now any extension can inject transitions. Horrible horrible freedom!

My guess is that this is ok as long as we maintain some hygiene around, for example, always using the trasitions system, rather than mucking about with state directly

This is also a somewhat systemic change for what is, today, a single feature.

cc @gjoseph92 @fjetter for feedback

@gjoseph92
Copy link
Collaborator

Scheduler plugins already support all these events. Why not just use that interface? In practice, there's just as much horrible horrible freedom with plugins as extensions, since they also give you access to the Scheduler object to do whatever you want with. That's why I used a plugin in #5524 instead of an extension: https://github.com/dask/distributed/pull/5524/files#diff-bbcf2e505bf2f9dd0dc25de4582115ee4ed4a6e80997affc7b22122912cc6591R191-R194

The only upside I see to extensions is that they generally have at-most-one semantics, whereas plugins don't assume they're singletons.

An interface change we could consider is allowing plugins to return recommendations after these events. (In fact I once made this change a few years ago in a private fork for similar reasons.) This would be a slightly more well-structured way for plugins to affect things than calling transition or manipulating state directly.

Also, your benchmark doesn't include the function call (because object() has no attribute 'transition'). Adding that in more than doubles the time on my machine (533µs -> 1.42ms). When you consider that that function might actually... do something (😱) (mostly just to check there's nothing serious to do), I might actually worry a little about adding that overhead on every transition.

@mrocklin
Copy link
Member Author

mrocklin commented Apr 12, 2022 via email

@mrocklin
Copy link
Member Author

@gjoseph92 do you see obvious flaws with the plan above? Do you thnk that it would do the job? (although possibly not as nicely as desired)

@fjetter
Copy link
Member

fjetter commented Oct 24, 2022

A couple of questions/comments mostly for my understanding

  1. I think a remove_worker hook for the shuffle extension is straight forward and the easiest approach for now
  2. What would be an example for "new outputs requested"?

Is this something like the following...

  • Cull the graph such that only some output keys are required
  • Later, request the entire graph after all
df = ...
x = df.shuffle(on="x")
y = x.partitions[x.npartitions//2].persist()
sleep(0.1)
z = x.persist()

This currently fails and there are actually a couple of different error cases depending on how data is distributed on the cluster. (Sometimes an assert, sometimes a KeyError; wouldn't be surprised if there is something else)

That's basically test_add_some_results

  1. Basically the other way round
y = x.persist()
z = y.partitions[y.npartitions // 2].persist()
del y

which currently does not clean up stuff / close an existing shuffle properly. This is already written in test_delete_some_results

Did I understand these error cases properly?

@mrocklin
Copy link
Member Author

Did I understand these error cases properly?

Based on my (bad) memory, yes.

@mrocklin
Copy link
Member Author

In many cases I think that the short term answer is

  1. Can we identify the situation?
  2. Can we reset everything to a clean state and start over?

If we get good at both then that gives us a not ideal but totally workable approach, I think.

@fjetter fjetter added discussion Discussing a topic with no specific actions yet scheduler shuffle labels Oct 24, 2022
@fjetter
Copy link
Member

fjetter commented Oct 24, 2022

I would suggest a slightly different approach

  1. (as I said, remove_worker hook)
  2. I think this is a culling problem. The issue we are seeing here originates from the way we are building the graph. Whenever we cull output tasks, we should also do less work on the input and we basically rewrite the task graph.

If we look at this example

df = ...
x = df.shuffle(on="x")
y = x.partitions[x.npartitions//2].persist()
sleep(0.1)
z = x.persist()

I suggest that z and y don't even share any shuffling related keys and both perform a unique shuffle op. I think this is much cleaner since it

  • makes culled graphs much faster (although I don't know how relevant this is for shuffle)
  • Allows us to cancel both very cleanly
  • We don't need to hook into the very, very messy update_graph

This is pretty easy if we just define a P2PShuffleLayer

  1. The only problem I can come up with (after playing with it as well) is that we're not cleaning up state properly. That's unfortunate but not necessarily critical.
    I suggest to replace ShuffleSchedulerExtension.register_complete with an appropriate transition hook, ensuring that we clean up after ourselves (broadcasting to the workers if need be)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet scheduler shuffle
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants