diff --git a/Makefile b/Makefile index 43f6859b6..29556c30e 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,6 @@ ORG := keyval +TAG ?= $(shell odigos version --short) + .PHONY: build-odiglet build-odiglet: docker build -t $(ORG)/odigos-odiglet:$(TAG) . -f odiglet/Dockerfile @@ -7,6 +9,11 @@ build-odiglet: build-autoscaler: docker build -t $(ORG)/odigos-autoscaler:$(TAG) . --build-arg SERVICE_NAME=autoscaler +.PHONY: reload-autoscaler +reload-autoscaler: build-autoscaler + kind load docker-image $(ORG)/odigos-autoscaler:$(TAG) + kubectl rollout restart deployment odigos-autoscaler -n odigos-system + .PHONY: build-instrumentor build-instrumentor: docker build -t $(ORG)/odigos-instrumentor:$(TAG) . --build-arg SERVICE_NAME=instrumentor diff --git a/autoscaler/collectormetrics/algorithm.go b/autoscaler/collectormetrics/algorithm.go new file mode 100644 index 000000000..0d08b476f --- /dev/null +++ b/autoscaler/collectormetrics/algorithm.go @@ -0,0 +1,98 @@ +package collectormetrics + +import ( + "context" + "flag" + + "github.com/odigos-io/odigos/autoscaler/controllers/gateway" + + odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" + + "sigs.k8s.io/controller-runtime/pkg/log" +) + +const ( + processMemory = "otelcol_process_memory_rss" + exporterQueueSizeMetricName = "otelcol_exporter_queue_size" + goMemLimitPercentageMax = "algo-gomemlimit-percentage-max" + defaultGoMemLimitPercentageMax = 80.0 + goMemLimitPercentageMin = "algo-gomemlimit-percentage-min" + defaultGoMemLimitPercentageMin = 55.0 +) + +// AutoscalerDecision represents the decision made by the autoscaler algorithm +// Positive values indicate that the autoscaler should scale up, negative values +// indicate that the autoscaler should scale down, and zero indicates that the +// autoscaler should not scale. +type AutoscalerDecision int + +type AutoscalerAlgorithm interface { + Decide(ctx context.Context, metrics []MetricFetchResult, config *odigosv1.OdigosConfiguration) AutoscalerDecision +} + +type memoryAndExporterRetries struct { + goMemLimitPercentageMax float64 + goMemLimitPercentageMin float64 +} + +var ScaleBasedOnMemoryAndExporterRetries = &memoryAndExporterRetries{} + +func (e *memoryAndExporterRetries) RegisterFlags() { + flag.Float64Var(&e.goMemLimitPercentageMax, goMemLimitPercentageMax, defaultGoMemLimitPercentageMax, "Percentage of the memory limit to consider for scaling up") + flag.Float64Var(&e.goMemLimitPercentageMin, goMemLimitPercentageMin, defaultGoMemLimitPercentageMin, "Percentage of the memory limit to consider for scaling down") +} + +// Decide scales based on the exporter queue and batch queue sizes. +// If more than 50% of the pods +func (e *memoryAndExporterRetries) Decide(ctx context.Context, metrics []MetricFetchResult, config *odigosv1.OdigosConfiguration) AutoscalerDecision { + memCfg := gateway.GetMemoryConfigurations(config) + maxMemory := float64(memCfg.GomemlimitMiB) * e.goMemLimitPercentageMax / 100.0 + minMemory := float64(memCfg.GomemlimitMiB) * e.goMemLimitPercentageMin / 100.0 + logger := log.FromContext(ctx) + currentReplicas := len(metrics) + + numberOfRetryingExporters := 0 + totalMemory := 0.0 + for _, podMetrics := range metrics { + if podMetrics.Error != nil { + continue + } + + var podMemory float64 + for _, metricFamily := range podMetrics.Metrics { + if metricFamily.Name != nil && *metricFamily.Name == processMemory { + podMemory = metricFamily.Metric[0].Gauge.GetValue() + logger.V(5).Info("memory", "value", podMemory, "pod", podMetrics.PodName) + totalMemory += podMemory + } else if metricFamily.Name != nil && *metricFamily.Name == exporterQueueSizeMetricName { + for _, metric := range metricFamily.Metric { + if metric.Gauge != nil { + queueSize := metric.Gauge.GetValue() + if queueSize > 0 { + numberOfRetryingExporters++ + } + + logger.V(5).Info("exporter queue size", "value", queueSize, "pod", podMetrics.PodName) + } + } + } + } + } + + if numberOfRetryingExporters > 0 { + logger.V(0).Info("Exporting are retrying, skipping autoscaling until backend is healthy", "number of exporters", numberOfRetryingExporters) + return AutoscalerDecision(currentReplicas) + } + + avgMemory := totalMemory / float64(len(metrics)) + avgMemoryMb := avgMemory / 1024 / 1024 + logger.V(5).Info("avg memory", "value", avgMemoryMb, "max memory", maxMemory, "min memory", minMemory) + + if avgMemoryMb > maxMemory { + currentReplicas++ + } else if avgMemoryMb < minMemory && currentReplicas > 1 { + currentReplicas-- + } + + return AutoscalerDecision(currentReplicas) +} diff --git a/autoscaler/collectormetrics/autoscaler.go b/autoscaler/collectormetrics/autoscaler.go new file mode 100644 index 000000000..60dce4c2b --- /dev/null +++ b/autoscaler/collectormetrics/autoscaler.go @@ -0,0 +1,155 @@ +package collectormetrics + +import ( + "time" + + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/odigos-io/odigos/k8sutils/pkg/env" + + "github.com/odigos-io/odigos/autoscaler/controllers/datacollection" + "github.com/odigos-io/odigos/autoscaler/controllers/gateway" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/event" + + "sigs.k8s.io/controller-runtime/pkg/predicate" + + odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" +) + +const ( + defaultInterval = 15 * time.Second + defaultMinReplicas = 1 + defaultMaxReplicas = 5 + notificationChanSize = 10 +) + +type AutoscalerOptions struct { + interval time.Duration + collectorsGroup odigosv1.CollectorsGroupRole + minReplicas int + maxReplicas int + algorithm AutoscalerAlgorithm +} + +type AutoscalerOption func(*AutoscalerOptions) + +func WithInterval(interval time.Duration) AutoscalerOption { + return func(o *AutoscalerOptions) { + o.interval = interval + } +} + +func WithScaleRange(minReplicas, maxReplicas int) AutoscalerOption { + return func(o *AutoscalerOptions) { + o.minReplicas = minReplicas + o.maxReplicas = maxReplicas + } +} + +func WithCollectorsGroup(collectorsGroup odigosv1.CollectorsGroupRole) AutoscalerOption { + return func(o *AutoscalerOptions) { + o.collectorsGroup = collectorsGroup + } +} + +func WithAlgorithm(algorithm AutoscalerAlgorithm) AutoscalerOption { + return func(o *AutoscalerOptions) { + o.algorithm = algorithm + } +} + +type Autoscaler struct { + kubeClient client.Client + options AutoscalerOptions + ticker *time.Ticker + notifications chan Notification + podIPs map[string]string + odigosConfig *odigosv1.OdigosConfiguration +} + +func NewAutoscaler(kubeClient client.Client, opts ...AutoscalerOption) *Autoscaler { + // Set default options + options := AutoscalerOptions{ + interval: defaultInterval, + minReplicas: defaultMinReplicas, + maxReplicas: defaultMaxReplicas, + } + + for _, opt := range opts { + opt(&options) + } + + return &Autoscaler{ + kubeClient: kubeClient, + options: options, + ticker: time.NewTicker(options.interval), + notifications: make(chan Notification, notificationChanSize), + podIPs: make(map[string]string), + } +} + +func (a *Autoscaler) Predicate() predicate.Predicate { + ns := env.GetCurrentNamespace() + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + oldPod := e.ObjectOld.(*corev1.Pod) + newPod := e.ObjectNew.(*corev1.Pod) + if oldPod.Namespace != ns || newPod.Namespace != ns { + return false + } + + // If pods are not related to this collectors group, ignore them + if val, ok := newPod.Labels[a.collectorsGroupLabelKey()]; !ok || val != "true" { + return false + } + if val, ok := oldPod.Labels[a.collectorsGroupLabelKey()]; !ok || val != "true" { + return false + } + + // Filter updates if IP changes or phase becomes running + return oldPod.Status.PodIP != newPod.Status.PodIP || + (newPod.Status.Phase == corev1.PodRunning && oldPod.Status.Phase != corev1.PodRunning) + }, + DeleteFunc: func(e event.DeleteEvent) bool { + if e.Object.GetNamespace() != ns { + return false + } + + // If pods are not related to this collectors group, ignore them + if val, ok := e.Object.GetLabels()[a.collectorsGroupLabelKey()]; !ok || val != "true" { + return false + } + + return true + }, + CreateFunc: func(e event.CreateEvent) bool { + if e.Object.GetNamespace() != ns { + return false + } + + // If pods are not related to this collectors group, ignore them + if val, ok := e.Object.GetLabels()[a.collectorsGroupLabelKey()]; !ok || val != "true" { + return false + } + + return true + }, + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + } +} + +func (a *Autoscaler) collectorsGroupLabelKey() string { + if a.options.collectorsGroup == odigosv1.CollectorsGroupRoleClusterGateway { + return gateway.CollectorLabel + } + + return datacollection.CollectorLabel +} + +func (a *Autoscaler) Notify() chan<- Notification { + return a.notifications +} diff --git a/autoscaler/collectormetrics/notify.go b/autoscaler/collectormetrics/notify.go new file mode 100644 index 000000000..046c6d84a --- /dev/null +++ b/autoscaler/collectormetrics/notify.go @@ -0,0 +1,15 @@ +package collectormetrics + +type NotificationReason string + +const ( + NewIPDiscovered NotificationReason = "NewIPDiscovered" + IPRemoved NotificationReason = "IPRemoved" + OdigosConfigUpdated NotificationReason = "OdigosConfigUpdated" +) + +type Notification struct { + Reason NotificationReason + PodName string + IP string +} diff --git a/autoscaler/collectormetrics/run.go b/autoscaler/collectormetrics/run.go new file mode 100644 index 000000000..37a58a92f --- /dev/null +++ b/autoscaler/collectormetrics/run.go @@ -0,0 +1,221 @@ +package collectormetrics + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/odigos-io/odigos/common/consts" + "k8s.io/apimachinery/pkg/types" + + autoscalingv1 "k8s.io/api/autoscaling/v1" + + "github.com/odigos-io/odigos/autoscaler/controllers/gateway" + "github.com/odigos-io/odigos/k8sutils/pkg/env" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" + appsv1 "k8s.io/api/apps/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + prometheus "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +const ( + metricsUrlPattern = "http://%s:8888/metrics" + maxTimeout = 1 * time.Second + timeoutIntervalFraction = 10 +) + +var ( + errObjectNotFoundForCollectorsGroup = fmt.Errorf("object not found for collectors group") + errDecisionOutOfRange = fmt.Errorf("decision out of range") +) + +type MetricFetchResult struct { + PodName string + Error error + Metrics map[string]*prometheus.MetricFamily +} + +func (a *Autoscaler) Run(ctx context.Context) { + logger := log.FromContext(ctx) + logger = logger.WithName("autoscaler") + + for { + select { + case notification := <-a.notifications: + if notification.Reason == OdigosConfigUpdated { + if err := a.updateOdigosConfig(ctx); err != nil { + logger.Error(err, "Failed to update odigos config") + } + } else { + logger.V(5).Info("Got ip change notification", "notification", notification) + a.updateIPsMap(notification) + } + case <-ctx.Done(): + logger.V(0).Info("Shutting down autoscaler", "collectorsGroup", a.options.collectorsGroup) + a.ticker.Stop() + close(a.notifications) + return + case <-a.ticker.C: + if len(a.podIPs) == 0 { + logger.V(0).Info("No collectors found, skipping autoscaling") + continue + } + + if a.odigosConfig == nil { + // This should not happen, we should get first config update before first tick but just in case + err := a.updateOdigosConfig(ctx) + if err != nil { + logger.Error(err, "Failed to get odigos config") + continue + } + } + + results := a.getCollectorsMetrics(ctx) + if len(results) == 0 { + continue + } + decision := a.options.algorithm.Decide(ctx, results, a.odigosConfig) + a.executeDecision(ctx, decision, len(results)) + } + } +} + +func (a *Autoscaler) adjustDecisionToBounds(decision AutoscalerDecision) AutoscalerDecision { + current := int(decision) + if current < a.options.minReplicas { + return AutoscalerDecision(a.options.minReplicas) + } else if current > a.options.maxReplicas { + return AutoscalerDecision(a.options.maxReplicas) + } + + return decision +} + +func (a *Autoscaler) executeDecision(ctx context.Context, decision AutoscalerDecision, currentReplicas int) bool { + logger := log.FromContext(ctx) + newDecision := a.adjustDecisionToBounds(decision) + if newDecision != decision { + logger.V(0).Info("Decision out of bounds, adjusting", "old", decision, "new", newDecision) + } + + if newDecision == AutoscalerDecision(currentReplicas) { + logger.V(5).Info("No need to scale", "current", currentReplicas) + return false + } + + obj := a.getTargetKubernetesObject() + if obj == nil { + logger.Error(errObjectNotFoundForCollectorsGroup, "No target object found", "group", a.options.collectorsGroup) + return false + } + + scale := &autoscalingv1.Scale{ + Spec: autoscalingv1.ScaleSpec{ + Replicas: int32(decision), + }, + } + + err := a.kubeClient.SubResource("scale").Update(ctx, obj, client.WithSubResourceBody(scale)) + if err != nil { + logger.Error(err, "Failed to scale object", "object", obj) + return false + } + + return true +} + +func (a *Autoscaler) getTargetKubernetesObject() client.Object { + if a.options.collectorsGroup == odigosv1.CollectorsGroupRoleClusterGateway { + return &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: gateway.KubeObjectName, + Namespace: env.GetCurrentNamespace(), + }, + } + } + + return nil +} + +func (a *Autoscaler) updateIPsMap(notification Notification) { + if notification.Reason == NewIPDiscovered { + a.podIPs[notification.PodName] = notification.IP + } else if notification.Reason == IPRemoved { + delete(a.podIPs, notification.PodName) + } +} + +func (a *Autoscaler) getCollectorsMetrics(ctx context.Context) []MetricFetchResult { + logger := log.FromContext(ctx) + results := make(chan MetricFetchResult, len(a.podIPs)) + timeout := min(maxTimeout, a.options.interval/timeoutIntervalFraction) + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + for podName, podIP := range a.podIPs { + go func(podName, podIP string, results chan MetricFetchResult) { + result := MetricFetchResult{ + PodName: podName, + } + + // Get metrics from the collector pod + urlStr := fmt.Sprintf(metricsUrlPattern, podIP) + req, err := http.NewRequestWithContext(timeoutCtx, http.MethodGet, urlStr, nil) + if err != nil { + result.Error = err + results <- result + return + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + result.Error = err + results <- result + return + } + + defer resp.Body.Close() + var parser expfmt.TextParser + metricFamilies, err := parser.TextToMetricFamilies(resp.Body) + if err != nil { + result.Error = err + results <- result + return + } + + result.Metrics = metricFamilies + results <- result + }(podName, podIP, results) + } + + // Fetch all results from channel + var successfulResults []MetricFetchResult + for i := 0; i < len(a.podIPs); i++ { + result := <-results + if result.Error != nil { + logger.Error(result.Error, "Failed to get metrics from pod", "pod", result.PodName) + } else { + successfulResults = append(successfulResults, result) + } + } + + return successfulResults +} + +func (a *Autoscaler) updateOdigosConfig(ctx context.Context) error { + odigosSystemNamespaceName := env.GetCurrentNamespace() + var odigosConfig odigosv1.OdigosConfiguration + if err := a.kubeClient.Get(ctx, types.NamespacedName{Namespace: odigosSystemNamespaceName, Name: consts.DefaultOdigosConfigurationName}, &odigosConfig); err != nil { + return err + } + + a.odigosConfig = &odigosConfig + return nil +} diff --git a/autoscaler/controllers/collectorsgroup_controller.go b/autoscaler/controllers/collectorsgroup_controller.go index b5d99ed80..2ef55c43e 100644 --- a/autoscaler/controllers/collectorsgroup_controller.go +++ b/autoscaler/controllers/collectorsgroup_controller.go @@ -19,6 +19,10 @@ package controllers import ( "context" + "sigs.k8s.io/controller-runtime/pkg/event" + + "sigs.k8s.io/controller-runtime/pkg/predicate" + odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" "github.com/odigos-io/odigos/autoscaler/controllers/datacollection" "github.com/odigos-io/odigos/autoscaler/controllers/gateway" @@ -82,5 +86,23 @@ func (r *CollectorsGroupReconciler) SetupWithManager(mgr ctrl.Manager) error { Owns(&v1.Service{}). Owns(&appsv1.Deployment{}). Owns(&appsv1.DaemonSet{}). + WithEventFilter(predicate.Or[client.Object](predicate.GenerationChangedPredicate{}, collectorGroupChangedStatus())). Complete(r) } + +func collectorGroupChangedStatus() predicate.TypedPredicate[client.Object] { + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + oldCollectorGroup, ok := e.ObjectOld.(*odigosv1.CollectorsGroup) + if !ok { + return false + } + newCollectorGroup, ok := e.ObjectNew.(*odigosv1.CollectorsGroup) + if !ok { + return false + } + + return oldCollectorGroup.Status.Ready != newCollectorGroup.Status.Ready + }, + } +} diff --git a/autoscaler/controllers/datacollection/configmap.go b/autoscaler/controllers/datacollection/configmap.go index 89f7b9343..455361ea7 100644 --- a/autoscaler/controllers/datacollection/configmap.go +++ b/autoscaler/controllers/datacollection/configmap.go @@ -56,7 +56,7 @@ func syncConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.D } } - logger.V(0).Info("Patching config map") + logger.V(5).Info("Patching config map") _, err = patchConfigMap(ctx, existing, desired, c) if err != nil { logger.Error(err, "failed to patch config map") @@ -69,7 +69,7 @@ func syncConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.D func patchConfigMap(ctx context.Context, existing *v1.ConfigMap, desired *v1.ConfigMap, c client.Client) (*v1.ConfigMap, error) { if reflect.DeepEqual(existing.Data, desired.Data) && reflect.DeepEqual(existing.ObjectMeta.OwnerReferences, desired.ObjectMeta.OwnerReferences) { - log.FromContext(ctx).V(0).Info("Config maps already match") + log.FromContext(ctx).V(5).Info("Config maps already match") return existing, nil } updated := existing.DeepCopy() diff --git a/autoscaler/controllers/datacollection/daemonset.go b/autoscaler/controllers/datacollection/daemonset.go index e9699e45c..a1acfab17 100644 --- a/autoscaler/controllers/datacollection/daemonset.go +++ b/autoscaler/controllers/datacollection/daemonset.go @@ -22,7 +22,7 @@ import ( ) const ( - collectorLabel = "odigos.io/data-collection" + CollectorLabel = "odigos.io/data-collection" containerName = "data-collection" containerImage = "keyval/odigos-collector" containerCommand = "/odigosotelcol" @@ -34,7 +34,7 @@ const ( var ( commonLabels = map[string]string{ - collectorLabel: "true", + CollectorLabel: "true", } ) @@ -83,7 +83,7 @@ func syncDaemonSet(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.D } } - logger.V(0).Info("Patching DaemonSet") + logger.V(5).Info("Patching DaemonSet") updated, err := patchDaemonSet(existing, desiredDs, ctx, c) if err != nil { logger.Error(err, "Failed to patch DaemonSet") diff --git a/autoscaler/controllers/gateway/configmap.go b/autoscaler/controllers/gateway/configmap.go index ffcc62f64..86b19cc71 100644 --- a/autoscaler/controllers/gateway/configmap.go +++ b/autoscaler/controllers/gateway/configmap.go @@ -22,13 +22,13 @@ const ( destinationConfiguredType = "DestinationConfigured" ) -func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.ProcessorList, gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme, memConfig *memoryConfigurations) (string, error) { +func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.ProcessorList, gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme, memConfig *MemoryConfigurations) (string, error) { logger := log.FromContext(ctx) memoryLimiterConfiguration := config.GenericMap{ "check_interval": "1s", - "limit_mib": memConfig.memoryLimiterLimitMiB, - "spike_limit_mib": memConfig.memoryLimiterSpikeLimitMiB, + "limit_mib": memConfig.MemoryLimiterLimitMiB, + "spike_limit_mib": memConfig.MemoryLimiterSpikeLimitMiB, } processors := common.FilterAndSortProcessorsByOrderHint(allProcessors, odigosv1.CollectorsGroupRoleClusterGateway) @@ -87,9 +87,9 @@ func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.Proc } existing := &v1.ConfigMap{} - if err := c.Get(ctx, client.ObjectKey{Namespace: gateway.Namespace, Name: kubeObjectName}, existing); err != nil { + if err := c.Get(ctx, client.ObjectKey{Namespace: gateway.Namespace, Name: KubeObjectName}, existing); err != nil { if apierrors.IsNotFound(err) { - logger.V(0).Info("Creating gateway config map") + logger.V(5).Info("Creating gateway config map") _, err := createConfigMap(desired, ctx, c) if err != nil { logger.Error(err, "Failed to create gateway config map") @@ -102,7 +102,7 @@ func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.Proc } } - logger.V(0).Info("Patching gateway config map") + logger.V(5).Info("Patching gateway config map") _, err = patchConfigMap(existing, desired, ctx, c) if err != nil { logger.Error(err, "Failed to patch gateway config map") @@ -123,7 +123,7 @@ func createConfigMap(desired *v1.ConfigMap, ctx context.Context, c client.Client func patchConfigMap(existing *v1.ConfigMap, desired *v1.ConfigMap, ctx context.Context, c client.Client) (*v1.ConfigMap, error) { if reflect.DeepEqual(existing.Data, desired.Data) && reflect.DeepEqual(existing.ObjectMeta.OwnerReferences, desired.ObjectMeta.OwnerReferences) { - log.FromContext(ctx).V(0).Info("Gateway config maps already match") + log.FromContext(ctx).V(5).Info("Gateway config maps already match") return existing, nil } updated := existing.DeepCopy() diff --git a/autoscaler/controllers/gateway/deployment.go b/autoscaler/controllers/gateway/deployment.go index 04c0b16d4..a13cc55eb 100644 --- a/autoscaler/controllers/gateway/deployment.go +++ b/autoscaler/controllers/gateway/deployment.go @@ -31,7 +31,7 @@ const ( ) func syncDeployment(dests *odigosv1.DestinationList, gateway *odigosv1.CollectorsGroup, configData string, - ctx context.Context, c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, memConfig *memoryConfigurations) (*appsv1.Deployment, error) { + ctx context.Context, c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, memConfig *MemoryConfigurations) (*appsv1.Deployment, error) { logger := log.FromContext(ctx) desiredDeployment, err := getDesiredDeployment(dests, configData, gateway, scheme, imagePullSecrets, odigosVersion, memConfig) if err != nil { @@ -89,9 +89,9 @@ func patchDeployment(existing *appsv1.Deployment, desired *appsv1.Deployment, ct } func getDesiredDeployment(dests *odigosv1.DestinationList, configData string, - gateway *odigosv1.CollectorsGroup, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, memConfig *memoryConfigurations) (*appsv1.Deployment, error) { + gateway *odigosv1.CollectorsGroup, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, memConfig *MemoryConfigurations) (*appsv1.Deployment, error) { - requestMemoryQuantity := resource.MustParse(fmt.Sprintf("%dMi", memConfig.memoryRequestMiB)) + requestMemoryQuantity := resource.MustParse(fmt.Sprintf("%dMi", memConfig.MemoryRequestMiB)) desiredDeployment := &appsv1.Deployment{ ObjectMeta: v1.ObjectMeta{ @@ -151,7 +151,7 @@ func getDesiredDeployment(dests *odigosv1.DestinationList, configData string, }, { Name: "GOMEMLIMIT", - Value: fmt.Sprintf("%dMiB", memConfig.gomemlimitMiB), + Value: fmt.Sprintf("%dMiB", memConfig.GomemlimitMiB), }, }, SecurityContext: &corev1.SecurityContext{ diff --git a/autoscaler/controllers/gateway/hpa.go b/autoscaler/controllers/gateway/hpa.go deleted file mode 100644 index 09854aeac..000000000 --- a/autoscaler/controllers/gateway/hpa.go +++ /dev/null @@ -1,80 +0,0 @@ -package gateway - -import ( - "context" - "fmt" - - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/yaml" - - "sigs.k8s.io/controller-runtime/pkg/log" - - "k8s.io/apimachinery/pkg/api/resource" - - odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" - autoscaling "k8s.io/api/autoscaling/v2beta2" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" -) - -const ( - memoryLimitPercentageForHPA = 75 -) - -var ( - minReplicas = intPtr(1) - maxReplicas = int32(10) -) - -func syncHPA(gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme, memConfig *memoryConfigurations) error { - logger := log.FromContext(ctx) - memLimit := memConfig.gomemlimitMiB * memoryLimitPercentageForHPA / 100.0 - metricQuantity := resource.MustParse(fmt.Sprintf("%dMi", memLimit)) - hpa := &autoscaling.HorizontalPodAutoscaler{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "autoscaling/v2beta2", - Kind: "HorizontalPodAutoscaler", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: gateway.Name, - Namespace: gateway.Namespace, - }, - Spec: autoscaling.HorizontalPodAutoscalerSpec{ - ScaleTargetRef: autoscaling.CrossVersionObjectReference{ - APIVersion: "apps/v1", - Kind: "Deployment", - Name: gateway.Name, - }, - MinReplicas: minReplicas, - MaxReplicas: maxReplicas, - Metrics: []autoscaling.MetricSpec{ - { - Type: autoscaling.ResourceMetricSourceType, - Resource: &autoscaling.ResourceMetricSource{ - Name: "memory", - Target: autoscaling.MetricTarget{ - Type: autoscaling.AverageValueMetricType, - AverageValue: &metricQuantity, - }, - }, - }, - }, - }, - } - if err := controllerutil.SetControllerReference(gateway, hpa, scheme); err != nil { - logger.Error(err, "Failed to set controller reference") - return err - } - - hpaBytes, _ := yaml.Marshal(hpa) - - force := true - patchOptions := client.PatchOptions{ - FieldManager: "odigos", - Force: &force, - } - - return c.Patch(ctx, hpa, client.RawPatch(types.ApplyPatchType, hpaBytes), &patchOptions) -} diff --git a/autoscaler/controllers/gateway/memory.go b/autoscaler/controllers/gateway/memory.go index fd76012f5..e012eb377 100644 --- a/autoscaler/controllers/gateway/memory.go +++ b/autoscaler/controllers/gateway/memory.go @@ -18,14 +18,14 @@ const ( defaultGoMemLimitPercentage = 80.0 ) -type memoryConfigurations struct { - memoryRequestMiB int - memoryLimiterLimitMiB int - memoryLimiterSpikeLimitMiB int - gomemlimitMiB int +type MemoryConfigurations struct { + MemoryRequestMiB int + MemoryLimiterLimitMiB int + MemoryLimiterSpikeLimitMiB int + GomemlimitMiB int } -func getMemoryConfigurations(odigosConfig *odigosv1.OdigosConfiguration) *memoryConfigurations { +func GetMemoryConfigurations(odigosConfig *odigosv1.OdigosConfiguration) *MemoryConfigurations { memoryRequestMiB := defaultRequestMemoryMiB if odigosConfig.Spec.CollectorGateway != nil && odigosConfig.Spec.CollectorGateway.RequestMemoryMiB > 0 { @@ -48,10 +48,10 @@ func getMemoryConfigurations(odigosConfig *odigosv1.OdigosConfiguration) *memory gomemlimitMiB = odigosConfig.Spec.CollectorGateway.GoMemLimitMib } - return &memoryConfigurations{ - memoryRequestMiB: memoryRequestMiB, - memoryLimiterLimitMiB: memoryLimiterLimitMiB, - memoryLimiterSpikeLimitMiB: memoryLimiterSpikeLimitMiB, - gomemlimitMiB: gomemlimitMiB, + return &MemoryConfigurations{ + MemoryRequestMiB: memoryRequestMiB, + MemoryLimiterLimitMiB: memoryLimiterLimitMiB, + MemoryLimiterSpikeLimitMiB: memoryLimiterSpikeLimitMiB, + GomemlimitMiB: gomemlimitMiB, } } diff --git a/autoscaler/controllers/gateway/root.go b/autoscaler/controllers/gateway/root.go index 37b6c4d4e..26ea66a93 100644 --- a/autoscaler/controllers/gateway/root.go +++ b/autoscaler/controllers/gateway/root.go @@ -3,8 +3,6 @@ package gateway import ( "context" - appsv1 "k8s.io/api/apps/v1" - odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" "github.com/odigos-io/odigos/common/consts" "github.com/odigos-io/odigos/k8sutils/pkg/env" @@ -15,13 +13,13 @@ import ( ) const ( - kubeObjectName = "odigos-gateway" - collectorLabel = "odigos.io/collector" + KubeObjectName = "odigos-gateway" + CollectorLabel = "odigos.io/gateway" ) var ( commonLabels = map[string]string{ - collectorLabel: "true", + CollectorLabel: "true", } ) @@ -74,7 +72,7 @@ func syncGateway(dests *odigosv1.DestinationList, processors *odigosv1.Processor logger := log.FromContext(ctx) logger.V(0).Info("Syncing gateway") - memConfig := getMemoryConfigurations(odigosConfig) + memConfig := GetMemoryConfigurations(odigosConfig) configData, err := syncConfigMap(dests, processors, gateway, ctx, c, scheme, memConfig) if err != nil { @@ -100,14 +98,6 @@ func syncGateway(dests *odigosv1.DestinationList, processors *odigosv1.Processor return err } - if isMetricsServerInstalled(ctx, c) { - err = syncHPA(gateway, ctx, c, scheme, memConfig) - if err != nil { - logger.Error(err, "Failed to sync HPA") - return err - } - } - isReady := dep.Status.ReadyReplicas > 0 if !gateway.Status.Ready && isReady { err := c.Status().Patch(ctx, gateway, client.RawPatch( @@ -121,22 +111,3 @@ func syncGateway(dests *odigosv1.DestinationList, processors *odigosv1.Processor return nil } - -func isMetricsServerInstalled(ctx context.Context, c client.Client) bool { - // Check if Kubernetes metrics server is installed by checking if the metrics-server deployment exists - logger := log.FromContext(ctx) - var metricsServerDeployment appsv1.Deployment - err := c.Get(ctx, types.NamespacedName{Name: "metrics-server", Namespace: "kube-system"}, &metricsServerDeployment) - if err != nil { - if client.IgnoreNotFound(err) != nil { - logger.Error(err, "Failed to get metrics-server deployment") - return false - } - - logger.V(0).Info("Metrics server not found, skipping HPA creation") - return false - } - - logger.V(0).Info("Metrics server found, creating HPA for Gateway") - return true -} diff --git a/autoscaler/controllers/gateway/service.go b/autoscaler/controllers/gateway/service.go index d15f63046..14c119705 100644 --- a/autoscaler/controllers/gateway/service.go +++ b/autoscaler/controllers/gateway/service.go @@ -21,7 +21,7 @@ func deletePreviousServices(ctx context.Context, c client.Client, ns string) err // so that it can be recreated with the new ClusterIP value logger := log.FromContext(ctx) svc := &v1.Service{} - err := c.Get(ctx, client.ObjectKey{Name: kubeObjectName, Namespace: ns}, svc) + err := c.Get(ctx, client.ObjectKey{Name: KubeObjectName, Namespace: ns}, svc) if err != nil || svc == nil { return client.IgnoreNotFound(err) } @@ -62,7 +62,7 @@ func syncService(gateway *odigosv1.CollectorsGroup, ctx context.Context, c clien return nil, err } - logger.V(0).Info("gateway service synced", "result", result) + logger.V(5).Info("gateway service synced", "result", result) return gatewaySvc, nil } diff --git a/autoscaler/controllers/instrumentedapplication_controller.go b/autoscaler/controllers/instrumentedapplication_controller.go index 4bc03cbf0..8b2af05ef 100644 --- a/autoscaler/controllers/instrumentedapplication_controller.go +++ b/autoscaler/controllers/instrumentedapplication_controller.go @@ -19,6 +19,8 @@ package controllers import ( "context" + "sigs.k8s.io/controller-runtime/pkg/predicate" + odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" "github.com/odigos-io/odigos/autoscaler/controllers/datacollection" "k8s.io/apimachinery/pkg/runtime" @@ -62,5 +64,6 @@ func (r *InstrumentedApplicationReconciler) Reconcile(ctx context.Context, req c func (r *InstrumentedApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&odigosv1.InstrumentedApplication{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) } diff --git a/autoscaler/controllers/odigosconfig_controller.go b/autoscaler/controllers/odigosconfig_controller.go index 327f6edfb..40f63c46d 100644 --- a/autoscaler/controllers/odigosconfig_controller.go +++ b/autoscaler/controllers/odigosconfig_controller.go @@ -3,6 +3,8 @@ package controllers import ( "context" + "github.com/odigos-io/odigos/autoscaler/collectormetrics" + v1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" "github.com/odigos-io/odigos/autoscaler/controllers/gateway" "k8s.io/apimachinery/pkg/runtime" @@ -16,6 +18,7 @@ type OdigosConfigReconciler struct { Scheme *runtime.Scheme ImagePullSecrets []string OdigosVersion string + Autoscaler *collectormetrics.Autoscaler } func (r *OdigosConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -28,6 +31,10 @@ func (r *OdigosConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, err } + r.Autoscaler.Notify() <- collectormetrics.Notification{ + Reason: collectormetrics.OdigosConfigUpdated, + } + return ctrl.Result{}, nil } diff --git a/autoscaler/controllers/pods_controller.go b/autoscaler/controllers/pods_controller.go new file mode 100644 index 000000000..830604676 --- /dev/null +++ b/autoscaler/controllers/pods_controller.go @@ -0,0 +1,58 @@ +package controllers + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/odigos-io/odigos/autoscaler/collectormetrics" + corev1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +type PodsReconciler struct { + client.Client + Autoscaler *collectormetrics.Autoscaler +} + +func (p *PodsReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + logger := log.FromContext(ctx) + var pod corev1.Pod + if err := p.Get(ctx, request.NamespacedName, &pod); err != nil { + if client.IgnoreNotFound(err) != nil { + logger.Error(err, "Failed to get pod") + return reconcile.Result{}, err + } + + p.Autoscaler.Notify() <- collectormetrics.Notification{ + Reason: collectormetrics.IPRemoved, + PodName: request.Name, + } + } + + // If IP exists and pod is running + if pod.Status.PodIP != "" && pod.Status.Phase == corev1.PodRunning { + p.Autoscaler.Notify() <- collectormetrics.Notification{ + Reason: collectormetrics.NewIPDiscovered, + PodName: request.Name, + IP: pod.Status.PodIP, + } + } else { + p.Autoscaler.Notify() <- collectormetrics.Notification{ + Reason: collectormetrics.IPRemoved, + PodName: request.Name, + } + } + + return reconcile.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (p *PodsReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Pod{}). + WithEventFilter(p.Autoscaler.Predicate()). + Complete(p) +} diff --git a/autoscaler/go.mod b/autoscaler/go.mod index 694a1f9ac..9806a90f8 100644 --- a/autoscaler/go.mod +++ b/autoscaler/go.mod @@ -9,12 +9,13 @@ require ( github.com/odigos-io/odigos/common v0.0.0 github.com/odigos-io/odigos/k8sutils v0.0.0 github.com/odigos-io/opentelemetry-zap-bridge v0.0.5 + github.com/prometheus/client_model v0.6.0 + github.com/prometheus/common v0.53.0 github.com/stretchr/testify v1.9.0 k8s.io/api v0.30.1 k8s.io/apimachinery v0.30.1 k8s.io/client-go v0.30.1 sigs.k8s.io/controller-runtime v0.18.3 - sigs.k8s.io/yaml v1.4.0 ) require ( @@ -47,15 +48,12 @@ require ( github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.18.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.45.0 // indirect + github.com/prometheus/client_golang v1.19.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/spf13/pflag v1.0.5 // indirect go.opentelemetry.io/otel v1.24.0 // indirect @@ -67,7 +65,7 @@ require ( go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect golang.org/x/net v0.23.0 // indirect - golang.org/x/oauth2 v0.12.0 // indirect + golang.org/x/oauth2 v0.18.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect @@ -88,6 +86,7 @@ require ( k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect ) replace ( diff --git a/autoscaler/go.sum b/autoscaler/go.sum index 8e1246c9d..830e3b5c1 100644 --- a/autoscaler/go.sum +++ b/autoscaler/go.sum @@ -92,8 +92,6 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= -github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -111,12 +109,12 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= -github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= -github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= -github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= -github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= -github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= +github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= +github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= +github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos= +github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8= +github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+aLCE= +github.com/prometheus/common v0.53.0/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= @@ -166,8 +164,8 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= -golang.org/x/oauth2 v0.12.0 h1:smVPGxink+n1ZI5pkQa8y6fZT0RW0MgCO5bFpepy4B4= -golang.org/x/oauth2 v0.12.0/go.mod h1:A74bZ3aGXgCY0qaIC9Ahg6Lglin4AMAco8cIv9baba4= +golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI= +golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/autoscaler/main.go b/autoscaler/main.go index 3656fbb86..07257ebda 100644 --- a/autoscaler/main.go +++ b/autoscaler/main.go @@ -20,6 +20,9 @@ import ( "flag" "os" "strings" + "time" + + "github.com/odigos-io/odigos/autoscaler/collectormetrics" "github.com/go-logr/zapr" bridge "github.com/odigos-io/opentelemetry-zap-bridge" @@ -37,7 +40,7 @@ import ( metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" apiactions "github.com/odigos-io/odigos/api/actions/v1alpha1" - observabilitycontrolplanev1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" + odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" "github.com/odigos-io/odigos/autoscaler/controllers" "github.com/odigos-io/odigos/autoscaler/controllers/actions" @@ -45,6 +48,12 @@ import ( //+kubebuilder:scaffold:imports ) +const ( + defaultAutoscalerInterval = "15s" + defaultMinReplicas = 1 + defaultMaxReplicas = 5 +) + var ( scheme = runtime.NewScheme() setupLog = ctrl.Log.WithName("setup") @@ -52,7 +61,7 @@ var ( func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - utilruntime.Must(observabilitycontrolplanev1.AddToScheme(scheme)) + utilruntime.Must(odigosv1.AddToScheme(scheme)) utilruntime.Must(apiactions.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme } @@ -63,6 +72,9 @@ func main() { var probeAddr string var imagePullSecretsString string var imagePullSecrets []string + var autoscalerInterval string + var minReplicas int + var maxReplicas int odigosVersion := os.Getenv("ODIGOS_VERSION") flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") @@ -73,6 +85,12 @@ func main() { flag.StringVar(&imagePullSecretsString, "image-pull-secrets", "", "The image pull secrets to use for the collectors created by autoscaler") flag.StringVar(&nameutils.ImagePrefix, "image-prefix", "", "The image prefix to use for the collectors created by autoscaler") + flag.StringVar(&autoscalerInterval, "autoscaler-interval", defaultAutoscalerInterval, "The interval at which the autoscaler should run") + flag.IntVar(&minReplicas, "min-replicas", defaultMinReplicas, "The minimum number of replicas for the collectors") + flag.IntVar(&maxReplicas, "max-replicas", defaultMaxReplicas, "The maximum number of replicas for the collectors") + + // Set flags for autoscaler algorithms + collectormetrics.ScaleBasedOnMemoryAndExporterRetries.RegisterFlags() if odigosVersion == "" { flag.StringVar(&odigosVersion, "version", "", "for development purposes only") @@ -84,6 +102,13 @@ func main() { opts.BindFlags(flag.CommandLine) flag.Parse() + var parsedInterval time.Duration + var err error + if parsedInterval, err = time.ParseDuration(autoscalerInterval); err != nil { + setupLog.Error(err, "unable to parse autoscaler interval, using default value", "interval", defaultAutoscalerInterval) + parsedInterval, _ = time.ParseDuration(defaultAutoscalerInterval) + } + if imagePullSecretsString != "" { imagePullSecrets = strings.Split(imagePullSecretsString, ",") } @@ -105,8 +130,8 @@ func main() { BindAddress: metricsAddr, }, HealthProbeBindAddress: probeAddr, - LeaderElection: enableLeaderElection, - LeaderElectionID: "f681cfed.odigos.io", + LeaderElection: enableLeaderElection, + LeaderElectionID: "f681cfed.odigos.io", }) if err != nil { setupLog.Error(err, "unable to start manager") @@ -149,11 +174,22 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "InstrumentedApplication") os.Exit(1) } + + ctx := ctrl.SetupSignalHandler() + setupLog.V(0).Info("Starting gateway autoscaler", "interval", parsedInterval.String(), "minReplicas", minReplicas, "maxReplicas", maxReplicas) + gatewayAutoscaler := collectormetrics.NewAutoscaler(mgr.GetClient(), + collectormetrics.WithCollectorsGroup(odigosv1.CollectorsGroupRoleClusterGateway), + collectormetrics.WithInterval(parsedInterval), + collectormetrics.WithScaleRange(minReplicas, maxReplicas), + collectormetrics.WithAlgorithm(collectormetrics.ScaleBasedOnMemoryAndExporterRetries)) + go gatewayAutoscaler.Run(ctx) + if err = (&controllers.OdigosConfigReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), ImagePullSecrets: imagePullSecrets, OdigosVersion: odigosVersion, + Autoscaler: gatewayAutoscaler, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "OdigosConfig") os.Exit(1) @@ -164,6 +200,14 @@ func main() { os.Exit(1) } + if err = (&controllers.PodsReconciler{ + Client: mgr.GetClient(), + Autoscaler: gatewayAutoscaler, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Pods") + os.Exit(1) + } + //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { @@ -176,7 +220,7 @@ func main() { } setupLog.Info("starting manager") - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + if err := mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } diff --git a/cli/cmd/resources/autoscaler.go b/cli/cmd/resources/autoscaler.go index 40fb61c01..58c47ed37 100644 --- a/cli/cmd/resources/autoscaler.go +++ b/cli/cmd/resources/autoscaler.go @@ -121,13 +121,12 @@ func NewAutoscalerRole(ns string) *rbacv1.Role { }, { Verbs: []string{ - "create", + "get", "patch", "update", - "delete", }, - APIGroups: []string{"autoscaling"}, - Resources: []string{"horizontalpodautoscalers"}, + APIGroups: []string{"apps"}, + Resources: []string{"deployments/scale"}, }, }, } @@ -328,6 +327,17 @@ func NewAutoscalerClusterRole() *rbacv1.ClusterRole { APIGroups: []string{"odigos.io"}, Resources: []string{"odigosconfigurations"}, }, + { + Verbs: []string{ + "get", + "list", + "watch", + }, + APIGroups: []string{""}, + Resources: []string{ + "pods", + }, + }, }, } }