Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

O(1) rebalance #4774

Merged
merged 44 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
a756022
partial prototype
crusaderky Apr 26, 2021
36589b1
incomplete poc
crusaderky Apr 27, 2021
9ac044a
poc (incomplete)
crusaderky Apr 27, 2021
6e1ef79
Merge remote-tracking branch 'upstream/main' into rebalance
crusaderky Apr 29, 2021
2a5b5cd
complete POC
crusaderky Apr 29, 2021
60fe5a4
polish
crusaderky Apr 29, 2021
a7e46b3
polish
crusaderky Apr 29, 2021
5d267d3
bugfix
crusaderky Apr 29, 2021
b275d88
fixes
crusaderky Apr 29, 2021
c3fd176
fix
crusaderky Apr 29, 2021
46aaea7
Merge remote-tracking branch 'upstream/main' into rebalance
crusaderky Apr 29, 2021
c7e8ed6
Use arbitrary measure in rebalance
crusaderky Apr 30, 2021
3454291
Merge remote-tracking branch 'upstream/main' into rebalance
crusaderky Apr 30, 2021
ae27798
Merge branch 'main' into rebalance
crusaderky May 7, 2021
428fd8f
Code review
crusaderky May 7, 2021
f73ace8
renames
crusaderky May 7, 2021
1ad35ea
suggest tweaking malloc_trim
crusaderky May 10, 2021
c230c89
Merge remote-tracking branch 'upstream/main' into rebalance
crusaderky May 10, 2021
cada411
Merge remote-tracking branch 'upstream/main' into rebalance
crusaderky May 11, 2021
1ad9d51
self-review
crusaderky May 11, 2021
32a1f32
test_tls_functional
crusaderky May 11, 2021
6557c52
test_memory to use gen_cluster
crusaderky May 12, 2021
5a5a775
test_memory to use gen_cluster
crusaderky May 12, 2021
af5adfc
Merge branch 'test_memory'
crusaderky May 12, 2021
fbeda36
half memory
crusaderky May 12, 2021
d5708d4
tests
crusaderky May 13, 2021
79aeab4
Merge remote-tracking branch 'upstream/main' into rebalance
crusaderky May 13, 2021
535342f
tests
crusaderky May 13, 2021
efc96f6
tests
crusaderky May 13, 2021
8818f99
tests
crusaderky May 13, 2021
b99e220
make Cython happy
crusaderky May 13, 2021
ed5336d
Merge remote-tracking branch 'upstream/main' into rebalance
crusaderky May 16, 2021
4b1b16e
test_rebalance_managed_memory
crusaderky May 16, 2021
bc9294a
tests
crusaderky May 16, 2021
37de01f
robustness
crusaderky May 16, 2021
fee3ff8
improve test stability
crusaderky May 17, 2021
71d0861
tests stability
crusaderky May 17, 2021
be188dc
trivial
crusaderky May 17, 2021
6e743a0
Merge branch 'main'
crusaderky May 17, 2021
9704a34
Merge branch 'main' into rebalance
crusaderky May 27, 2021
3f29a81
reload dask.config on Scheduler.__init__
crusaderky May 27, 2021
cfc4590
Merge branch 'main' into rebalance
crusaderky May 27, 2021
03e376e
code review
crusaderky May 27, 2021
f08185b
Merge branch 'main' into rebalance
crusaderky May 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3051,8 +3051,11 @@ def upload_file(self, filename, **kwargs):
)

async def _rebalance(self, futures=None, workers=None):
await _wait(futures)
keys = list({stringify(f.key) for f in self.futures_of(futures)})
if futures is not None:
await _wait(futures)
keys = list({stringify(f.key) for f in self.futures_of(futures)})
else:
keys = None
result = await self.scheduler.rebalance(keys=keys, workers=workers)
if result["status"] == "missing-data":
raise ValueError(
Expand Down
43 changes: 43 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,49 @@ properties:
non-time-sensitive heuristics. This should be set to be longer than
the duration of most dask tasks.

rebalance:
type: object
description: >-
Settings for data rebalance operations
properties:
measure:
enum:
- process
- optimistic
- managed
- managed_in_memory
description: >-
Which of the properties of distributed.scheduler.MemoryState
should be used for measuring worker memory usage
sender_min:
crusaderky marked this conversation as resolved.
Show resolved Hide resolved
type: number
minimum: 0
maximum: 1
description: >-
Fraction of worker process memory at which we stop potentially
receiving data from other workers. Ignored when max_memory is not
set.

recipient_max:
type: number
minimum: 0
maximum: 1
description: >-
Fraction of worker process memory at which we start potentially
transferring data to other workers.

sender_recipient_gap:
type: number
minimum: 0
maximum: 1
description: >-
Fraction of worker process memory, around the cluster mean, where
a worker is neither a sender nor a recipient of data during a
rebalance operation. E.g. if the mean cluster occupation is 50%,
no_rebalance_gap=0.1 means that only nodes above 55% will donate
data and only nodes below 45% will receive them. This helps avoid
data from bouncing around the cluster repeatedly.

target:
oneOf:
- {type: number, minimum: 0, maximum: 1}
Expand Down
30 changes: 30 additions & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,36 @@ distributed:
# This should be set to be longer than the duration of most dask tasks.
recent_to_old_time: 30s

rebalance:
# Memory measure to rebalance upon. Possible choices are:
# process
# Total process memory, as measured by the OS.
# optimistic
# Managed by dask (instantaneous) + unmanaged (without any increases
# happened in the last <distributed.worker.memory.recent_to_old_time>).
# Recommended for use on CPython with large (2MiB+) numpy-based data chunks.
# managed_in_memory
# Only consider the data allocated by dask in RAM. Recommended if RAM is not
# released in a timely fashion back to the OS after the Python objects are
# dereferenced, but remains available for reuse by PyMalloc.
# managed
# Only consider data allocated by dask, including that spilled to disk.
# Recommended if disk occupation of the spill file is an issue.
measure: optimistic
# Fraction of worker process memory at which we start potentially sending
# data to other workers. Ignored when max_memory is not set.
sender_min: 0.30
# Fraction of worker process memory at which we stop potentially accepting
# data from other workers. Ignored when max_memory is not set.
recipient_max: 0.60
# Fraction of worker process memory, around the cluster mean, where a worker is
# neither a sender nor a recipient of data during a rebalance operation. E.g.
# if the mean cluster occupation is 50%, no_rebalance_gap=0.10 means that only
# nodes above 55% will donate data and only nodes below 45% will receive them.
# This helps avoid data from bouncing around the cluster repeatedly.#
# Ignored when max_memory is not set.
sender_recipient_gap: 0.10

# Fractions of worker process memory at which we take action to avoid memory
# blowup. Set any of the values to False to turn off the behavior entirely.
target: 0.60 # target fraction to stay below
Expand Down
Loading