Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
P2P shuffle skeleton #5520
P2P shuffle skeleton #5520
Changes from all commits
5a1f00e
a46246c
3d4bbf3
433ce17
743215e
6f0b531
4a4f430
c39309c
432d149
cad4717
74ddf00
a998141
c480684
3fadb0a
10c0c64
6de77db
0e403ec
9b9a68b
20a2ce4
506dda3
c8a2f7a
20c9312
9f4304d
ab2c1d1
c6fc157
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, neither of these PRs solves this problems, do they? Do we have an idea how to solve this problem down the road? What would need to change to enable this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#5524 solves it (that's one of the main points). The scheduler plugin handles the unpredictable task names by traversing the graph from the barrier task and parsing the key names of its dependents. That's what the discussion in https://github.com/dask/distributed/pull/5524/files#r765203083 is about.
I think the idea we have right now is something like:
where we add annotations to the graph explicitly marking which output partition number each task is. The
fuse="propagate"
would be respected by all fusion optimizations (dask/dask#7036 is a prerequisite), so it would carry annotations from this layer over to any final fused layer, including rewriting the keys as necessary.And the only thing these annotations gain us is not having to parse the keys of tasks that depend on the barrier, to pull out that
i
component (instead, we'd look at the task's annotations). And as ugly as key-parsing feels, the only case where it'll actually break in practice is if you convert to a non-DataFrame collection (namely Delayed, but Array might cause problems too) and graph optimizations support fusion across that boundary (at the moment, not sure if this can happen).Basically key-parsing would be fine for 95% of situations; the annotations would only solve some edge cases and make it feel more proper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In an earlier offline conversation, I mentioned that I am doubtful whether or not this should actually be considered a blockwise operation. My line of argumentation is mostly about the fact that the individual pieces are not independently calculable. This blockwise definition you propose itself also doesn't really look healthy in my opinion since we're encoding the
BlockwiseDepDict
to define the output partitions even though we're, in fact, not dealing with an IO layer here. It seems we're using an extremely complex machinery here even though it is not necessarily required.The most important reason why we'd want to use this blockwise layer here is to enable downstream task fusion. imho, this should not require us to define this layer as blockwise but rather subsequent blockwise operations should care and be capable to fuse with with this. We may need to teach it this first and in the intermediate term we can use blockwise. My point is, I consider blockwise as rather the shortcut and not the as final solution.
Regardless of how the fusion is supposed to work, I am doubtful about whether or not this is even required. To my understanding fusion is extremely important for the input/transfer layer since it needs to be fused with whatever the input is. E.g. if we're starting off with a
read_parquet
, this should be fused with thetransfer
task. This is the important root task overproduction scenario since we do not read 100 files before we engage in any transfer.However, on the output/unpack side, the situation is not as clear to me. At the very least it's not as urgent. Once we reach
unpack
all data is on-cluster. As long as we do not implement data spilling, the data is even in-memory. As long as all data is in memory, this should be mostly irrelevant. Delaying this further down the road would be fine for me since I could think of other ways of dealing with this problem than forcing fusion (e.g. the worker plugin/extension could enforce a limit on how many tasks are allowed to be processed/be in memory at the same time).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fair. You're right that if the data is all in memory anyway, overproduction is irrelevant. And there are probably other hacks we can do to protect against it on the unpack side.
I think making a dedicated layer is the best approach here.