Skip to content

Commit

Permalink
Merge pull request #58 from anurag-rajawat/perf-optimizations
Browse files Browse the repository at this point in the history
fix: Optimise performance
  • Loading branch information
Aryan-sharma11 committed Sep 2, 2024
2 parents 9937ca2 + 25813ff commit 95e7d11
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 83 deletions.
11 changes: 0 additions & 11 deletions relay-server/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,10 @@
package common

Check warning on line 4 in relay-server/common/common.go

View workflow job for this annotation

GitHub Actions / go-lint

should have a package comment

import (
"encoding/json"
"os"
"path/filepath"
)

// ============ //
// == Common == //
// ============ //

// Clone Function
func Clone(src, dst interface{}) error {
arr, _ := json.Marshal(src)
return json.Unmarshal(arr, dst)
}

// ================ //
// == Kubernetes == //
// ================ //
Expand Down
5 changes: 3 additions & 2 deletions relay-server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/dustin/go-humanize v1.0.1
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.6.0
github.com/kubearmor/KubeArmor/KubeArmor v0.0.0-20240412061210-e4422dd02342
github.com/kubearmor/KubeArmor/protobuf v0.0.0-20240315075053-fee50c9428b9
github.com/spf13/viper v1.18.2
go.uber.org/zap v1.27.0
golang.org/x/sync v0.7.0
google.golang.org/grpc v1.63.2
k8s.io/api v0.29.2
k8s.io/apimachinery v0.29.2
k8s.io/client-go v0.29.2
)
Expand All @@ -33,8 +35,8 @@ require (
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down Expand Up @@ -71,7 +73,6 @@ require (
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.29.2 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20240310230437-4693a0247e57 // indirect
Expand Down
93 changes: 55 additions & 38 deletions relay-server/server/k8sHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@ import (
"net/http"
"os"
"path/filepath"
"reflect"
"strings"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"

kl "github.com/kubearmor/kubearmor-relay-server/relay-server/common"
kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
rest "k8s.io/client-go/rest"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

Expand Down Expand Up @@ -228,52 +232,65 @@ func (kh *K8sHandler) DoRequest(cmd string, data interface{}, path string) ([]by
return resBody, nil
}

// ========== //
// == Pods == //
// ========== //
func (kh *K8sHandler) WatchKubeArmorPods(ctx context.Context, wg *sync.WaitGroup, ipsChan chan string) {
defer func() {
close(ipsChan)
wg.Done()
}()

func containsElement(slice interface{}, element interface{}) bool {
switch reflect.TypeOf(slice).Kind() {
case reflect.Slice:
s := reflect.ValueOf(slice)
// Get the KubeArmor pods IP that were added before relay itself.
once := sync.Once{}
once.Do(func() {
kh.findExistingKaPodsIp(ctx, ipsChan)
})

for i := 0; i < s.Len(); i++ {
val := s.Index(i).Interface()
if reflect.DeepEqual(val, element) {
return true
}
}
}
return false
podInformer := kh.getKaPodInformer(ipsChan)
podInformer.Run(ctx.Done())
}

// GetKubeArmorNodes Function
func (kh *K8sHandler) GetKubeArmorNodes() []string {
nodeIPs := []string{}
func (kh *K8sHandler) getKaPodInformer(ipsChan chan string) cache.SharedIndexInformer {
option := informers.WithTweakListOptions(func(lo *metav1.ListOptions) {
lo.LabelSelector = "kubearmor-app=kubearmor"
})

factory := informers.NewSharedInformerFactoryWithOptions(kh.K8sClient, 0, option)
informer := factory.Core().V1().Pods().Informer()

_, _ = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if ok {
if pod.Status.PodIP != "" {
ipsChan <- pod.Status.PodIP
}
}
},
UpdateFunc: func(old, new interface{}) {
newPod, ok := new.(*corev1.Pod)
if ok {
if newPod.Status.PodIP != "" {
ipsChan <- newPod.Status.PodIP
}
}
},
})

return informer
}

if !kl.IsK8sEnv() { // not Kubernetes
return nodeIPs
}
func (kh *K8sHandler) findExistingKaPodsIp(ctx context.Context, ipsChan chan string) {
pods, err := kh.K8sClient.CoreV1().Pods("").List(ctx, metav1.ListOptions{
LabelSelector: "kubearmor-app=kubearmor",
})

pods, err := kh.K8sClient.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
if err != nil {
return nodeIPs
kg.Errf("failed to list KubeArmor pods: %v", err)
return
}

for _, pod := range pods.Items {
if val, ok := pod.ObjectMeta.Labels["kubearmor-app"]; !ok {
continue
} else if val != "kubearmor" {
continue
}
if pod.Status.PodIP == "" {
kg.Printf("pod.Status=%+v", pod.Status)
}

if pod.Status.PodIP != "" && !containsElement(nodeIPs, pod.Status.PodIP) {
nodeIPs = append(nodeIPs, pod.Status.PodIP)
if pod.Status.PodIP != "" {
ipsChan <- pod.Status.PodIP
}
}

return nodeIPs
}
55 changes: 23 additions & 32 deletions relay-server/server/relayServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

kl "github.com/kubearmor/kubearmor-relay-server/relay-server/common"
cfg "github.com/kubearmor/kubearmor-relay-server/relay-server/config"
kg "github.com/kubearmor/kubearmor-relay-server/relay-server/log"
)
Expand Down Expand Up @@ -434,21 +433,15 @@ func (rs *RelayServer) AddMsgFromBuffChan() {

for Running {
select {
case res := <-MsgBufferChannel:
msg := pb.Message{}

if err := kl.Clone(*res, &msg); err != nil {
kg.Warnf("Failed to clone a message (%v)", *res)
continue
}
case msg := <-MsgBufferChannel:
if stdoutmsg {
tel, _ := json.Marshal(msg)
fmt.Printf("%s\n", string(tel))
}
MsgLock.RLock()
for uid := range MsgStructs {
select {
case MsgStructs[uid].Broadcast <- (&msg):
case MsgStructs[uid].Broadcast <- msg:
default:
}
}
Expand Down Expand Up @@ -493,26 +486,20 @@ func (lc *LogClient) WatchAlerts(wg *sync.WaitGroup, stop chan struct{}, errCh c
kg.Print("Stopped watching alerts from " + lc.Server)
}

// AddAlertFromBuffChan Adds ALert from AlertBufferChannel into AlertStructs
// AddAlertFromBuffChan Adds Alert from AlertBufferChannel into AlertStructs
func (rs *RelayServer) AddAlertFromBuffChan() {

for Running {
select {
case res := <-AlertBufferChannel:
alert := pb.Alert{}

if err := kl.Clone(*res, &alert); err != nil {
kg.Warnf("Failed to clone an alert (%v)", *res)
continue
}
case alert := <-AlertBufferChannel:
if stdoutalerts {
tel, _ := json.Marshal(alert)
fmt.Printf("%s\n", string(tel))
}
AlertLock.RLock()
for uid := range AlertStructs {
select {
case AlertStructs[uid].Broadcast <- (&alert):
case AlertStructs[uid].Broadcast <- alert:
default:
}
}
Expand Down Expand Up @@ -561,18 +548,14 @@ func (rs *RelayServer) AddLogFromBuffChan() {

for Running {
select {
case res := <-LogBufferChannel:
log := pb.Log{}
if err := kl.Clone(*res, &log); err != nil {
kg.Warnf("Failed to clone a log (%v)", *res)
}
case log := <-LogBufferChannel:
if stdoutlogs {
tel, _ := json.Marshal(log)
fmt.Printf("%s\n", string(tel))
}
for uid := range LogStructs {
select {
case LogStructs[uid].Broadcast <- (&log):
case LogStructs[uid].Broadcast <- log:
default:
}
}
Expand Down Expand Up @@ -816,19 +799,27 @@ func (rs *RelayServer) GetFeedsFromNodes() {
if K8s.InitK8sClient() {
kg.Print("Initialized the Kubernetes client")

ipsChan := make(chan string)
if Running {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rs.WgServer.Add(1)
go K8s.WatchKubeArmorPods(ctx, &rs.WgServer, ipsChan)
} else {
close(ipsChan)
}

for Running {
newNodes := K8s.GetKubeArmorNodes()
for _, nodeIP := range newNodes {
select {
case ip := <-ipsChan:
ClientListLock.Lock()
if _, ok := ClientList[nodeIP]; !ok {
ClientList[nodeIP] = 1
go connectToKubeArmor(nodeIP, rs.Port)
if _, ok := ClientList[ip]; !ok {
ClientList[ip] = 1
go connectToKubeArmor(ip, rs.Port)
}
ClientListLock.Unlock()
}

time.Sleep(time.Second * 1)
time.Sleep(10 * time.Second)
}

}
}

0 comments on commit 95e7d11

Please sign in to comment.