Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P shuffle deduplicates data and can be run several times #7486

Merged
merged 54 commits into from
Jan 27, 2023

Conversation

hendrikmakait
Copy link
Member

@hendrikmakait hendrikmakait commented Jan 18, 2023

This PR removes the tombstone-based logic that ensured consistency for a shuffle, but also prohibited the user from running the same shuffle several times. It introduces the concept of a ShuffleRun, which is a single execution of a shuffle and is used to allow tasks from multiple executions to run concurrently while guaranteeing consistency. For this purpose, we propagate the ShuffleRun used in the individual tasks to downstream ones.

  • Tests added / passed
  • Passes pre-commit run --all-files

@hendrikmakait
Copy link
Member Author

It looks like performance degraded by roughly 20 % for shuffle/merging, not for shuffle-based aggregation:

test_join_big[1-p2p]

test_shuffle[p2p]

test_q8[5 GB (parquet+pyarrow)-p2p]

You can find the results of an A/B test here: https://github.com/coiled/coiled-runtime/actions/runs/4006635032

@fjetter
Copy link
Member

fjetter commented Jan 25, 2023

20% is OK-ish for now. I'd still like us to roughly understand where this regression is coming from. I'm concerned that this affects event loop stability and not just runtime performance. event loop stability is something I'm not eager to sacrifice

@fjetter
Copy link
Member

fjetter commented Jan 26, 2023

In an offline conversation we could track some of this perf regression down to us using a list of tuples instead of a list of bytes to submit data. Our serialization protocol does not handle this efficiently. It is calling pickle.dumps no the tuples which triggers a data copy instead of iterating into the tuples and write headers/frames.
Enabling iterate_collection in

fixes this but might have unwanted side effects. I suggest to postpone a fix for this until later. A fix could either be to make serialize handle this case better or to even use a custom class as a container with a registered serializer that bypasses the recursive walks in serialize

@hendrikmakait
Copy link
Member Author

After serializing the pa.Table before handing it to the buffers, it looks like we have recovered from the regression:
test_shuffle[p2p]

test_join_big[p2p]

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to review the tests still but changes so far look good

distributed/shuffle/_worker_extension.py Show resolved Hide resolved
distributed/shuffle/_worker_extension.py Outdated Show resolved Hide resolved
await shuffle.close()
extension._runs.remove(shuffle)

self.worker._ongoing_background_tasks.call_soon(_, self, shuffle)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we dispatch this to a background task? why not make shuffle_fail async?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can revisit whether we still need this. In some intermediate version, shuffle_fail needed to be synchronous to ensure ordering of operations. It might be that we solved this by propagating ShuffleRun.run_id.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shuffle_fail must be synchronous to ensure that a shuffle run is always invalidated before the task of a new run starts computing. For this, we rely on the message ordering of the stream. However, we must also guarantee that the messages are processed in the order they were submitted to the stream. Since handle_stream hands coroutines off to background tasks instead of awaiting them, shuffle_fail must be synchronous to ensure that the shuffle is invalidated before we process the next message.

if iscoroutinefunction(handler):
self._ongoing_background_tasks.call_soon(
handler, **merge(extra, msg)

Note that this differs from the behavior of handle_comm, which awaits coroutines:

if inspect.iscoroutine(result):
result = await result

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the above is no longer true. handlers are always executed in order since #8059

hendrikmakait and others added 2 commits January 26, 2023 17:57
Co-authored-by: Florian Jetter <fjetter@users.noreply.github.com>
distributed/shuffle/tests/test_shuffle_extension.py Outdated Show resolved Hide resolved
distributed/shuffle/tests/test_shuffle.py Show resolved Hide resolved
Comment on lines 814 to 819
await c.compute(dd.shuffle.shuffle(df, "x", shuffle="p2p"))

await clean_worker(a, timeout=2)
await clean_worker(b, timeout=2)
await clean_scheduler(s, timeout=2)
while s.tasks:
await asyncio.sleep(0)

await c.compute(out.head(compute=False))
await c.compute(dd.shuffle.shuffle(df, "x", shuffle="p2p"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have another test that actually computes the identical graph, i.e.

graph = dd.shuffle.shuffle(df, ...).size
await c.compute(graph)
await asyncio.sleep(0.1
await c.compute(graph)

If you are generating the graph from scratch, there is otherwise the possibility that some ID is regenerated. The fact that the ID is currently deterministic is more or less an implementation detail but I wouldn't want to hold on to this, at least not for the shuffle layer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, I'll add that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See test_repeat_shuffle_operation and test_repeat_shuffle_instance

distributed/shuffle/tests/test_shuffle.py Show resolved Hide resolved
distributed/shuffle/tests/test_shuffle.py Show resolved Hide resolved
distributed/shuffle/tests/test_shuffle.py Outdated Show resolved Hide resolved
distributed/shuffle/tests/test_shuffle.py Outdated Show resolved Hide resolved

out = dd.shuffle.shuffle(df, "x", shuffle="p2p")
x, y = c.compute([df.x.size, out.x.size])
await get_shuffle_id(s)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this doing/asserting? This looks like there is an implicit assumption about get_shuffle_id doing something. ideally we'd assert on some sort of state here. IF that's not possible, I suggest to add comment to explain what this is for

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed it. This waits until the shuffle is initialized on the scheduler and returns its ID.

Comment on lines 1495 to 1499
shuffle_id = await get_shuffle_id(s)
shuffle_dict = scheduler_ext.get(shuffle_id, None, None, None, a.worker_address)
assert await worker_ext._get_shuffle_run(shuffle_id, shuffle_dict["run_id"])
with pytest.raises(RuntimeError, match="Invalid shuffle state"):
await worker_ext._get_shuffle_run(shuffle_id, shuffle_dict["run_id"] + 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not super excited about this. If this is just about getting the coverage report to look nice, let's add a pragma: no cover
Is this exception even reachable under ordinary circumstances?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this entire test covers something that is currently not possible. I OK with keeping it regardless but I suggest to add an explanatory comment why these are effectively dead code branches.

IIUC this is because of our scheduler run_id magic, isn't it?

if this is the intentino of this test, I also suggest a different name. this is not necessarily about lifecycle. I consider lifecycle as creating state, using it, cleaning it up, etc.
This is more about staleness and internal consistency checks, isn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I'll add some comments and adjust the naming. I am not aware of any case where this would happen, but it's nice to know that this case is caught should it ever arise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. This is just a bit of a code smell and suggests that our architecture is not great, yet since this is effectively a strong abstraction leak. Anyhow, If that's the worst of it, we're still in good shape :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be precise, https://github.com/dask/distributed/pull/7486/files/43e0df2220c29e1f23e878a14cb9acc462fba98a#diff-6702746f1047e0898ef00b7b3af565f9ce5eb7b4200ec521d8c0db92f3bafd93R1498-R1499

is the only test case that should not happen. The test generally checks invariants that the implementation heavily relies upon, which is why I think that they warrant a dedicated test.

I should make it more clear though that this test checks internals only relevant to developers working on the P2P shuffle implementation, not to users of it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added more documentation.

@fjetter
Copy link
Member

fjetter commented Jan 27, 2023

One or two test adjustments and this is in. Thanks for the hard work @hendrikmakait !

@fjetter
Copy link
Member

fjetter commented Jan 27, 2023

I believe the following tickets can be closed with this as well, can't they @hendrikmakait ?

@hendrikmakait
Copy link
Member Author

@fjetter: test_delete_some_results, a test mentioned in #7352 failed for an unknown cause on CI. I had planned to test and unskip it in a dedicated PR, which would then close both of the tickets you mention above. I think this should be good, then again, I have no idea why test_delete_some_results failed in the first place.

@fjetter
Copy link
Member

fjetter commented Jan 27, 2023

Oh, this is interesting (but unrelated)

https://github.com/dask/distributed/actions/runs/4026033623/jobs/6920058827

  File "D:\a\distributed\distributed\distributed\scheduler.py", line 4617, in stimulus_queue_slots_maybe_opened

    assert qts.state == "queued", qts.state

AssertionError: forgotten

This is distributed/tests/test_client.py::test_threadsafe_get

#7504

@fjetter
Copy link
Member

fjetter commented Jan 27, 2023

@hendrikmakait there is a possibly related failure in distributed/dashboard/tests/test_scheduler_bokeh.py::test_shuffling

https://github.com/dask/distributed/actions/runs/4026033623/jobs/6920058644

Traceback (most recent call last):
  File "d:\a\distributed\distributed\distributed\shuffle\_comms.py", line 70, in _process
    await self.send(address, shards)
  File "d:\a\distributed\distributed\distributed\shuffle\_worker_extension.py", line 179, in send
    return await self.rpc(address).shuffle_receive(
  File "d:\a\distributed\distributed\distributed\core.py", line 1221, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "d:\a\distributed\distributed\distributed\core.py", line 1011, in send_recv
    raise exc.with_traceback(tb)
  File "d:\a\distributed\distributed\distributed\core.py", line 820, in _handle_comm
    result = await result
  File "d:\a\distributed\distributed\distributed\shuffle\_worker_extension.py", line 384, in shuffle_receive
    shuffle = await self._get_shuffle_run(shuffle_id, run_id)
  File "d:\a\distributed\distributed\distributed\shuffle\_worker_extension.py", line 475, in _get_shuffle_run
    shuffle = await self._refresh_shuffle(
  File "d:\a\distributed\distributed\distributed\shuffle\_worker_extension.py", line 551, in _refresh_shuffle
    result = await self.worker.scheduler.shuffle_get(
  File "d:\a\distributed\distributed\distributed\core.py", line 1221, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "d:\a\distributed\distributed\distributed\core.py", line 1011, in send_recv
    raise exc.with_traceback(tb)
  File "d:\a\distributed\distributed\distributed\core.py", line 818, in _handle_comm
    result = handler(**msg)
  File "d:\a\distributed\distributed\distributed\shuffle\_scheduler_extension.py", line 83, in get
    assert schema is not None

@hendrikmakait
Copy link
Member Author

@hendrikmakait there is a possibly related failure in distributed/dashboard/tests/test_scheduler_bokeh.py::test_shuffling

https://github.com/dask/distributed/actions/runs/4026033623/jobs/6920058644

Thanks, @fjetter. I could reproduce the test being flaky locally and it should be resolved by awaiting the result to avoid leaking threads.

@fjetter
Copy link
Member

fjetter commented Jan 27, 2023

FYI #7505 for the linting issue. my bad

{"shuffle": BlockedShuffleReceiveShuffleWorkerExtension},
)
@gen_cluster(client=True, nthreads=[("", 1)] * 2)
async def test_deduplicate_stale_transfer(c, s, a, b, wait_until_forgotten):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI this is the issue we ran into earlier dask/dask#9888

@fjetter fjetter merged commit 1c6fb84 into dask:main Jan 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

P2P cannot execute a shuffle twice Data duplication in P2P shuffling
2 participants