Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P shuffle skeleton #5520

Merged
merged 25 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5a1f00e
Skeleton and stubs for ShuffleExtension
gjoseph92 Nov 2, 2021
a46246c
Start filling in skeleton
gjoseph92 Nov 2, 2021
3d4bbf3
Simplest possible shuffle done, ready to try out
gjoseph92 Nov 2, 2021
433ce17
Separate NewShuffleMetadata and ShuffleMetadata
gjoseph92 Nov 9, 2021
743215e
Beginning of tests, get imports working
gjoseph92 Nov 9, 2021
6f0b531
Switch to async interfaces for async tests
gjoseph92 Nov 10, 2021
4a4f430
test add_partition
gjoseph92 Nov 10, 2021
c39309c
test_get_partition
gjoseph92 Nov 10, 2021
432d149
rename to `p2p`
gjoseph92 Nov 11, 2021
cad4717
set worker restrictions
gjoseph92 Nov 11, 2021
74ddf00
Get output partition ranges/counts per worker
gjoseph92 Nov 11, 2021
a998141
Barrier for workers that hold no output partitions
gjoseph92 Nov 11, 2021
c480684
ensure consistent graph
gjoseph92 Nov 11, 2021
3fadb0a
Ignore race condition removing shuffle
gjoseph92 Nov 11, 2021
10c0c64
comments
gjoseph92 Nov 12, 2021
6de77db
[draft] Blockwise for unpack layer
gjoseph92 Nov 16, 2021
0e403ec
Basic end-to-end tests for the graph
gjoseph92 Nov 16, 2021
9b9a68b
Respond to comments
gjoseph92 Nov 19, 2021
20a2ce4
Remove `add_handler`
gjoseph92 Nov 19, 2021
506dda3
Better error when not running on worker
gjoseph92 Nov 19, 2021
c8a2f7a
Private `partition_range`
gjoseph92 Dec 7, 2021
20c9312
Use `scheduler.identity()` RPC directly
gjoseph92 Dec 7, 2021
9f4304d
Use `scheduler=client` from dask/dask#8559
gjoseph92 Jan 12, 2022
ab2c1d1
Merge remote-tracking branch 'upstream/main' into p2p-shuffle/skeleton
gjoseph92 Jan 12, 2022
c6fc157
Merge remote-tracking branch 'upstream/main' into p2p-shuffle/skeleton
gjoseph92 Jan 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions distributed/shuffle/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
try:
import pandas
except ImportError:
SHUFFLE_AVAILABLE = False
else:
del pandas
SHUFFLE_AVAILABLE = True

from .shuffle import rearrange_by_column_p2p
from .shuffle_extension import ShuffleId, ShuffleMetadata, ShuffleWorkerExtension

__all__ = [
"SHUFFLE_AVAILABLE",
"rearrange_by_column_p2p",
"ShuffleId",
"ShuffleMetadata",
"ShuffleWorkerExtension",
]
111 changes: 111 additions & 0 deletions distributed/shuffle/shuffle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from dask.base import tokenize
from dask.dataframe import DataFrame
from dask.delayed import Delayed, delayed
from dask.highlevelgraph import HighLevelGraph

from .shuffle_extension import NewShuffleMetadata, ShuffleId, ShuffleWorkerExtension

if TYPE_CHECKING:
import pandas as pd


def get_ext() -> ShuffleWorkerExtension:
from distributed import get_worker

try:
worker = get_worker()
except ValueError as e:
raise RuntimeError(
"`shuffle='p2p'` requires Dask's distributed scheduler. This task is not running on a Worker; "
"please confirm that you've created a distributed Client and are submitting this computation through it."
) from e
extension: ShuffleWorkerExtension | None = worker.extensions.get("shuffle")
if not extension:
raise RuntimeError(
f"The worker {worker.address} does not have a ShuffleExtension. "
"Is pandas installed on the worker?"
)
return extension


def shuffle_setup(metadata: NewShuffleMetadata) -> None:
get_ext().create_shuffle(metadata)


def shuffle_transfer(input: pd.DataFrame, id: ShuffleId, setup=None) -> None:
get_ext().add_partition(input, id)


def shuffle_unpack(id: ShuffleId, output_partition: int, barrier=None) -> pd.DataFrame:
return get_ext().get_output_partition(id, output_partition)


def shuffle_barrier(id: ShuffleId, transfers: list[None]) -> None:
get_ext().barrier(id)


def rearrange_by_column_p2p(
df: DataFrame,
column: str,
npartitions: int | None = None,
):
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved
npartitions = npartitions or df.npartitions
token = tokenize(df, column, npartitions)

setup = delayed(shuffle_setup, pure=True)(
NewShuffleMetadata(
ShuffleId(token),
df._meta,
column,
npartitions,
)
)

transferred = df.map_partitions(
shuffle_transfer,
token,
setup,
meta=df,
enforce_metadata=False,
transform_divisions=False,
)

barrier_key = "shuffle-barrier-" + token
barrier_dsk = {barrier_key: (shuffle_barrier, token, transferred.__dask_keys__())}
barrier = Delayed(
barrier_key,
HighLevelGraph.from_collections(
barrier_key, barrier_dsk, dependencies=[transferred]
),
)

name = "shuffle-unpack-" + token
dsk = {
(name, i): (shuffle_unpack, token, i, barrier_key) for i in range(npartitions)
}
# TODO use this blockwise (https://github.com/coiled/oss-engineering/issues/49)
# Changes task names, so breaks setting worker restrictions at the moment.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, neither of these PRs solves this problems, do they? Do we have an idea how to solve this problem down the road? What would need to change to enable this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#5524 solves it (that's one of the main points). The scheduler plugin handles the unpredictable task names by traversing the graph from the barrier task and parsing the key names of its dependents. That's what the discussion in https://github.com/dask/distributed/pull/5524/files#r765203083 is about.

Do we have an idea how to solve this problem down the road?

I think the idea we have right now is something like:

with dask.annotate(shuffle_id=token, output_partition={(name, i): i for i in range(npartitions)}, fuse="propagate"):
    dsk = blockwise(...)
# or
with dask.annotate(shuffle_id=token, resources={(name, i): f"shuffle-{token}-{i}" for i in range(npartitions)}, fuse="propagate"):
    dsk = blockwise(...)

where we add annotations to the graph explicitly marking which output partition number each task is. The fuse="propagate" would be respected by all fusion optimizations (dask/dask#7036 is a prerequisite), so it would carry annotations from this layer over to any final fused layer, including rewriting the keys as necessary.

And the only thing these annotations gain us is not having to parse the keys of tasks that depend on the barrier, to pull out that i component (instead, we'd look at the task's annotations). And as ugly as key-parsing feels, the only case where it'll actually break in practice is if you convert to a non-DataFrame collection (namely Delayed, but Array might cause problems too) and graph optimizations support fusion across that boundary (at the moment, not sure if this can happen).

Basically key-parsing would be fine for 95% of situations; the annotations would only solve some edge cases and make it feel more proper.

# Also maybe would be nice if the `DataFrameIOLayer` interface supported this?
# dsk = blockwise(
# shuffle_unpack,
# name,
# "i",
# token,
# None,
# BlockwiseDepDict({(i,): i for i in range(npartitions)}),
# "i",
# barrier_key,
# None,
# numblocks={},
# )
Comment on lines +92 to +104
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an earlier offline conversation, I mentioned that I am doubtful whether or not this should actually be considered a blockwise operation. My line of argumentation is mostly about the fact that the individual pieces are not independently calculable. This blockwise definition you propose itself also doesn't really look healthy in my opinion since we're encoding the BlockwiseDepDict to define the output partitions even though we're, in fact, not dealing with an IO layer here. It seems we're using an extremely complex machinery here even though it is not necessarily required.

The most important reason why we'd want to use this blockwise layer here is to enable downstream task fusion. imho, this should not require us to define this layer as blockwise but rather subsequent blockwise operations should care and be capable to fuse with with this. We may need to teach it this first and in the intermediate term we can use blockwise. My point is, I consider blockwise as rather the shortcut and not the as final solution.

Regardless of how the fusion is supposed to work, I am doubtful about whether or not this is even required. To my understanding fusion is extremely important for the input/transfer layer since it needs to be fused with whatever the input is. E.g. if we're starting off with a read_parquet, this should be fused with the transfer task. This is the important root task overproduction scenario since we do not read 100 files before we engage in any transfer.
However, on the output/unpack side, the situation is not as clear to me. At the very least it's not as urgent. Once we reach unpack all data is on-cluster. As long as we do not implement data spilling, the data is even in-memory. As long as all data is in memory, this should be mostly irrelevant. Delaying this further down the road would be fine for me since I could think of other ways of dealing with this problem than forcing fusion (e.g. the worker plugin/extension could enforce a limit on how many tasks are allowed to be processed/be in memory at the same time).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as all data is in memory, this should be mostly irrelevant. Delaying this further down the road would be fine for me since I could think of other ways of dealing with this problem than forcing fusion

That's fair. You're right that if the data is all in memory anyway, overproduction is irrelevant. And there are probably other hacks we can do to protect against it on the unpack side.

I think making a dedicated layer is the best approach here.


return DataFrame(
HighLevelGraph.from_collections(name, dsk, [barrier]),
name,
df._meta,
[None] * (npartitions + 1),
)
Loading