Skip to content

Commit

Permalink
agent/core: Implement LFC-aware scaling
Browse files Browse the repository at this point in the history
Part of #872.
This builds on the metrics that will be exposed by neondatabase/neon#8298.

For now, we only look at the working set size metrics over various time
windows.

The algorithm is somewhat straightforward to implement (see wss.go), but
unfortunately seems to be difficult to understand *why* it's expected to
work.

See also: https://www.notion.so/neondatabase/874ef1cc942a4e6592434dbe9e609350
  • Loading branch information
sharnoff committed Jul 7, 2024
1 parent 9795693 commit 4244add
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 21 deletions.
5 changes: 4 additions & 1 deletion deploy/agent/config_map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ data:
"defaultConfig": {
"loadAverageFractionTarget": 0.9,
"memoryUsageFractionTarget": 0.75,
"enableLFCMetrics": false
"enableLFCMetrics": false,
"lfcSizePerCU": 0.75,
"lfcWindowSizeMinutes": 5,
"lfcMinWaitBeforeDownscaleMinutes": 15
}
},
"billing": {
Expand Down
15 changes: 8 additions & 7 deletions pkg/agent/core/dumpstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ func (d StateDump) MarshalJSON() ([]byte, error) {
func (s *State) Dump() StateDump {
return StateDump{
internal: state{
Debug: s.internal.Debug,
Config: s.internal.Config,
VM: s.internal.VM,
Plugin: s.internal.Plugin.deepCopy(),
Monitor: s.internal.Monitor.deepCopy(),
NeonVM: s.internal.NeonVM.deepCopy(),
Metrics: shallowCopy[SystemMetrics](s.internal.Metrics),
Debug: s.internal.Debug,
Config: s.internal.Config,
VM: s.internal.VM,
Plugin: s.internal.Plugin.deepCopy(),
Monitor: s.internal.Monitor.deepCopy(),
NeonVM: s.internal.NeonVM.deepCopy(),
Metrics: shallowCopy[SystemMetrics](s.internal.Metrics),
LFCMetrics: shallowCopy[LFCMetrics](s.internal.LFCMetrics),
},
}
}
Expand Down
81 changes: 79 additions & 2 deletions pkg/agent/core/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package core
// Definition of the Metrics type, plus reading it from vector.dev's prometheus format host metrics

import (
"cmp"
"fmt"
"io"
"slices"
"strconv"
"time"

promtypes "github.com/prometheus/client_model/go"
promfmt "github.com/prometheus/common/expfmt"
Expand All @@ -31,7 +35,8 @@ type LFCMetrics struct {
CacheMissesTotal float64
CacheWritesTotal float64

ApproximateWorkingSetSizeTotal float64 // approximate_working_set_size
// lfc_approximate_working_set_size_seconds
ApproximateworkingSetSizeBuckets []float64
}

// FromPrometheus represents metric types that can be parsed from prometheus output.
Expand Down Expand Up @@ -118,12 +123,15 @@ func (m *LFCMetrics) fromPrometheus(mfs map[string]*promtypes.MetricFamily) erro
}
}

wssBuckets, err := extractWorkingSetSizeBuckets(mfs)
ec.Add(err)

tmp := LFCMetrics{
CacheHitsTotal: getFloat("lfc_hits"),
CacheMissesTotal: getFloat("lfc_misses"),
CacheWritesTotal: getFloat("lfc_writes"),

ApproximateWorkingSetSizeTotal: getFloat("lfc_approximate_working_set_size"),
ApproximateworkingSetSizeBuckets: wssBuckets,
}

if err := ec.Resolve(); err != nil {
Expand All @@ -133,3 +141,72 @@ func (m *LFCMetrics) fromPrometheus(mfs map[string]*promtypes.MetricFamily) erro
*m = tmp
return nil
}

func extractWorkingSetSizeBuckets(mfs map[string]*promtypes.MetricFamily) ([]float64, error) {
metricName := "lfc_approximate_working_set_size_seconds"
mf := mfs[metricName]
if mf == nil {
return nil, missingMetric(metricName)
}

if mf.GetType() != promtypes.MetricType_GAUGE {
return nil, fmt.Errorf("wrong metric type: expected %s, but got %s", promtypes.MetricType_GAUGE, mf.GetType())
} else if len(mf.Metric) < 1 {
return nil, fmt.Errorf("expected >= metric, found %d", len(mf.Metric))
}

type pair struct {
duration time.Duration
value float64
}

var pairs []pair
for _, m := range mf.Metric {
// Find the duration label
durationLabel := "duration_minutes"
durationIndex := slices.IndexFunc(m.Label, func(l *promtypes.LabelPair) bool {
return l.GetName() == durationLabel
})
if durationIndex == -1 {
return nil, fmt.Errorf("metric missing label %q", durationLabel)
}

durationMinutes, err := strconv.Atoi(m.Label[durationIndex].GetValue())
if err != nil {
return nil, fmt.Errorf("couldn't parse metric's %q label as int: %w", durationLabel, err)
}

pairs = append(pairs, pair{
duration: time.Minute * time.Duration(durationMinutes),
value: m.GetGauge().GetValue(),
})
}

slices.SortFunc(pairs, func(x, y pair) int {
return cmp.Compare(x.duration, y.duration)
})

// Check that the values make are as expected: they should all be 1 minute apart, starting
// at 1 minute.
// NOTE: this assumption is relied on elsewhere for scaling on ApproximateworkingSetSizeBuckets.
// Please search for usages before changing this behavior.
if pairs[0].duration != 1 {
return nil, fmt.Errorf("expected smallest duration to be %v, got %v", time.Minute, pairs[0].duration)
}
for i := range pairs {
expected := time.Minute * time.Duration(i+1)
if pairs[i].duration != expected {
return nil, fmt.Errorf(
"expected duration values to be exactly 1m apart, got unexpected value %v instead of %v",
pairs[i].duration,
expected,
)
}
}

var values []float64
for _, p := range pairs {
values = append(values, p.value)
}
return values, nil
}
71 changes: 66 additions & 5 deletions pkg/agent/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/samber/lo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/util"
Expand Down Expand Up @@ -114,6 +115,8 @@ type state struct {
NeonVM neonvmState

Metrics *SystemMetrics

LFCMetrics *LFCMetrics
}

type pluginState struct {
Expand Down Expand Up @@ -221,7 +224,8 @@ func NewState(vm api.VmInfo, config Config) *State {
OngoingRequested: nil,
RequestFailedAt: nil,
},
Metrics: nil,
Metrics: nil,
LFCMetrics: nil,
},
}
}
Expand Down Expand Up @@ -667,8 +671,11 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) (
// 2. Cap the goal CU by min/max, etc
// 3. that's it!

hasMetrics := s.Metrics != nil && (!*s.scalingConfig().EnableLFCMetrics || s.LFCMetrics != nil)

var goalCU uint32
if s.Metrics != nil {
var lfcLogFields func(zapcore.ObjectEncoder) error
if hasMetrics {
// For CPU:
// Goal compute unit is at the point where (CPUs) × (LoadAverageFractionTarget) == (load
// average),
Expand All @@ -686,6 +693,44 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) (
memGoalCU := uint32(memGoalBytes / s.Config.ComputeUnit.Mem)

goalCU = util.Max(cpuGoalCU, memGoalCU)

// For LFC metrics, if enabled:
if *s.scalingConfig().EnableLFCMetrics {
cfg := s.scalingConfig()
wssValues := s.LFCMetrics.ApproximateworkingSetSizeBuckets
// At this point, we can assume that the values are equally spaced at 1 minute apart,
// starting at 1 minute.
offsetIndex := *cfg.LFCMinWaitBeforeDownscaleMinutes - 1 // -1 because values start at 1m
windowSize := *cfg.LFCWindowSizeMinutes
// Handle invalid metrics:
if len(wssValues) < offsetIndex+windowSize {
s.warn("not enough working set size values to make scaling determination")
} else {
estimateWss := EstimateTrueWorkingSetSize(wssValues, WssEstimatorConfig{
MaxAllowedIncreaseFactor: 4.0, // hard-code this for now.
InitialOffset: offsetIndex,
WindowSize: windowSize,
})
projectLen := 1 // hard-code this for now.
predictedHighestNextMinute := ProjectNextHighest(wssValues, projectLen, estimateWss)

// predictedHighestNextMinute is still in units of 8KiB pages. Let's convert that
// into GiB, then convert that into CU, and then invert the discount from only some
// of the memory going towards LFC to get the actual CU required to fit the
// predicted working set size.
requiredCU := predictedHighestNextMinute * 8192 / s.Config.ComputeUnit.Mem.AsFloat64() / *cfg.LFCSizePerCU
lfcGoalCU := uint32(math.Ceil(requiredCU))
goalCU = util.Max(goalCU, lfcGoalCU)

lfcLogFields = func(obj zapcore.ObjectEncoder) error {
obj.AddFloat64("estimateWssPages", estimateWss)
obj.AddFloat64("predictedNextWssPages", predictedHighestNextMinute)
obj.AddFloat64("requiredCU", requiredCU)
return nil
}
}
}

}

// Copy the initial value of the goal CU so that we can accurately track whether either
Expand Down Expand Up @@ -727,7 +772,7 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) (

// If there's no constraints and s.metrics is nil, then we'll end up with goalCU = 0.
// But if we have no metrics, we'd prefer to keep things as-is, rather than scaling down.
if s.Metrics == nil && goalCU == 0 {
if !hasMetrics && goalCU == 0 {
goalResources = s.VM.Using()
} else {
goalResources = s.Config.ComputeUnit.Mul(uint16(goalCU))
Expand Down Expand Up @@ -792,7 +837,14 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) (
}
}

s.info("Calculated desired resources", zap.Object("current", s.VM.Using()), zap.Object("target", result))
logFields := []zap.Field{
zap.Object("current", s.VM.Using()),
zap.Object("target", result),
}
if lfcLogFields != nil {
logFields = append(logFields, zap.Object("lfc", zapcore.ObjectMarshalerFunc(lfcLogFields)))
}
s.info("Calculated desired resources", logFields...)

return result, calculateWaitTime
}
Expand Down Expand Up @@ -922,6 +974,8 @@ func (s *State) Debug(enabled bool) {
}

func (s *State) UpdatedVM(vm api.VmInfo) {
hadLFCMetrics := *s.internal.scalingConfig().EnableLFCMetrics

// FIXME: overriding this is required right now because we trust that a successful request to
// NeonVM means the VM was already updated, which... isn't true, and otherwise we could run into
// sync issues.
Expand All @@ -932,14 +986,21 @@ func (s *State) UpdatedVM(vm api.VmInfo) {
// - https://github.com/neondatabase/autoscaling/issues/462
vm.SetUsing(s.internal.VM.Using())
s.internal.VM = vm

// If the VM *only now* has LFC-aware scaling enabled, then remove any existing LFC metrics we
// might have. If we didn't do this, then we disabling & re-enabling LFC scaling could cause us
// to make scaling decisions based on stale data.
if !hadLFCMetrics && *s.internal.scalingConfig().EnableLFCMetrics {
s.internal.LFCMetrics = nil
}
}

func (s *State) UpdateSystemMetrics(metrics SystemMetrics) {
s.internal.Metrics = &metrics
}

func (s *State) UpdateLFCMetrics(metrics LFCMetrics) {
// stub implementation, intentionally does nothing yet.
s.internal.LFCMetrics = &metrics
}

// PluginHandle provides write access to the scheduler plugin pieces of an UpdateState
Expand Down
18 changes: 12 additions & 6 deletions pkg/agent/core/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,12 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) {
core.Config{
ComputeUnit: api.Resources{VCPU: 250, Mem: 1 * slotSize},
DefaultScalingConfig: api.ScalingConfig{
LoadAverageFractionTarget: lo.ToPtr(0.5),
MemoryUsageFractionTarget: lo.ToPtr(0.5),
EnableLFCMetrics: nil,
LoadAverageFractionTarget: lo.ToPtr(0.5),
MemoryUsageFractionTarget: lo.ToPtr(0.5),
EnableLFCMetrics: nil,
LFCSizePerCU: lo.ToPtr(0.75),
LFCWindowSizeMinutes: lo.ToPtr(5),
LFCMinWaitBeforeDownscaleMinutes: lo.ToPtr(15),
},
// these don't really matter, because we're not using (*State).NextActions()
NeonVMRetryWait: time.Second,
Expand Down Expand Up @@ -179,9 +182,12 @@ var DefaultInitialStateConfig = helpers.InitialStateConfig{
Core: core.Config{
ComputeUnit: DefaultComputeUnit,
DefaultScalingConfig: api.ScalingConfig{
LoadAverageFractionTarget: lo.ToPtr(0.5),
MemoryUsageFractionTarget: lo.ToPtr(0.5),
EnableLFCMetrics: nil,
LoadAverageFractionTarget: lo.ToPtr(0.5),
MemoryUsageFractionTarget: lo.ToPtr(0.5),
EnableLFCMetrics: nil,
LFCSizePerCU: lo.ToPtr(0.75),
LFCWindowSizeMinutes: lo.ToPtr(5),
LFCMinWaitBeforeDownscaleMinutes: lo.ToPtr(15),
},
NeonVMRetryWait: 5 * time.Second,
PluginRequestTick: 5 * time.Second,
Expand Down
Loading

0 comments on commit 4244add

Please sign in to comment.