Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix concurrent map iteration and map write #283

Merged
merged 1 commit into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion admiral/pkg/clusters/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ func createServiceEntrySkeletion(se v1alpha32.ServiceEntry, name string, namespa
}

//nolint
func createSidecarSkeletion(sidecar v1alpha32.Sidecar, name string, namespace string) *v1alpha3.Sidecar {
func createSidecarSkeleton(sidecar v1alpha32.Sidecar, name string, namespace string) *v1alpha3.Sidecar {
return &v1alpha3.Sidecar{Spec: sidecar, ObjectMeta: v12.ObjectMeta{Name: name, Namespace: namespace}}
}

Expand Down
77 changes: 43 additions & 34 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ func modifyServiceEntryForNewServiceOrPod(
}

if common.GetWorkloadSidecarUpdate() == "enabled" {
modifySidecarForLocalClusterCommunication(ctx, serviceInstance.Namespace, remoteRegistry.AdmiralCache.DependencyNamespaceCache.Get(sourceIdentity), rc)
modifySidecarForLocalClusterCommunication(
ctx, serviceInstance.Namespace, sourceIdentity,
remoteRegistry.AdmiralCache.DependencyNamespaceCache, rc)
}

for _, val := range dependents {
Expand Down Expand Up @@ -373,50 +375,57 @@ func updateEndpointsForWeightedServices(serviceEntry *networking.ServiceEntry, w
serviceEntry.Endpoints = endpoints
}

func modifySidecarForLocalClusterCommunication(ctx context.Context, sidecarNamespace string, sidecarEgressMap map[string]common.SidecarEgress, rc *RemoteController) {
func modifySidecarForLocalClusterCommunication(
ctx context.Context, sidecarNamespace, sourceIdentity string,
sidecarEgressMap *common.SidecarEgressMap, rc *RemoteController) {

//get existing sidecar from the cluster
sidecarConfig := rc.SidecarController

if sidecarConfig == nil || sidecarEgressMap == nil {
return
}
sidecarEgressMap.Range(func(k string, v map[string]common.SidecarEgress) {
if k == sourceIdentity {
sidecarEgress := v
if sidecarConfig == nil || sidecarEgress == nil {
return
}

sidecar, err := sidecarConfig.IstioClient.NetworkingV1alpha3().Sidecars(sidecarNamespace).Get(ctx, common.GetWorkloadSidecarName(), v12.GetOptions{})
if err != nil {
return
}
if sidecar == nil || (sidecar.Spec.Egress == nil) {
return
}
sidecar, err := sidecarConfig.IstioClient.NetworkingV1alpha3().Sidecars(sidecarNamespace).Get(ctx, common.GetWorkloadSidecarName(), v12.GetOptions{})
if err != nil {
return
}
if sidecar == nil || (sidecar.Spec.Egress == nil) {
return
}

//copy and add our new local FQDN
newSidecar := copySidecar(sidecar)
//copy and add our new local FQDN
newSidecar := copySidecar(sidecar)

egressHosts := make(map[string]string)
egressHosts := make(map[string]string)

for _, sidecarEgress := range sidecarEgressMap {
egressHost := sidecarEgress.Namespace + "/" + sidecarEgress.FQDN
egressHosts[egressHost] = egressHost
for cname := range sidecarEgress.CNAMEs {
scopedCname := sidecarEgress.Namespace + "/" + cname
egressHosts[scopedCname] = scopedCname
}
}
for _, sidecarEgress := range sidecarEgress {
egressHost := sidecarEgress.Namespace + "/" + sidecarEgress.FQDN
egressHosts[egressHost] = egressHost
for cname := range sidecarEgress.CNAMEs {
scopedCname := sidecarEgress.Namespace + "/" + cname
egressHosts[scopedCname] = scopedCname
}
}

for egressHost := range egressHosts {
if !util.Contains(newSidecar.Spec.Egress[0].Hosts, egressHost) {
newSidecar.Spec.Egress[0].Hosts = append(newSidecar.Spec.Egress[0].Hosts, egressHost)
}
}
for egressHost := range egressHosts {
if !util.Contains(newSidecar.Spec.Egress[0].Hosts, egressHost) {
newSidecar.Spec.Egress[0].Hosts = append(newSidecar.Spec.Egress[0].Hosts, egressHost)
}
}

//nolint
newSidecarConfig := createSidecarSkeletion(newSidecar.Spec, common.GetWorkloadSidecarName(), sidecarNamespace)
//nolint
newSidecarConfig := createSidecarSkeleton(newSidecar.Spec, common.GetWorkloadSidecarName(), sidecarNamespace)

//insert into cluster
if newSidecarConfig != nil {
addUpdateSidecar(ctx, newSidecarConfig, sidecar, sidecarNamespace, rc)
}
//insert into cluster
if newSidecarConfig != nil {
addUpdateSidecar(ctx, newSidecarConfig, sidecar, sidecarNamespace, rc)
}
}
})
}

func addUpdateSidecar(ctx context.Context, obj *v1alpha3.Sidecar, exist *v1alpha3.Sidecar, namespace string, rc *RemoteController) {
Expand Down
127 changes: 101 additions & 26 deletions admiral/pkg/clusters/serviceentry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,17 +1082,75 @@ func buildFakeConfigMapFromAddressStore(addressStore *ServiceEntryAddressStore,
}

func TestModifyNonExistingSidecarForLocalClusterCommunication(t *testing.T) {
setupForServiceEntryTests()
var (
assetIdentity = "test-identity"
identityNamespace = "test-dependency-namespace"
assetFQDN = "test-local-fqdn"
sidecar = &v1alpha3.Sidecar{
ObjectMeta: metav1.ObjectMeta{
Name: "default",
Namespace: identityNamespace,
},
Spec: istioNetworkingV1Alpha3.Sidecar{
Egress: []*istioNetworkingV1Alpha3.IstioEgressListener{
{
Hosts: []string{"a"},
},
},
},
}
)
sidecarController := &istio.SidecarController{}
sidecarController.IstioClient = istiofake.NewSimpleClientset()
sidecarController.IstioClient.NetworkingV1alpha3().Sidecars(identityNamespace).
Create(context.TODO(), sidecar, v12.CreateOptions{})

remoteController := &RemoteController{}
remoteController.SidecarController = sidecarController

sidecarEgressMap := make(map[string]common.SidecarEgress)
sidecarEgressMap["test-dependency-namespace"] = common.SidecarEgress{Namespace: "test-dependency-namespace", FQDN: "test-local-fqdn"}
ctx := context.Background()

modifySidecarForLocalClusterCommunication(ctx, "test-sidecar-namespace", sidecarEgressMap, remoteController)
sidecarCacheEgressMap := common.NewSidecarEgressMap()
sidecarCacheEgressMap.Put(
assetIdentity,
identityNamespace,
assetFQDN,
nil,
)
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancel()
var wg sync.WaitGroup
wg.Add(2)
go func(ctx context.Context) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
sidecarCacheEgressMap.Put(
assetIdentity,
identityNamespace,
assetFQDN,
nil,
)
}
}
}(ctx)

go func(ctx context.Context) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
modifySidecarForLocalClusterCommunication(
ctx, identityNamespace, assetIdentity,
sidecarCacheEgressMap, remoteController)
}
}
}(ctx)
wg.Wait()

sidecarObj, err := sidecarController.IstioClient.NetworkingV1alpha3().Sidecars("test-sidecar-namespace").Get(ctx, common.GetWorkloadSidecarName(), v12.GetOptions{})
if err == nil {
Expand All @@ -1105,35 +1163,52 @@ func TestModifyNonExistingSidecarForLocalClusterCommunication(t *testing.T) {
}

func TestModifyExistingSidecarForLocalClusterCommunication(t *testing.T) {
setupForServiceEntryTests()
var (
assetIdentity = "test-identity"
identityNamespace = "test-sidecar-namespace"
sidecarName = "default"
assetHostsList = []string{"test-host"}
sidecar = &v1alpha3.Sidecar{
ObjectMeta: metav1.ObjectMeta{
Name: sidecarName,
Namespace: identityNamespace,
},
Spec: istioNetworkingV1Alpha3.Sidecar{
Egress: []*istioNetworkingV1Alpha3.IstioEgressListener{
{
Hosts: assetHostsList,
},
},
},
}

sidecarController := &istio.SidecarController{}
sidecarController.IstioClient = istiofake.NewSimpleClientset()

remoteController := &RemoteController{}
sidecarController = &istio.SidecarController{}
remoteController = &RemoteController{}
sidecarCacheEgressMap = common.NewSidecarEgressMap()
)
sidecarCacheEgressMap.Put(
assetIdentity,
"test-dependency-namespace",
"test-local-fqdn",
map[string]string{
"test.myservice.global": "1",
},
)
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancel()
remoteController.SidecarController = sidecarController
sidecarController.IstioClient = istiofake.NewSimpleClientset()
createdSidecar, err := sidecarController.IstioClient.NetworkingV1alpha3().Sidecars(identityNamespace).
Create(context.TODO(), sidecar, v12.CreateOptions{})

existingSidecarObj := &v1alpha3.Sidecar{}
existingSidecarObj.ObjectMeta.Namespace = "test-sidecar-namespace"
existingSidecarObj.ObjectMeta.Name = "default"

istioEgress := istioNetworkingV1Alpha3.IstioEgressListener{
Hosts: []string{"test-host"},
}

existingSidecarObj.Spec = istioNetworkingV1Alpha3.Sidecar{
Egress: []*istioNetworkingV1Alpha3.IstioEgressListener{&istioEgress},
}

ctx := context.Background()
createdSidecar, err := sidecarController.IstioClient.NetworkingV1alpha3().Sidecars("test-sidecar-namespace").Create(ctx, existingSidecarObj, v12.CreateOptions{})
if err != nil {
t.Error(err)
t.Errorf("unable to create sidecar using fake client, err: %v", err)
}
if createdSidecar != nil {

sidecarEgressMap := make(map[string]common.SidecarEgress)
sidecarEgressMap["test-dependency-namespace"] = common.SidecarEgress{Namespace: "test-dependency-namespace", FQDN: "test-local-fqdn", CNAMEs: map[string]string{"test.myservice.global": "1"}}
modifySidecarForLocalClusterCommunication(ctx, "test-sidecar-namespace", sidecarEgressMap, remoteController)
modifySidecarForLocalClusterCommunication(ctx, identityNamespace, assetIdentity, sidecarCacheEgressMap, remoteController)

updatedSidecar, err := sidecarController.IstioClient.NetworkingV1alpha3().Sidecars("test-sidecar-namespace").Get(ctx, "default", v12.GetOptions{})

Expand Down
7 changes: 0 additions & 7 deletions admiral/pkg/controller/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,6 @@ func (s *SidecarEgressMap) Delete(key string) {
delete(s.cache, key)
}

// Map func returns a map of identity to namespace:SidecarEgress map
// Iterating through the returned map is not implicitly thread safe,
// use (s *SidecarEgressMap) Range() func instead.
func (s *SidecarEgressMap) Map() map[string]map[string]SidecarEgress {
return s.cache
}

// Range is a thread safe iterator to iterate through the SidecarEgress map
func (s *SidecarEgressMap) Range(fn func(k string, v map[string]SidecarEgress)) {
defer s.mutex.Unlock()
Expand Down
35 changes: 0 additions & 35 deletions admiral/pkg/controller/common/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/uuid"
)
Expand Down Expand Up @@ -48,40 +47,6 @@ func TestMapOfMaps(t *testing.T) {
}
}

func TestEgressMap(t *testing.T) {
egressMap := NewSidecarEgressMap()
payments, orders := "payments", "orders"
paymentsEnv, ordersEnv := "prod", "staging"
paymentsNs, ordersNs := payments+"-"+paymentsEnv, orders+"-"+ordersEnv
paymentsFqdn, ordersFqdn := payments+"."+paymentsNs+"."+"svc.cluster.local", orders+"."+ordersNs+"."+"svc.cluster.local"
paymentsCname, ordersCname := paymentsEnv+"."+payments+".global", ordersEnv+"."+orders+".global"
paymentsSidecar, ordersSidecar := SidecarEgress{FQDN: paymentsFqdn, Namespace: paymentsNs, CNAMEs: map[string]string{paymentsCname: paymentsCname}}, SidecarEgress{FQDN: ordersFqdn, Namespace: ordersNs, CNAMEs: map[string]string{ordersCname: ordersCname}}
egressMap.Put(payments, paymentsNs, paymentsFqdn, map[string]string{paymentsCname: paymentsCname})
egressMap.Put(orders, ordersNs, ordersFqdn, map[string]string{ordersCname: ordersCname})

ordersEgress := egressMap.Get("orders")

if !cmp.Equal(ordersEgress[ordersNs], ordersSidecar) {
t.Errorf("Orders egress object should match expected %v, got %v", ordersSidecar, ordersEgress[ordersNs])
t.FailNow()
}

egressMap.Delete(orders)
ordersEgress = egressMap.Get("orders")

if ordersEgress != nil {
t.Errorf("Delete object should delete the object %v", ordersEgress)
t.FailNow()
}

egressMapForIter := egressMap.Map()

if len(egressMapForIter) != 1 {
t.Errorf("Egressmap should contains only one object %v", paymentsSidecar)
t.FailNow()
}
}

func TestAdmiralParams(t *testing.T) {
admiralParams := AdmiralParams{SANPrefix: "custom.san.prefix"}
admiralParamsStr := admiralParams.String()
Expand Down