Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P shuffle skeleton #5520

Merged
merged 25 commits into from
Jan 24, 2022
Merged

P2P shuffle skeleton #5520

merged 25 commits into from
Jan 24, 2022

Conversation

gjoseph92
Copy link
Collaborator

Somewhat implementing step one of the design in #5435.

This skeleton is just intended to establish a framework for how the out-of-band shuffle will interact with dask (both with the workers, and with the task graph). The shuffle itself is intentionally not performant at all, but is correct.

Note that this does not implement the scheduler extension in mentioned in the design doc yet. This requires a more substantial refactor than expected.

cc @fjetter

  • Tests added / passed
  • Passes pre-commit run --all-files

Note that many of these methods will probably become async as we add buffering, disk, etc.
This is a go-style approach with struct composition and a gRPC-like "CreateThingRequest", but it's nice to note that they're separate things
Having `sync` within functions called by an async test deadlocked. Rather than figure out how to make `sync` not do this, we just have fully async functions now, and then public APIs for calling from threads that just wrap them in `sync`. This is not quite idea, but the async/sync boundaries are going to change so much anyway that it's not worth investing much time into yet.
I _think_ this is safe, though it feels odd not to be locking. However, bytecode-level ops on dicts are atomic, I don't think we need to lock `self.shuffles` yet. Most of the places where race conditions feel most likely are in comms handlers, which are already protected by being called serially from the event loop. There could be a race condition around a thread deleting a shuffle in `get_output_partition` while the event loop is recreating the same shuffle ID in `create_shuffle`. However, this _should_ be impossible so long as the `shuffle_setup` task only runs once. _Assuming a shuffle is always submitted as the full graph_, and clients don't persist any of the intermediate keys, then I think resubmitting the same shuffle while one was still running would make the scheduler just tack onto the already-existing keys.

Though if you were halfway through unpacks, and then resubmitted the same shuffle, it would have to run again since some unpack results might have already been released. So you could have `create` run again while that same shuffle is already running.

This is maybe an argument for impure graph keys and unstable shuffle IDs. But it's a messed-up case, so maybe we should just lock `shuffles`? What we're worried about is:
- thread 1 gets shuffle & requests partition
- thread 2 gets shuffle & requests partition
- thread 1 finishes, sees it's done, deletes it
- `create_shuffle` runs, shuffle doesn't exist, re-creates it
- thread 2 finishes, sees it's done, deletes it (the new shuffle just created)
So locking would prevent that. But so would reusing IDs.
This works, but changes task names, so setting worker restrictions fails. Will be easier once we have the scheduler plugin.
https://github.com/coiled/oss-engineering/issues/49
distributed/shuffle/shuffle_extension.py Outdated Show resolved Hide resolved
distributed/shuffle/shuffle_extension.py Outdated Show resolved Hide resolved
), f"Worker client is not using the worker's event loop: {client.loop} vs {self.worker.loop}"
# NOTE: `Client.scheduler_info()` doesn't actually do anything when the client is async,
# manually call into the method for now.
await client._update_scheduler_info()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the implementation of Client.scheduler_info. It's a no-op if the client is asynchronous:

if not self.asynchronous:
self.sync(self._update_scheduler_info)
return self._scheduler_identity

Maybe I'm not understanding why. I know scheduler info is updated automatically every second:

self._periodic_callbacks["scheduler-info"] = PeriodicCallback(
self._update_scheduler_info, scheduler_info_interval * 1000
)

but I don't know why asking for the freshest possible info would be a no-op only with an async client.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so you're updating the client scheduler identity to get the workers? I think I would prefer us just calling the identity ourself with await self.scheduler.identity()
The existing implementation wraps everything with an exception handler we don't want to have here and wraps everything in an HTML repr class we also don't want to have. I don't mind one more call, that's not going to change a lot but the code is less entangled and easier to read

i = self.workers.index(worker)
first = math.ceil(self.npartitions * i / len(self.workers))
last = math.ceil(self.npartitions * (i + 1) / len(self.workers)) - 1
return first, last
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is it possible that this is a sorted range?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean? Do you think this test of it is missing something?

counter = Counter(assignments)
assert len(counter) == min(npartitions, n_workers)
# Test `npartitions_for`
calculated_counter = {w: metadata.npartitions_for(w) for w in metadata.workers}
assert counter == {
w: count for w, count in calculated_counter.items() if count != 0
}
assert calculated_counter.keys() == set(metadata.workers)
# ^ this also checks that workers receiving 0 output partitions were calculated properly

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm mostly wondering why a worker has a continuous range of IDs. I guess this connects to how we calculate the hash and partition ID. Can you put that information. Is it documented somewhere how all of these things connect and which ID is which and how they are calcualted, etc.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm mostly wondering why a worker has a continuous range of IDs.

That's an implementation detail of the binning arithmetic (// instead of %). I'll make this method private. It should not be required, nor relied on as part of the shuffle interface, that workers receive consecutive output partitions (though it is good for scheduling). This method just made it more readable and intuitive to implement npartitions_for, which is needed.

gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request Nov 19, 2021
Whenver I forget to switch to dask#5520, the errors are confusing.
gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request Nov 19, 2021
@fjetter fjetter self-requested a review December 10, 2021 10:28
Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with the extension implementation but am wondering about the graph. let's discuss this and we can move on. Likely this all should be a follow up anyhow

distributed/shuffle/shuffle.py Show resolved Hide resolved
(name, i): (shuffle_unpack, token, i, barrier_key) for i in range(npartitions)
}
# TODO use this blockwise (https://github.com/coiled/oss-engineering/issues/49)
# Changes task names, so breaks setting worker restrictions at the moment.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, neither of these PRs solves this problems, do they? Do we have an idea how to solve this problem down the road? What would need to change to enable this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#5524 solves it (that's one of the main points). The scheduler plugin handles the unpredictable task names by traversing the graph from the barrier task and parsing the key names of its dependents. That's what the discussion in https://github.com/dask/distributed/pull/5524/files#r765203083 is about.

Do we have an idea how to solve this problem down the road?

I think the idea we have right now is something like:

with dask.annotate(shuffle_id=token, output_partition={(name, i): i for i in range(npartitions)}, fuse="propagate"):
    dsk = blockwise(...)
# or
with dask.annotate(shuffle_id=token, resources={(name, i): f"shuffle-{token}-{i}" for i in range(npartitions)}, fuse="propagate"):
    dsk = blockwise(...)

where we add annotations to the graph explicitly marking which output partition number each task is. The fuse="propagate" would be respected by all fusion optimizations (dask/dask#7036 is a prerequisite), so it would carry annotations from this layer over to any final fused layer, including rewriting the keys as necessary.

And the only thing these annotations gain us is not having to parse the keys of tasks that depend on the barrier, to pull out that i component (instead, we'd look at the task's annotations). And as ugly as key-parsing feels, the only case where it'll actually break in practice is if you convert to a non-DataFrame collection (namely Delayed, but Array might cause problems too) and graph optimizations support fusion across that boundary (at the moment, not sure if this can happen).

Basically key-parsing would be fine for 95% of situations; the annotations would only solve some edge cases and make it feel more proper.

# workers will be flushing buffers to each other.
name = "shuffle-unpack-" + metadata.id # TODO single-source task name

# FIXME TODO XXX what about when culling means not all of the output tasks actually exist??!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what to do about the additional transfer but the cleanup/output task pruning could be handled here by accessing

worker = get_worker()
key = worker.get_current_task()
ts = worker.tasks[key]
# assert ts == barrier
ts.dependents

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume knowing which dependents belongs to which ID boils down to the same problem we have on the scheduler plugin side, wouldn't it? The only way to properly assign the subset of dependents would be to parse they key for it's ID? Is there any other method to do this properly upfront during setup?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a very thorough understanding of the HLG and Layers classes but just looking over the API/signatures I would assume we were able to wrap the entire shuffle graph into a single layer and handle the culling there. That would allow us to mutate transfer and barrier tasks solving both problems.

class Layer:
    def cull(...):
        ...

https://github.com/dask/dask/blob/ecddc8df5869679cef5280c869674cbf0d9f60d9/dask/highlevelgraph.py#L112-L156

Similarly, if we rather wanted to use resources, annotations or other attributes, the layer materialization would be an ideal place where we could set all of this, couldnt' we? We don't need any further infrastructure for this, do we?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cleanup/output task pruning could be handled here by accessing worker.tasks[key].dependents

Maybe I don't know worker state well enough, but I didn't think the scheduler sent workers the full list of dependents for a task. Don't workers only fill in dependents based on the dependencies of other tasks they've been told about? And since none of the dependents of the barrier can run until the barrier completes, the scheduler won't have sent those tasks to any workers, so I'd assume dependents is empty at this moment.

If this isn't true, and workers have a wider view of the graph than just the tasks they've been told to run, then that would change things a lot.

The only way to properly assign the subset of dependents would be to parse they key for it's ID?

You need to know which output partition number that task is creating. By definition for DataFrames, the output partition number is always the i part of the key (name, i). So that's what we parse out of the key.

Is there any other method to do this properly upfront during setup?

You could annotate each task with the output partition number, as mentioned.

You could maybe, very sketchily, assume something about the ordering/priorities of the keys instead of parsing them. Obviously just sorting by key names would work in most cases, because the keys all have the i component, but that's really no different from parsing. I think maybe enumerate(sorted(output_tasks, key=lambda ts: ts.priority)) would happen to give you the output partition number correctly simply because dask.order uses the ordering of key names to break ties—so again, you're still using the information in the key names.

Going fully mad, you could deserialize and inspect the task itself, traversing through all the layers of SubraphCallables until you found a call to unpack, and look at the i argument passed into it.

The bottom line is that in dask/dask, key names are not opaque. In fact, they're the place where this information (the partition number) is defined. So it's not surprising that most things come back to key names.

Copy link
Collaborator Author

@gjoseph92 gjoseph92 Dec 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap the entire shuffle graph into a single layer and handle the culling there

This is a reasonable idea, and I think how HLGs were intended work. There are already special layers for task-based shuffles; it makes sense there'd be one for this as well. And yeah, this would solve a lot of our problems around culling, annotations, etc. I avoided doing this initially because the whole thing is easily expressible just with Blockwise, and the Layer API is so ill-specified and a pain to work with.

The problem with this approach is that we'd lose fusion, because it wouldn't be a Blockwise layer anymore, so optimize_blockwise wouldn't know how to fuse it. And then that would expose us to root task overproduction for both the inputs to the shuffle and the outputs from it. We'd want to write our own optimization step for this layer type to fuse it to Blockwise layers (big pain), or extend optimize_blockwise to know about this layer (odd), or somehow make it a subclass of Blockwise (possibly odd and tricky, since Blockwise is very much just one logical layer, and this would be two Blockwises plus a materialized layer in between).


Also, I hope this illustrates just how much we have to bend ourselves in knots because the system can't schedule linear chains of tasks properly (#5555).

If we had STA (or any other solution to this problem), I don't think we'd even be considering this scheduler plugin approach. We'd just write a simple Layer here with a cull method, not worry about fusion, not worry about unstable task names or parsing keys, not worry about propagating annotations, not worry about triangle fusion, and do all the necessary logic within tasks, with a couple RPCs to the scheduler to get peers and set restrictions/resources.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think most of what we're still discussing evolves around how we define the graph and how it can be further modified down the line (culling and optimizing/fusing). My current best feeling is that we should deal with this as a dedicated layer implementation which would be a nice follow up item, wouldn't it?

If you agree with this approach, I'm fine with merging

Comment on lines +92 to +104
# Also maybe would be nice if the `DataFrameIOLayer` interface supported this?
# dsk = blockwise(
# shuffle_unpack,
# name,
# "i",
# token,
# None,
# BlockwiseDepDict({(i,): i for i in range(npartitions)}),
# "i",
# barrier_key,
# None,
# numblocks={},
# )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an earlier offline conversation, I mentioned that I am doubtful whether or not this should actually be considered a blockwise operation. My line of argumentation is mostly about the fact that the individual pieces are not independently calculable. This blockwise definition you propose itself also doesn't really look healthy in my opinion since we're encoding the BlockwiseDepDict to define the output partitions even though we're, in fact, not dealing with an IO layer here. It seems we're using an extremely complex machinery here even though it is not necessarily required.

The most important reason why we'd want to use this blockwise layer here is to enable downstream task fusion. imho, this should not require us to define this layer as blockwise but rather subsequent blockwise operations should care and be capable to fuse with with this. We may need to teach it this first and in the intermediate term we can use blockwise. My point is, I consider blockwise as rather the shortcut and not the as final solution.

Regardless of how the fusion is supposed to work, I am doubtful about whether or not this is even required. To my understanding fusion is extremely important for the input/transfer layer since it needs to be fused with whatever the input is. E.g. if we're starting off with a read_parquet, this should be fused with the transfer task. This is the important root task overproduction scenario since we do not read 100 files before we engage in any transfer.
However, on the output/unpack side, the situation is not as clear to me. At the very least it's not as urgent. Once we reach unpack all data is on-cluster. As long as we do not implement data spilling, the data is even in-memory. As long as all data is in memory, this should be mostly irrelevant. Delaying this further down the road would be fine for me since I could think of other ways of dealing with this problem than forcing fusion (e.g. the worker plugin/extension could enforce a limit on how many tasks are allowed to be processed/be in memory at the same time).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as all data is in memory, this should be mostly irrelevant. Delaying this further down the road would be fine for me since I could think of other ways of dealing with this problem than forcing fusion

That's fair. You're right that if the data is all in memory anyway, overproduction is irrelevant. And there are probably other hacks we can do to protect against it on the unpack side.

I think making a dedicated layer is the best approach here.

@gjoseph92
Copy link
Collaborator Author

Yeah, I think we should make a layer for this. And yes, it can be a follow-up task. So let's merge this one instead of #5524.

However, DO NOT MERGE this until dask/dask#8392 is merged! Otherwise, distributed CI will fail.

@gjoseph92
Copy link
Collaborator Author

This now only requires dask/dask#8559 in order to merge.

@gjoseph92
Copy link
Collaborator Author

This is now unblocked.

@gjoseph92 gjoseph92 marked this pull request as ready for review January 18, 2022 17:05
@gjoseph92 gjoseph92 changed the title [DNM] P2P shuffle skeleton P2P shuffle skeleton Jan 18, 2022
@fjetter fjetter merged commit 8032046 into dask:main Jan 24, 2022
gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request Jan 24, 2022
In dask#5688, we discovered that dask#5520 was increasing worker unmanaged memory usage by ~10MiB at startup (on my mac). I suspect that this came from importing pandas.

The shuffle extension already worked if pandas wasn't available: it just wouldn't install itself on the worker. However, to check if pandas was available, it was importing pandas—so if pandas _was_ available, every worker would have to spend the time and memory to import it at startup, even if it wasn't used at all.

With this PR, all use of pandas is deferred. There's also now an explicit test that importing the shuffle does not import pandas.
@mrocklin mrocklin mentioned this pull request Mar 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants