Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autoscaler v2.0 #1250

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
ORG := keyval
TAG ?= $(shell odigos version --short)

.PHONY: build-odiglet
build-odiglet:
docker build -t $(ORG)/odigos-odiglet:$(TAG) . -f odiglet/Dockerfile
Expand All @@ -7,6 +9,11 @@ build-odiglet:
build-autoscaler:
docker build -t $(ORG)/odigos-autoscaler:$(TAG) . --build-arg SERVICE_NAME=autoscaler

.PHONY: reload-autoscaler
reload-autoscaler: build-autoscaler
kind load docker-image $(ORG)/odigos-autoscaler:$(TAG)
kubectl rollout restart deployment odigos-autoscaler -n odigos-system

.PHONY: build-instrumentor
build-instrumentor:
docker build -t $(ORG)/odigos-instrumentor:$(TAG) . --build-arg SERVICE_NAME=instrumentor
Expand Down
98 changes: 98 additions & 0 deletions autoscaler/collectormetrics/algorithm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package collectormetrics

import (
"context"
"flag"

"github.com/odigos-io/odigos/autoscaler/controllers/gateway"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"

"sigs.k8s.io/controller-runtime/pkg/log"
)

const (
processMemory = "otelcol_process_memory_rss"
exporterQueueSizeMetricName = "otelcol_exporter_queue_size"
goMemLimitPercentageMax = "algo-gomemlimit-percentage-max"
defaultGoMemLimitPercentageMax = 80.0
goMemLimitPercentageMin = "algo-gomemlimit-percentage-min"
defaultGoMemLimitPercentageMin = 55.0
)

// AutoscalerDecision represents the decision made by the autoscaler algorithm
// Positive values indicate that the autoscaler should scale up, negative values
// indicate that the autoscaler should scale down, and zero indicates that the
// autoscaler should not scale.
type AutoscalerDecision int

type AutoscalerAlgorithm interface {
Decide(ctx context.Context, metrics []MetricFetchResult, config *odigosv1.OdigosConfiguration) AutoscalerDecision
}

type memoryAndExporterRetries struct {
goMemLimitPercentageMax float64
goMemLimitPercentageMin float64
}

var ScaleBasedOnMemoryAndExporterRetries = &memoryAndExporterRetries{}

func (e *memoryAndExporterRetries) RegisterFlags() {
flag.Float64Var(&e.goMemLimitPercentageMax, goMemLimitPercentageMax, defaultGoMemLimitPercentageMax, "Percentage of the memory limit to consider for scaling up")
flag.Float64Var(&e.goMemLimitPercentageMin, goMemLimitPercentageMin, defaultGoMemLimitPercentageMin, "Percentage of the memory limit to consider for scaling down")
}

// Decide scales based on the exporter queue and batch queue sizes.
// If more than 50% of the pods
func (e *memoryAndExporterRetries) Decide(ctx context.Context, metrics []MetricFetchResult, config *odigosv1.OdigosConfiguration) AutoscalerDecision {
memCfg := gateway.GetMemoryConfigurations(config)
maxMemory := float64(memCfg.GomemlimitMiB) * e.goMemLimitPercentageMax / 100.0
minMemory := float64(memCfg.GomemlimitMiB) * e.goMemLimitPercentageMin / 100.0
logger := log.FromContext(ctx)
currentReplicas := len(metrics)

numberOfRetryingExporters := 0
totalMemory := 0.0
for _, podMetrics := range metrics {
if podMetrics.Error != nil {
continue
}

var podMemory float64
for _, metricFamily := range podMetrics.Metrics {
if metricFamily.Name != nil && *metricFamily.Name == processMemory {
podMemory = metricFamily.Metric[0].Gauge.GetValue()
logger.V(5).Info("memory", "value", podMemory, "pod", podMetrics.PodName)
totalMemory += podMemory
} else if metricFamily.Name != nil && *metricFamily.Name == exporterQueueSizeMetricName {
for _, metric := range metricFamily.Metric {
if metric.Gauge != nil {
queueSize := metric.Gauge.GetValue()
if queueSize > 0 {
numberOfRetryingExporters++
}

logger.V(5).Info("exporter queue size", "value", queueSize, "pod", podMetrics.PodName)
}
}
}
}
}

if numberOfRetryingExporters > 0 {
logger.V(0).Info("Exporting are retrying, skipping autoscaling until backend is healthy", "number of exporters", numberOfRetryingExporters)
return AutoscalerDecision(currentReplicas)
}

avgMemory := totalMemory / float64(len(metrics))
avgMemoryMb := avgMemory / 1024 / 1024
logger.V(5).Info("avg memory", "value", avgMemoryMb, "max memory", maxMemory, "min memory", minMemory)

if avgMemoryMb > maxMemory {
currentReplicas++
} else if avgMemoryMb < minMemory && currentReplicas > 1 {
currentReplicas--
}

return AutoscalerDecision(currentReplicas)
}
155 changes: 155 additions & 0 deletions autoscaler/collectormetrics/autoscaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package collectormetrics

import (
"time"

"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/odigos-io/odigos/k8sutils/pkg/env"

"github.com/odigos-io/odigos/autoscaler/controllers/datacollection"
"github.com/odigos-io/odigos/autoscaler/controllers/gateway"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/event"

"sigs.k8s.io/controller-runtime/pkg/predicate"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
)

const (
defaultInterval = 15 * time.Second
defaultMinReplicas = 1
defaultMaxReplicas = 5
notificationChanSize = 10
)

type AutoscalerOptions struct {
interval time.Duration
collectorsGroup odigosv1.CollectorsGroupRole
minReplicas int
maxReplicas int
algorithm AutoscalerAlgorithm
}

type AutoscalerOption func(*AutoscalerOptions)

func WithInterval(interval time.Duration) AutoscalerOption {
return func(o *AutoscalerOptions) {
o.interval = interval
}
}

func WithScaleRange(minReplicas, maxReplicas int) AutoscalerOption {
return func(o *AutoscalerOptions) {
o.minReplicas = minReplicas
o.maxReplicas = maxReplicas
}
}

func WithCollectorsGroup(collectorsGroup odigosv1.CollectorsGroupRole) AutoscalerOption {
return func(o *AutoscalerOptions) {
o.collectorsGroup = collectorsGroup
}
}

func WithAlgorithm(algorithm AutoscalerAlgorithm) AutoscalerOption {
return func(o *AutoscalerOptions) {
o.algorithm = algorithm
}
}

type Autoscaler struct {
kubeClient client.Client
options AutoscalerOptions
ticker *time.Ticker
notifications chan Notification
edeNFed marked this conversation as resolved.
Show resolved Hide resolved
podIPs map[string]string
edeNFed marked this conversation as resolved.
Show resolved Hide resolved
odigosConfig *odigosv1.OdigosConfiguration
}

func NewAutoscaler(kubeClient client.Client, opts ...AutoscalerOption) *Autoscaler {
// Set default options
options := AutoscalerOptions{
interval: defaultInterval,
minReplicas: defaultMinReplicas,
maxReplicas: defaultMaxReplicas,
}

for _, opt := range opts {
opt(&options)
}

return &Autoscaler{
kubeClient: kubeClient,
options: options,
ticker: time.NewTicker(options.interval),
notifications: make(chan Notification, notificationChanSize),
podIPs: make(map[string]string),
}
}

func (a *Autoscaler) Predicate() predicate.Predicate {
ns := env.GetCurrentNamespace()
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
oldPod := e.ObjectOld.(*corev1.Pod)
newPod := e.ObjectNew.(*corev1.Pod)
if oldPod.Namespace != ns || newPod.Namespace != ns {
return false
}

// If pods are not related to this collectors group, ignore them
if val, ok := newPod.Labels[a.collectorsGroupLabelKey()]; !ok || val != "true" {
return false
}
if val, ok := oldPod.Labels[a.collectorsGroupLabelKey()]; !ok || val != "true" {
return false
}

// Filter updates if IP changes or phase becomes running
return oldPod.Status.PodIP != newPod.Status.PodIP ||
(newPod.Status.Phase == corev1.PodRunning && oldPod.Status.Phase != corev1.PodRunning)
},
DeleteFunc: func(e event.DeleteEvent) bool {
if e.Object.GetNamespace() != ns {
return false
}

// If pods are not related to this collectors group, ignore them
if val, ok := e.Object.GetLabels()[a.collectorsGroupLabelKey()]; !ok || val != "true" {
return false
}

return true
},
CreateFunc: func(e event.CreateEvent) bool {
if e.Object.GetNamespace() != ns {
return false
}

// If pods are not related to this collectors group, ignore them
if val, ok := e.Object.GetLabels()[a.collectorsGroupLabelKey()]; !ok || val != "true" {
return false
}

return true
},
GenericFunc: func(e event.GenericEvent) bool {
return false
},
}
}

func (a *Autoscaler) collectorsGroupLabelKey() string {
if a.options.collectorsGroup == odigosv1.CollectorsGroupRoleClusterGateway {
return gateway.CollectorLabel
}

return datacollection.CollectorLabel
}

func (a *Autoscaler) Notify() chan<- Notification {
return a.notifications
}
15 changes: 15 additions & 0 deletions autoscaler/collectormetrics/notify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package collectormetrics

type NotificationReason string

const (
NewIPDiscovered NotificationReason = "NewIPDiscovered"
IPRemoved NotificationReason = "IPRemoved"
OdigosConfigUpdated NotificationReason = "OdigosConfigUpdated"
)

type Notification struct {
Reason NotificationReason
PodName string
IP string
}
Loading
Loading