Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust transfer costs in worker_objective #5326

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
12 changes: 8 additions & 4 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3304,7 +3304,8 @@ def get_comm_cost(self, ts: TaskState, ws: WorkerState) -> double:
nbytes: Py_ssize_t = 0
for dts in deps:
nbytes += dts._nbytes
return nbytes / self._bandwidth
# Add a fixed 10ms penalty per transfer. See distributed#5324
return nbytes / self._bandwidth + 0.01 * len(deps)
crusaderky marked this conversation as resolved.
Show resolved Hide resolved

@ccall
def get_task_duration(self, ts: TaskState, default: double = -1) -> double:
Expand Down Expand Up @@ -3415,14 +3416,17 @@ def worker_objective(self, ts: TaskState, ws: WorkerState) -> tuple:
"""
dts: TaskState
nbytes: Py_ssize_t
comm_bytes: Py_ssize_t = 0
comm_bytes: double = 0
xfers: Py_ssize_t = 0
for dts in ts._dependencies:
if ws not in dts._who_has:
nbytes = dts.get_nbytes()
comm_bytes += nbytes
# amortize transfer cost over all waiters
comm_bytes += nbytes / len(dts._waiters)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an in-code comment explaining how this division amortizes cost? I assume this is again a "local topology" argument related to the fan-out tasks (#5325 (comment)) where we try to "ignore" tasks which will likely end up everywhere anyhow?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. It's related to that, but actually a simpler idea. Basically, if we transfer to this worker now, that opens up the potential for N other tasks to run on this worker without transferring the data. So you could look at as, rather than this task paying the whole cost up front and others getting the benefit for free, all the sibling tasks split the cost of the transferring evenly between them. (That's an analogy of course—once transferred, the other tasks don't actually pay anything!)

xfers += 1

stack_time: double = ws._occupancy / ws._nthreads
start_time: double = stack_time + comm_bytes / self._bandwidth
start_time: double = stack_time + comm_bytes / self._bandwidth + xfers * 0.01
crusaderky marked this conversation as resolved.
Show resolved Hide resolved

if ts._actor:
return (len(ws._actors), start_time, ws._nbytes)
Expand Down