From 00caab5b9eee56aa35578331e98bbd81575b8004 Mon Sep 17 00:00:00 2001 From: "Mohamed S. Mahmoud" Date: Mon, 17 Jul 2023 10:22:35 -0400 Subject: [PATCH] NETOBSERV-1190: change DNS to use latency instead of TS (#395) * NETOBSERV-979: Add TCP drop to netobserv operator Signed-off-by: msherif1234 * Add scc rbac to fix issues with scc constraint when creating ebpf pod in privileged mode we got Error creating: pods "netobserv-ebpf-agent-" is forbidden: unable to validate against any security context constraint Signed-off-by: msherif1234 * Add DNS tracker operator changes Signed-off-by: msherif1234 * Address PR review comments Signed-off-by: msherif1234 * list enabled features in console plugin configmap * fix TcpDrop fields case * NETOBSERV-1191: fix updating tcpdrop and dns configs Signed-off-by: msherif1234 * fix linter complicity error Signed-off-by: msherif1234 * NETOBSERV-1190: use DNS latency instead of TS This PR depends on PR #331 so 331 need to be merged 1st Signed-off-by: msherif1234 --------- Signed-off-by: msherif1234 Co-authored-by: Julien Pinsonneau <91894519+jpinsonneau@users.noreply.github.com> --- api/v1alpha1/flowcollector_webhook.go | 15 ++ api/v1alpha1/zz_generated.conversion.go | 17 +- api/v1beta1/flowcollector_types.go | 14 ++ api/v1beta1/zz_generated.deepcopy.go | 10 + .../flows.netobserv.io_flowcollectors.yaml | 14 ++ ...observ-operator.clusterserviceversion.yaml | 3 + .../flows.netobserv.io_flowcollectors.yaml | 14 ++ .../samples/flows_v1beta1_flowcollector.yaml | 3 + .../consoleplugin/consoleplugin_objects.go | 12 + controllers/ebpf/agent_controller.go | 236 +++++++++++------- .../ebpf/internal/permissions/permissions.go | 10 + .../flowcollector_controller_iso_test.go | 2 + .../flowlogspipeline/flp_common_objects.go | 133 ++++++---- docs/FlowCollector.md | 18 ++ pkg/helper/flowcollector.go | 14 ++ 15 files changed, 377 insertions(+), 138 deletions(-) diff --git a/api/v1alpha1/flowcollector_webhook.go b/api/v1alpha1/flowcollector_webhook.go index afa0f6ff8..f720aba73 100644 --- a/api/v1alpha1/flowcollector_webhook.go +++ b/api/v1alpha1/flowcollector_webhook.go @@ -63,6 +63,14 @@ func (r *FlowCollector) ConvertTo(dstRaw conversion.Hub) error { } dst.Spec.Loki.Enable = restored.Spec.Loki.Enable + if restored.Spec.Agent.EBPF.EnableTCPDrop != nil { + *dst.Spec.Agent.EBPF.EnableTCPDrop = *restored.Spec.Agent.EBPF.EnableTCPDrop + } + + if restored.Spec.Agent.EBPF.EnableDNSTracking != nil { + *dst.Spec.Agent.EBPF.EnableDNSTracking = *restored.Spec.Agent.EBPF.EnableDNSTracking + } + dst.Spec.Loki.StatusTLS = restored.Spec.Loki.StatusTLS dst.Spec.Kafka.SASL = restored.Spec.Kafka.SASL @@ -147,3 +155,10 @@ func Convert_v1beta1_FlowCollectorConsolePlugin_To_v1alpha1_FlowCollectorConsole func Convert_v1beta1_FlowCollectorExporter_To_v1alpha1_FlowCollectorExporter(in *v1beta1.FlowCollectorExporter, out *FlowCollectorExporter, s apiconversion.Scope) error { return autoConvert_v1beta1_FlowCollectorExporter_To_v1alpha1_FlowCollectorExporter(in, out, s) } + +// This function need to be manually created because conversion-gen not able to create it intentionally because +// we have new defined fields in v1beta1 not in v1alpha1 +// nolint:golint,stylecheck,revive +func Convert_v1beta1_FlowCollectorEBPF_To_v1alpha1_FlowCollectorEBPF(in *v1beta1.FlowCollectorEBPF, out *FlowCollectorEBPF, s apiconversion.Scope) error { + return autoConvert_v1beta1_FlowCollectorEBPF_To_v1alpha1_FlowCollectorEBPF(in, out, s) +} diff --git a/api/v1alpha1/zz_generated.conversion.go b/api/v1alpha1/zz_generated.conversion.go index 5bbd91d34..b79c60da2 100644 --- a/api/v1alpha1/zz_generated.conversion.go +++ b/api/v1alpha1/zz_generated.conversion.go @@ -123,11 +123,6 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } - if err := s.AddGeneratedConversionFunc((*v1beta1.FlowCollectorEBPF)(nil), (*FlowCollectorEBPF)(nil), func(a, b interface{}, scope conversion.Scope) error { - return Convert_v1beta1_FlowCollectorEBPF_To_v1alpha1_FlowCollectorEBPF(a.(*v1beta1.FlowCollectorEBPF), b.(*FlowCollectorEBPF), scope) - }); err != nil { - return err - } if err := s.AddGeneratedConversionFunc((*FlowCollectorExporter)(nil), (*v1beta1.FlowCollectorExporter)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1alpha1_FlowCollectorExporter_To_v1beta1_FlowCollectorExporter(a.(*FlowCollectorExporter), b.(*v1beta1.FlowCollectorExporter), scope) }); err != nil { @@ -253,6 +248,11 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddConversionFunc((*v1beta1.FlowCollectorEBPF)(nil), (*FlowCollectorEBPF)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1beta1_FlowCollectorEBPF_To_v1alpha1_FlowCollectorEBPF(a.(*v1beta1.FlowCollectorEBPF), b.(*FlowCollectorEBPF), scope) + }); err != nil { + return err + } if err := s.AddConversionFunc((*v1beta1.FlowCollectorExporter)(nil), (*FlowCollectorExporter)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta1_FlowCollectorExporter_To_v1alpha1_FlowCollectorExporter(a.(*v1beta1.FlowCollectorExporter), b.(*FlowCollectorExporter), scope) }); err != nil { @@ -569,14 +569,11 @@ func autoConvert_v1beta1_FlowCollectorEBPF_To_v1alpha1_FlowCollectorEBPF(in *v1b if err := Convert_v1beta1_DebugConfig_To_v1alpha1_DebugConfig(&in.Debug, &out.Debug, s); err != nil { return err } + // WARNING: in.EnableTCPDrop requires manual conversion: does not exist in peer-type + // WARNING: in.EnableDNSTracking requires manual conversion: does not exist in peer-type return nil } -// Convert_v1beta1_FlowCollectorEBPF_To_v1alpha1_FlowCollectorEBPF is an autogenerated conversion function. -func Convert_v1beta1_FlowCollectorEBPF_To_v1alpha1_FlowCollectorEBPF(in *v1beta1.FlowCollectorEBPF, out *FlowCollectorEBPF, s conversion.Scope) error { - return autoConvert_v1beta1_FlowCollectorEBPF_To_v1alpha1_FlowCollectorEBPF(in, out, s) -} - func autoConvert_v1alpha1_FlowCollectorExporter_To_v1beta1_FlowCollectorExporter(in *FlowCollectorExporter, out *v1beta1.FlowCollectorExporter, s conversion.Scope) error { out.Type = v1beta1.ExporterType(in.Type) if err := Convert_v1alpha1_FlowCollectorKafka_To_v1beta1_FlowCollectorKafka(&in.Kafka, &out.Kafka, s); err != nil { diff --git a/api/v1beta1/flowcollector_types.go b/api/v1beta1/flowcollector_types.go index 7d8cf5bd7..1e45eb479 100644 --- a/api/v1beta1/flowcollector_types.go +++ b/api/v1beta1/flowcollector_types.go @@ -221,6 +221,20 @@ type FlowCollectorEBPF struct { // such as GOGC and GOMAXPROCS env vars. Users setting its values do it at their own risk. // +optional Debug DebugConfig `json:"debug,omitempty"` + + // Enable the TCP drop flows logging feature. This feature requires mounting + // the kernel debug filesystem, so the eBPF pod has to run as privileged. + // If the spec.agent.eBPF.privileged parameter is not set, an error is reported. + //+kubebuilder:default:=false + //+optional + EnableTCPDrop *bool `json:"enableTCPDrop,omitempty"` + + // Enable the DNS tracking feature. This feature requires mounting + // the kernel debug filesystem hence the eBPF pod has to run as privileged. + // If the spec.agent.eBPF.privileged parameter is not set, an error is reported. + //+kubebuilder:default:=false + //+optional + EnableDNSTracking *bool `json:"enableDNSTracking,omitempty"` } // `FlowCollectorKafka` defines the desired Kafka config of FlowCollector diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index 88055c9ef..d03e228ef 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -268,6 +268,16 @@ func (in *FlowCollectorEBPF) DeepCopyInto(out *FlowCollectorEBPF) { copy(*out, *in) } in.Debug.DeepCopyInto(&out.Debug) + if in.EnableTCPDrop != nil { + in, out := &in.EnableTCPDrop, &out.EnableTCPDrop + *out = new(bool) + **out = **in + } + if in.EnableDNSTracking != nil { + in, out := &in.EnableDNSTracking, &out.EnableDNSTracking + *out = new(bool) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowCollectorEBPF. diff --git a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml index bfdebd86f..cc7aa0c52 100644 --- a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml +++ b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml @@ -2302,6 +2302,20 @@ spec: they are only useful in edge debug or support scenarios.' type: object type: object + enableDNSTracking: + default: false + description: Enable the DNS tracking feature. This feature + requires mounting the kernel debug filesystem hence the + eBPF pod has to run as privileged. If the spec.agent.eBPF.privileged + parameter is not set, an error is reported. + type: boolean + enableTCPDrop: + default: false + description: Enable the TCP drop flows logging feature. This + feature requires mounting the kernel debug filesystem, so + the eBPF pod has to run as privileged. If the spec.agent.eBPF.privileged + parameter is not set, an error is reported. + type: boolean excludeInterfaces: default: - lo diff --git a/bundle/manifests/netobserv-operator.clusterserviceversion.yaml b/bundle/manifests/netobserv-operator.clusterserviceversion.yaml index 34cf3769d..a0d13aba9 100644 --- a/bundle/manifests/netobserv-operator.clusterserviceversion.yaml +++ b/bundle/manifests/netobserv-operator.clusterserviceversion.yaml @@ -175,6 +175,8 @@ metadata: "ebpf": { "cacheActiveTimeout": "5s", "cacheMaxFlows": 100000, + "enableDNSTracking": false, + "enableTCPDrop": false, "excludeInterfaces": [ "lo" ], @@ -182,6 +184,7 @@ metadata: "interfaces": [], "kafkaBatchSize": 10485760, "logLevel": "info", + "privileged": false, "resources": { "limits": { "memory": "800Mi" diff --git a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml index 97c9c3f91..803fa68da 100644 --- a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml +++ b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml @@ -2289,6 +2289,20 @@ spec: they are only useful in edge debug or support scenarios.' type: object type: object + enableDNSTracking: + default: false + description: Enable the DNS tracking feature. This feature + requires mounting the kernel debug filesystem hence the + eBPF pod has to run as privileged. If the spec.agent.eBPF.privileged + parameter is not set, an error is reported. + type: boolean + enableTCPDrop: + default: false + description: Enable the TCP drop flows logging feature. This + feature requires mounting the kernel debug filesystem, so + the eBPF pod has to run as privileged. If the spec.agent.eBPF.privileged + parameter is not set, an error is reported. + type: boolean excludeInterfaces: default: - lo diff --git a/config/samples/flows_v1beta1_flowcollector.yaml b/config/samples/flows_v1beta1_flowcollector.yaml index 1c0dd9411..b2b3abe3f 100644 --- a/config/samples/flows_v1beta1_flowcollector.yaml +++ b/config/samples/flows_v1beta1_flowcollector.yaml @@ -12,6 +12,9 @@ spec: sampling: 50 cacheActiveTimeout: 5s cacheMaxFlows: 100000 + privileged: false + enableTCPDrop: false + enableDNSTracking: false interfaces: [ ] excludeInterfaces: [ "lo" ] logLevel: info diff --git a/controllers/consoleplugin/consoleplugin_objects.go b/controllers/consoleplugin/consoleplugin_objects.go index d8bf6531e..24b03632f 100644 --- a/controllers/consoleplugin/consoleplugin_objects.go +++ b/controllers/consoleplugin/consoleplugin_objects.go @@ -348,12 +348,24 @@ func (b *builder) metricsService() *corev1.Service { func (b *builder) configMap() (*corev1.ConfigMap, string) { outputRecordTypes := helper.GetRecordTypes(&b.desired.Processor) + var features []string + if b.desired.Agent.Type == flowslatest.AgentEBPF { + if helper.IsTCPDropEnabled(b.desired) { + features = append(features, "tcpDrop") + } + + if helper.IsDNSTrackingEnabled(b.desired) { + features = append(features, "dnsTracking") + } + } + config := map[string]interface{}{ "recordTypes": outputRecordTypes, "portNaming": b.desired.ConsolePlugin.PortNaming, "quickFilters": b.desired.ConsolePlugin.QuickFilters, "alertNamespaces": []string{b.namespace}, "sampling": helper.GetSampling(b.desired), + "features": features, } configStr := "{}" diff --git a/controllers/ebpf/agent_controller.go b/controllers/ebpf/agent_controller.go index de03683bb..d4f7576c2 100644 --- a/controllers/ebpf/agent_controller.go +++ b/controllers/ebpf/agent_controller.go @@ -7,14 +7,6 @@ import ( "strconv" "strings" - v1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/utils/pointer" - "sigs.k8s.io/controller-runtime/pkg/log" - flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/controllers/ebpf/internal/permissions" @@ -23,7 +15,16 @@ import ( "github.com/netobserv/network-observability-operator/pkg/helper" "github.com/netobserv/network-observability-operator/pkg/volumes" "github.com/netobserv/network-observability-operator/pkg/watchers" + + "github.com/go-logr/logr" + v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/log" ) const ( @@ -54,17 +55,20 @@ const ( envDedupeJustMark = "DEDUPER_JUST_MARK" dedupeJustMarkDefault = "true" envGoMemLimit = "GOMEMLIMIT" - - envListSeparator = "," + envEnableTCPDrop = "ENABLE_TCP_DROPS" + envEnableDNSTracking = "ENABLE_DNS_TRACKING" + envListSeparator = "," ) const ( - exportKafka = "kafka" - exportGRPC = "grpc" + exportKafka = "kafka" + exportGRPC = "grpc" + kafkaCerts = "kafka-certs" + averageMessageSize = 100 + bpfTraceMountName = "bpf-kernel-debug" + bpfTraceMountPath = "/sys/kernel/debug" ) -const averageMessageSize = 100 - type reconcileAction int const ( @@ -122,7 +126,7 @@ func (c *AgentController) Reconcile( if err := c.permissions.Reconcile(ctx, &target.Spec.Agent.EBPF); err != nil { return fmt.Errorf("reconciling permissions: %w", err) } - desired, err := c.desired(ctx, target) + desired, err := c.desired(ctx, target, rlog) if err != nil { return err } @@ -156,7 +160,19 @@ func (c *AgentController) current(ctx context.Context) (*v1.DaemonSet, error) { return &agentDS, nil } -func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCollector) (*v1.DaemonSet, error) { +func newHostPathType(pathType corev1.HostPathType) *corev1.HostPathType { + hostPathType := new(corev1.HostPathType) + *hostPathType = corev1.HostPathType(pathType) + return hostPathType +} + +func newMountPropagationMode(m corev1.MountPropagationMode) *corev1.MountPropagationMode { + mode := new(corev1.MountPropagationMode) + *mode = corev1.MountPropagationMode(m) + return mode +} + +func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCollector, rlog logr.Logger) (*v1.DaemonSet, error) { if coll == nil || !helper.UseEBPF(&coll.Spec) { return nil, nil } @@ -166,6 +182,32 @@ func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCol if err != nil { return nil, err } + volumeMounts := c.volumes.GetMounts() + volumes := c.volumes.GetVolumes() + + if helper.IsTCPDropEnabled(&coll.Spec) || helper.IsDNSTrackingEnabled(&coll.Spec) { + if !coll.Spec.Agent.EBPF.Privileged { + rlog.Error(fmt.Errorf("invalid configuration"), + "To use TCPDrop and/or DNSTracking feature(s) privileged mode needs to be enabled", nil) + } else { + volume := corev1.Volume{ + Name: bpfTraceMountName, + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Type: newHostPathType(corev1.HostPathDirectory), + Path: bpfTraceMountPath, + }, + }, + } + volumes = append(volumes, volume) + volumeMount := corev1.VolumeMount{ + Name: bpfTraceMountName, + MountPath: bpfTraceMountPath, + MountPropagation: newMountPropagationMode(corev1.MountPropagationBidirectional), + } + volumeMounts = append(volumeMounts, volumeMount) + } + } return &v1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ @@ -191,7 +233,7 @@ func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCol ServiceAccountName: constants.EBPFServiceAccount, HostNetwork: true, DNSPolicy: corev1.DNSClusterFirstWithHostNet, - Volumes: c.volumes.GetVolumes(), + Volumes: volumes, Containers: []corev1.Container{{ Name: constants.EBPFAgentName, Image: c.config.EBPFAgentImage, @@ -199,7 +241,7 @@ func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCol Resources: coll.Spec.Agent.EBPF.Resources, SecurityContext: c.securityContext(coll), Env: env, - VolumeMounts: c.volumes.GetMounts(), + VolumeMounts: volumeMounts, }}, }, }, @@ -208,72 +250,7 @@ func (c *AgentController) desired(ctx context.Context, coll *flowslatest.FlowCol } func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowCollector, annots map[string]string) ([]corev1.EnvVar, error) { - var config []corev1.EnvVar - if coll.Spec.Agent.EBPF.CacheActiveTimeout != "" { - config = append(config, corev1.EnvVar{ - Name: envCacheActiveTimeout, - Value: coll.Spec.Agent.EBPF.CacheActiveTimeout, - }) - } - if coll.Spec.Agent.EBPF.CacheMaxFlows != 0 { - config = append(config, corev1.EnvVar{ - Name: envCacheMaxFlows, - Value: strconv.Itoa(int(coll.Spec.Agent.EBPF.CacheMaxFlows)), - }) - } - if coll.Spec.Agent.EBPF.LogLevel != "" { - config = append(config, corev1.EnvVar{ - Name: envLogLevel, - Value: coll.Spec.Agent.EBPF.LogLevel, - }) - } - if len(coll.Spec.Agent.EBPF.Interfaces) > 0 { - config = append(config, corev1.EnvVar{ - Name: envInterfaces, - Value: strings.Join(coll.Spec.Agent.EBPF.Interfaces, envListSeparator), - }) - } - if len(coll.Spec.Agent.EBPF.ExcludeInterfaces) > 0 { - config = append(config, corev1.EnvVar{ - Name: envExcludeInterfaces, - Value: strings.Join(coll.Spec.Agent.EBPF.ExcludeInterfaces, envListSeparator), - }) - } - sampling := coll.Spec.Agent.EBPF.Sampling - if sampling != nil && *sampling > 1 { - config = append(config, corev1.EnvVar{ - Name: envSampling, - Value: strconv.Itoa(int(*sampling)), - }) - } - - // set GOMEMLIMIT which allows specifying a soft memory cap to force GC when resource limit is reached - // to prevent OOM - if coll.Spec.Agent.EBPF.Resources.Limits.Memory() != nil { - if memLimit, ok := coll.Spec.Agent.EBPF.Resources.Limits.Memory().AsInt64(); ok { - // we will set the GOMEMLIMIT to current memlimit - 10% as a headroom to account for - // memory sources the Go runtime is unaware of - memLimit -= int64(float64(memLimit) * 0.1) - config = append(config, corev1.EnvVar{Name: envGoMemLimit, Value: fmt.Sprint(memLimit)}) - } - } - - dedup := dedupeDefault - dedupJustMark := dedupeJustMarkDefault - // we need to sort env map to keep idempotency, - // as equal maps could be iterated in different order - for _, pair := range helper.KeySorted(coll.Spec.Agent.EBPF.Debug.Env) { - k, v := pair[0], pair[1] - if k == envDedupe { - dedup = v - } else if k == envDedupeJustMark { - dedupJustMark = v - } else { - config = append(config, corev1.EnvVar{Name: k, Value: v}) - } - } - config = append(config, corev1.EnvVar{Name: envDedupe, Value: dedup}) - config = append(config, corev1.EnvVar{Name: envDedupeJustMark, Value: dedupJustMark}) + config := c.setEnvConfig(coll) if helper.UseKafka(&coll.Spec) { config = append(config, @@ -383,3 +360,94 @@ func (c *AgentController) securityContext(coll *flowslatest.FlowCollector) *core return &sc } + +func (c *AgentController) setEnvConfig(coll *flowslatest.FlowCollector) []corev1.EnvVar { + var config []corev1.EnvVar + + if coll.Spec.Agent.EBPF.CacheActiveTimeout != "" { + config = append(config, corev1.EnvVar{ + Name: envCacheActiveTimeout, + Value: coll.Spec.Agent.EBPF.CacheActiveTimeout, + }) + } + + if coll.Spec.Agent.EBPF.CacheMaxFlows != 0 { + config = append(config, corev1.EnvVar{ + Name: envCacheMaxFlows, + Value: strconv.Itoa(int(coll.Spec.Agent.EBPF.CacheMaxFlows)), + }) + } + + if coll.Spec.Agent.EBPF.LogLevel != "" { + config = append(config, corev1.EnvVar{ + Name: envLogLevel, + Value: coll.Spec.Agent.EBPF.LogLevel, + }) + } + + if len(coll.Spec.Agent.EBPF.Interfaces) > 0 { + config = append(config, corev1.EnvVar{ + Name: envInterfaces, + Value: strings.Join(coll.Spec.Agent.EBPF.Interfaces, envListSeparator), + }) + } + + if len(coll.Spec.Agent.EBPF.ExcludeInterfaces) > 0 { + config = append(config, corev1.EnvVar{ + Name: envExcludeInterfaces, + Value: strings.Join(coll.Spec.Agent.EBPF.ExcludeInterfaces, envListSeparator), + }) + } + + sampling := coll.Spec.Agent.EBPF.Sampling + if sampling != nil && *sampling > 1 { + config = append(config, corev1.EnvVar{ + Name: envSampling, + Value: strconv.Itoa(int(*sampling)), + }) + } + + // set GOMEMLIMIT which allows specifying a soft memory cap to force GC when resource limit is reached + // to prevent OOM + if coll.Spec.Agent.EBPF.Resources.Limits.Memory() != nil { + if memLimit, ok := coll.Spec.Agent.EBPF.Resources.Limits.Memory().AsInt64(); ok { + // we will set the GOMEMLIMIT to current memlimit - 10% as a headroom to account for + // memory sources the Go runtime is unaware of + memLimit -= int64(float64(memLimit) * 0.1) + config = append(config, corev1.EnvVar{Name: envGoMemLimit, Value: fmt.Sprint(memLimit)}) + } + } + + if helper.IsTCPDropEnabled(&coll.Spec) { + config = append(config, corev1.EnvVar{ + Name: envEnableTCPDrop, + Value: "true", + }) + } + + if helper.IsDNSTrackingEnabled(&coll.Spec) { + config = append(config, corev1.EnvVar{ + Name: envEnableDNSTracking, + Value: "true", + }) + } + + dedup := dedupeDefault + dedupJustMark := dedupeJustMarkDefault + // we need to sort env map to keep idempotency, + // as equal maps could be iterated in different order + for _, pair := range helper.KeySorted(coll.Spec.Agent.EBPF.Debug.Env) { + k, v := pair[0], pair[1] + if k == envDedupe { + dedup = v + } else if k == envDedupeJustMark { + dedupJustMark = v + } else { + config = append(config, corev1.EnvVar{Name: k, Value: v}) + } + } + config = append(config, corev1.EnvVar{Name: envDedupe, Value: dedup}) + config = append(config, corev1.EnvVar{Name: envDedupeJustMark, Value: dedupJustMark}) + + return config +} diff --git a/controllers/ebpf/internal/permissions/permissions.go b/controllers/ebpf/internal/permissions/permissions.go index acfa8c721..ea9a7c8bf 100644 --- a/controllers/ebpf/internal/permissions/permissions.go +++ b/controllers/ebpf/internal/permissions/permissions.go @@ -9,6 +9,7 @@ import ( "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/controllers/reconcilers" "github.com/netobserv/network-observability-operator/pkg/helper" + osv1 "github.com/openshift/api/security/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -154,6 +155,14 @@ func (c *Reconciler) reconcileOpenshiftPermissions( } else { scc.AllowedCapabilities = AllowedCapabilities } + if (desired.EnableTCPDrop != nil && *desired.EnableTCPDrop) || + (desired.EnableDNSTracking != nil && *desired.EnableDNSTracking) { + scc.AllowHostDirVolumePlugin = true + } + if (desired.EnableTCPDrop != nil && !*desired.EnableTCPDrop) && + (desired.EnableDNSTracking != nil && !*desired.EnableDNSTracking) { + scc.AllowHostDirVolumePlugin = false + } actual := &osv1.SecurityContextConstraints{} if err := c.Get(ctx, client.ObjectKeyFromObject(scc), actual); err != nil { if errors.IsNotFound(err) { @@ -171,6 +180,7 @@ func (c *Reconciler) reconcileOpenshiftPermissions( !equality.Semantic.DeepDerivative(&scc.SELinuxContext, &actual.SELinuxContext) || !equality.Semantic.DeepDerivative(&scc.Users, &actual.Users) || scc.AllowPrivilegedContainer != actual.AllowPrivilegedContainer || + scc.AllowHostDirVolumePlugin != actual.AllowHostDirVolumePlugin || !equality.Semantic.DeepDerivative(&scc.AllowedCapabilities, &actual.AllowedCapabilities) { rlog.Info("updating SecurityContextConstraints") diff --git a/controllers/flowcollector_controller_iso_test.go b/controllers/flowcollector_controller_iso_test.go index 51f241c2c..fa26e1033 100644 --- a/controllers/flowcollector_controller_iso_test.go +++ b/controllers/flowcollector_controller_iso_test.go @@ -97,6 +97,8 @@ func flowCollectorIsoSpecs() { ExcludeInterfaces: []string{}, Privileged: false, KafkaBatchSize: 0, + EnableTCPDrop: pointer.Bool(false), + EnableDNSTracking: pointer.Bool(false), }, }, ConsolePlugin: flowslatest.FlowCollectorConsolePlugin{ diff --git a/controllers/flowlogspipeline/flp_common_objects.go b/controllers/flowlogspipeline/flp_common_objects.go index 43de091b8..f7114af67 100644 --- a/controllers/flowlogspipeline/flp_common_objects.go +++ b/controllers/flowlogspipeline/flp_common_objects.go @@ -320,6 +320,93 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) (*corev // Else: nothing for eBPF at the moment } + outputFields := []api.OutputField{ + { + Name: "Bytes", + Operation: "sum", + }, + { + Name: "Bytes", + Operation: "sum", + SplitAB: true, + }, + { + Name: "Packets", + Operation: "sum", + }, + { + Name: "Packets", + Operation: "sum", + SplitAB: true, + }, + { + Name: "numFlowLogs", + Operation: "count", + }, + { + Name: "TimeFlowStartMs", + Operation: "min", + }, + { + Name: "TimeFlowEndMs", + Operation: "max", + }, + { + Name: "FlowDirection", + Operation: "first", + }, + { + Name: "IfDirection", + Operation: "first", + }, + { + Name: "AgentIP", + Operation: "first", + }, + } + + if helper.IsTCPDropEnabled(b.desired) { + outputTCPDropFields := []api.OutputField{ + { + Name: "TcpDropBytes", + Operation: "sum", + }, + { + Name: "TcpDropBytes", + Operation: "sum", + SplitAB: true, + }, + { + Name: "TcpDropPackets", + Operation: "sum", + }, + { + Name: "TcpDropPackets", + Operation: "sum", + SplitAB: true, + }, + { + Name: "TcpDropLatestState", + Operation: "last", + }, + { + Name: "TcpDropLatestDropCause", + Operation: "last", + }, + } + outputFields = append(outputFields, outputTCPDropFields...) + } + + if helper.IsDNSTrackingEnabled(b.desired) { + outDNSTrackingFields := []api.OutputField{ + { + Name: "DnsLatencyMs", + Operation: "max", + }, + } + outputFields = append(outputFields, outDNSTrackingFields...) + } + // Connection tracking stage (only if LogTypes is not FLOWS) if b.desired.Processor.LogTypes != nil && *b.desired.Processor.LogTypes != flowslatest.LogTypeFlows { indexFields = append(indexFields, constants.LokiConnectionIndexFields...) @@ -356,50 +443,7 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) (*corev }, }, OutputRecordTypes: outputRecordTypes, - OutputFields: []api.OutputField{ - { - Name: "Bytes", - Operation: "sum", - }, - { - Name: "Bytes", - Operation: "sum", - SplitAB: true, - }, - { - Name: "Packets", - Operation: "sum", - }, - { - Name: "Packets", - Operation: "sum", - SplitAB: true, - }, - { - Name: "numFlowLogs", - Operation: "count", - }, - { - Name: "TimeFlowStartMs", - Operation: "min", - }, - { - Name: "TimeFlowEndMs", - Operation: "max", - }, - { - Name: "FlowDirection", - Operation: "first", - }, - { - Name: "IfDirection", - Operation: "first", - }, - { - Name: "AgentIP", - Operation: "first", - }, - }, + OutputFields: outputFields, Scheduling: []api.ConnTrackSchedulingGroup{ { Selector: nil, // Default group. Match all flowlogs @@ -414,6 +458,7 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) (*corev SwapAB: true, }, }) + } // enrich stage (transform) configuration diff --git a/docs/FlowCollector.md b/docs/FlowCollector.md index eb6f84dac..1979091b4 100644 --- a/docs/FlowCollector.md +++ b/docs/FlowCollector.md @@ -4070,6 +4070,24 @@ Agent configuration for flows extraction. `debug` allows setting some aspects of the internal configuration of the eBPF agent. This section is aimed exclusively for debugging and fine-grained performance optimizations, such as GOGC and GOMAXPROCS env vars. Users setting its values do it at their own risk.
false + + enableDNSTracking + boolean + + Enable the DNS tracking feature. This feature requires mounting the kernel debug filesystem hence the eBPF pod has to run as privileged. If the spec.agent.eBPF.privileged parameter is not set, an error is reported.
+
+ Default: false
+ + false + + enableTCPDrop + boolean + + Enable the TCP drop flows logging feature. This feature requires mounting the kernel debug filesystem, so the eBPF pod has to run as privileged. If the spec.agent.eBPF.privileged parameter is not set, an error is reported.
+
+ Default: false
+ + false excludeInterfaces []string diff --git a/pkg/helper/flowcollector.go b/pkg/helper/flowcollector.go index ac688c4d9..13a252508 100644 --- a/pkg/helper/flowcollector.go +++ b/pkg/helper/flowcollector.go @@ -102,6 +102,20 @@ func UseConsolePlugin(spec *flowslatest.FlowCollectorSpec) bool { (spec.ConsolePlugin.Enable == nil || *spec.ConsolePlugin.Enable) } +func IsTCPDropEnabled(spec *flowslatest.FlowCollectorSpec) bool { + if spec.Agent.EBPF.Privileged && spec.Agent.EBPF.EnableTCPDrop != nil && *spec.Agent.EBPF.EnableTCPDrop { + return true + } + return false +} + +func IsDNSTrackingEnabled(spec *flowslatest.FlowCollectorSpec) bool { + if spec.Agent.EBPF.Privileged && spec.Agent.EBPF.EnableDNSTracking != nil && *spec.Agent.EBPF.EnableDNSTracking { + return true + } + return false +} + func PtrBool(b *bool) bool { if b == nil { return false