-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
P2P shuffle deduplicates data and can be run several times #7486
Conversation
…ging stimulus_task_finished
It looks like performance degraded by roughly 20 % for shuffle/merging, not for shuffle-based aggregation: You can find the results of an A/B test here: https://github.com/coiled/coiled-runtime/actions/runs/4006635032 |
20% is OK-ish for now. I'd still like us to roughly understand where this regression is coming from. I'm concerned that this affects event loop stability and not just runtime performance. event loop stability is something I'm not eager to sacrifice |
In an offline conversation we could track some of this perf regression down to us using a list of tuples instead of a list of bytes to submit data. Our serialization protocol does not handle this efficiently. It is calling distributed/distributed/protocol/serialize.py Line 317 in 0161991
serialize
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to review the tests still but changes so far look good
await shuffle.close() | ||
extension._runs.remove(shuffle) | ||
|
||
self.worker._ongoing_background_tasks.call_soon(_, self, shuffle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we dispatch this to a background task? why not make shuffle_fail async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can revisit whether we still need this. In some intermediate version, shuffle_fail
needed to be synchronous to ensure ordering of operations. It might be that we solved this by propagating ShuffleRun.run_id
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shuffle_fail
must be synchronous to ensure that a shuffle run is always invalidated before the task of a new run starts computing. For this, we rely on the message ordering of the stream. However, we must also guarantee that the messages are processed in the order they were submitted to the stream. Since handle_stream
hands coroutines off to background tasks instead of awaiting them, shuffle_fail
must be synchronous to ensure that the shuffle is invalidated before we process the next message.
distributed/distributed/core.py
Lines 898 to 900 in fa86772
if iscoroutinefunction(handler): | |
self._ongoing_background_tasks.call_soon( | |
handler, **merge(extra, msg) |
Note that this differs from the behavior of handle_comm
, which awaits coroutines:
distributed/distributed/core.py
Lines 819 to 820 in fa86772
if inspect.iscoroutine(result): | |
result = await result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the above is no longer true. handlers are always executed in order since #8059
Co-authored-by: Florian Jetter <fjetter@users.noreply.github.com>
await c.compute(dd.shuffle.shuffle(df, "x", shuffle="p2p")) | ||
|
||
await clean_worker(a, timeout=2) | ||
await clean_worker(b, timeout=2) | ||
await clean_scheduler(s, timeout=2) | ||
while s.tasks: | ||
await asyncio.sleep(0) | ||
|
||
await c.compute(out.head(compute=False)) | ||
await c.compute(dd.shuffle.shuffle(df, "x", shuffle="p2p")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have another test that actually computes the identical graph, i.e.
graph = dd.shuffle.shuffle(df, ...).size
await c.compute(graph)
await asyncio.sleep(0.1
await c.compute(graph)
If you are generating the graph from scratch, there is otherwise the possibility that some ID is regenerated. The fact that the ID is currently deterministic is more or less an implementation detail but I wouldn't want to hold on to this, at least not for the shuffle layer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point, I'll add that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See test_repeat_shuffle_operation
and test_repeat_shuffle_instance
|
||
out = dd.shuffle.shuffle(df, "x", shuffle="p2p") | ||
x, y = c.compute([df.x.size, out.x.size]) | ||
await get_shuffle_id(s) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this doing/asserting? This looks like there is an implicit assumption about get_shuffle_id
doing something. ideally we'd assert on some sort of state here. IF that's not possible, I suggest to add comment to explain what this is for
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've renamed it. This waits until the shuffle is initialized on the scheduler and returns its ID.
shuffle_id = await get_shuffle_id(s) | ||
shuffle_dict = scheduler_ext.get(shuffle_id, None, None, None, a.worker_address) | ||
assert await worker_ext._get_shuffle_run(shuffle_id, shuffle_dict["run_id"]) | ||
with pytest.raises(RuntimeError, match="Invalid shuffle state"): | ||
await worker_ext._get_shuffle_run(shuffle_id, shuffle_dict["run_id"] + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not super excited about this. If this is just about getting the coverage report to look nice, let's add a pragma: no cover
Is this exception even reachable under ordinary circumstances?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this entire test covers something that is currently not possible. I OK with keeping it regardless but I suggest to add an explanatory comment why these are effectively dead code branches.
IIUC this is because of our scheduler run_id magic, isn't it?
if this is the intentino of this test, I also suggest a different name. this is not necessarily about lifecycle. I consider lifecycle as creating state, using it, cleaning it up, etc.
This is more about staleness and internal consistency checks, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I'll add some comments and adjust the naming. I am not aware of any case where this would happen, but it's nice to know that this case is caught should it ever arise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree. This is just a bit of a code smell and suggests that our architecture is not great, yet since this is effectively a strong abstraction leak. Anyhow, If that's the worst of it, we're still in good shape :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the only test case that should not happen. The test generally checks invariants that the implementation heavily relies upon, which is why I think that they warrant a dedicated test.
I should make it more clear though that this test checks internals only relevant to developers working on the P2P shuffle implementation, not to users of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added more documentation.
One or two test adjustments and this is in. Thanks for the hard work @hendrikmakait ! |
I believe the following tickets can be closed with this as well, can't they @hendrikmakait ? |
@fjetter: |
Oh, this is interesting (but unrelated) https://github.com/dask/distributed/actions/runs/4026033623/jobs/6920058827
This is |
@hendrikmakait there is a possibly related failure in https://github.com/dask/distributed/actions/runs/4026033623/jobs/6920058644
|
Thanks, @fjetter. I could reproduce the test being flaky locally and it should be resolved by awaiting the result to avoid leaking threads. |
FYI #7505 for the linting issue. my bad |
{"shuffle": BlockedShuffleReceiveShuffleWorkerExtension}, | ||
) | ||
@gen_cluster(client=True, nthreads=[("", 1)] * 2) | ||
async def test_deduplicate_stale_transfer(c, s, a, b, wait_until_forgotten): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI this is the issue we ran into earlier dask/dask#9888
Blocked by Do not allow for a worker to reject a drop replica request #7490This PR removes the tombstone-based logic that ensured consistency for a shuffle, but also prohibited the user from running the same shuffle several times. It introduces the concept of a
ShuffleRun
, which is a single execution of a shuffle and is used to allow tasks from multiple executions to run concurrently while guaranteeing consistency. For this purpose, we propagate theShuffleRun
used in the individual tasks to downstream ones.pre-commit run --all-files