Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

User a layer for p2p shuffle #7180

Merged
merged 1 commit into from
Oct 26, 2022
Merged

User a layer for p2p shuffle #7180

merged 1 commit into from
Oct 26, 2022

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Oct 24, 2022

This moves the graph generation for p2p shuffling to a layer. The benefit is that we can generate a "new graph" when culling.

This specifically helps with problem cases like Case 2 in #6105

df = ...
x = df.shuffle(on="x")
y = x.partitions[x.npartitions//2].persist()
sleep(0.1)
z = x.persist()

as exhibited by test case test_add_some_results.

In the above example, y, and z, will be treated as unique shuffles which makes releasing keys much more straight forward and decouples the problem significantly.

Of course, we may need to submit some data twice but I doubt this is a problem in practice.

FWIW I only inherited from SimpleShuffleLayer because I didn't want to duplicate too much code. It's sufficiently similar to what we need for now to be subclassed

@github-actions
Copy link
Contributor

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±0         15 suites  ±0   6h 13m 13s ⏱️ - 21m 38s
  3 153 tests ±0    3 061 ✔️  - 2    85 💤 ±0    6 +1  1 🔥 +1 
23 329 runs  +1  22 396 ✔️ +2  911 💤  - 3  21 +1  1 🔥 +1 

For more details on these failures and errors, see this check.

Results for commit dda2ad9. ± Comparison against base commit 8f25111.

Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the code-movement aspects.

)

def _construct_graph(self, deserializing=None):
token = tokenize(self.name_input, self.column, self.npartitions, self.parts_out)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW putting self.parts_out here is the actual functional change. This causes culled graphs to be unique which matches the way how the shuffle service works very well.

Comment on lines -73 to -81
transferred = df.map_partitions(
shuffle_transfer,
id=token,
npartitions=npartitions,
column=column,
meta=df,
enforce_metadata=False,
transform_divisions=False,
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one downside of this is that we'll no longer get input task fusion. The previous code here was a BlockWise layer and we'll loose this. However, I consider consistency more important than the input task fusion.

FWIW "fixing"/enabling input (and output) task fusion for graphs of this type would be beneficial for other types of graph as well but I consider this out of scope for now

dsk = {
(name, i): (shuffle_unpack, token, i, barrier_key) for i in range(npartitions)
}
layer = MaterializedLayer(dsk, annotations={"shuffle": lambda key: key[1]})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cherry on top: no MaterializedLayer

Comment on lines +291 to +300
x = dd.shuffle.shuffle(df, "x", shuffle="p2p")
full = await x.persist()
ntasks_full = len(s.tasks)
del full
while s.tasks:
await asyncio.sleep(0)
partial = await x.tail(compute=False).persist() # Only ask for one key

assert len(s.tasks) < df.npartitions * 2
assert len(s.tasks) < ntasks_full
del partial
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're no longer fusing this test was breaking. The new test ensures that the culled graph has fewer tasks instead of asserting on the ratio to input partitions

@fjetter fjetter self-assigned this Oct 26, 2022
@fjetter fjetter merged commit c349f4f into dask:main Oct 26, 2022
@fjetter fjetter deleted the p2p_layer branch October 26, 2022 13:39
@fjetter fjetter mentioned this pull request Oct 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants