Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Peer-to-peer shuffle design #5435

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

gjoseph92
Copy link
Collaborator

Here's a work-in-progress proposal for how we can implement the scalable peer-to-peer shuffling prototyped in dask/dask#8223 in a maintainable and stable way.

The primary problem with the code in dask/dask#8223 is that it's brittle and non-maintainable in its current state. (And also not reliable.) So the primary purpose of this PR is to discuss how we can write something similar, implementation-wise, but in a way that integrates better and we're happy to maintain long-term.

This is meant to be relatively high-level and architectural, and we'd like to keep the conversation away from implementation details except where necessary.

You can view the rendered markdown at https://github.com/gjoseph92/distributed/blob/p2p-shuffle/proposal/distributed/shuffle/shuffle-design.md.

cc @fjetter @jcrist @crusaderky @jrbourbeau

@gjoseph92
Copy link
Collaborator Author

I'll be out next week, so I've given @fjetter access to https://github.com/gjoseph92/distributed (and therefore this branch) for now so he can push changes.

Alternatively, if folks would prefer just copying it all into a Google doc, I'm perfectly happy with that too.

distributed/shuffle/shuffle-design.md Show resolved Hide resolved

Because most the tasks in the shuffle graph are impure and run for their side effects, restarting an in-progress shuffle requires rerunning _every_ task involved, even ones that appear to have successfully transitioned to `memory` and whose "results" are stored on non-yet-dead workers.

Additionally, cleanly stopping a running shuffle takes more than just releasing the shuffle tasks from memory: since there's out-of-band processing going on, the `ShuffleExtension` has to be informed in some way that it needs to stop doing whatever it's doing in the background, and clear out its buffers. Also, executing tasks may be blocking on the `ShuffleExtension` doing something; without a way to tell the extension to shut down, those tasks might block forever, deadlocking the cluster.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends what "releasing" means. Currently releasing a task on the worker means

  • Ensure the task is not executed and no coroutine is running which is trying to fetch that tasks result/key/data.
  • Reset any TaskState attributes to a neutral value.
  • Remove the TaskState.key from Worker.data if it is in there.

Whatever the extension does, it might be coupled to any of these mechanisms and a "release task" action would be sufficient for cleanup

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be a mechnaism to release input buffer immediately and we could get rid of data in memory/disk but output buffer is actually very difficult

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the TaskState.key from Worker.data if it is in there.

In the POC we used this as the signal to clean up state (the ShuffleService object was a dependency of every task, got copied into every Worker.data, and had a __del__ method to clean things up when it was removed).

But this is unreliable in pathological shuffle cases; for example when some workers either don't send any data or don't receive any data in the shuffle. Also, there are weird questions around what happens when that object itself is spilled to disk.

Whatever the extension does, it might be coupled to any of these mechanisms and a "release task" action would be sufficient for cleanup

I agree that this would be nice, but I don't really like the idea of hooking into gather_dep / TaskState attribute changes to detect when this has happened. It just feels brittle to me and like it requires really really careful reasoning about what signals the scheduler is going to send in which cases. Having an explicit mechanism built into RerunGroup seems both easy and reliable.

In the spirit of what I said in the blog post:

It may be worth it to adjust the system so that bypassing task graphs isn’t a hack, but a supported (and tested and maintained) feature.
https://coiled.io/blog/better-shuffling-in-dask-a-proof-of-concept/

I just prefer the idea of making a clear path for how to this in the API that doesn't require deep understanding of the current implementation of the worker and scheduler.

distributed/shuffle/shuffle-design.md Outdated Show resolved Hide resolved

Therefore, we propose adding a `RerunGroup` (`ImpureGroup`? `CoExecutionGroup`? `RestartGroup`? `OutOfBandGroup`? name TBD) structure to the scheduler which intertwines the fates of all tasks within it: if any one task is to be rescheduled (due to its worker leaving), all tasks are restarted; if any one is to be prematurely released (due to cancellation), all are released.

Membership in a `RerunGroup` is implemented via task annotations, where each task gives the name of the `RerunGroup` it belongs to. A task can belong to at most one `RerunGroup`. TBD if we will enforce any structural restrictions on `RerunGroup`s to prevent odd/invalid states from emerging—we probably should, such as not allowing disjoint parts of the graph to be in the same `RerunGroup`, etc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to say this is out of scope and the responsibility of the user.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are annotations impacted by graph optimizations?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: What happens if you fuse an annotated blockwise with an unannotated blockwise. The intended behaviour is currently unclear.

The rerunning might actually be a special case since this specific case is "inheritable", i.e. fused objects should both be rerun.

The current default behaviour according to Jim is that annotated tasks are not fused but we might be able to special case this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to say this is out of scope and the responsibility of the user.

I think this could create all sorts of headaches for us down the line in the scheduler, where we have to bend ourselves into knots to handle weird cases that are technically possible but nobody should actually do. A more restrictive API up front means less work for us!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: What happens if you fuse an annotated blockwise with an unannotated blockwise. The intended behaviour is currently unclear.

There's an open issue around annotations getting lost during optimization dask/dask#7036 but I don't think it fully recognizes the problem (we don't have a clear policy on annotation propagation during optimization). Fixing this would also be necessary work here; I've mentioned it in the doc.

distributed/shuffle/shuffle-design.md Show resolved Hide resolved
distributed/shuffle/shuffle-design.md Outdated Show resolved Hide resolved
distributed/shuffle/shuffle-design.md Outdated Show resolved Hide resolved
distributed/shuffle/shuffle-design.md Show resolved Hide resolved
distributed/shuffle/shuffle-design.md Outdated Show resolved Hide resolved
@mrocklin
Copy link
Member

In general the objectives and constraints laid out here seems sensible to me. Thank you for writing this up @gjoseph92 . I would like to suggest that we think about a plan for resilience, but implement that last. There is value in getting this out quickly so that people can experiment and give us feedback. I expect resilience to be an interesting but time consuming and ultimately separable problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants