-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Speculative task assignment #4264
base: main
Are you sure you want to change the base?
Conversation
and a fix for a weird nbytes issues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eight comments. Ah! Ah! Ah!
|
||
if self.validate: | ||
# All dependencies are on the same worker | ||
assert len({dts.processing_on for dts in ts.dependencies}) == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably also want that at least one of the dependencies is in a "processing" state
|
||
self.send_task_to_worker(worker, key) | ||
|
||
return _recommend_speculative_assignment(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, so we would send long chains down to a worker all at once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's what currently happens and I can see it leading to less than stellar performance, especially if a cluster is scaling up --
We could add a config item about # of speculative tasks to assign to a given worker? a la max_height
in the fuse options?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I think it's fine. Let's leave it like this until there is a problem.
@@ -3837,7 +3885,14 @@ def get_comm_cost(self, ts, ws): | |||
Get the estimated communication cost (in s.) to compute the task | |||
on the given worker. | |||
""" | |||
return sum(dts.nbytes for dts in ts.dependencies - ws.has_what) / self.bandwidth | |||
# TODO: How is it possible for nbytes to be None when there's a getter that is supposed to | |||
# stop that from happening? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like an excellent "who-dunnit?" mystery to enjoy
@@ -1441,7 +1443,7 @@ def add_task( | |||
self.tasks[key] = ts = TaskState( | |||
key=key, runspec=SerializedTask(function, args, kwargs, task) | |||
) | |||
ts.state = "waiting" | |||
ts.state = "waiting" if not speculative else "speculative" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrm, what if the dependency has just finished running? Maybe there is a way to trust the local state here more than what the scheduler said.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't make a difference, when add_task
finishes we'll have noted that the dependency is in memory and then the task will transition from speculative -> ready
6b48a1d
to
0f9b767
Compare
("waiting", "released"): self.transition_waiting_released, | ||
("waiting", "processing"): self.transition_waiting_processing, | ||
("waiting", "memory"): self.transition_waiting_memory, | ||
("waiting", "speculative"): self.transition_waiting_speculative, | ||
("speculative", "processing"): self.transition_speculative_processing, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll eventually want to see what happens when a speculative task gets pushed back to released/waiting again, such as if a worker goes down. The catch all behavior is, I think, to move things back to released. I think that if you handle that that things should be ok, we may also want a speculative->waiting though. Same with errors. I suspect that when a processing task errs it will mark all of its dependents as errored, and so the scheduler will try to enact a speculative->erred transition.
In some of these it may be that the current self.transition_processing_*
methods will work without modification, but we'll probably have to verify this.
Removing one less extra dictionary to track -- this was the source of some tricky-to-debug issues over in dask#4264 and I thought I'd break it out into a separate PR instead of jamming everything into one enormous PR.
9565fa2
to
2e1b5e6
Compare
62c79e3
to
b609e8e
Compare
This was an initial attempted fix for a test failure in `test_worker.py::test_clean_nbytes` that turns out to be unnecessary. On top of that, it breaks actors, so it shouldn't be in here. For reference, the value of actor tasks goes in a separate `self.actors` dict, not `self.data`
This is a preliminary cut at introducing speculative task assignment to the distributed scheduler and closes #3974
As @mrocklin mentions in #3974, if tasks are speculatively assigned to a worker that contains all of their dependencies (which may still be processing or awaiting execution), this removes the need for task fusion.
The current approach I've started sketching out here is to use the existing
recommendation
system in the scheduler to examine a tasks dependent(s) -- (I'm going to switch to parent/child terminology here to make this less confusing):if a task has only one child and all parents of that child task are on the same worker, speculatively assign that task to the same worker.
Still TODO: