Skip to content

Commit

Permalink
Change memory bars color on spilling/paused status (#6959)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Aug 26, 2022
1 parent 4cf9baf commit c083790
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 11 deletions.
87 changes: 76 additions & 11 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
parse_timedelta,
)

from distributed.core import Status
from distributed.dashboard.components import add_periodic_callback
from distributed.dashboard.components.shared import (
DashboardComponent,
Expand Down Expand Up @@ -239,20 +240,63 @@ def update(self):
self.source.data.update({"left": x[:-1], "right": x[1:], "top": counts})


def _memory_color(current: int, limit: int) -> str:
"""Dynamic color used by WorkersMemory and ClusterMemory"""
if limit and current > limit:
return "red"
if limit and current > limit / 2:
return "orange"
return "blue"
class MemoryColor:
"""Change the color of the memory bars from blue to orange when process memory goes
above the ``target`` threshold and to red when the worker pauses.
Workers in ``closing_gracefully`` state will also be orange.
If ``target`` is disabled, change to orange on ``spill`` instead.
If spilling is completely disabled, never turn orange.
class ClusterMemory(DashboardComponent):
If pausing is disabled, change to red when passing the ``terminate`` threshold
instead. If both pause and terminate are disabled, turn red when passing
``memory_limit``.
Note
----
A worker will start spilling when managed memory alone passes the target threshold.
However, here we're switching to orange when the process memory goes beyond target,
which is usually earlier.
This is deliberate for the sake of simplicity and also because, when the process
memory passes the spill threshold, it will keep spilling until it falls below the
target threshold - so it's not completely wrong. Again, we don't want to track
the hysteresis cycle of the spill system here for the sake of simplicity.
In short, orange should be treated as "the worker *may* be spilling".
"""

orange: float
red: float

def __init__(self):
target = dask.config.get("distributed.worker.memory.target")
spill = dask.config.get("distributed.worker.memory.spill")
terminate = dask.config.get("distributed.worker.memory.terminate")
# These values can be False. It's also common to configure them to impossibly
# high values to achieve the same effect.
self.orange = min(target or math.inf, spill or math.inf)
self.red = min(terminate or math.inf, 1.0)

def _memory_color(self, current: int, limit: int, status: Status) -> str:
if status != Status.running:
return "red"
if not limit:
return "blue"
if current >= limit * self.red:
return "red"
if current >= limit * self.orange:
return "orange"
return "blue"


class ClusterMemory(DashboardComponent, MemoryColor):
"""Total memory usage on the cluster"""

@log_errors
def __init__(self, scheduler, width=600, **kwargs):
DashboardComponent.__init__(self)
MemoryColor.__init__(self)

self.scheduler = scheduler
self.source = ColumnDataSource(
{
Expand Down Expand Up @@ -327,12 +371,30 @@ def __init__(self, scheduler, width=600, **kwargs):
)
self.root.add_tools(hover)

def _cluster_memory_color(self) -> str:
colors = {
self._memory_color(
current=ws.memory.process,
limit=getattr(ws, "memory_limit", 0),
status=ws.status,
)
for ws in self.scheduler.workers.values()
}

assert colors.issubset({"red", "orange", "blue"})
if "red" in colors:
return "red"
elif "orange" in colors:
return "orange"
else:
return "blue"

@without_property_validation
@log_errors
def update(self):
limit = sum(ws.memory_limit for ws in self.scheduler.workers.values())
meminfo = self.scheduler.memory
color = _memory_color(meminfo.process, limit)
color = self._cluster_memory_color()

width = [
meminfo.managed_in_memory,
Expand Down Expand Up @@ -363,11 +425,14 @@ def update(self):
update(self.source, result)


class WorkersMemory(DashboardComponent):
class WorkersMemory(DashboardComponent, MemoryColor):
"""Memory usage for single workers"""

@log_errors
def __init__(self, scheduler, width=600, **kwargs):
DashboardComponent.__init__(self)
MemoryColor.__init__(self)

self.scheduler = scheduler
self.source = ColumnDataSource(
{
Expand Down Expand Up @@ -477,7 +542,7 @@ def quadlist(i: Iterable[T]) -> list[T]:
meminfo = ws.memory
limit = getattr(ws, "memory_limit", 0)
max_limit = max(max_limit, limit, meminfo.process + meminfo.managed_spilled)
color_i = _memory_color(meminfo.process, limit)
color_i = self._memory_color(meminfo.process, limit, ws.status)

width += [
meminfo.managed_in_memory,
Expand Down
44 changes: 44 additions & 0 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dask.utils import stringify

from distributed.client import wait
from distributed.core import Status
from distributed.dashboard import scheduler
from distributed.dashboard.components.scheduler import (
AggregateAction,
Expand All @@ -29,6 +30,7 @@
Events,
Hardware,
MemoryByKey,
MemoryColor,
Occupancy,
ProcessingHistogram,
ProfileServer,
Expand Down Expand Up @@ -323,6 +325,48 @@ async def test_ClusterMemory(c, s, a, b):
assert not all(d["width"])


def test_memory_color():
def config(**kwargs):
return dask.config.set(
{f"distributed.worker.memory.{k}": v for k, v in kwargs.items()}
)

with config(target=0.6, spill=0.7, pause=0.8, terminate=0.95):
c = MemoryColor()
assert c._memory_color(50, 100, Status.running) == "blue"
assert c._memory_color(60, 100, Status.running) == "orange"
assert c._memory_color(75, 100, Status.running) == "orange"
# Pause is not impacted by the paused threshold, but by the worker status
assert c._memory_color(85, 100, Status.running) == "orange"
assert c._memory_color(0, 100, Status.paused) == "red"
assert c._memory_color(0, 100, Status.closing_gracefully) == "red"
# Passing the terminate threshold will turn the bar red, regardless of pause
assert c._memory_color(95, 100, Status.running) == "red"
# Disabling memory limit disables all threshold-related color changes
assert c._memory_color(100, 0, Status.running) == "blue"
assert c._memory_color(100, 0, Status.paused) == "red"

# target disabled
with config(target=False, spill=0.7):
c = MemoryColor()
assert c._memory_color(60, 100, Status.running) == "blue"
assert c._memory_color(75, 100, Status.running) == "orange"

# spilling disabled
with config(target=False, spill=False, pause=0.8, terminate=0.95):
c = MemoryColor()
assert c._memory_color(94, 100, Status.running) == "blue"
assert c._memory_color(0, 100, Status.closing_gracefully) == "red"
assert c._memory_color(95, 100, Status.running) == "red"

# terminate disabled; fall back to 100%
with config(target=False, spill=False, terminate=False):
c = MemoryColor()
assert c._memory_color(99, 100, Status.running) == "blue"
assert c._memory_color(100, 100, Status.running) == "red"
assert c._memory_color(110, 100, Status.running) == "red"


@gen_cluster(client=True)
async def test_WorkersMemoryHistogram(c, s, a, b):
nh = WorkersMemoryHistogram(s)
Expand Down
11 changes: 11 additions & 0 deletions docs/source/worker-memory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ spilled
The sum of managed + unmanaged + unmanaged recent is equal by definition to the process
memory.

The color of the bars will change as a function of memory usage too:

blue
The worker is operating as normal
orange
The worker may be spilling data to disk
red
The worker is paused or retiring
grey
Data that has already been spilled to disk; this is in addition to process memory


.. _memtrim:

Expand Down

0 comments on commit c083790

Please sign in to comment.