Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workers run twice as many root tasks as they should, causing memory pressure #5223

Closed
gjoseph92 opened this issue Aug 17, 2021 · 1 comment
Closed

Comments

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Aug 17, 2021

There is a race condition (sorta) between the worker and scheduler to decide which task will run next when a task gets completed. Because this is not currently a fair race—the worker will always win!—workers will always run more root tasks (or sibling tasks in general) than they should, over-producing data and leading to memory pressure. Specifically, I believe workers will hold twice as many root tasks in memory as they need to (and therefore downstream tasks as well, until there's a reduction).

Consider a graph like this (numbers are priority from dask.order). Imagine there's a single worker with 2 threads.

2  4  6  8
|  |  |  |
1  3  5  7

Because 1, 3, 5, and 7 have no dependencies, the scheduler sees they're ready to run, and submits them all to the worker. The worker keeps track of all its tasks ordered by priority. From the worker's perspective, the task graph looks like:

1  3  5  7

So the worker starts 1 and 3 (since it has 2 threads) using Worker.execute. When the task completes, Worker.execute transitions the task to memory. This calls send_task_state_to_scheduler, but that just adds the message to the BatchedSend buffer; it doesn't actually block on sending the message.

Then at the end of Worker.execute, we run ensure_computing. Because there is no await statement between running the task and calling ensure_computing, we've never given up control to another coroutine, so we can guarantee that the task-finished message has not been sent to the scheduler yet, and we haven't received any updates from the scheduler either, since the comm-handling coroutines are also blocked. (Even if we had awaited, it's unlikely the scheduler could have received the message, processed it, and gotten a message back over the network in time. But at least that would be a real race condition, instead of a race where one runner isn't allowed to start until the other has finished.)

So when we call ensure_computing at the end of execute, there have been no external changes to Worker.ready, so it now looks like:

5  7

And the worker starts 5 and 7 immediately. Only after that is the event loop freed up to tell the scheduler everything that's happened.

When the scheduler hears these updates, it says "go run tasks 2 and 4 please; they're now ready and highest in priority." But at that point, the worker is already busy with 5 and 7. Once 5 and 7 complete, it now has 4 root keys in memory—twice what it should.

Here is a test demonstrating this: cb21e14.


What are solutions for this?

Quick simple fixes that don't work:

  • Have an await before ensure_computing? This doesn't actually work. It just turns it into an actual race condition that the worker will probably still win.
  • Don't ensure_computing here? This fixes this problem but breaks many other tests. Clearly it's not that simple.

General courses of action:

  • Allow the scheduler to intervene here before the ensure_computing.

  • Speculative task scheduling Speculatively assign tasks to workers #3974, so the worker knows 2 and 4 exist and can make better choices. This would probably work, but might be overkill for this particular problem, and also maybe would have edge cases where it still doesn't solve it.

  • The opposite of speculative scheduling: don't let the scheduler tell the worker about 5 and 7 in the first place. This seems tricky.

  • Prevent the worker from running too many sibling tasks in ensure_computing. This sounds tricky since we can't see the full graph, but we actually do have a glimpse into its structure via priorities.

    Within each worker, we can avoid creating too many gaps in the priority sequence. That is, only allow up to num_threads tasks to be running or in memory at once where the key that comes directly before that task (in priority order) isn't already in memory. For example, we'd run 1 without knowing where task 0 is, creating 1 "priority gap". (It's irrelevant that 0 doesn't exist; that just makes the base case work out nicely.) Then we'd run 3 without knowing where 2 is, giving us 2 priority gaps. When we try to run task 5 in ensure_computing, we've used up our "priority gap budget", so instead we do nothing and wait for more information from the scheduler. Even though we haven't heard of them yet, we can infer that tasks 2 and 4 must exist, so we know there must be something more important than 5 to run next, and we should wait to do more work so we don't get too far ahead of the scheduler's grand plan.

    Of course, this also requires treating a part of priority not as an opaque, orderable thing but as a meaningful integer.

Interestingly this feels like a lot like the need for memory backpressure: #2602.

cc @fjetter @mrocklin

@mrocklin
Copy link
Member

Thank you for the comment @gjoseph92 . As discussed, this is fairly similar to the issue solution proposed in #3974 . I'm going to close this in order to keep issues consolidated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants