Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speculative task assignment #4264

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft

Conversation

gforsyth
Copy link
Contributor

@gforsyth gforsyth commented Nov 23, 2020

This is a preliminary cut at introducing speculative task assignment to the distributed scheduler and closes #3974

As @mrocklin mentions in #3974, if tasks are speculatively assigned to a worker that contains all of their dependencies (which may still be processing or awaiting execution), this removes the need for task fusion.

The current approach I've started sketching out here is to use the existing recommendation system in the scheduler to examine a tasks dependent(s) -- (I'm going to switch to parent/child terminology here to make this less confusing):
if a task has only one child and all parents of that child task are on the same worker, speculatively assign that task to the same worker.

Still TODO:

  • so many more tests
  • figure out if we can disable fusion for an entire test run (to make sure tasks are being assigned as expected)
  • Update scheduling policy documentation to include possibility of speculative assignment
  • Figure out what to do with Actors (ignore spec assignment in this case?): don't use speculative assignment with actors
  • Handle resource restrictions
  • Handle work stealing

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eight comments. Ah! Ah! Ah!

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved

if self.validate:
# All dependencies are on the same worker
assert len({dts.processing_on for dts in ts.dependencies}) == 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably also want that at least one of the dependencies is in a "processing" state

distributed/scheduler.py Outdated Show resolved Hide resolved

self.send_task_to_worker(worker, key)

return _recommend_speculative_assignment(ts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so we would send long chains down to a worker all at once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's what currently happens and I can see it leading to less than stellar performance, especially if a cluster is scaling up --
We could add a config item about # of speculative tasks to assign to a given worker? a la max_height in the fuse options?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I think it's fine. Let's leave it like this until there is a problem.

@@ -3837,7 +3885,14 @@ def get_comm_cost(self, ts, ws):
Get the estimated communication cost (in s.) to compute the task
on the given worker.
"""
return sum(dts.nbytes for dts in ts.dependencies - ws.has_what) / self.bandwidth
# TODO: How is it possible for nbytes to be None when there's a getter that is supposed to
# stop that from happening?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like an excellent "who-dunnit?" mystery to enjoy

distributed/tests/test_speculative.py Outdated Show resolved Hide resolved
@@ -1441,7 +1443,7 @@ def add_task(
self.tasks[key] = ts = TaskState(
key=key, runspec=SerializedTask(function, args, kwargs, task)
)
ts.state = "waiting"
ts.state = "waiting" if not speculative else "speculative"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm, what if the dependency has just finished running? Maybe there is a way to trust the local state here more than what the scheduler said.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't make a difference, when add_task finishes we'll have noted that the dependency is in memory and then the task will transition from speculative -> ready

("waiting", "released"): self.transition_waiting_released,
("waiting", "processing"): self.transition_waiting_processing,
("waiting", "memory"): self.transition_waiting_memory,
("waiting", "speculative"): self.transition_waiting_speculative,
("speculative", "processing"): self.transition_speculative_processing,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll eventually want to see what happens when a speculative task gets pushed back to released/waiting again, such as if a worker goes down. The catch all behavior is, I think, to move things back to released. I think that if you handle that that things should be ok, we may also want a speculative->waiting though. Same with errors. I suspect that when a processing task errs it will mark all of its dependents as errored, and so the scheduler will try to enact a speculative->erred transition.

In some of these it may be that the current self.transition_processing_* methods will work without modification, but we'll probably have to verify this.

gforsyth added a commit to gforsyth/distributed that referenced this pull request Nov 24, 2020
Removing one less extra dictionary to track -- this was the source of
some tricky-to-debug issues over in dask#4264 and I thought I'd break it out
into a separate PR instead of jamming everything into one enormous PR.
mrocklin pushed a commit that referenced this pull request Nov 25, 2020
Removing one less extra dictionary to track -- this was the source of
some tricky-to-debug issues over in #4264 and I thought I'd break it out
into a separate PR instead of jamming everything into one enormous PR.
@gforsyth gforsyth force-pushed the speculative_task branch 2 times, most recently from 9565fa2 to 2e1b5e6 Compare December 9, 2020 16:02
gforsyth and others added 3 commits December 9, 2020 11:15
Removing one less extra dictionary to track -- this was the source of
some tricky-to-debug issues over in dask#4264 and I thought I'd break it out
into a separate PR instead of jamming everything into one enormous PR.
This was an initial attempted fix for a test failure in
`test_worker.py::test_clean_nbytes` that turns out to be unnecessary.
On top of that, it breaks actors, so it shouldn't be in here.

For reference, the value of actor tasks goes in a separate `self.actors`
dict, not `self.data`
Base automatically changed from master to main March 8, 2021 19:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Speculatively assign tasks to workers
2 participants