Skip to content

Commit

Permalink
Fix for concurrent map iteration and map write (#197)
Browse files Browse the repository at this point in the history
* fixes #196

Added `.Range()` and `.Copy() func to avoid concurrent map read and
write.

Signed-off-by: Shriram Sharma <shriram_sharma@intuit.com>

* fixed the linting errors

Signed-off-by: Shriram Sharma <shriram_sharma@intuit.com>

Co-authored-by: Shriram Sharma <shriram_sharma@intuit.com>
  • Loading branch information
shriramsharma and shriramsharma committed Mar 14, 2022
1 parent d3f4341 commit 4c9ef81
Show file tree
Hide file tree
Showing 8 changed files with 389 additions and 204 deletions.
20 changes: 12 additions & 8 deletions admiral/pkg/apis/admiral/routes/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package routes
import (
"encoding/json"
"fmt"
"github.com/gorilla/mux"
"github.com/istio-ecosystem/admiral/admiral/pkg/clusters"
"istio.io/client-go/pkg/apis/networking/v1alpha3"
"log"
"net/http"
"strings"

"github.com/gorilla/mux"
"github.com/istio-ecosystem/admiral/admiral/pkg/clusters"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
"istio.io/client-go/pkg/apis/networking/v1alpha3"
)

type RouteOpts struct {
Expand Down Expand Up @@ -126,16 +128,18 @@ func (opts *RouteOpts) GetServiceEntriesByIdentity(w http.ResponseWriter, r *htt

if identity != "" {

for cname, serviceCluster := range opts.RemoteRegistry.AdmiralCache.SeClusterCache.Map() {
m := opts.RemoteRegistry.AdmiralCache.SeClusterCache

m.Range(func(cname string, serviceCluster *common.Map) {
if strings.Contains(cname, identity) {
var identityServiceEntry IdentityServiceEntry
identityServiceEntry.Cname = cname
for _, clusterId := range serviceCluster.Map() {
identityServiceEntry.ClusterNames = append(identityServiceEntry.ClusterNames, clusterId)
}
serviceCluster.Range(func(k string, clusterID string) {
identityServiceEntry.ClusterNames = append(identityServiceEntry.ClusterNames, clusterID)
})
response = append(response, identityServiceEntry)
}
}
})
out, err := json.Marshal(response)
if err != nil {
log.Printf("Failed to marshall response GetServiceEntriesByIdentity call")
Expand Down
106 changes: 54 additions & 52 deletions admiral/pkg/clusters/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ package clusters
import (
"bytes"
"fmt"
"reflect"
"sort"
"strings"
"time"

argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/gogo/protobuf/types"
"github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/model"
"github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1"
v1 "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/util"
Expand All @@ -16,10 +21,6 @@ import (
k8sAppsV1 "k8s.io/api/apps/v1"
k8sV1 "k8s.io/api/core/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"reflect"
"sort"
"strings"
"time"
)

const ROLLOUT_POD_HASH_LABEL string = "rollouts-pod-template-hash"
Expand All @@ -45,8 +46,8 @@ type SidecarHandler struct {
}

type WeightedService struct {
Weight int32
Service *k8sV1.Service
Weight int32
Service *k8sV1.Service
}

func updateIdentityDependencyCache(sourceIdentity string, identityDependencyCache *common.MapOfMaps, dr *v1.Dependency) {
Expand All @@ -70,9 +71,9 @@ func getDestinationRule(host string, locality string, gtpTrafficPolicy *model.Tr
processGtp = false
}
outlierDetection := &v1alpha32.OutlierDetection{
BaseEjectionTime: &types.Duration{Seconds: 300},
BaseEjectionTime: &types.Duration{Seconds: 300},
Consecutive_5XxErrors: &types.UInt32Value{Value: uint32(10)},
Interval: &types.Duration{Seconds: 60},
Interval: &types.Duration{Seconds: 60},
}
if gtpTrafficPolicy != nil && processGtp {
var loadBalancerSettings = &v1alpha32.LoadBalancerSettings{
Expand Down Expand Up @@ -108,52 +109,52 @@ func getDestinationRule(host string, locality string, gtpTrafficPolicy *model.Tr

func (se *ServiceEntryHandler) Added(obj *v1alpha3.ServiceEntry) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Add", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Add", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
}

func (se *ServiceEntryHandler) Updated(obj *v1alpha3.ServiceEntry) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Update", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Update", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
}

func (se *ServiceEntryHandler) Deleted(obj *v1alpha3.ServiceEntry) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Delete", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Delete", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
}

func (dh *DestinationRuleHandler) Added(obj *v1alpha3.DestinationRule) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Add", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Add", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
handleDestinationRuleEvent(obj, dh, common.Add, common.DestinationRule)
}

func (dh *DestinationRuleHandler) Updated(obj *v1alpha3.DestinationRule) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Update", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Update", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
handleDestinationRuleEvent(obj, dh, common.Update, common.DestinationRule)
}

func (dh *DestinationRuleHandler) Deleted(obj *v1alpha3.DestinationRule) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Delete", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Delete", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
handleDestinationRuleEvent(obj, dh, common.Delete, common.DestinationRule)
}

func (vh *VirtualServiceHandler) Added(obj *v1alpha3.VirtualService) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Add", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Add", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
err := handleVirtualServiceEvent(obj, vh, common.Add, common.VirtualService)
Expand All @@ -164,7 +165,7 @@ func (vh *VirtualServiceHandler) Added(obj *v1alpha3.VirtualService) {

func (vh *VirtualServiceHandler) Updated(obj *v1alpha3.VirtualService) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Update", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Update", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
err := handleVirtualServiceEvent(obj, vh, common.Update, common.VirtualService)
Expand All @@ -175,7 +176,7 @@ func (vh *VirtualServiceHandler) Updated(obj *v1alpha3.VirtualService) {

func (vh *VirtualServiceHandler) Deleted(obj *v1alpha3.VirtualService) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Delete", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Delete", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
err := handleVirtualServiceEvent(obj, vh, common.Delete, common.VirtualService)
Expand All @@ -192,7 +193,7 @@ func (dh *SidecarHandler) Deleted(obj *v1alpha3.Sidecar) {}

func IgnoreIstioResource(exportTo []string, annotations map[string]string, namespace string) bool {

if len(annotations) > 0 && annotations[common.AdmiralIgnoreAnnotation] == "true" {
if len(annotations) > 0 && annotations[common.AdmiralIgnoreAnnotation] == "true" {
return true
}

Expand Down Expand Up @@ -225,9 +226,9 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu

r := dh.RemoteRegistry

dependentClusters := r.AdmiralCache.CnameDependentClusterCache.Get(destinationRule.Host)
dependentClusters := r.AdmiralCache.CnameDependentClusterCache.Get(destinationRule.Host).Copy()

if dependentClusters != nil && len(dependentClusters.Map()) > 0 {
if len(dependentClusters) > 0 {

log.Infof(LogFormat, "Event", "DestinationRule", obj.Name, clusterId, "Processing")

Expand All @@ -240,7 +241,7 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu

allDependentClusters := make(map[string]string)

util.MapCopy(allDependentClusters, dependentClusters.Map())
util.MapCopy(allDependentClusters, dependentClusters)

allDependentClusters[clusterId] = clusterId

Expand Down Expand Up @@ -416,7 +417,7 @@ func handleVirtualServiceEvent(obj *v1alpha3.VirtualService, vh *VirtualServiceH
}

//check if this virtual service is used by Argo rollouts for canary strategy, if so, update the corresponding SE with appropriate weights
if common.GetAdmiralParams().ArgoRolloutsEnabled {
if common.GetAdmiralParams().ArgoRolloutsEnabled {
rollouts, err := vh.RemoteRegistry.RemoteControllers[clusterId].RolloutController.RolloutClient.Rollouts(obj.Namespace).List(v12.ListOptions{})

if err != nil {
Expand All @@ -432,11 +433,11 @@ func handleVirtualServiceEvent(obj *v1alpha3.VirtualService, vh *VirtualServiceH
}
}

dependentClusters := r.AdmiralCache.CnameDependentClusterCache.Get(virtualService.Hosts[0])
dependentClusters := r.AdmiralCache.CnameDependentClusterCache.Get(virtualService.Hosts[0]).Copy()

if dependentClusters != nil && len(dependentClusters.Map()) > 0 {
if len(dependentClusters) > 0 {

for _, dependentCluster := range dependentClusters.Map() {
for _, dependentCluster := range dependentClusters {

rc := r.RemoteControllers[dependentCluster]

Expand Down Expand Up @@ -480,7 +481,7 @@ func handleVirtualServiceEvent(obj *v1alpha3.VirtualService, vh *VirtualServiceH
}
return nil
} else {
log.Infof(LogFormat,"Event", "VirtualService", obj.Name, clusterId, "No dependent clusters found")
log.Infof(LogFormat, "Event", "VirtualService", obj.Name, clusterId, "No dependent clusters found")
}

//copy the VirtualService `as is` if they are not generated by Admiral (not in CnameDependentClusterCache)
Expand Down Expand Up @@ -510,7 +511,7 @@ func addUpdateVirtualService(obj *v1alpha3.VirtualService, exist *v1alpha3.Virtu
if obj.Annotations == nil {
obj.Annotations = map[string]string{}
}
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
if exist == nil || len(exist.Spec.Hosts) == 0 {
obj.Namespace = namespace
obj.ResourceVersion = ""
Expand All @@ -537,20 +538,20 @@ func addUpdateServiceEntry(obj *v1alpha3.ServiceEntry, exist *v1alpha3.ServiceEn
if obj.Annotations == nil {
obj.Annotations = map[string]string{}
}
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
if exist == nil || exist.Spec.Hosts == nil {
obj.Namespace = namespace
obj.ResourceVersion = ""
_, err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(namespace).Create(obj)
op = "Add"
log.Infof(LogFormat + " SE=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "New SE", obj.Spec.String())
log.Infof(LogFormat+" SE=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "New SE", obj.Spec.String())
} else {
exist.Labels = obj.Labels
exist.Annotations = obj.Annotations
op = "Update"
skipUpdate, diff := skipDestructiveUpdate(rc, obj, exist)
if diff != "" {
log.Infof(LogFormat + " diff=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "Diff in update", diff)
log.Infof(LogFormat+" diff=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "Diff in update", diff)
}
if skipUpdate {
log.Infof(LogFormat, op, "ServiceEntry", obj.Name, rc.ClusterID, "Update skipped as it was destructive during Admiral's bootup phase")
Expand All @@ -573,7 +574,7 @@ func skipDestructiveUpdate(rc *RemoteController, new *v1alpha3.ServiceEntry, old
skipDestructive = false
destructive, diff := getServiceEntryDiff(new, old)
//do not update SEs during bootup phase if they are destructive
if time.Since(rc.StartTime) < (2 * common.GetAdmiralParams().CacheRefreshDuration) && destructive {
if time.Since(rc.StartTime) < (2*common.GetAdmiralParams().CacheRefreshDuration) && destructive {
skipDestructive = true
}

Expand Down Expand Up @@ -603,17 +604,17 @@ func getServiceEntryDiff(new *v1alpha3.ServiceEntry, old *v1alpha3.ServiceEntry)
found[nEndpoint.Address] = "1"
if !reflect.DeepEqual(val, nEndpoint) {
destructive = true
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Update", val.String(), nEndpoint.String()))
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Update", val.String(), nEndpoint.String()))
}
} else {
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Add", "", nEndpoint.String()))
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Add", "", nEndpoint.String()))
}
}

for key := range oldEndpointMap {
if _, ok := found[key]; !ok {
destructive = true
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Delete", oldEndpointMap[key].String(), ""))
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Delete", oldEndpointMap[key].String(), ""))
}
}

Expand All @@ -638,7 +639,7 @@ func addUpdateDestinationRule(obj *v1alpha3.DestinationRule, exist *v1alpha3.Des
if obj.Annotations == nil {
obj.Annotations = map[string]string{}
}
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
if exist == nil || exist.Name == "" || exist.Spec.Host == "" {
obj.Namespace = namespace
obj.ResourceVersion = ""
Expand Down Expand Up @@ -714,22 +715,24 @@ func getServiceForDeployment(rc *RemoteController, deployment *k8sAppsV1.Deploym
return matchedService
}

func getDependentClusters(dependents *common.Map, identityClusterCache *common.MapOfMaps, sourceServices map[string]*k8sV1.Service) map[string]string {
func getDependentClusters(dependents map[string]string, identityClusterCache *common.MapOfMaps, sourceServices map[string]*k8sV1.Service) map[string]string {
var dependentClusters = make(map[string]string)
//TODO optimize this map construction
if dependents != nil {
for identity, clusters := range identityClusterCache.Map() {
for depIdentity := range dependents.Map() {
if identity == depIdentity {
for _, clusterId := range clusters.Map() {
_, ok := sourceServices[clusterId]
if !ok {
dependentClusters[clusterId] = clusterId
}
}
}
}

if dependents == nil {
return dependentClusters
}

for depIdentity := range dependents {
clusters := identityClusterCache.Get(depIdentity)
if clusters == nil {
continue
}
clusters.Range(func(k string, clusterID string) {
_, ok := sourceServices[clusterID]
if !ok {
dependentClusters[clusterID] = clusterID
}
})
}
return dependentClusters
}
Expand Down Expand Up @@ -761,7 +764,7 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin
return nil
}

var canaryService, stableService, virtualServiceRouteName string
var canaryService, stableService, virtualServiceRouteName string

var istioCanaryWeights = make(map[string]int32)

Expand Down Expand Up @@ -827,7 +830,6 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin

var matchedServices = make(map[string]*WeightedService)


//if we have more than one matching service we will pick the first one, for this to be deterministic we sort services
var servicesInNamespace = cachedService.Service[rollout.Namespace]

Expand Down
Loading

0 comments on commit 4c9ef81

Please sign in to comment.