From 4aded05125714c28446d5832b5dec4f71df8ed80 Mon Sep 17 00:00:00 2001 From: Anubhav Aeron Date: Wed, 8 Feb 2023 13:53:24 -0800 Subject: [PATCH] fix concurrent map iteration and map write Signed-off-by: Anubhav Aeron --- admiral/pkg/clusters/handler.go | 2 +- admiral/pkg/clusters/serviceentry.go | 77 ++++++------ admiral/pkg/clusters/serviceentry_test.go | 127 ++++++++++++++++---- admiral/pkg/controller/common/types.go | 7 -- admiral/pkg/controller/common/types_test.go | 35 ------ 5 files changed, 145 insertions(+), 103 deletions(-) diff --git a/admiral/pkg/clusters/handler.go b/admiral/pkg/clusters/handler.go index a495923c..5e968b33 100644 --- a/admiral/pkg/clusters/handler.go +++ b/admiral/pkg/clusters/handler.go @@ -707,7 +707,7 @@ func createServiceEntrySkeletion(se v1alpha32.ServiceEntry, name string, namespa } //nolint -func createSidecarSkeletion(sidecar v1alpha32.Sidecar, name string, namespace string) *v1alpha3.Sidecar { +func createSidecarSkeleton(sidecar v1alpha32.Sidecar, name string, namespace string) *v1alpha3.Sidecar { return &v1alpha3.Sidecar{Spec: sidecar, ObjectMeta: v12.ObjectMeta{Name: name, Namespace: namespace}} } diff --git a/admiral/pkg/clusters/serviceentry.go b/admiral/pkg/clusters/serviceentry.go index 65511f08..0fea1031 100644 --- a/admiral/pkg/clusters/serviceentry.go +++ b/admiral/pkg/clusters/serviceentry.go @@ -240,7 +240,9 @@ func modifyServiceEntryForNewServiceOrPod( } if common.GetWorkloadSidecarUpdate() == "enabled" { - modifySidecarForLocalClusterCommunication(ctx, serviceInstance.Namespace, remoteRegistry.AdmiralCache.DependencyNamespaceCache.Get(sourceIdentity), rc) + modifySidecarForLocalClusterCommunication( + ctx, serviceInstance.Namespace, sourceIdentity, + remoteRegistry.AdmiralCache.DependencyNamespaceCache, rc) } for _, val := range dependents { @@ -373,50 +375,57 @@ func updateEndpointsForWeightedServices(serviceEntry *networking.ServiceEntry, w serviceEntry.Endpoints = endpoints } -func modifySidecarForLocalClusterCommunication(ctx context.Context, sidecarNamespace string, sidecarEgressMap map[string]common.SidecarEgress, rc *RemoteController) { +func modifySidecarForLocalClusterCommunication( + ctx context.Context, sidecarNamespace, sourceIdentity string, + sidecarEgressMap *common.SidecarEgressMap, rc *RemoteController) { //get existing sidecar from the cluster sidecarConfig := rc.SidecarController - if sidecarConfig == nil || sidecarEgressMap == nil { - return - } + sidecarEgressMap.Range(func(k string, v map[string]common.SidecarEgress) { + if k == sourceIdentity { + sidecarEgress := v + if sidecarConfig == nil || sidecarEgress == nil { + return + } - sidecar, err := sidecarConfig.IstioClient.NetworkingV1alpha3().Sidecars(sidecarNamespace).Get(ctx, common.GetWorkloadSidecarName(), v12.GetOptions{}) - if err != nil { - return - } - if sidecar == nil || (sidecar.Spec.Egress == nil) { - return - } + sidecar, err := sidecarConfig.IstioClient.NetworkingV1alpha3().Sidecars(sidecarNamespace).Get(ctx, common.GetWorkloadSidecarName(), v12.GetOptions{}) + if err != nil { + return + } + if sidecar == nil || (sidecar.Spec.Egress == nil) { + return + } - //copy and add our new local FQDN - newSidecar := copySidecar(sidecar) + //copy and add our new local FQDN + newSidecar := copySidecar(sidecar) - egressHosts := make(map[string]string) + egressHosts := make(map[string]string) - for _, sidecarEgress := range sidecarEgressMap { - egressHost := sidecarEgress.Namespace + "/" + sidecarEgress.FQDN - egressHosts[egressHost] = egressHost - for cname := range sidecarEgress.CNAMEs { - scopedCname := sidecarEgress.Namespace + "/" + cname - egressHosts[scopedCname] = scopedCname - } - } + for _, sidecarEgress := range sidecarEgress { + egressHost := sidecarEgress.Namespace + "/" + sidecarEgress.FQDN + egressHosts[egressHost] = egressHost + for cname := range sidecarEgress.CNAMEs { + scopedCname := sidecarEgress.Namespace + "/" + cname + egressHosts[scopedCname] = scopedCname + } + } - for egressHost := range egressHosts { - if !util.Contains(newSidecar.Spec.Egress[0].Hosts, egressHost) { - newSidecar.Spec.Egress[0].Hosts = append(newSidecar.Spec.Egress[0].Hosts, egressHost) - } - } + for egressHost := range egressHosts { + if !util.Contains(newSidecar.Spec.Egress[0].Hosts, egressHost) { + newSidecar.Spec.Egress[0].Hosts = append(newSidecar.Spec.Egress[0].Hosts, egressHost) + } + } - //nolint - newSidecarConfig := createSidecarSkeletion(newSidecar.Spec, common.GetWorkloadSidecarName(), sidecarNamespace) + //nolint + newSidecarConfig := createSidecarSkeleton(newSidecar.Spec, common.GetWorkloadSidecarName(), sidecarNamespace) - //insert into cluster - if newSidecarConfig != nil { - addUpdateSidecar(ctx, newSidecarConfig, sidecar, sidecarNamespace, rc) - } + //insert into cluster + if newSidecarConfig != nil { + addUpdateSidecar(ctx, newSidecarConfig, sidecar, sidecarNamespace, rc) + } + } + }) } func addUpdateSidecar(ctx context.Context, obj *v1alpha3.Sidecar, exist *v1alpha3.Sidecar, namespace string, rc *RemoteController) { diff --git a/admiral/pkg/clusters/serviceentry_test.go b/admiral/pkg/clusters/serviceentry_test.go index 9a549769..4dac1181 100644 --- a/admiral/pkg/clusters/serviceentry_test.go +++ b/admiral/pkg/clusters/serviceentry_test.go @@ -1082,17 +1082,75 @@ func buildFakeConfigMapFromAddressStore(addressStore *ServiceEntryAddressStore, } func TestModifyNonExistingSidecarForLocalClusterCommunication(t *testing.T) { + setupForServiceEntryTests() + var ( + assetIdentity = "test-identity" + identityNamespace = "test-dependency-namespace" + assetFQDN = "test-local-fqdn" + sidecar = &v1alpha3.Sidecar{ + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + Namespace: identityNamespace, + }, + Spec: istioNetworkingV1Alpha3.Sidecar{ + Egress: []*istioNetworkingV1Alpha3.IstioEgressListener{ + { + Hosts: []string{"a"}, + }, + }, + }, + } + ) sidecarController := &istio.SidecarController{} sidecarController.IstioClient = istiofake.NewSimpleClientset() + sidecarController.IstioClient.NetworkingV1alpha3().Sidecars(identityNamespace). + Create(context.TODO(), sidecar, v12.CreateOptions{}) remoteController := &RemoteController{} remoteController.SidecarController = sidecarController - sidecarEgressMap := make(map[string]common.SidecarEgress) - sidecarEgressMap["test-dependency-namespace"] = common.SidecarEgress{Namespace: "test-dependency-namespace", FQDN: "test-local-fqdn"} - ctx := context.Background() - - modifySidecarForLocalClusterCommunication(ctx, "test-sidecar-namespace", sidecarEgressMap, remoteController) + sidecarCacheEgressMap := common.NewSidecarEgressMap() + sidecarCacheEgressMap.Put( + assetIdentity, + identityNamespace, + assetFQDN, + nil, + ) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second)) + defer cancel() + var wg sync.WaitGroup + wg.Add(2) + go func(ctx context.Context) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + sidecarCacheEgressMap.Put( + assetIdentity, + identityNamespace, + assetFQDN, + nil, + ) + } + } + }(ctx) + + go func(ctx context.Context) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + modifySidecarForLocalClusterCommunication( + ctx, identityNamespace, assetIdentity, + sidecarCacheEgressMap, remoteController) + } + } + }(ctx) + wg.Wait() sidecarObj, err := sidecarController.IstioClient.NetworkingV1alpha3().Sidecars("test-sidecar-namespace").Get(ctx, common.GetWorkloadSidecarName(), v12.GetOptions{}) if err == nil { @@ -1105,35 +1163,52 @@ func TestModifyNonExistingSidecarForLocalClusterCommunication(t *testing.T) { } func TestModifyExistingSidecarForLocalClusterCommunication(t *testing.T) { + setupForServiceEntryTests() + var ( + assetIdentity = "test-identity" + identityNamespace = "test-sidecar-namespace" + sidecarName = "default" + assetHostsList = []string{"test-host"} + sidecar = &v1alpha3.Sidecar{ + ObjectMeta: metav1.ObjectMeta{ + Name: sidecarName, + Namespace: identityNamespace, + }, + Spec: istioNetworkingV1Alpha3.Sidecar{ + Egress: []*istioNetworkingV1Alpha3.IstioEgressListener{ + { + Hosts: assetHostsList, + }, + }, + }, + } - sidecarController := &istio.SidecarController{} - sidecarController.IstioClient = istiofake.NewSimpleClientset() - - remoteController := &RemoteController{} + sidecarController = &istio.SidecarController{} + remoteController = &RemoteController{} + sidecarCacheEgressMap = common.NewSidecarEgressMap() + ) + sidecarCacheEgressMap.Put( + assetIdentity, + "test-dependency-namespace", + "test-local-fqdn", + map[string]string{ + "test.myservice.global": "1", + }, + ) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second)) + defer cancel() remoteController.SidecarController = sidecarController + sidecarController.IstioClient = istiofake.NewSimpleClientset() + createdSidecar, err := sidecarController.IstioClient.NetworkingV1alpha3().Sidecars(identityNamespace). + Create(context.TODO(), sidecar, v12.CreateOptions{}) - existingSidecarObj := &v1alpha3.Sidecar{} - existingSidecarObj.ObjectMeta.Namespace = "test-sidecar-namespace" - existingSidecarObj.ObjectMeta.Name = "default" - - istioEgress := istioNetworkingV1Alpha3.IstioEgressListener{ - Hosts: []string{"test-host"}, - } - - existingSidecarObj.Spec = istioNetworkingV1Alpha3.Sidecar{ - Egress: []*istioNetworkingV1Alpha3.IstioEgressListener{&istioEgress}, - } - - ctx := context.Background() - createdSidecar, err := sidecarController.IstioClient.NetworkingV1alpha3().Sidecars("test-sidecar-namespace").Create(ctx, existingSidecarObj, v12.CreateOptions{}) if err != nil { - t.Error(err) + t.Errorf("unable to create sidecar using fake client, err: %v", err) } if createdSidecar != nil { - sidecarEgressMap := make(map[string]common.SidecarEgress) sidecarEgressMap["test-dependency-namespace"] = common.SidecarEgress{Namespace: "test-dependency-namespace", FQDN: "test-local-fqdn", CNAMEs: map[string]string{"test.myservice.global": "1"}} - modifySidecarForLocalClusterCommunication(ctx, "test-sidecar-namespace", sidecarEgressMap, remoteController) + modifySidecarForLocalClusterCommunication(ctx, identityNamespace, assetIdentity, sidecarCacheEgressMap, remoteController) updatedSidecar, err := sidecarController.IstioClient.NetworkingV1alpha3().Sidecars("test-sidecar-namespace").Get(ctx, "default", v12.GetOptions{}) diff --git a/admiral/pkg/controller/common/types.go b/admiral/pkg/controller/common/types.go index 171fab30..6b6566e0 100644 --- a/admiral/pkg/controller/common/types.go +++ b/admiral/pkg/controller/common/types.go @@ -207,13 +207,6 @@ func (s *SidecarEgressMap) Delete(key string) { delete(s.cache, key) } -// Map func returns a map of identity to namespace:SidecarEgress map -// Iterating through the returned map is not implicitly thread safe, -// use (s *SidecarEgressMap) Range() func instead. -func (s *SidecarEgressMap) Map() map[string]map[string]SidecarEgress { - return s.cache -} - // Range is a thread safe iterator to iterate through the SidecarEgress map func (s *SidecarEgressMap) Range(fn func(k string, v map[string]SidecarEgress)) { defer s.mutex.Unlock() diff --git a/admiral/pkg/controller/common/types_test.go b/admiral/pkg/controller/common/types_test.go index 741a0d41..b56c9a4d 100644 --- a/admiral/pkg/controller/common/types_test.go +++ b/admiral/pkg/controller/common/types_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/util/uuid" ) @@ -48,40 +47,6 @@ func TestMapOfMaps(t *testing.T) { } } -func TestEgressMap(t *testing.T) { - egressMap := NewSidecarEgressMap() - payments, orders := "payments", "orders" - paymentsEnv, ordersEnv := "prod", "staging" - paymentsNs, ordersNs := payments+"-"+paymentsEnv, orders+"-"+ordersEnv - paymentsFqdn, ordersFqdn := payments+"."+paymentsNs+"."+"svc.cluster.local", orders+"."+ordersNs+"."+"svc.cluster.local" - paymentsCname, ordersCname := paymentsEnv+"."+payments+".global", ordersEnv+"."+orders+".global" - paymentsSidecar, ordersSidecar := SidecarEgress{FQDN: paymentsFqdn, Namespace: paymentsNs, CNAMEs: map[string]string{paymentsCname: paymentsCname}}, SidecarEgress{FQDN: ordersFqdn, Namespace: ordersNs, CNAMEs: map[string]string{ordersCname: ordersCname}} - egressMap.Put(payments, paymentsNs, paymentsFqdn, map[string]string{paymentsCname: paymentsCname}) - egressMap.Put(orders, ordersNs, ordersFqdn, map[string]string{ordersCname: ordersCname}) - - ordersEgress := egressMap.Get("orders") - - if !cmp.Equal(ordersEgress[ordersNs], ordersSidecar) { - t.Errorf("Orders egress object should match expected %v, got %v", ordersSidecar, ordersEgress[ordersNs]) - t.FailNow() - } - - egressMap.Delete(orders) - ordersEgress = egressMap.Get("orders") - - if ordersEgress != nil { - t.Errorf("Delete object should delete the object %v", ordersEgress) - t.FailNow() - } - - egressMapForIter := egressMap.Map() - - if len(egressMapForIter) != 1 { - t.Errorf("Egressmap should contains only one object %v", paymentsSidecar) - t.FailNow() - } -} - func TestAdmiralParams(t *testing.T) { admiralParams := AdmiralParams{SANPrefix: "custom.san.prefix"} admiralParamsStr := admiralParams.String()