Skip to content

Commit

Permalink
Filter watched nodes and pods server-side
Browse files Browse the repository at this point in the history
Rather than each DaemonSet pod listing/watching
every node and pod in the cluster, this tells
Kubernetes which node and pods we're interested in
for each DaemonSet, reducing overhead
significantly in larger clusters.

Signed-off-by: Dakota Sullivan <djqballer@outlook.com>
  • Loading branch information
dqsully committed Mar 11, 2024
1 parent 454bac1 commit 45507ed
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 19 deletions.
2 changes: 1 addition & 1 deletion KubeArmor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ const (
func readCmdLineParams() {
hostname, _ := os.Hostname()
clusterStr := flag.String(ConfigCluster, "default", "cluster name")
hostStr := flag.String(ConfigHost, strings.Split(hostname, ".")[0], "host name")
hostStr := flag.String(ConfigHost, hostname, "host name")

grpcStr := flag.String(ConfigGRPC, "32767", "gRPC port number")
tlsEnabled := flag.Bool(ConfigTLS, false, "enable tls for secure grpc connection")
Expand Down
13 changes: 10 additions & 3 deletions KubeArmor/core/k8sHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"time"
Expand Down Expand Up @@ -420,13 +421,19 @@ func (kh *K8sHandler) GetStatefulSet(namespaceName, podownerName string) (string
// ========== //

// WatchK8sPods Function
func (kh *K8sHandler) WatchK8sPods() *http.Response {
func (kh *K8sHandler) WatchK8sPods(nodeName string) *http.Response {
if !kl.IsK8sEnv() { // not Kubernetes
return nil
}

queryParams := url.Values{}
if nodeName != "" {
queryParams.Add("fieldSelector", "spec.nodeName="+nodeName)
}
queryParams.Add("watch", "true")

if kl.IsInK8sCluster() { // kube-apiserver
URL := "https://" + kh.K8sHost + ":" + kh.K8sPort + "/api/v1/pods?watch=true"
URL := "https://" + kh.K8sHost + ":" + kh.K8sPort + "/api/v1/pods?" + queryParams.Encode()

req, err := http.NewRequest("GET", URL, nil)
if err != nil {
Expand All @@ -445,7 +452,7 @@ func (kh *K8sHandler) WatchK8sPods() *http.Response {
}

// kube-proxy (local)
URL := "http://" + kh.K8sHost + ":" + kh.K8sPort + "/api/v1/pods?watch=true"
URL := "http://" + kh.K8sHost + ":" + kh.K8sPort + "/api/v1/pods?" + queryParams.Encode()

if resp, err := http.Get(URL); err == nil /* #nosec */ {
return resp
Expand Down
33 changes: 18 additions & 15 deletions KubeArmor/core/kubeUpdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,7 @@ func (dm *KubeArmorDaemon) HandleNodeAnnotations(node *tp.Node) {
}
}

func matchHost(hostName string) bool {
envName := os.Getenv("KUBEARMOR_NODENAME")
if envName != "" {
return envName == hostName
}
nodeName := strings.Split(hostName, ".")[0]
return nodeName == cfg.GlobalCfg.Host
}

func (dm *KubeArmorDaemon) checkAndUpdateNode(item *corev1.Node) {
if !matchHost(item.Name) {
return
}

node := tp.Node{}

node.ClusterName = cfg.GlobalCfg.Cluster
Expand Down Expand Up @@ -150,7 +137,18 @@ func (dm *KubeArmorDaemon) checkAndUpdateNode(item *corev1.Node) {
func (dm *KubeArmorDaemon) WatchK8sNodes() {
kg.Printf("GlobalCfg.Host=%s, KUBEARMOR_NODENAME=%s", cfg.GlobalCfg.Host, os.Getenv("KUBEARMOR_NODENAME"))

factory := informers.NewSharedInformerFactory(K8s.K8sClient, 0)
nodeName := os.Getenv("KUBEARMOR_NODENAME")
if nodeName == "" {
nodeName = cfg.GlobalCfg.Host
}

factory := informers.NewSharedInformerFactoryWithOptions(
K8s.K8sClient,
0,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fmt.Sprintf("metadata.name=%s", nodeName)
}),
)
informer := factory.Core().V1().Nodes().Informer()

if _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -555,8 +553,13 @@ func (dm *KubeArmorDaemon) WatchK8sPods() {
var controllerName, controller, namespace string
var err error

nodeName := os.Getenv("KUBEARMOR_NODENAME")
if nodeName == "" {
nodeName = cfg.GlobalCfg.Host
}

for {
if resp := K8s.WatchK8sPods(); resp != nil {
if resp := K8s.WatchK8sPods(nodeName); resp != nil {
defer func() {
if err := resp.Body.Close(); err != nil {
kg.Warnf("Error closing http stream %s\n", err)
Expand Down

0 comments on commit 45507ed

Please sign in to comment.