Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Queue root tasks on scheduler, not workers [with co-assignment] #6598

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

gjoseph92
Copy link
Collaborator

This is an approach for withholding root tasks from workers while preserving neighboring task co-assignment, as discussed in #6560.

This adds:

  • A queued state on the scheduler. queued is like processing (task is ready; all dependencies are in memory), but rather than being already sent to a worker, we just track the worker we plan to run it on when that worker has availability.
  • A HeapSet tracking queued tasks per worker in priority order (scheduler-side).
  • Transition logic where each time a task completes on a worker, we try to schedule its next queued task if, after other recommendations have been processed, the worker would still have idle threads. This ensures higher-priority downstream tasks always take precedence over root tasks.
  • A distributed.scheduler.worker-oversaturation config option to allow some degree of oversaturation (or even undersaturation) if desired. This will be helpful for benchmarking. It also could be a feature flag: if we set the default to inf, current behavior would be unchanged.
  • Basic dashboard updates to show queued tasks (could do much more here)
  • An algorithm for load-balancing queued tasks when a new worker joins. This is just a draft of an idea; I'm not sure it would scale adequately or even if it's the right scheduling approach. It tries to preserve co-assignment ("runs" of subsequent tasks in priority order) even as new workers join.

Due to an unintentional implementation detail, this also avoids #6597 in some very specific situations (see b2e7924 message), but is not in any way actually a fix.

I have not tested this much on real clusters yet. But I think it works. If people like @TomNicholas want to try it out, I'd be curious to hear feedback.

I didn't get around to pushing this up on Friday and I'll be off tomorrow, but I'm pushing it up now in case @fjetter wants to take a look.

Closes #6560

  • Tests added / passed
  • Passes pre-commit run --all-files

Idea was that if a `SortedSet` of unrunnable tasks is too expensive, then insertion order is probably _approximately_ priority order, since higher-priority (root) tasks will be scheduled first. This would give us O(1) for all necessary operations, instead of O(logn) for adding and removing.

Interestingly, the SortedSet implementation could be hacked to support O(1) `pop` and `popleft`, and inserting a min/max value. In the most common case (root tasks), we're always inserting a value that's greater than the max. Something like this might be the best tradeoff, since it gives us O(1) in the common case but still maintains the sorted gaurantee, which is easier to reason about.
Now task states on the dashboard are listed in the logical order that tasks transition through.
Simpler, though I think basically just an int of 1 may be the most reasonable.
This is just a hack currently, but maybe it would actually be useful?
This reverts commit df11f719b59aad11f39a27ccae7b2fd4dfd9243a.
When there were multiple root task groups, we were just re-using the last worker for every batch because it had nothing processing on it.

Unintentionally this also fixes dask#6597 in some cases (because the first task goes to processing, but we measure queued, so we pick the same worker for both task groups)
@gjoseph92
Copy link
Collaborator Author

The main weakness of this PR is that it all relies on the hacky co-assignment logic from decide_worker.

The fact that the current co-assignment logic completely relies on picking a worker for every task up front means we need these per-workers queues to keep track of that decision. But this makes dealing with workers joining/leaving difficult. If we're going to hold tasks back on the scheduler, it would be way easier if we could pick workers in realtime, rather than pre-determining them.

We don't have "real" co-assignment logic. We don't currently have a cheap pure function to determine, given a task, what its neighboring tasks are (or how many there are), and which worker(s) they are assigned to. (This obviously can be computed by traversing the graph, but we were worried about the performance of that long ago #4864 (comment)).

We want to be able to identify "families" or "neighborhoods" of tasks that should be run together. Specifically, this refers to a set of tasks which all must be in memory at once, on the same worker, to compute a dependent task. (Yes, TaskState.dependencies is a "family"). Therefore, these tasks should all be scheduled on the same worker. The complex part of this definition is determining how to split up family lines when tasks have multiple dependents, or when dependents of dependents need to run togehter, or when a task has so many dependents that it doesn't matter where it runs, so it shouldn't have a family at all.

It turns out though that being able to refer to task families would simplify a number of places:

  1. We wouldn't need to pick a worker for every task up front. So we probably wouldn't need a per-worker task queue. We could have a global queue of task families, and just assign families to workers as spots open up.
  2. Without per-worker queues, there's no need for complex load-balancing logic when a worker joins or leaves. You just pop off/push onto the queue as usual.
  3. Families don't care about task groups, so Task co-assignment logic is worst-case for binary operations like a + b #6597 is no longer an issue.
  4. The criteria for a task not having a family is likely the same as a task being a "widely-shared dependency", which is the same as the "ish" part of root-ish tasks. Broadcast-like operations are poorly scheduled (widely-shared dependencies) #6570
  5. Most likely, the dependent(s) of a family should be assigned to the same worker as the family, setting us up for Speculatively assign tasks to workers #3974

My point being that I think this PR works, but if anyone feels like "huh, this is starting to add a lot of complexity", I kind of agree. And I think the cause of that complexity is not having a robust way to co-assignment at any moment, as opposed to just when we're iterating through all the root tasks in order.

I think we could take an approach like what I have here as a stopgap though.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not against adding a new state but I would prefer not having to since this typically introduces a lot of difficult-to-test complexity and opens us up to potential deadlocks. The queued state as you are proposing it is also a new type of state (for the scheduler). As you pointed out, the definition of a proper transition would require us to allow passing of (kw)args to transition functions. From my time with the worker, I perceive these kinds of transitions as a code smell that indicate we're trying to use the transitions for something they are not suited for. This is most apparent in the fetch->flight transition on the worker that links to gather_dep which is the most problematic of all the transitions. I would hate to replicate such a problem hotspot on the scheduler. That's not a deal breaker, though; I might be too cautious.

Preassigning workers feels like a vastly different approach than what I had in mind when we first discussed #6560 I understand the reason why you chose to go down that route but I am concerned the scope creep, particularly considering the rebalancing.
I'm wondering if the rebalancing would actually be stable. I wouldn't be surprised if work stealing would destroy all our efforts trying to be smart here. See also #6600

The complexity introduced here, accounting for a couple of TODOs that still need to be implemented and a reasonably big uncertainty around stealing + rebalancing, is about what I expect is necessary to implement full STA. At least it's not far off. If this kind of complexity (particularly the rebalancing) is actually necessary to make this work and we agree that STA is the ultimate solution, I would prefer investing this time in STA instead of this stopgap solution.
I'm wondering where we can compromise. How harmful would it be if we wouldn't have any rebalancing?

(This obviously can be computed by traversing the graph, but we were worried about the performance of that long ago #4864 (comment)).

If this were something we actually needed we could still do it, e.g. for cases
with few dependents.
I also see some room to collect additional topological information once during
dask.order but that's of course a different beast (similar to
order.graph_metrics). There are of course limitations to what we can actually
collect in linear time. Just wanting to point out that we should not consider
this door shut.

tasks = []
recommendations: dict[str, str] = {}

# Redistribute the tasks between all worker queues. We bubble tasks off the back of the most-queued
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't studied this algorithm but I'm hesitant of introducing this. This would be the third time we're implementing a bin packing algorithm of some sorts (the other two are stealing and rebalance). All of these applications appear similar enough to me that I'm wondering why we cannot reuse anything (different metrics/weights to optimize, different ways to act on the decision of the algorithm, etc.).
If such a function could be factored out, this would allow for a very nice optimization target.

Anyways, this feels like an awful lot of complexity just for the use case of queued up root tasks in scaling situations

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that there's probably some repetition. I'm not sure how much that could be factored out. This is a pretty different approach from work stealing AFAIU.

Without this though, I think new workers would be mostly/entirely idle. Since work-stealing doesn't know about queued tasks, and co-assignment means the candidates for downstream dependencies will only be existing workers, I think it's quite likely new workers wouldn't get used at all. That didn't feel acceptable to me.

Anyway, I wrote this just to profile and experiment with the rebalancing approach on actual clusters. Maintaining good co-assignment while scaling up is an interesting problem (that we don't currently handle). It's not necessarily what would actually get merged.

But I do think we'd need something to rebalance queued tasks during scaling as long as we're pre-assigning all tasks to workers upfront. This is the main reason why I'd like to not do that anymore and make co-assignment logic stateless. It makes handling scaling way, way easier.

@gjoseph92
Copy link
Collaborator Author

Preassigning workers feels like a vastly different approach than what I had in mind when we first discussed #6560

What did you have in mind then? Since the current co-assignment implementation only works by pre-assigning workers, there's no way to re-use it without pre-assignment. I would very much like to change the co-assignment logic to something stateless, but I considered redesigning it to be out of scope here.

I have an idea for how to do it that I think will overall simplify things; I'd be happy to try it, but first I wanted to try the approach of "keep the current co-assignment implementation", since I thought that's what we'd talked about in #6560.

I'm not against adding a new state but I would prefer not having to

I think there is very little chance we will withhold tasks on the scheduler without introducing a new state for them. Logically, it's clearly a state that doesn't yet exist for the scheduler. In #6584, I co-opted the rarely-used no-worker state and gave it new meaning, but effectively it was a new task state as well.

I personally don't feel worried about adding a new state on the scheduler. The scheduler is less prone to deadlocking and much easier to debug when it does. In the process of writing this PR, I got something wrong and created a deadlock; it took about 5min to find and fix.

I would prefer investing this time in STA instead of this stopgap solution

Maybe this particular implementation is stopgap, but I think the overall problem of withholding root tasks on the scheduler is not. Even with STA, I think we'd still want to do this. Otherwise, work-stealing will become even more of a nightmare. And as discussed in #6600, using work-stealing to load-balance during scale up doesn't even work well. Holding back a queue of runnable tasks on the scheduler both massively simplifies the consistency problems of work-stealing, and probably allows for a better load-balancing algorithm during scale-up anyway.

@fjetter
Copy link
Member

fjetter commented Jun 22, 2022

Even with STA, I think we'd still want to do this. Otherwise, work-stealing will become even more of a nightmare

I don't think that with STA we would withhold anything. Work stealing would not work with STA and would very likely need to be replaced with another system

@gjoseph92 gjoseph92 changed the title [WIP] Queue root tasks on scheduler, not workers [WIP] Queue root tasks on scheduler, not workers [with co-assignment] Jun 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Design and prototype for root-ish task deprioritization by withholding tasks on the scheduler
2 participants