From 4209d5492dbe9e66a20e3201a2340dab25b92b28 Mon Sep 17 00:00:00 2001 From: Dakota Sullivan Date: Mon, 11 Mar 2024 07:43:23 -0600 Subject: [PATCH] Filter watched nodes and pods server-side Rather than each DaemonSet pod listing/watching every node and pod in the cluster, this tells Kubernetes which node and pods we're interested in for each DaemonSet, reducing overhead significantly in larger clusters. Signed-off-by: Dakota Sullivan --- KubeArmor/config/config.go | 2 +- KubeArmor/core/k8sHandler.go | 13 ++++++++++--- KubeArmor/core/kubeUpdate.go | 33 ++++++++++++++++++--------------- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/KubeArmor/config/config.go b/KubeArmor/config/config.go index 7539c448e..b172b1ef7 100644 --- a/KubeArmor/config/config.go +++ b/KubeArmor/config/config.go @@ -101,7 +101,7 @@ const ( func readCmdLineParams() { hostname, _ := os.Hostname() clusterStr := flag.String(ConfigCluster, "default", "cluster name") - hostStr := flag.String(ConfigHost, strings.Split(hostname, ".")[0], "host name") + hostStr := flag.String(ConfigHost, hostname, "host name") grpcStr := flag.String(ConfigGRPC, "32767", "gRPC port number") tlsEnabled := flag.Bool(ConfigTLS, false, "enable tls for secure grpc connection") diff --git a/KubeArmor/core/k8sHandler.go b/KubeArmor/core/k8sHandler.go index e8ad5d326..3938113e9 100644 --- a/KubeArmor/core/k8sHandler.go +++ b/KubeArmor/core/k8sHandler.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "net/http" + "net/url" "os" "path/filepath" "time" @@ -420,13 +421,19 @@ func (kh *K8sHandler) GetStatefulSet(namespaceName, podownerName string) (string // ========== // // WatchK8sPods Function -func (kh *K8sHandler) WatchK8sPods() *http.Response { +func (kh *K8sHandler) WatchK8sPods(nodeName string) *http.Response { if !kl.IsK8sEnv() { // not Kubernetes return nil } + queryParams := url.Values{} + if nodeName != "" { + queryParams.Add("fieldSelector", "spec.nodeName="+nodeName) + } + queryParams.Add("watch", "true") + if kl.IsInK8sCluster() { // kube-apiserver - URL := "https://" + kh.K8sHost + ":" + kh.K8sPort + "/api/v1/pods?watch=true" + URL := "https://" + kh.K8sHost + ":" + kh.K8sPort + "/api/v1/pods?" + queryParams.Encode() req, err := http.NewRequest("GET", URL, nil) if err != nil { @@ -445,7 +452,7 @@ func (kh *K8sHandler) WatchK8sPods() *http.Response { } // kube-proxy (local) - URL := "http://" + kh.K8sHost + ":" + kh.K8sPort + "/api/v1/pods?watch=true" + URL := "http://" + kh.K8sHost + ":" + kh.K8sPort + "/api/v1/pods?" + queryParams.Encode() if resp, err := http.Get(URL); err == nil /* #nosec */ { return resp diff --git a/KubeArmor/core/kubeUpdate.go b/KubeArmor/core/kubeUpdate.go index f85de46f2..6e57db283 100644 --- a/KubeArmor/core/kubeUpdate.go +++ b/KubeArmor/core/kubeUpdate.go @@ -83,20 +83,7 @@ func (dm *KubeArmorDaemon) HandleNodeAnnotations(node *tp.Node) { } } -func matchHost(hostName string) bool { - envName := os.Getenv("KUBEARMOR_NODENAME") - if envName != "" { - return envName == hostName - } - nodeName := strings.Split(hostName, ".")[0] - return nodeName == cfg.GlobalCfg.Host -} - func (dm *KubeArmorDaemon) checkAndUpdateNode(item *corev1.Node) { - if !matchHost(item.Name) { - return - } - node := tp.Node{} node.ClusterName = cfg.GlobalCfg.Cluster @@ -150,7 +137,18 @@ func (dm *KubeArmorDaemon) checkAndUpdateNode(item *corev1.Node) { func (dm *KubeArmorDaemon) WatchK8sNodes() { kg.Printf("GlobalCfg.Host=%s, KUBEARMOR_NODENAME=%s", cfg.GlobalCfg.Host, os.Getenv("KUBEARMOR_NODENAME")) - factory := informers.NewSharedInformerFactory(K8s.K8sClient, 0) + nodeName := os.Getenv("KUBEARMOR_NODENAME") + if nodeName == "" { + nodeName = cfg.GlobalCfg.Host + } + + factory := informers.NewSharedInformerFactoryWithOptions( + K8s.K8sClient, + 0, + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fmt.Sprintf("metadata.name=%s", nodeName) + }), + ) informer := factory.Core().V1().Nodes().Informer() if _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -555,8 +553,13 @@ func (dm *KubeArmorDaemon) WatchK8sPods() { var controllerName, controller, namespace string var err error + nodeName := os.Getenv("KUBEARMOR_NODENAME") + if nodeName == "" { + nodeName = cfg.GlobalCfg.Host + } + for { - if resp := K8s.WatchK8sPods(); resp != nil { + if resp := K8s.WatchK8sPods(nodeName); resp != nil { defer func() { if err := resp.Body.Close(); err != nil { kg.Warnf("Error closing http stream %s\n", err)