Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent/core: Implement LFC-aware scaling #1003

Merged
merged 10 commits into from
Jul 23, 2024
7 changes: 5 additions & 2 deletions deploy/agent/config_map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ data:
"defaultConfig": {
"loadAverageFractionTarget": 0.9,
"memoryUsageFractionTarget": 0.75,
"enableLFCMetrics": false
"enableLFCMetrics": false,
"lfcSizePerCU": 0.75,
Omrigan marked this conversation as resolved.
Show resolved Hide resolved
"lfcWindowSizeMinutes": 5,
"lfcMinWaitBeforeDownscaleMinutes": 5
}
},
"billing": {
Expand Down Expand Up @@ -45,7 +48,7 @@ data:
"secondsBetweenRequests": 5
},
"lfc": {
"port": 9399,
"port": 9499,
sharnoff marked this conversation as resolved.
Show resolved Hide resolved
"requestTimeoutSeconds": 5,
"secondsBetweenRequests": 15
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/agent/core/dumpstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ func (d StateDump) MarshalJSON() ([]byte, error) {
func (s *State) Dump() StateDump {
return StateDump{
internal: state{
Debug: s.internal.Debug,
Config: s.internal.Config,
VM: s.internal.VM,
Plugin: s.internal.Plugin.deepCopy(),
Monitor: s.internal.Monitor.deepCopy(),
NeonVM: s.internal.NeonVM.deepCopy(),
Metrics: shallowCopy[SystemMetrics](s.internal.Metrics),
Debug: s.internal.Debug,
Config: s.internal.Config,
VM: s.internal.VM,
Plugin: s.internal.Plugin.deepCopy(),
Monitor: s.internal.Monitor.deepCopy(),
NeonVM: s.internal.NeonVM.deepCopy(),
Metrics: shallowCopy[SystemMetrics](s.internal.Metrics),
LFCMetrics: shallowCopy[LFCMetrics](s.internal.LFCMetrics),
},
}
}
Expand Down
82 changes: 80 additions & 2 deletions pkg/agent/core/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package core
// Definition of the Metrics type, plus reading it from vector.dev's prometheus format host metrics

import (
"cmp"
"fmt"
"io"
"slices"
"strconv"
"time"

promtypes "github.com/prometheus/client_model/go"
promfmt "github.com/prometheus/common/expfmt"
Expand All @@ -31,7 +35,9 @@ type LFCMetrics struct {
CacheMissesTotal float64
CacheWritesTotal float64

ApproximateWorkingSetSizeTotal float64 // approximate_working_set_size
// lfc_approximate_working_set_size_windows, currently requires that values are exactly every
// minute
ApproximateworkingSetSizeBuckets []float64
}

// FromPrometheus represents metric types that can be parsed from prometheus output.
Expand Down Expand Up @@ -118,12 +124,15 @@ func (m *LFCMetrics) fromPrometheus(mfs map[string]*promtypes.MetricFamily) erro
}
}

wssBuckets, err := extractWorkingSetSizeWindows(mfs)
ec.Add(err)

tmp := LFCMetrics{
CacheHitsTotal: getFloat("lfc_hits"),
CacheMissesTotal: getFloat("lfc_misses"),
CacheWritesTotal: getFloat("lfc_writes"),

ApproximateWorkingSetSizeTotal: getFloat("lfc_approximate_working_set_size"),
ApproximateworkingSetSizeBuckets: wssBuckets,
}

if err := ec.Resolve(); err != nil {
Expand All @@ -133,3 +142,72 @@ func (m *LFCMetrics) fromPrometheus(mfs map[string]*promtypes.MetricFamily) erro
*m = tmp
return nil
}

func extractWorkingSetSizeWindows(mfs map[string]*promtypes.MetricFamily) ([]float64, error) {
metricName := "lfc_approximate_working_set_size_windows"
mf := mfs[metricName]
if mf == nil {
return nil, missingMetric(metricName)
}

if mf.GetType() != promtypes.MetricType_GAUGE {
return nil, fmt.Errorf("wrong metric type: expected %s, but got %s", promtypes.MetricType_GAUGE, mf.GetType())
} else if len(mf.Metric) < 1 {
return nil, fmt.Errorf("expected >= metric, found %d", len(mf.Metric))
}

type pair struct {
duration time.Duration
value float64
}

var pairs []pair
for _, m := range mf.Metric {
// Find the duration label
durationLabel := "duration_seconds"
durationIndex := slices.IndexFunc(m.Label, func(l *promtypes.LabelPair) bool {
return l.GetName() == durationLabel
})
if durationIndex == -1 {
return nil, fmt.Errorf("metric missing label %q", durationLabel)
}

durationSeconds, err := strconv.Atoi(m.Label[durationIndex].GetValue())
if err != nil {
return nil, fmt.Errorf("couldn't parse metric's %q label as int: %w", durationLabel, err)
}

pairs = append(pairs, pair{
duration: time.Second * time.Duration(durationSeconds),
value: m.GetGauge().GetValue(),
})
}

slices.SortFunc(pairs, func(x, y pair) int {
return cmp.Compare(x.duration, y.duration)
})

// Check that the values make are as expected: they should all be 1 minute apart, starting
sharnoff marked this conversation as resolved.
Show resolved Hide resolved
// at 1 minute.
// NOTE: this assumption is relied on elsewhere for scaling on ApproximateworkingSetSizeBuckets.
// Please search for usages before changing this behavior.
if pairs[0].duration != time.Minute {
return nil, fmt.Errorf("expected smallest duration to be %v, got %v", time.Minute, pairs[0].duration)
}
for i := range pairs {
expected := time.Minute * time.Duration(i+1)
if pairs[i].duration != expected {
return nil, fmt.Errorf(
"expected duration values to be exactly 1m apart, got unexpected value %v instead of %v",
pairs[i].duration,
expected,
)
}
}

var values []float64
for _, p := range pairs {
values = append(values, p.value)
}
return values, nil
}
89 changes: 78 additions & 11 deletions pkg/agent/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/samber/lo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/util"
Expand Down Expand Up @@ -114,6 +115,8 @@ type state struct {
NeonVM neonvmState

Metrics *SystemMetrics

LFCMetrics *LFCMetrics
}

type pluginState struct {
Expand Down Expand Up @@ -221,7 +224,8 @@ func NewState(vm api.VmInfo, config Config) *State {
OngoingRequested: nil,
RequestFailedAt: nil,
},
Metrics: nil,
Metrics: nil,
LFCMetrics: nil,
},
}
}
Expand Down Expand Up @@ -667,6 +671,14 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) (
// 2. Cap the goal CU by min/max, etc
// 3. that's it!

// Record whether we have all the metrics we'll need.
// If not, we'll later prevent downscaling to avoid flushing the VM's cache on autoscaler-agent
// restart if we have SystemMetrics but not LFCMetrics.
hasAllMetrics := s.Metrics != nil && (!*s.scalingConfig().EnableLFCMetrics || s.LFCMetrics != nil)
if !hasAllMetrics {
s.warn("Making scaling decision without all required metrics available")
}

var goalCU uint32
if s.Metrics != nil {
// For CPU:
Expand All @@ -686,6 +698,48 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) (
memGoalCU := uint32(memGoalBytes / s.Config.ComputeUnit.Mem)

goalCU = util.Max(cpuGoalCU, memGoalCU)

}

// For LFC metrics, if enabled:
var lfcLogFields func(zapcore.ObjectEncoder) error
if s.LFCMetrics != nil {
cfg := s.scalingConfig()
wssValues := s.LFCMetrics.ApproximateworkingSetSizeBuckets
// At this point, we can assume that the values are equally spaced at 1 minute apart,
// starting at 1 minute.
offsetIndex := *cfg.LFCMinWaitBeforeDownscaleMinutes - 1 // -1 because values start at 1m
windowSize := *cfg.LFCWindowSizeMinutes
// Handle invalid metrics:
if len(wssValues) < offsetIndex+windowSize {
s.warn("not enough working set size values to make scaling determination")
} else {
estimateWss := EstimateTrueWorkingSetSize(wssValues, WssEstimatorConfig{
MaxAllowedIncreaseFactor: 3.0, // hard-code this for now.
InitialOffset: offsetIndex,
WindowSize: windowSize,
})
projectSliceEnd := offsetIndex // start at offsetIndex to avoid panics if not monotonically non-decreasing
for ; projectSliceEnd < len(wssValues) && wssValues[projectSliceEnd] <= estimateWss; projectSliceEnd++ {
}
projectLen := 0.5 // hard-code this for now.
predictedHighestNextMinute := ProjectNextHighest(wssValues[:projectSliceEnd], projectLen)

// predictedHighestNextMinute is still in units of 8KiB pages. Let's convert that
// into GiB, then convert that into CU, and then invert the discount from only some
// of the memory going towards LFC to get the actual CU required to fit the
// predicted working set size.
requiredCU := predictedHighestNextMinute * 8192 / s.Config.ComputeUnit.Mem.AsFloat64() / *cfg.LFCSizePerCU
lfcGoalCU := uint32(math.Ceil(requiredCU))
goalCU = util.Max(goalCU, lfcGoalCU)

lfcLogFields = func(obj zapcore.ObjectEncoder) error {
obj.AddFloat64("estimateWssPages", estimateWss)
obj.AddFloat64("predictedNextWssPages", predictedHighestNextMinute)
obj.AddFloat64("requiredCU", requiredCU)
return nil
}
}
}

// Copy the initial value of the goal CU so that we can accurately track whether either
Expand Down Expand Up @@ -723,14 +777,14 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) (
}

// resources for the desired "goal" compute units
var goalResources api.Resources

// If there's no constraints and s.metrics is nil, then we'll end up with goalCU = 0.
// But if we have no metrics, we'd prefer to keep things as-is, rather than scaling down.
if s.Metrics == nil && goalCU == 0 {
goalResources = s.VM.Using()
} else {
goalResources = s.Config.ComputeUnit.Mul(uint16(goalCU))
goalResources := s.Config.ComputeUnit.Mul(uint16(goalCU))

// If we don't have all the metrics we need to make a proper decision, make sure that we aren't
// going to scale down below the current resources.
// Otherwise, we can make an under-informed decision that has undesirable impacts (e.g., scaling
// down because we don't have LFC metrics and flushing the cache because of it).
if !hasAllMetrics {
goalResources = goalResources.Max(s.VM.Using())
}

// bound goalResources by the minimum and maximum resource amounts for the VM
Expand Down Expand Up @@ -792,7 +846,14 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) (
}
}

s.info("Calculated desired resources", zap.Object("current", s.VM.Using()), zap.Object("target", result))
logFields := []zap.Field{
zap.Object("current", s.VM.Using()),
zap.Object("target", result),
}
if lfcLogFields != nil {
logFields = append(logFields, zap.Object("lfc", zapcore.ObjectMarshalerFunc(lfcLogFields)))
}
s.info("Calculated desired resources", logFields...)

return result, calculateWaitTime
}
Expand Down Expand Up @@ -932,14 +993,20 @@ func (s *State) UpdatedVM(vm api.VmInfo) {
// - https://github.com/neondatabase/autoscaling/issues/462
vm.SetUsing(s.internal.VM.Using())
s.internal.VM = vm

// Make sure that if LFC metrics are disabled & later enabled, we don't make decisions based on
// stale data.
if !*s.internal.scalingConfig().EnableLFCMetrics {
s.internal.LFCMetrics = nil
}
}

func (s *State) UpdateSystemMetrics(metrics SystemMetrics) {
s.internal.Metrics = &metrics
}

func (s *State) UpdateLFCMetrics(metrics LFCMetrics) {
// stub implementation, intentionally does nothing yet.
s.internal.LFCMetrics = &metrics
}

// PluginHandle provides write access to the scheduler plugin pieces of an UpdateState
Expand Down
Loading
Loading