Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DNM] Peer-to-peer shuffle design #5435
base: main
Are you sure you want to change the base?
[DNM] Peer-to-peer shuffle design #5435
Changes from 5 commits
a9f15aa
9f3ee13
847425b
7077541
0060cfc
04868a6
213f5ad
897c375
fdf6e65
aef4156
cc3f238
5fb5c56
bb37433
481958c
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends what "releasing" means. Currently releasing a task on the worker means
Whatever the extension does, it might be coupled to any of these mechanisms and a "release task" action would be sufficient for cleanup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There could be a mechnaism to release input buffer immediately and we could get rid of data in memory/disk but output buffer is actually very difficult
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the POC we used this as the signal to clean up state (the
ShuffleService
object was a dependency of every task, got copied into everyWorker.data
, and had a__del__
method to clean things up when it was removed).But this is unreliable in pathological shuffle cases; for example when some workers either don't send any data or don't receive any data in the shuffle. Also, there are weird questions around what happens when that object itself is spilled to disk.
I agree that this would be nice, but I don't really like the idea of hooking into
gather_dep
/ TaskState attribute changes to detect when this has happened. It just feels brittle to me and like it requires really really careful reasoning about what signals the scheduler is going to send in which cases. Having an explicit mechanism built intoRerunGroup
seems both easy and reliable.In the spirit of what I said in the blog post:
I just prefer the idea of making a clear path for how to this in the API that doesn't require deep understanding of the current implementation of the worker and scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm inclined to say this is out of scope and the responsibility of the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are annotations impacted by graph optimizations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: What happens if you fuse an annotated blockwise with an unannotated blockwise. The intended behaviour is currently unclear.
The rerunning might actually be a special case since this specific case is "inheritable", i.e. fused objects should both be rerun.
The current default behaviour according to Jim is that annotated tasks are not fused but we might be able to special case this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could create all sorts of headaches for us down the line in the scheduler, where we have to bend ourselves into knots to handle weird cases that are technically possible but nobody should actually do. A more restrictive API up front means less work for us!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an open issue around annotations getting lost during optimization dask/dask#7036 but I don't think it fully recognizes the problem (we don't have a clear policy on annotation propagation during optimization). Fixing this would also be necessary work here; I've mentioned it in the doc.