Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

O(1) rebalance #4774

Merged
merged 44 commits into from
Jun 1, 2021
Merged

O(1) rebalance #4774

merged 44 commits into from
Jun 1, 2021

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Apr 30, 2021

  • DONE implementation
  • DONE stress test
  • DONE unit tests

In scope

  • Make rebalance() O(1) to the total number of tasks on the cluster.
  • Introduce hysteresis and thresholds to minimise the overall cost of rebalancing.
  • Consider non managed memory (leaks, fragmentation, etc.) while rebalancing.
  • Change algorithm that picks which tasks to move from largest first to least recently inserted first.

Out of scope, left for future PRs

  • This PR does not make rebalance safe to run while computations are running (nothing changes on that front).
  • This PR does not harmonize / deduplicate the code that moves data around used by rebalance(), replicate(), and retire_workers().
  • High level documentation containing recommendations regarding the use of malloc_trim, jemalloc, etc.

Demo

https://gist.github.com/crusaderky/c1ccf5fd0107b13c8d24bbed5197d5f6
(also works on master)

@crusaderky
Copy link
Collaborator Author

CC @jrbourbeau @mrocklin

@crusaderky
Copy link
Collaborator Author

rebalance() runtime when there is nothing to do:

# keys on cluster # workers master this PR
10k 32 18.3 ~ 25.3 ms 1.1 ~ 1.9 ms
100k 32 232 ~ 260 ms 1.0 ~ 2.6 ms

@crusaderky
Copy link
Collaborator Author

crusaderky commented Apr 30, 2021

Important

I noticed that, once the size of a single minimal Python object (read: the buffer of a numpy array) drops below 2 MiB, CPython / Linux x64 won't release RAM instantly anymore when the object is deallocated, but instead it will hog it indefinitely. gc.collect() does nothing. malloc_trim() typically, but not always, fixes the problem. Creating new Python objects simply reuses said memory.

This is the behaviour already in master - nothing changes there - and it already impacts all logic that relies on measuring process memory (spilling, pausing, and restarting). What does change with this PR is that now this unused but allocated memory is considered by rebalance() too, which in turn means that keys may be evicted too aggressively from particularly heavy nodes, as there is currently no way to tell apart trimmable memory from memory leaks or genuine fragmentation.

The current workaround is that I let the user pick what measure he wants to use in rebalance(); by default it is the optimistic memory (managed by dask in RAM + unmanaged older than 30s) but it can be changed to managed only, thus reverting to the behaviour of master. Needless to say this is not great as it adds config burden on the shoulders of the user.

An alternative, cleaner solution would be to run malloc_trim() periodically (e.g. 2~5s) on the workers, but problems will ensue if someone compiled the Python interpreter with an alternative alloc/free pair of primitives.

I have not tested the behaviour on MacOSX and Windows yet, but from what I saw in my previous PR I expect similar headaches there too.

@mrocklin
Copy link
Member

mrocklin commented May 3, 2021

An alternative, cleaner solution would be to run malloc_trim() periodically (e.g. 2~5s) on the workers, but problems will ensue if someone compiled the Python interpreter with an alternative alloc/free pair of primitives.

Can I ask you to raise an issue about this and tag @pitrou and @jakirkham. I don't know much about this, but it seems like this might be helpful in the common case. "leaking" memory is a very common pain point today.

@fjetter
Copy link
Member

fjetter commented May 5, 2021

An alternative, cleaner solution would be to run malloc_trim() periodically (e.g. 2~5s) on the workers, but problems will ensue if someone compiled the Python interpreter with an alternative alloc/free pair of primitives.

cc @xhochy

@xhochy
Copy link

xhochy commented May 5, 2021

An alternative, cleaner solution would be to run malloc_trim() periodically (e.g. 2~5s) on the workers, but problems will ensue if someone compiled the Python interpreter with an alternative alloc/free pair of primitives.

cc @xhochy

Thanks for the ping, I'll probably should comment on the (to-be-opened) issue instead of here?

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the simplicity and elegance of the algorithm. In particular that there are no opaque heuristics in there. Other than configuring the different thresholds, I think we have a nice lever to control different policies via the heap sort key (or rather the who_has sorting / insertion order / etc.)

I am a bit concerned about the dynamic case where this decision might become much more complicated. I would like to avoid the complexity we currently have in dask.order.
Also, I'm wondering how we can manage "data ownership", i.e. is a worker actually supposed to hold a replica or is it just a temp copy needed for a dependency, etc. I believe we do not have a data model for this, yet.

These concerns should not stop us here but rather should be kept in mind once we move on to the next step(s).

distributed/scheduler.py Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Show resolved Hide resolved
Comment on lines 5641 to 5644
memory_by_worker = [
(ws, getattr(ws.memory, MEMORY_REBALANCE_MEASURE)) for ws in workers
]
mean_memory = sum(m for _, m in memory_by_worker) // len(memory_by_worker)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If performance is super critical, this could be done in one loop. This way we iterate three times over the workers. For sake of simplicity, it's probably fine to keep as is for now

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to store the memory measure somewhere for later (calling WorkerState.memory is mildly expensive). List comprehension tends to be a lot faster than explicit for loop + appends.

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Show resolved Hide resolved
Comment on lines +106 to +110
# If this is your problem on Linux, you should alternatively consider
# setting the MALLOC_TRIM_THRESHOLD_ environment variable (note the final
# underscore) to a low value; refer to the mallopt man page and to the
# comments about M_TRIM_THRESHOLD on
# https://sourceware.org/git/?p=glibc.git;a=blob;f=malloc/malloc.c
Copy link
Collaborator Author

@crusaderky crusaderky May 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO as part of a later PR, I'm going to move this advice to a sphinx page for the sake of visibility and just leave a note here to go read it

@crusaderky crusaderky closed this May 17, 2021
@crusaderky crusaderky reopened this May 17, 2021
@crusaderky crusaderky closed this May 17, 2021
@crusaderky crusaderky reopened this May 17, 2021
@crusaderky
Copy link
Collaborator Author

All tests successful over 4 consecutive runs

@fjetter
Copy link
Member

fjetter commented May 26, 2021

I left many comments but overall the code itself looks great. I also do like the clearly written tests since the expected behaviour is quite clear.

The two biggest issues I consider worth discussing are

  • Global constants / config on import time
  • Factor out _rebalance_find_msgs into dedicated functions to allow for scheduler/worker independent testing. I think this one is a bit of work but this could pay off nicely in the long run

@@ -161,14 +163,6 @@ def nogil(func):
DEFAULT_DATA_SIZE = declare(
Py_ssize_t, parse_bytes(dask.config.get("distributed.scheduler.default-data-size"))
)
Copy link
Collaborator Author

@crusaderky crusaderky May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could not move this as it is required by TaskState, and IMHO it would be too expensive to reload it on every TaskState.__init__

@crusaderky
Copy link
Collaborator Author

@fjetter

Global constants / config on import time

I redesigned this part. Now dask.config is read on Scheduler.__init__ and I think it looks a lot better.

Factor out _rebalance_find_msgs into dedicated functions to allow for scheduler/worker independent testing. I think this one is a bit of work but this could pay off nicely in the long run

While it is certainly possible to move _rebalance_find_msgs to a top-level function, it would require in input a SchedulerState object and a list of WorkerState objects, neither of which are easy - and most importantly, robust - to craft synthetically in a unit test. I'd much rather keep the tests as a black-box design because otherwise it's extremely easy that the real-life Scheduler and WorkerState objects will deviate from those crafted for the tests.

@fjetter
Copy link
Member

fjetter commented May 27, 2021

While it is certainly possible to move _rebalance_find_msgs to a top-level function, it would require in input a SchedulerState object and a list of WorkerState objects, neither of which are easy - and most importantly, robust - to craft synthetically in a unit test.

We wouldn't need to pass the entire scheduler state in there, would we? I agree this would be nuts. From what I can see is that we're using four constants and the log_event method of the SchedulerState object. My point is that these constants can be a function input and we could simply not log this particular event. The event we're logging here is anyhow the "empty/nothing to do" event. I don't see a reason why this should be part of the core algorithm.

Passing in a list of WorkerState objects is exactly what I am proposing here and I believe it is more robust since we're not relying on futures, servers, comms, heartbeats, actual memory measurements or any timing relevant pieces. The tests would not even need to be sync/async since they would only test the actual algorithm.
Most attributes of the WorkerState object are irrelevant and we can run with the default values. The only actual coupling is introduced by the attributes we're using in the algorithm anyhow. Below an example how the test could look like

def test_rebalance_simple():
    A = WorkerState(name="A", memory_limit="1GB")
    B = WorkerState(name="B", memory_limit="1GB")

    def put_task_on_worker(ws, key, nbytes):
        ws._has_what[key] = None
        ws._nbytes += nbytes

    for ix in range(4):
        put_task_on_worker(A, f"k{ix}", 200)

    result = _rebalance_find_msgs(workers=[A, B])

    expected = [
        (A, B, "k0"),
        (A, B, "k1"),
    ]

    assert result == expected

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooops, had a few comments in 'pending', sorry

distributed/distributed-schema.yaml Outdated Show resolved Hide resolved
distributed/tests/test_client.py Show resolved Hide resolved
distributed/tests/test_client.py Show resolved Hide resolved
"""keys exist but belong to unfinished futures"""
futures = c.map(slowinc, range(10), delay=0.05, workers=a.address)
await asyncio.sleep(0.1)
out = await s.rebalance(keys=[f.key for f in futures])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a nother test for the internal mechanics which is waiting for the future. Why is this different for the user API?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests in test_client.py test specifically the wrapper in client.py. Previously, all tests for rebalance were exclusively in test_client.py. However, very soon rebalance will be chiefly invoked internally by the scheduler, bypassing the client entirely. Hence, the tests should invoke Scheduler.rebalance and not rely on the client wrapper.

As to why Client.rebalance and Scheduler.rebalance behave differently: it would be non-trivial to move the logic upstream (Client uses Futures, Scheduler uses keys, for which I couldn't find a straightforward wait method?) and I just could not justify the effort. If in the future the Scheduler will call rebalance internally on unfinished keys, we will revisit this, but I just cannot see a use case in the plan that we laid down.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added clarification in the test that Client.rebalance and Scheduler.rebalance behave differently for this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. The scheduler indeed does not have a way to wait for keys. The closes thing to this would be if we were to allow registration of "transition callbacks", such that if a key is transitioned into a given state, a callback is invoked. However, let's just not do that, the state machine is too complex as it is.

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/tests/test_scheduler.py Outdated Show resolved Hide resolved
distributed/tests/test_scheduler.py Show resolved Hide resolved
@set_config_and_reload({"distributed.worker.memory.rebalance.measure": "managed"})
@gen_cluster(client=True, worker_kwargs={"memory_limit": 0})
async def test_rebalance_skip_all_recipients(c, s, a, b):
"""All recipients are skipped because they already hold copies"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More a warning than an actual comment about this test. As soon as we hit a dynamic scheduler, replicas become much, much more complex since whether or not a task is actually replicated cannot be inferred from who_has. While who_has tracks copies of the data, this includes "temporary" copies on workers. the lifecycle of a tasks data is different for data which was calculated on a worker and data which is only there because it is a dependency of another task. Simply "skipping" these tasks might no longer be and the definition of the state (i.e. simply count data) will become more complex.

See also
#4784
#4772


# Fast exit in case no transfers are necessary or possible
if not senders or not recipients:
self.log_event(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually the only place in this method where we're using SchedulerState / self. Why not factor this out into an independent function? This would make testing possible without actually invoking any scheduler/worker interaction and would allow us to write tests by defining measuremets (memory per worker) and assert the result w/out playing with actual memory measurements, scheduler<->worker heartbeats, comms, etc.

This would also futher decouple the move of the data from the decision making and might allow for easier testing of the latter

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactoring the function is straightforward, but I don't agree on doing it to begin with. Refer to discussion above

@crusaderky
Copy link
Collaborator Author

#4853 breaks this. Temporarily reverting the other PR to prove that tests pass.

@crusaderky
Copy link
Collaborator Author

Passing in a list of WorkerState objects is exactly what I am proposing here and I believe it is more robust

I strongly disagree. WorkerState objects are complex and in your code snippet you made very strong assumptions on their internal behaviour. If these internals changed in the future, I don't want a (potentially newbie) developer to have to hunt down unit tests that mock internal behaviour. What you're proposing has the big potential that, in a future PR completely unrelated to rebalance, rebalance will break without any of the unit tests noticing.

@mrocklin
Copy link
Member

Passing in a list of WorkerState objects is exactly what I am proposing here and I believe it is more robust

I strongly disagree. WorkerState objects are complex and in your code snippet you made very strong assumptions on their internal behaviour. If these internals changed in the future, I don't want a (potentially newbie) developer to have to hunt down unit tests that mock internal behaviour. What you're proposing has the big potential that, in a future PR completely unrelated to rebalance, rebalance will break without any of the unit tests noticing.

Just to jump in here with historical context. Historically distributed had a lot of fine grained unit tests like this. I've heard them called "white box" tests because they expose the internals, unlike "black box" tests. In hindsight this ended up being a bad idea, especially as the scheduler changed internally. Small changes to Scheduler internals required changing dozens of tests. The tests became a liability and added inertia to development.

At one point I spent a week rewriting white box tests so that we generally only touched user-level API. This was hard because we needed to craft very specific situations where the desired behavior would arise. In hindsight though this approach has provided a ton of value. We're able to make large changes to internal scheduling state and the test suite remains valid.

@fjetter
Copy link
Member

fjetter commented May 28, 2021

WorkerState objects are complex and in your code snippet you made very strong assumptions on their internal behaviour.

As you said, this is a snippet I haven't put much thought into. The only relevant bit of this snippet is that the actual modifications are encapsulated into a dedicated function such that there is one place to change things. I didn't want to argue about the very specific way to do this until we reached a conclusion about whether this is something we want to have or not. Please do not reject this idea based on that snippet.

What you're proposing has the big potential that, in a future PR completely unrelated to rebalance, rebalance will break without any of the unit tests noticing.

This assumes that there are no tests which cover the entire system and this is not something I propose. A bunch of the tests you wrote are still very important, exactly the way you wrote them, to know that everything works when put together.
I'm proposing to implement tests covering the particular logic of the rebalancing algorithm as just that. The rebalancing algorithm is the solution to a problem "given X workers with keys Y and weights W and hardware measurements Z, calculate the necessary transitions such that the variance of weights per worker is minimal". This is formally not 100% correct but my point is that this is a purely mathematical problem and as such already a hard problem worth having enough test coverage.

The how do we get the system into the proposed state is a hard problem by itself but less of mathematical nature. This problem is something where we need to deal with all the problems distributed systems introduce, like latencies, comm failures, dead nodes, race conditions, etc. Also a problem space where I think we should have enough edge case tests for non-happy paths.

Then there is the "plugging both pieces together" problem which doesn't require huge amounts of tests if both of the above subclasses are well tested.

At one point I spent a week rewriting white box tests so that we generally only touched user-level API. This was hard because we needed to craft very specific situations where the desired behavior would arise. In hindsight though this approach has provided a ton of value. We're able to make large changes to internal scheduling state and the test suite remains valid.

I get that. That's the classical test hierarchy problem and both from my experience and from literature I believe that a healthy balance is good.
This flexibility may have given us the opportunity to rewrite a lot of code but we also dropped the ball on a few things we haven't had the chance to clean up, yet. Most of my time with the distributed code base, I was trying very hard to construct situation like you described but it took me much more than a week and we're still having issues actually reproducing some cases, in particular connected to the various race conditions and deadlocks associated to the worker state machine but not exclusively.

The typical approach to this problem is to work with appropriate internal abstractions and (private) interfaces. If existing abstraction are not useful, let's create new ones or change existing ones. If the WorkerState is not a suitable interface, let's not use it. Maybe the WorkerState is indeed too complex and we should just provide raw data (e.g. a dict per worker with the necessary measurements) to the algorithm. After all, we definitely don't want the algorithm part to actually mutate the state. Ideally, we wouldn't want to touch the algorithm at all if we were to change the WorkerState so there is no reason why it needs to be exposed to the actual objects. If we wanted to stick with the WorkerState for performance and complexity reasons we might want to look into whether the operations I proposed above are universal enough that it could become a method of the WorkerState object, making it less fragile to be used in tests since it will then be used all over the scheduler. Maybe an entirely different approach is better but I'm very certain we will find a robust interface if we want to.
I'm actually more concerned with arguing that we want more of these thin internal interfaces than how they will particularly look like. In general I believe we should aim for having more of these since this allows for a less monolithic software architecture and allows us, if need be, to prepare a test system more easily for non-happy edge cases. This also helps with runtime and flakiness which is growing but that's only a secondary point.

@crusaderky
Copy link
Collaborator Author

Merged again from main after #4853 was fixed.
All tests pass.
All review comments with the exception of the refactoring have been addressed; @fjetter please let me know if there is anything else outstanding after my last few commits.

@fjetter
Copy link
Member

fjetter commented May 28, 2021

Current test failures are (for reference)

#4859
#4839
#4862 (new but very likely unrelated)


I'll pass over the code once more but I don't expect something major to pop up. Let's have a chat about the possible refactoring this afternoon

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had another look over the code and everything seems in order. Great job.

Regarding the refactoring, we couldn't reach a conclusion yet but I'm fine with merging as is. If we decide in favour of a refactoring, this will be additional work on top of this anyhow.

@fjetter fjetter merged commit 9d4f0bf into dask:main Jun 1, 2021
@crusaderky crusaderky deleted the rebalance branch June 1, 2021 14:21
@mrocklin
Copy link
Member

mrocklin commented Jun 1, 2021 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants