Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speculatively assign tasks to workers #3974

Open
mrocklin opened this issue Jul 21, 2020 · 4 comments · May be fixed by #4264
Open

Speculatively assign tasks to workers #3974

mrocklin opened this issue Jul 21, 2020 · 4 comments · May be fixed by #4264

Comments

@mrocklin
Copy link
Member

Motivation

Currently when a task becomes ready to run the scheduler finds the best worker for that task distributes that task to that worker to be enqueued. The worker then handles whatever communication is necessary in order to collect the dependencies for that task and once it's ready it puts it in a queue to be run in a local ThreadPoolExecutor. When it finishes the worker informs the scheduler and goes on to its next task. When the scheduler receives news that the task has finished it marks all of its dependents, each of which go through this process again.

This is occasionally sub-optimal. Consider the following graph:

A1 -> B1
A2 -> B2

If we have one worker then both of the A's will be sent to that worker. The worker will finish A1, send the report to the scheduler that it is finished, and begin work on A2. It will then get information from the scheduler that B1 is ready to compute and it will work on that next. As a result, the resulting order of execution looks like the following:

A1, A2, B1, B2

When really we would have preferred ...

A1, B1, A2, B2

Today we often avoid this situation by doing task fusion on the client side, so that we really only have two tasks

AB1
AB2

If we remove low-level task fusion (which we may want to do for performance reasons) then it would be nice to capture this same A, B, A, B behavior some other way

Send tasks to workers before dependencies are set

One way to capture this same behavior is to send more tasks down to the worker, even before they are ready to run. Today we only send a task to a worker once we have high confidence that that is where it should run, which typically we only know after we understand the data sizes of all of its inputs. We only know this once its dependencies are done, so we only send tasks to workers once all of their dependencies are complete.

However, we could send not-yet-ready-to-run tasks to a worker with high confidence if all of that task's dependencies are also present or running on that worker. This happens to fully consume the low-level task fusion case. If A1 is running on a worker then we believe with high probability that B1 will also run on that same worker.

Changes

This would be a significant change to the worker. We would need to add states to track dependencies, much like how we do in the dask.local scheduler, or a very stripped down version of the dask.distributed scheduler.

This will also require managing interactions with other parts of Dask like ...

  1. Work stealing / load balancing: What happens when to a dependent task when its dependency is taken? (my guess is that it probably goes back to the scheduler and we let the scheduler sort it out)
  2. Resources / restrictions: We should make sure to only speculatively schedule a dependent on a worker if that worker is valid for that worker
  3. Exceptions / failures: Probably we just kick the task back up to the scheduler if anything bad happens.

Learning

I think that this task is probably a good learning task for someone who has some moderate exposure to the distributed scheduler and wants to level up a bit. Adding dependencies to the worker will, I think, force someone to think a lot about the systems that help make Dask run while mostly implementing minor versions of them. cc @quasiben @jacobtomlinson @madsbk

@mrocklin
Copy link
Member Author

Some first steps:

  1. Remove the fuse call and see what breaks if anything, maybe also measure performance
  2. Take a look at this and compare to extending blockwise to support IO operations (need to handle blockwise without any inputs)

@mrocklin mrocklin changed the title Populate workers with downstream tasks Speculatively assign tasks to workers Aug 6, 2020
@gforsyth gforsyth linked a pull request Nov 23, 2020 that will close this issue
6 tasks
@fjetter
Copy link
Member

fjetter commented Nov 27, 2020

I'm wondering what problem this is solving exactly because I see a few potential gains/pitfalls here

A) Ensure there is no network traffic of intermediate results between the execution of e.g. A1 and B1
B) Reduce overhead by not involving the scheduler anymore for the assignment of task B1 once A1 is finished

If A) is the goal, I'm wondering if this approach is actually worth its complexity. After all, if our task2worker decision is accurate, we should not pay any network cost since B1 should be assigned to the same worker, shouldn't it?

If B) is the goal, I'm wondering how the state machine on the scheduler is actually looking like. Can the scheduler still distinguish between processing on worker and speculatively assigned to worker? If the scheduler knows about this difference, how does it know about it? Is the worker pinging it or the other way round? If this exchange is synchronous/blocking/required for the worker speculative->executing transition, would it actually reduce overhead or rather increase it? (My gut feeling tells me the scheduler does not necessarily need to know but I might be wrong, in particular w.r.t work stealing)

And actually, if one of the indirect goals is to remove the necessity of fusing, I would argue that one of the convenient things about fusing is that it also helps to reduce the number of overall tasks since the scheduler becomes pretty busy if we reach graphs of significant size (>1M tasks) and this approach would not help in dealing with this. I'm wondering here if the cost of optimization outweighs the cost of task overhead

@mrocklin
Copy link
Member Author

And actually, if one of the indirect goals is to remove the necessity of fusing, I would argue that one of the convenient things about fusing is that it also helps to reduce the number of overall tasks since the scheduler becomes pretty busy if we reach graphs of significant size (>1M tasks) and this approach would not help in dealing with this. I'm wondering here if the cost of optimization outweighs the cost of task overhead

The main objective is to avoid task fusion on the client side. This allows us to transmit high level graphs directly to the scheduler and avoid creating low-level graphs on the client entirely. I expect that this will significantly outweigh the costs of added tasks on the scheduler.

Also, we're working to make the scheduler itself faster. Currently we process around 5000 tasks per second. I suspect/hope that we can improve on this significantly. It's a bit easier to optimize just the scheduler rather than both the scheduler and the client.

Can the scheduler still distinguish between processing on worker and speculatively assigned to worker?
My gut feeling tells me the scheduler does not necessarily need to know but I might be wrong, in particular w.r.t work stealing

This problem exists today. The scheduler assigns all tasks the "processing" state after they have been assigned to a worker, but they may still be waiting in a queue, or waiting on data to transfer. The Scheduler does not know when a task has actually started running. This does come up in work stealing. There is a complex handshake to handle this. I believe that the current behavior is that the scheduler suggests to a worker "Hey Alice maybe you should go steal task X from worker Bob". Alice then tries to do that and Bob either agrees or says "nope, I'm working on X right now".

@mrocklin
Copy link
Member Author

For context, here is a link to the dataframe optimization code. The only thing below the ensure_dict call which converts a HighLevelGraph to a dict is optimization code.

https://github.com/dask/dask/blob/fbccc4ef3e1974da2b4a9cb61aa83c1e6d61efba/dask/dataframe/optimize.py#L12-L41

We get a lot of speedup if we're able to return the HighLevelGraph

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants