Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] P2P shuffle skeleton - scheduler plugin #5524

Closed

Conversation

gjoseph92
Copy link
Collaborator

@gjoseph92 gjoseph92 commented Nov 18, 2021

Alternative to #5520: a peer-to-peer shuffle skeleton, based on a scheduler plugin to handle much of the synchronization.

There are some hacky things, but generally I think this shows more promise than #5520. Things this can do that the other PR cannot:

  • Wait to initialize a shuffle until it actually starts
  • Support blockwise fusion of the output tasks
  • Detect when input task culling has happened and raise an immediate error. There's also a path forward from here for actually supporting this culling

This also doesn't match the design in #5435—in reality, a bit needed to change to work with the scheduler driving things—but overall, I like this design more than #5520. It also feels a little easier to build off of.

This needs unit tests, especially for the more exciting logic around concurrent shuffles, waiting for the scheduler, etc. But in the basic tests, it seems to shuffle correctly, including sequential and concurrent shuffles, and also passes test_shuffle.py.

Since this design differs a little from #5435, here's a rough diagram of the flow:
scheduler-plugin-flow

cc @fjetter

  • Tests added / passed
  • Passes pre-commit run --all-files

@gjoseph92 gjoseph92 marked this pull request as draft November 18, 2021 09:29
@gjoseph92 gjoseph92 force-pushed the p2p-shuffle/skeleton-scheduler-plugin branch 2 times, most recently from 1df7a61 to b56d15d Compare November 18, 2021 21:45
Copy link
Collaborator Author

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beyond whether or not we want to use the scheduler plugin, I like the code in this more than #5520. Overall the style is a little simpler, which is a nicer starting point to build other things off of. #5520 feels slightly over-engineered in comparison, with the ShuffleExtension containing Shuffles containing ShuffleMetadatas.

The problem with sibling shuffles noted at the end of test_graph.py also will probably require more complex task-traversing logic on the scheduler to pick the right worker per shuffle. That makes me think we'll have a scheduler plugin regardless, whether it offers an RPC or sets the restrictions itself.

), f"Removed {id}, which still has data for output partitions {list(data)}"

async def get_shuffle(self, id: ShuffleId) -> ShuffleState:
"Get the `ShuffleState`, blocking until it's been received from the scheduler."
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjetter mentioned that this whole method and waiting_for_metadata may be unnecessary. Since messages from scheduler to workers remain ordered in BatchedSend (and TCP preserves ordering), we can probably count on the shuffle_init hitting the worker before the add_partition does, so long as we trust the transition logic of our plugin.

del self.output_keys[k]

def transition(self, key: str, start: str, finish: str, *args, **kwargs):
"Watch transitions for keys we care about"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having this run for every key is not ideal. I've tried to make it return as fast as possible for irrelevant keys.

# FIXME this feels very hacky/brittle.
# For example, after `df.set_index(...).to_delayed()`, you could create
# keys that don't have indices in them, and get fused (because they should!).
m = re.match(r"\(.+, (\d+)\)$", key)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my least favorite part of the implementation. Keys are supposed to be opaque to the scheduler (as far as I understand); we're inferring a lot of meaning from them.

Parsing the IDs out for transfer/barrier is okay, because we control those key names when we generate the graph. The downstream tasks could theoretically be named anything though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. perf nitpick
pattern = re.compile("<pattern>")  # global var

def foo(...):
    pattern.match(key)
  1. I might want to see a unit test for this. I generally don't trust regular expressions, even though this one looks straight forward...
  2. I don't feel comfortable with using such a logic at all. I don't think we should parse keys to infer logic. We do control them but this is not a well defined API and it is very hard to control whether or not keys are mutated at some point in time. The fusing you mentioned is the best example. Any kind of optimization or reordering would have the potential to break this contract easily and I think such systems should be disentangled

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I figured re caching would handle it, but sure
  2. Agreed a test would be good
  3. I don't like this logic at all. As noted though, I can't think of any other options right now to have shuffling still work in the face of output task fusion, since HLG annotations are somewhat broken.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also important to understand that this key-parsing isn't here because this PR uses a scheduler plugin and #5520 doesn't.

This parsing is here because this PR supports Blockwise fusion of output tasks, and the other doesn't. Whether or not we used a scheduler plugin, the only way I can come up with right now to support fusion of output tasks is to parse key names.

Annotations are the ideal, proper way to deal with this. But if you use annotations, then fusion won't happen, so it defeats the purpose. Therefore, if there's no way to attach additional information to the graph (and there's no way to embed information in the tasks themselves and have it transmitted at runtime to the scheduler, because by the time the task is running, it's already too late), the only information you have is the key names.

prefix, group, id = parts

if prefix == TASK_PREFIX:
if start == "waiting" and finish in ("processing", "memory"):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if start == "waiting" and finish in ("processing", "memory"):
if start == "waiting" and finish == "processing":

It should be impossible for the transfer task to go to memory before we've taken some action, since that task will block on us (the plugin) broadcasting shuffle info to all workers.

And if the barrier task is going to memory, that's bad news, because the dependents are about to run and we need to set restrictions on them.

I was just concerned that transition_waiting_memory exists. I'm not sure what triggers that case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If in doubt, I prefer raising in these situations. If it is a valid case, we should know about it. Either way, an explicit exception protects us from corrupted state

@gjoseph92 gjoseph92 mentioned this pull request Nov 19, 2021
2 tasks
All pass with dask/dask#8392. Rather crude;
needs unit testing.
Surprisingly, blockwise decides to merge the two output layers. This really throws things off. The test passes right now by disabling an aggressive assertion, but we need more robust validation here.
Whenver I forget to switch to dask#5520, the errors are confusing.
See dask#5524 (comment). Since messages from scheduler to workers remain ordered in `BatchedSend` (and TCP preserves ordering), we should be able to count on the `shuffle_init` always hitting the worker before the `add_partition` does, so long as we trust the transition logic of our plugin.
Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think hooking into the transition engine is fine. I'm not overly concerned about this but I'm not convinced about the other solutions/problems introduced here. Most notably how the worker restrictions are computed.

One different way coming to mind is that instead of worker restrictions, we could use resource restrictions. we could define every output partition as a unique resource. The shuffle init would then assign an output partition resource to every worker. The scheduler heuritics would then take care of the rest.

class ShuffleScheduler:
    def transfer(self, id, key):
        # Shuffle init part
        shuffle_ix = 0
        while shuffle_ix < npartition_out:
            for w in self.scheduler.workers:
                scheduler.add_resources(
                    worker=w,
                    resources={f"shuffle-{shuffle_ix}": 1},
                )
                shuffle_ix += 1
                if shuffle_ix == npartition_out:
                break
        

This would require us to encode this resource during graph construction but the total number of output partitions is known during graph construction so I don't see a conceptional problem, maybe a technical one, though. I'm not sure if we can assign resources in the blockwise layer. If not, adding this feature to to blockwise or dropping blockwise might be an option (is this actually a blockwise operation or are we just abusing this for some optimization hack??)

either way, arguing about resources and arguing about off-the-band output partitions feels semantically well aligned. Even when talking about task fusion, I would argue that whatever semantics should be true for resource should as well be true for the unpack tasks.

thoughts?

# TODO if these checks fail, we need to error the task!
# Otherwise it'll still run, and maybe even succeed, but just produce wrong data?

dts._worker_restrictions = restrictions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a scheduler API for this, isn't there?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is, but it's a little bit more overhead when we already have the TaskState here:

def set_restrictions(self, comm=None, worker=None):
ts: TaskState
for key, restrictions in worker.items():
ts = self.tasks[key]
if isinstance(restrictions, str):
restrictions = {restrictions}
ts._worker_restrictions = set(restrictions)


class ShuffleSchedulerPlugin(SchedulerPlugin):
name: ClassVar[str] = "ShuffleSchedulerPlugin"
output_keys: dict[str, ShuffleId]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this rather be something like dict[ShuffleId, Set[str]]?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See:

# Task completed
if start in ("waiting", "processing") and finish in (
"memory",
"released",
"erred",
):
try:
id = self.output_keys[key]
except KeyError:
return

This lets us check which shuffle (if any) a given key is a part of.

Also note that this whole thing isn't actually necessary for the proper operation of a shuffle:

# Check if all output keys are done
# NOTE: we don't actually need this `unpack` step or tracking output keys;
# we could just delete the state in `barrier`.
# But we do it so we can detect duplicate shuffles, where a `transfer` task
# tries to reuse a shuffle ID that we're unpacking.
# (It does also allow us to clean up worker restrictions on error)

I added it just for sanity checking and validation right now.

return addr


def parse_key(key: str) -> list[str] | None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe premat optimization but I guess this should have at least an LRU cache

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that idea, but this is going to run on every single transition, so I think that cache would get blown out. We could implement an internal LRU cache just for positives I guess.


def transition(self, key: str, start: str, finish: str, *args, **kwargs):
"Watch transitions for keys we care about"
parts = parse_key(key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like us parsing the key to do this logic. I would likely prefer us using a task attribute or similar

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather even consider us adopting how tasks are submitted to the scheduler. I'm not too familar with HLG but shouldn't it be somehow "easy" to tell the scheduler what keys should be considered for this? isn't there some way during unpacking to let the scheduler know that the keys are "special" such that we just keep a set of "shuffle keys" on board instead of parsing them

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotations seem like the mechanism for this. Unfortunately there are two blockers to using them:

So yes, it should be easy, but it will require a bit of fixing and changing blockwise for it to actually be easy.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also thinking about getting rid of this transition-watching logic entirely, and having transfer tasks call an RPC on the scheduler to register themselves and get the list of peer workers (which would then also cause the scheduler to broadcast a message to all workers telling them the shuffle has started). This would eliminate the need for parse_key entirely. The only keys we'd have to parse are the output keys (for reasons mentioned in https://github.com/dask/distributed/pull/5524/files#r765203083). That would be more of a hybrid approach with #5520.

prefix, group, id = parts

if prefix == TASK_PREFIX:
if start == "waiting" and finish in ("processing", "memory"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If in doubt, I prefer raising in these situations. If it is a valid case, we should know about it. Either way, an explicit exception protects us from corrupted state

return self.erred(ShuffleId(id), key)

# Task completed
if start in ("waiting", "processing") and finish in (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition feels brittle. What motivates the selective start states?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking the start state is probably unnecessary. All we really care about is that the key is in a terminal state.

# FIXME this feels very hacky/brittle.
# For example, after `df.set_index(...).to_delayed()`, you could create
# keys that don't have indices in them, and get fused (because they should!).
m = re.match(r"\(.+, (\d+)\)$", key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. perf nitpick
pattern = re.compile("<pattern>")  # global var

def foo(...):
    pattern.match(key)
  1. I might want to see a unit test for this. I generally don't trust regular expressions, even though this one looks straight forward...
  2. I don't feel comfortable with using such a logic at all. I don't think we should parse keys to infer logic. We do control them but this is not a well defined API and it is very hard to control whether or not keys are mutated at some point in time. The fusing you mentioned is the best example. Any kind of optimization or reordering would have the potential to break this contract easily and I think such systems should be disentangled

@gjoseph92
Copy link
Collaborator Author

One different way coming to mind is that instead of worker restrictions, we could use resource restrictions.

I like the spirit of this, because you're embedding some information at graph-generation time that lets you identify the partition number of an output task through something else than parsing its key. However, as noted in https://github.com/dask/distributed/pull/5524/files#r765203083 and #5524 (comment) this isn't possible right now:

Whether we use resource restriction annotations, or some custom shuffle annotation that then gets translated into worker restrictions, it's basically the same thing. (The scheduling mechanism for worker restrictions is slightly simpler than for resource restrictions, so I'd prefer sticking with that path.) It's all just a way of passing along auxiliary data so we have more information to go off of than just the key names.

is this actually a blockwise operation or are we just abusing this for some optimization hack??

This is definitely a proper blockwise optimization. And we really, really want subsequent blockwise optimizations to fuse onto it, otherwise we get root-task overproduction.

I would argue that whatever semantics should be true for resource should as well be true for the unpack tasks.

I don't think so. Consider

df = dd.read_parquet(...)
df_pre = df.map_partitions(preprocess)
with dask.annotate(resources={"GPU": 1}):
    inferred = df_pre.map_partitions(run_ml_model)
df_post = inferred.map_partitions(post_process)

Without the resource annotations, this whole thing would Blockwise-fuse into one layer (read_parquet-preprocess-run_ml_model-post_process). But with resource restrictions, you don't want that to happen—you want the CPU tasks to run on all workers, then send their data to the few GPU workers for inference, which then send their data back out.

But our case would use resources in basically the same way:

transfer = df.map_partitions(transfer)
barrier = delayed(barrier)(transfer)
with dask.annotate(resources={f"shuffle-{id}-{i}": 1 for i in range(transfer.npartitions)}):
    unpack = dd.map_partitions(unpack, BlockwiseRange(transfer.npartitions), barrier)

downstream = unpack.map_partitions(user_code)

However, we do want user_code to get fused with unpack. They both use resource restrictions, but in the real case we want the tasks to remain separate, and in our case we want them to fuse, and for the resource restrictions to propagate to the fused task.

I think we'll end up needing something like dask.annotate(**annotations: dict, fuse: Literal["prohibit" | "propagage" | "drop"] = "prohibit"), so you can explicitly control how fusion should apply to that annotation.

@fjetter
Copy link
Member

fjetter commented Dec 9, 2021

Thanks for your replies.

I don't think what we would like to do in this PR is actually very special. I could see similar applications down the road that require us to slightly change scheduling logic for specific keys. Therefore I think this PR is a nice example and it should be considered as a requirement for future iterations of the HLE/HLG/annotation engine.

I'll have another peak at the other PR shortly but right now it feels like we should postpone the scheduler plugin until we have a more robust annotation / HLG / HLE engine.

@gjoseph92
Copy link
Collaborator Author

gjoseph92 commented Dec 9, 2021

we should postpone the scheduler plugin until we have a more robust annotation / HLG / HLE engine

I think what you're really talking about is postponing handling of Blockwise fusion of the output tasks, since handling fused output tasks is what requires us to parse keys right now, and parsing keys is what makes it more reasonable to use a scheduler plugin, instead of RPCs to the scheduler from within tasks like the other PR (because the scheduler would have to send a bunch of key names to the task, which would then parse them and send them back to the scheduler—might as well just do that all on the scheduler and save the communication).

I get this from a simplicity perspective, but I disagree for a few reasons:

  1. When we have fixed the annotation machinery, so we can stop parsing keys and instead check annotations, all those annotations will still live on the scheduler. We will still want worker restrictions to be computed on the scheduler—whether that happens via RPC or via transition-watching is not very important.

    Annotations are just an implementation detail. Specifically, it only changes how ShuffleSchedulerPlugin.worker_for_key and maybe ShuffleSchedulerPlugin.parse_key work internally—everything else could look the same as this PR.

    The overall architecture, in terms of where the necessary data lives, is no different: the complete set of names of the tasks, and their annotations, only lives on the scheduler. So things which need that should run on the scheduler.

  2. The P2P shuffle is way more useful in practice if it supports output-task fusion. Root-task overproduction will affect any real-world workload using the shuffle, even just df.set_index("foo").to_parquet().

    Root task overproduction will make it harder for us to test and benchmark our improvements. It also may make it harder to get feedback from users, and less useful to users, because "it ran out of memory" will probably just be overproduction.

    If we should postpone the scheduler plugin until we have a more robust annotation engine, should we also postpone it until we have Speculatively assign tasks to workers #3974, so root task overproduction isn't a problem, and we don't need to fuse? I would guess not. So in order to move forward without STA, we have to have output-task fusion. To have output-task fusion, we have to run logic on the scheduler to identify output task numbers, whether from annotations or from parsing keys.

  3. Dealing with the "triangle" blockwise fusion that occurs during a merge is probably the very first thing that has to happen after one of these PRs gets merged. I don't see a way to do that without logic that runs on the scheduler.

(Also, the logic in the other PR is even more egregious than parsing keys, because it just generates the key names it expects based on the hardcoded name of the unpack function. This is highly brittle—just setting optimization.fuse.active: True will break the other PR. And with the other PR, it's not a simple change to handle that: you'll move the restriction-setting logic to the scheduler, so that you can parse key names... and you'll end up halfway to this PR.)

Another option

To simplify this PR and bring it closer to the original design, we could drop the transition-watching logic, and instead have tasks call RPCs on the scheduler. The transition-watching is really just an optimization to reduce communication, and probably an unnecessary one. I'd be happy to try that out. It would be a pretty quick change. Doing this is even more awkward IMO.

@gjoseph92
Copy link
Collaborator Author

All this said, I could still see the rationale in merging the other PR first, then making this a new PR onto that (though the diff would be very hard to read) in order to have a place for more discussion about why we're moving logic to the scheduler. Or even for doing so incrementally. It would add a bit of extra work, but maybe that's worth it for the clearer process.

@gjoseph92
Copy link
Collaborator Author

We've decided to merge #5520 as the initial skeleton instead of this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants