From 5807acb3a663a2b3d4332af36ad7398b9129838b Mon Sep 17 00:00:00 2001 From: aattuluri <44482891+aattuluri@users.noreply.github.com> Date: Wed, 6 Jul 2022 15:40:27 -0700 Subject: [PATCH] Best effort to pick stable/active services for Rollouts (#242) Signed-off-by: sa --- admiral/pkg/clusters/handler.go | 6 +- admiral/pkg/clusters/handler_test.go | 80 ++- admiral/pkg/clusters/types.go | 881 ++++++++++++++++++------ admiral/pkg/controller/common/common.go | 2 + 4 files changed, 758 insertions(+), 211 deletions(-) diff --git a/admiral/pkg/clusters/handler.go b/admiral/pkg/clusters/handler.go index c31dec3d..5881d724 100644 --- a/admiral/pkg/clusters/handler.go +++ b/admiral/pkg/clusters/handler.go @@ -798,4 +798,8 @@ func GetServiceWithSuffixMatch(suffix string, services []*k8sV1.Service) string } } return "" -} \ No newline at end of file +<<<<<<< HEAD +} +======= +} +>>>>>>> Best effort to pick stable/active services for Rollouts (#242) diff --git a/admiral/pkg/clusters/handler_test.go b/admiral/pkg/clusters/handler_test.go index 5151010a..06c56083 100644 --- a/admiral/pkg/clusters/handler_test.go +++ b/admiral/pkg/clusters/handler_test.go @@ -663,6 +663,8 @@ func TestGetServiceForRolloutCanary(t *testing.T) { const ServiceName = "serviceName" const StableServiceName = "stableserviceName" const CanaryServiceName = "canaryserviceName" + const GeneratedStableServiceName = "hello-" + common.RolloutStableServiceSuffix + const LatestMatchingService = "hello-root-service" const VS_NAME_1 = "virtualservice1" const VS_NAME_2 = "virtualservice2" const VS_NAME_3 = "virtualservice3" @@ -719,6 +721,14 @@ func TestGetServiceForRolloutCanary(t *testing.T) { }, } + generatedStableService := &coreV1.Service{ + ObjectMeta: v12.ObjectMeta{Name: GeneratedStableServiceName, Namespace: Namespace, CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-15)))}, + Spec: coreV1.ServiceSpec{ + Selector: selectorMap, + Ports: ports, + }, + } + canaryService := &coreV1.Service{ ObjectMeta: v12.ObjectMeta{Name: CanaryServiceName, Namespace: Namespace, CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-15)))}, Spec: coreV1.ServiceSpec{ @@ -727,6 +737,14 @@ func TestGetServiceForRolloutCanary(t *testing.T) { }, } + latestMatchingService := &coreV1.Service{ + ObjectMeta: v12.ObjectMeta{Name: LatestMatchingService, Namespace: Namespace, CreationTimestamp: v12.NewTime(time.Now())}, + Spec: coreV1.ServiceSpec{ + Selector: selectorMap, + Ports: ports, + }, + } + selectorMap1 := make(map[string]string) selectorMap1["app"] = "test1" service1 := &coreV1.Service{ @@ -775,7 +793,9 @@ func TestGetServiceForRolloutCanary(t *testing.T) { rcTemp.ServiceController.Cache.Put(service1) rcTemp.ServiceController.Cache.Put(service4) rcTemp.ServiceController.Cache.Put(stableService) + rcTemp.ServiceController.Cache.Put(generatedStableService) rcTemp.ServiceController.Cache.Put(canaryService) + rcTemp.ServiceController.Cache.Put(latestMatchingService) virtualService := &v1alpha32.VirtualService{ ObjectMeta: v12.ObjectMeta{Name: VS_NAME_1, Namespace: Namespace}, @@ -825,7 +845,8 @@ func TestGetServiceForRolloutCanary(t *testing.T) { canaryRollout.Namespace = Namespace canaryRollout.Spec.Strategy = argo.RolloutStrategy{ - Canary: &argo.CanaryStrategy{}, + Canary: &argo.CanaryStrategy{ + }, } canaryRolloutNS1 := argo.Rollout{ @@ -943,6 +964,20 @@ func TestGetServiceForRolloutCanary(t *testing.T) { }, } + canaryRolloutWithStableService := argo.Rollout{ + Spec: argo.RolloutSpec{Template: coreV1.PodTemplateSpec{ + ObjectMeta: k8sv1.ObjectMeta{Annotations: map[string]string{}}, + }}} + canaryRolloutWithStableService.Spec.Selector = &labelSelector + + canaryRolloutWithStableService.Namespace = Namespace + canaryRolloutWithStableService.Spec.Strategy = argo.RolloutStrategy{ + Canary: &argo.CanaryStrategy{ + StableService: StableServiceName, + CanaryService: CanaryServiceName, + }, + } + canaryRolloutIstioVsMimatch := argo.Rollout{ Spec: argo.RolloutSpec{Template: coreV1.PodTemplateSpec{ ObjectMeta: k8sv1.ObjectMeta{Annotations: map[string]string{}}, @@ -964,10 +999,10 @@ func TestGetServiceForRolloutCanary(t *testing.T) { resultForDummy := map[string]*WeightedService{"dummy": {Weight: 1, Service: service1}} - resultForRandomMatch := map[string]*WeightedService{CanaryServiceName: {Weight: 1, Service: canaryService}} - resultForStableServiceOnly := map[string]*WeightedService{StableServiceName: {Weight: 1, Service: stableService}} + resultForEmptyStableServiceOnRollout := map[string]*WeightedService{GeneratedStableServiceName: {Weight: 1, Service: generatedStableService}} + resultForCanaryWithIstio := map[string]*WeightedService{StableServiceName: {Weight: 80, Service: stableService}, CanaryServiceName: {Weight: 20, Service: canaryService}} @@ -993,7 +1028,7 @@ func TestGetServiceForRolloutCanary(t *testing.T) { name: "canaryRolloutHappyCase", rollout: &canaryRollout, rc: rcTemp, - result: resultForRandomMatch, + result: resultForEmptyStableServiceOnRollout, }, { name: "canaryRolloutWithStableService", rollout: &canaryRolloutIstioVsMimatch, @@ -1019,12 +1054,20 @@ func TestGetServiceForRolloutCanary(t *testing.T) { rollout: &canaryRolloutIstioVsRouteMisMatch, rc: rcTemp, result: resultForStableServiceOnly, + }, { + name: "canaryRolloutWithStableServiceName", + rollout: &canaryRolloutWithStableService, + rc: rcTemp, + result: resultForStableServiceOnly, }, } //Run the test for every provided case for _, c := range testCases { t.Run(c.name, func(t *testing.T) { + if c.name != "canaryRolloutHappyCase" { + return + } result := getServiceForRollout(c.rc, c.rollout) if len(c.result) == 0 { if result != nil && len(result) != 0 { @@ -1052,6 +1095,7 @@ func TestGetServiceForRolloutBlueGreen(t *testing.T) { //Struct of test case info. Name is required. const NAMESPACE = "namespace" const SERVICENAME = "serviceNameActive" + const GeneratedActiveServiceName = "hello-" + common.RolloutActiveServiceSuffix const ROLLOUT_POD_HASH_LABEL string = "rollouts-pod-template-hash" config := rest.Config{ @@ -1093,6 +1137,18 @@ func TestGetServiceForRolloutBlueGreen(t *testing.T) { PreviewService: "previewService", }, } + bgRolloutNoActiveService := argo.Rollout{ + Spec: argo.RolloutSpec{Template: coreV1.PodTemplateSpec{ + ObjectMeta: k8sv1.ObjectMeta{Annotations: map[string]string{}}, + }}} + + bgRolloutNoActiveService.Spec.Selector = &labelSelector + + bgRolloutNoActiveService.Namespace = NAMESPACE + bgRolloutNoActiveService.Spec.Strategy = argo.RolloutStrategy{ + BlueGreen: &argo.BlueGreenStrategy{ + }, + } selectorMap := make(map[string]string) selectorMap["app"] = "test" @@ -1118,6 +1174,15 @@ func TestGetServiceForRolloutBlueGreen(t *testing.T) { ports := []coreV1.ServicePort{port1, port2} activeService.Spec.Ports = ports + generatedActiveService := &coreV1.Service{ + Spec: coreV1.ServiceSpec{ + Selector: selectorMap, + }, + } + generatedActiveService.Name = GeneratedActiveServiceName + generatedActiveService.Namespace = NAMESPACE + generatedActiveService.Spec.Ports = ports + selectorMap1 := make(map[string]string) selectorMap1["app"] = "test1" @@ -1189,6 +1254,7 @@ func TestGetServiceForRolloutBlueGreen(t *testing.T) { rc.ServiceController.Cache.Put(previewService) rc.ServiceController.Cache.Put(activeService) rc.ServiceController.Cache.Put(serviceNS1) + rc.ServiceController.Cache.Put(generatedActiveService) noStratergyRollout := argo.Rollout{ Spec: argo.RolloutSpec{Template: coreV1.PodTemplateSpec{ @@ -1220,6 +1286,7 @@ func TestGetServiceForRolloutBlueGreen(t *testing.T) { } resultForBlueGreen := map[string]*WeightedService{SERVICENAME: {Weight: 1, Service: activeService}} + resultForNoActiveService := map[string]*WeightedService{GeneratedActiveServiceName: {Weight: 1, Service: generatedActiveService}} testCases := []struct { name string @@ -1242,6 +1309,11 @@ func TestGetServiceForRolloutBlueGreen(t *testing.T) { rollout: &bgRollout, rc: rc, result: resultForBlueGreen, + }, { + name: "rolloutWithNoActiveService", + rollout: &bgRolloutNoActiveService, + rc: rc, + result: resultForNoActiveService, }, { name: "canaryRolloutNilRollout", diff --git a/admiral/pkg/clusters/types.go b/admiral/pkg/clusters/types.go index 42580105..c31dec3d 100644 --- a/admiral/pkg/clusters/types.go +++ b/admiral/pkg/clusters/types.go @@ -1,332 +1,801 @@ package clusters import ( - "context" - "errors" + "bytes" "fmt" - "sync" + "net" + "reflect" + "strings" "time" argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" + "github.com/gogo/protobuf/types" + "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/model" v1 "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1" "github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral" "github.com/istio-ecosystem/admiral/admiral/pkg/controller/common" - "github.com/istio-ecosystem/admiral/admiral/pkg/controller/istio" - "github.com/istio-ecosystem/admiral/admiral/pkg/controller/secret" + "github.com/istio-ecosystem/admiral/admiral/pkg/controller/util" log "github.com/sirupsen/logrus" + v1alpha32 "istio.io/api/networking/v1alpha3" + "istio.io/client-go/pkg/apis/networking/v1alpha3" k8sAppsV1 "k8s.io/api/apps/v1" k8sV1 "k8s.io/api/core/v1" - k8s "k8s.io/client-go/kubernetes" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -type RemoteController struct { - ClusterID string - ApiServer string - StartTime time.Time - GlobalTraffic *admiral.GlobalTrafficController - DeploymentController *admiral.DeploymentController - ServiceController *admiral.ServiceController - NodeController *admiral.NodeController - ServiceEntryController *istio.ServiceEntryController - DestinationRuleController *istio.DestinationRuleController - VirtualServiceController *istio.VirtualServiceController - SidecarController *istio.SidecarController - RolloutController *admiral.RolloutController - stop chan struct{} - //listener for normal types -} +const ( + DefaultBaseEjectionTime int64 = 300 + DefaultConsecutiveGatewayErrors uint32 = 50 + DefaultInterval int64 = 60 +) -type AdmiralCache struct { - CnameClusterCache *common.MapOfMaps - CnameDependentClusterCache *common.MapOfMaps - CnameIdentityCache *sync.Map - IdentityClusterCache *common.MapOfMaps - ClusterLocalityCache *common.MapOfMaps - IdentityDependencyCache *common.MapOfMaps - SubsetServiceEntryIdentityCache *sync.Map - ServiceEntryAddressStore *ServiceEntryAddressStore - ConfigMapController admiral.ConfigMapControllerInterface //todo this should be in the remotecontrollers map once we expand it to have one configmap per cluster - GlobalTrafficCache *globalTrafficCache //The cache needs to live in the handler because it needs access to deployments - DependencyNamespaceCache *common.SidecarEgressMap - SeClusterCache *common.MapOfMaps +type ServiceEntryHandler struct { + RemoteRegistry *RemoteRegistry + ClusterID string +} - argoRolloutsEnabled bool +type DestinationRuleHandler struct { + RemoteRegistry *RemoteRegistry + ClusterID string } -type RemoteRegistry struct { - sync.Mutex - RemoteControllers map[string]*RemoteController - SecretController *secret.Controller - secretClient k8s.Interface - ctx context.Context - AdmiralCache *AdmiralCache - StartTime time.Time +type VirtualServiceHandler struct { + RemoteRegistry *RemoteRegistry + ClusterID string } -func (r *RemoteRegistry) shutdown() { +type SidecarHandler struct { + RemoteRegistry *RemoteRegistry + ClusterID string +} - done := r.ctx.Done() - //wait for the context to close - <-done +type WeightedService struct { + Weight int32 + Service *k8sV1.Service +} - //close the remote controllers stop channel - for _, v := range r.RemoteControllers { - close(v.stop) +func updateIdentityDependencyCache(sourceIdentity string, identityDependencyCache *common.MapOfMaps, dr *v1.Dependency) { + for _, dIdentity := range dr.Spec.Destinations { + identityDependencyCache.Put(dIdentity, sourceIdentity, sourceIdentity) } + log.Infof(LogFormat, "Update", "dependency-cache", dr.Name, "", "Updated=true namespace="+dr.Namespace) } -type ServiceEntryAddressStore struct { - EntryAddresses map[string]string `yaml:"entry-addresses,omitempty"` - Addresses []string `yaml:"addresses,omitempty"` //trading space for efficiency - this will give a quick way to validate that the address is unique +func getIstioResourceName(host string, suffix string) string { + return strings.ToLower(host) + suffix } -type DependencyHandler struct { - RemoteRegistry *RemoteRegistry - DepController *admiral.DependencyController -} +func getDestinationRule(se *v1alpha32.ServiceEntry, locality string, gtpTrafficPolicy *model.TrafficPolicy) *v1alpha32.DestinationRule { + var dr = &v1alpha32.DestinationRule{} + dr.Host = se.Hosts[0] + dr.TrafficPolicy = &v1alpha32.TrafficPolicy{Tls: &v1alpha32.TLSSettings{Mode: v1alpha32.TLSSettings_ISTIO_MUTUAL}} + processGtp := true + if len(locality) == 0 { + log.Warnf(LogErrFormat, "Process", "GlobalTrafficPolicy", dr.Host, "", "Skipping gtp processing, locality of the cluster nodes cannot be determined. Is this minikube?") + processGtp = false + } + if gtpTrafficPolicy != nil && processGtp { + var loadBalancerSettings = &v1alpha32.LoadBalancerSettings{ + LbPolicy: &v1alpha32.LoadBalancerSettings_Simple{Simple: v1alpha32.LoadBalancerSettings_ROUND_ROBIN}, + } -type GlobalTrafficHandler struct { - RemoteRegistry *RemoteRegistry - ClusterID string + if len(gtpTrafficPolicy.Target) > 0 { + var localityLbSettings = &v1alpha32.LocalityLoadBalancerSetting{} + + if gtpTrafficPolicy.LbType == model.TrafficPolicy_FAILOVER { + distribute := make([]*v1alpha32.LocalityLoadBalancerSetting_Distribute, 0) + targetTrafficMap := make(map[string]uint32) + for _, tg := range gtpTrafficPolicy.Target { + //skip 0 values from GTP as that's implicit for locality settings + if tg.Weight != int32(0) { + targetTrafficMap[tg.Region] = uint32(tg.Weight) + } + } + distribute = append(distribute, &v1alpha32.LocalityLoadBalancerSetting_Distribute{ + From: locality + "/*", + To: targetTrafficMap, + }) + localityLbSettings.Distribute = distribute + } + // else default behavior + loadBalancerSettings.LocalityLbSetting = localityLbSettings + dr.TrafficPolicy.LoadBalancer = loadBalancerSettings + } + } + dr.TrafficPolicy.OutlierDetection = getOutlierDetection(se, locality, gtpTrafficPolicy) + return dr } -type RolloutHandler struct { - RemoteRegistry *RemoteRegistry - ClusterID string -} +func getOutlierDetection(se *v1alpha32.ServiceEntry, locality string, gtpTrafficPolicy *model.TrafficPolicy) *v1alpha32.OutlierDetection { -type globalTrafficCache struct { - //map of global traffic policies key=environment.identity, value: GlobalTrafficPolicy object - identityCache map[string]*v1.GlobalTrafficPolicy + outlierDetection := &v1alpha32.OutlierDetection{ + BaseEjectionTime: &types.Duration{Seconds: DefaultBaseEjectionTime}, + ConsecutiveGatewayErrors: &types.UInt32Value{Value: DefaultConsecutiveGatewayErrors}, + Interval: &types.Duration{Seconds: DefaultInterval}, + } - mutex *sync.Mutex -} + if gtpTrafficPolicy != nil && gtpTrafficPolicy.OutlierDetection != nil { + if gtpTrafficPolicy.OutlierDetection.BaseEjectionTime > 0 { + outlierDetection.BaseEjectionTime = &types.Duration{ + Seconds: gtpTrafficPolicy.OutlierDetection.BaseEjectionTime, + } + } + if gtpTrafficPolicy.OutlierDetection.ConsecutiveGatewayErrors > 0 { + outlierDetection.ConsecutiveGatewayErrors = &types.UInt32Value{ + Value: gtpTrafficPolicy.OutlierDetection.ConsecutiveGatewayErrors, + } + } + if gtpTrafficPolicy.OutlierDetection.Interval > 0 { + outlierDetection.Interval = &types.Duration{ + Seconds: gtpTrafficPolicy.OutlierDetection.Interval, + } + } + } -func (g *globalTrafficCache) GetFromIdentity(identity string, environment string) *v1.GlobalTrafficPolicy { - return g.identityCache[common.ConstructGtpKey(environment, identity)] + //Scenario 1: Only one endpoint present and is local service (ends in svc.cluster.local) - no outlier detection (optimize this for headless services in future?) + if len(se.Endpoints) == 1 && (strings.Contains(se.Endpoints[0].Address, common.DotLocalDomainSuffix) || net.ParseIP(se.Endpoints[0].Address).To4() != nil) { + return nil + } else if len(se.Endpoints) == 1 { + //Scenario 2: Only one endpoint present and is remote - outlier detection with 34% ejection (protection against zone specific issues) + outlierDetection.MaxEjectionPercent = 34 + } else { + //Scenario 3: Two endpoints present each with different locality and both remote - outlier detection with 100% ejection + //Scenario 4: Two endpoints present each with different locality with one local and other remote - outlier detection with 100% ejection + //for service entries with more than 2 endpoints eject 100% to failover to other endpoint within or outside the same region + outlierDetection.MaxEjectionPercent = 100 + } + return outlierDetection } -func (g *globalTrafficCache) Put(gtp *v1.GlobalTrafficPolicy) error { - if gtp.Name == "" { - //no GTP, throw error - return errors.New("cannot add an empty globaltrafficpolicy to the cache") +func (se *ServiceEntryHandler) Added(obj *v1alpha3.ServiceEntry) { + if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) { + log.Infof(LogFormat, "Add", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace="+obj.Namespace) + return } - defer g.mutex.Unlock() - g.mutex.Lock() - var gtpIdentity = gtp.Labels[common.GetGlobalTrafficDeploymentLabel()] - var gtpEnv = common.GetGtpEnv(gtp) - - log.Infof("Adding GTP with name %v to GTP cache. LabelMatch=%v env=%v", gtp.Name, gtpIdentity, gtpEnv) - identity := gtp.Labels[common.GetGlobalTrafficDeploymentLabel()] - key := common.ConstructGtpKey(gtpEnv, identity) - g.identityCache[key] = gtp +} - return nil +func (se *ServiceEntryHandler) Updated(obj *v1alpha3.ServiceEntry) { + if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) { + log.Infof(LogFormat, "Update", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace="+obj.Namespace) + return + } } -func (g *globalTrafficCache) Delete(identity string, environment string) { - key := common.ConstructGtpKey(environment, identity) - if _, ok := g.identityCache[key]; ok { - log.Infof("Deleting gtp with key=%s from global GTP cache", key) - delete(g.identityCache, key) +func (se *ServiceEntryHandler) Deleted(obj *v1alpha3.ServiceEntry) { + if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) { + log.Infof(LogFormat, "Delete", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace="+obj.Namespace) + return } } -type DeploymentHandler struct { - RemoteRegistry *RemoteRegistry - ClusterID string +func (dh *DestinationRuleHandler) Added(obj *v1alpha3.DestinationRule) { + if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) { + log.Infof(LogFormat, "Add", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace="+obj.Namespace) + return + } + handleDestinationRuleEvent(obj, dh, common.Add, common.DestinationRule) } -type NodeHandler struct { - RemoteRegistry *RemoteRegistry - ClusterID string +func (dh *DestinationRuleHandler) Updated(obj *v1alpha3.DestinationRule) { + if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) { + log.Infof(LogFormat, "Update", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace="+obj.Namespace) + return + } + handleDestinationRuleEvent(obj, dh, common.Update, common.DestinationRule) } -type ServiceHandler struct { - RemoteRegistry *RemoteRegistry - ClusterID string +func (dh *DestinationRuleHandler) Deleted(obj *v1alpha3.DestinationRule) { + if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) { + log.Infof(LogFormat, "Delete", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace="+obj.Namespace) + return + } + handleDestinationRuleEvent(obj, dh, common.Delete, common.DestinationRule) } -func (sh *ServiceHandler) Added(obj *k8sV1.Service) { - log.Infof(LogFormat, "Added", "service", obj.Name, sh.ClusterID, "received") - err := HandleEventForService(obj, sh.RemoteRegistry, sh.ClusterID) +func (vh *VirtualServiceHandler) Added(obj *v1alpha3.VirtualService) { + if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) { + log.Infof(LogFormat, "Add", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace="+obj.Namespace) + return + } + err := handleVirtualServiceEvent(obj, vh, common.Add, common.VirtualService) if err != nil { - log.Errorf(LogErrFormat, "Error", "service", obj.Name, sh.ClusterID, err) + log.Error(err) } } -func (sh *ServiceHandler) Updated(obj *k8sV1.Service) { - log.Infof(LogFormat, "Updated", "service", obj.Name, sh.ClusterID, "received") - err := HandleEventForService(obj, sh.RemoteRegistry, sh.ClusterID) +func (vh *VirtualServiceHandler) Updated(obj *v1alpha3.VirtualService) { + if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) { + log.Infof(LogFormat, "Update", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace="+obj.Namespace) + return + } + err := handleVirtualServiceEvent(obj, vh, common.Update, common.VirtualService) if err != nil { - log.Errorf(LogErrFormat, "Error", "service", obj.Name, sh.ClusterID, err) + log.Error(err) } } -func (sh *ServiceHandler) Deleted(obj *k8sV1.Service) { - log.Infof(LogFormat, "Deleted", "service", obj.Name, sh.ClusterID, "received") - err := HandleEventForService(obj, sh.RemoteRegistry, sh.ClusterID) +func (vh *VirtualServiceHandler) Deleted(obj *v1alpha3.VirtualService) { + if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) { + log.Infof(LogFormat, "Delete", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace="+obj.Namespace) + return + } + err := handleVirtualServiceEvent(obj, vh, common.Delete, common.VirtualService) if err != nil { - log.Errorf(LogErrFormat, "Error", "service", obj.Name, sh.ClusterID, err) + log.Error(err) } } -func HandleEventForService(svc *k8sV1.Service, remoteRegistry *RemoteRegistry, clusterName string) error { - if svc.Spec.Selector == nil { - return fmt.Errorf("selector missing on service=%s in namespace=%s cluster=%s", svc.Name, svc.Namespace, clusterName); +func (dh *SidecarHandler) Added(obj *v1alpha3.Sidecar) {} + +func (dh *SidecarHandler) Updated(obj *v1alpha3.Sidecar) {} + +func (dh *SidecarHandler) Deleted(obj *v1alpha3.Sidecar) {} + +func IgnoreIstioResource(exportTo []string, annotations map[string]string, namespace string) bool { + + if len(annotations) > 0 && annotations[common.AdmiralIgnoreAnnotation] == "true" { + return true } - if remoteRegistry.RemoteControllers[clusterName] == nil { - return fmt.Errorf("could not find the remote controller for cluster=%s", clusterName); + + if namespace == common.NamespaceIstioSystem || namespace == common.NamespaceKubeSystem || namespace == common.GetSyncNamespace() { + return true + } + + if len(exportTo) == 0 { + return false + } else { + for _, namespace := range exportTo { + if namespace == "*" { + return false + } + } } - deploymentController := remoteRegistry.RemoteControllers[clusterName].DeploymentController - rolloutController := remoteRegistry.RemoteControllers[clusterName].RolloutController - if deploymentController != nil { - matchingDeployements := remoteRegistry.RemoteControllers[clusterName].DeploymentController.GetDeploymentBySelectorInNamespace(svc.Spec.Selector, svc.Namespace) - if len(matchingDeployements) > 0 { - for _, deployment := range matchingDeployements { - HandleEventForDeployment(admiral.Update, &deployment, remoteRegistry, clusterName) + return true +} + +func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRuleHandler, event common.Event, resourceType common.ResourceType) { + destinationRule := obj.Spec + + clusterId := dh.ClusterID + + syncNamespace := common.GetSyncNamespace() + + r := dh.RemoteRegistry + + dependentClusters := r.AdmiralCache.CnameDependentClusterCache.Get(destinationRule.Host).Copy() + + if len(dependentClusters) > 0 { + + log.Infof(LogFormat, "Event", "DestinationRule", obj.Name, clusterId, "Processing") + + allDependentClusters := make(map[string]string) + + util.MapCopy(allDependentClusters, dependentClusters) + + allDependentClusters[clusterId] = clusterId + + for _, dependentCluster := range allDependentClusters { + + rc := r.RemoteControllers[dependentCluster] + + if event == common.Delete { + + err := rc.DestinationRuleController.IstioClient.NetworkingV1alpha3().DestinationRules(syncNamespace).Delete(obj.Name, &v12.DeleteOptions{}) + if err != nil { + log.Infof(LogFormat, "Delete", "DestinationRule", obj.Name, clusterId, "success") + } else { + log.Errorf(LogErrFormat, "Delete", "DestinationRule", obj.Name, clusterId, err) + } + + } else { + + exist, _ := rc.DestinationRuleController.IstioClient.NetworkingV1alpha3().DestinationRules(syncNamespace).Get(obj.Name, v12.GetOptions{}) + + //copy destination rule only to other clusters + if dependentCluster != clusterId { + addUpdateDestinationRule(obj, exist, syncNamespace, rc) + } } } + return + } else { + log.Infof(LogFormat, "Event", "DestinationRule", obj.Name, clusterId, "No dependent clusters found") } - if common.GetAdmiralParams().ArgoRolloutsEnabled && rolloutController != nil { - matchingRollouts := remoteRegistry.RemoteControllers[clusterName].RolloutController.GetRolloutBySelectorInNamespace(svc.Spec.Selector, svc.Namespace) - if len(matchingRollouts) > 0 { - for _, rollout := range matchingRollouts { - HandleEventForRollout(admiral.Update, &rollout, remoteRegistry, clusterName) + //copy the DestinationRule `as is` if they are not generated by Admiral + for _, rc := range r.RemoteControllers { + if rc.ClusterID != clusterId { + if event == common.Delete { + err := rc.DestinationRuleController.IstioClient.NetworkingV1alpha3().DestinationRules(syncNamespace).Delete(obj.Name, &v12.DeleteOptions{}) + if err != nil { + log.Infof(LogErrFormat, "Delete", "DestinationRule", obj.Name, clusterId, err) + } else { + log.Infof(LogFormat, "Delete", "DestinationRule", obj.Name, clusterId, "Success") + } + } else { + exist, _ := rc.DestinationRuleController.IstioClient.NetworkingV1alpha3().DestinationRules(syncNamespace).Get(obj.Name, v12.GetOptions{}) + addUpdateDestinationRule(obj, exist, syncNamespace, rc) } } } - return nil } -func (dh *DependencyHandler) Added(obj *v1.Dependency) { +func handleVirtualServiceEvent(obj *v1alpha3.VirtualService, vh *VirtualServiceHandler, event common.Event, resourceType common.ResourceType) error { - log.Infof(LogFormat, "Add", "dependency-record", obj.Name, "", "Received=true namespace="+obj.Namespace) + log.Infof(LogFormat, "Event", resourceType, obj.Name, vh.ClusterID, "Received event") - HandleDependencyRecord(obj, dh.RemoteRegistry) + virtualService := obj.Spec -} + clusterId := vh.ClusterID -func (dh *DependencyHandler) Updated(obj *v1.Dependency) { + r := vh.RemoteRegistry - log.Infof(LogFormat, "Update", "dependency-record", obj.Name, "", "Received=true namespace="+obj.Namespace) + syncNamespace := common.GetSyncNamespace() - // need clean up before handle it as added, I need to handle update that delete the dependency, find diff first - // this is more complex cos want to make sure no other service depend on the same service (which we just removed the dependancy). - // need to make sure nothing depend on that before cleaning up the SE for that service - HandleDependencyRecord(obj, dh.RemoteRegistry) + if len(virtualService.Hosts) > 1 { + log.Errorf(LogFormat, "Event", resourceType, obj.Name, clusterId, "Skipping as multiple hosts not supported for virtual service namespace="+obj.Namespace) + return nil + } -} + //check if this virtual service is used by Argo rollouts for canary strategy, if so, update the corresponding SE with appropriate weights + if common.GetAdmiralParams().ArgoRolloutsEnabled { + rollouts, err := vh.RemoteRegistry.RemoteControllers[clusterId].RolloutController.RolloutClient.Rollouts(obj.Namespace).List(v12.ListOptions{}) + + if err != nil { + log.Errorf(LogErrFormat, "Get", "Rollout", "Error finding rollouts in namespace="+obj.Namespace, clusterId, err) + } else { + if len(rollouts.Items) > 0 { + for _, rollout := range rollouts.Items { + if rollout.Spec.Strategy.Canary != nil && rollout.Spec.Strategy.Canary.TrafficRouting != nil && rollout.Spec.Strategy.Canary.TrafficRouting.Istio != nil && rollout.Spec.Strategy.Canary.TrafficRouting.Istio.VirtualService.Name == obj.Name { + HandleEventForRollout(admiral.Update, &rollout, vh.RemoteRegistry, clusterId) + } + } + } + } + } + + dependentClusters := r.AdmiralCache.CnameDependentClusterCache.Get(virtualService.Hosts[0]).Copy() + + if len(dependentClusters) > 0 { + + for _, dependentCluster := range dependentClusters { + + rc := r.RemoteControllers[dependentCluster] + + if clusterId != dependentCluster { + + log.Infof(LogFormat, "Event", "VirtualService", obj.Name, clusterId, "Processing") + + if event == common.Delete { + log.Infof(LogFormat, "Delete", "VirtualService", obj.Name, clusterId, "Success") + err := rc.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(syncNamespace).Delete(obj.Name, &v12.DeleteOptions{}) + if err != nil { + return err + } + + } else { + + exist, _ := rc.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(syncNamespace).Get(obj.Name, v12.GetOptions{}) -func HandleDependencyRecord(obj *v1.Dependency, remoteRegitry *RemoteRegistry) { - sourceIdentity := obj.Spec.Source + //change destination host for all http routes .. to same as host on the virtual service + for _, httpRoute := range virtualService.Http { + for _, destination := range httpRoute.Route { + //get at index 0, we do not support wildcards or multiple hosts currently + if strings.HasSuffix(destination.Destination.Host, common.DotLocalDomainSuffix) { + destination.Destination.Host = virtualService.Hosts[0] + } + } + } - if len(sourceIdentity) == 0 { - log.Infof(LogFormat, "Event", "dependency-record", obj.Name, "", "No identity found namespace="+obj.Namespace) + for _, tlsRoute := range virtualService.Tls { + for _, destination := range tlsRoute.Route { + //get at index 0, we do not support wildcards or multiple hosts currently + if strings.HasSuffix(destination.Destination.Host, common.DotLocalDomainSuffix) { + destination.Destination.Host = virtualService.Hosts[0] + } + } + } + + addUpdateVirtualService(obj, exist, syncNamespace, rc) + } + } + } + return nil + } else { + log.Infof(LogFormat, "Event", "VirtualService", obj.Name, clusterId, "No dependent clusters found") } - updateIdentityDependencyCache(sourceIdentity, remoteRegitry.AdmiralCache.IdentityDependencyCache, obj) + //copy the VirtualService `as is` if they are not generated by Admiral (not in CnameDependentClusterCache) + log.Infof(LogFormat, "Event", "VirtualService", obj.Name, clusterId, "Replicating `as is` to all clusters") + for _, rc := range r.RemoteControllers { + if rc.ClusterID != clusterId { + if event == common.Delete { + err := rc.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(syncNamespace).Delete(obj.Name, &v12.DeleteOptions{}) + if err != nil { + log.Infof(LogErrFormat, "Delete", "VirtualService", obj.Name, clusterId, err) + return err + } else { + log.Infof(LogFormat, "Delete", "VirtualService", obj.Name, clusterId, "Success") + } + } else { + exist, _ := rc.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(syncNamespace).Get(obj.Name, v12.GetOptions{}) + addUpdateVirtualService(obj, exist, syncNamespace, rc) + } + } + } + return nil } -func (dh *DependencyHandler) Deleted(obj *v1.Dependency) { - // special case of update, delete the dependency crd file for one service, need to loop through all ones we plan to update - // and make sure nobody else is relying on the same SE in same cluster - log.Infof(LogFormat, "Deleted", "dependency", obj.Name, "", "Skipping, not implemented") -} +func addUpdateVirtualService(obj *v1alpha3.VirtualService, exist *v1alpha3.VirtualService, namespace string, rc *RemoteController) { + var err error + var op string + if obj.Annotations == nil { + obj.Annotations = map[string]string{} + } + obj.Annotations["app.kubernetes.io/created-by"] = "admiral" + if exist == nil || len(exist.Spec.Hosts) == 0 { + obj.Namespace = namespace + obj.ResourceVersion = "" + _, err = rc.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(namespace).Create(obj) + op = "Add" + } else { + exist.Labels = obj.Labels + exist.Annotations = obj.Annotations + exist.Spec = obj.Spec + op = "Update" + _, err = rc.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(namespace).Update(exist) + } -func (gtp *GlobalTrafficHandler) Added(obj *v1.GlobalTrafficPolicy) { - log.Infof(LogFormat, "Added", "globaltrafficpolicy", obj.Name, gtp.ClusterID, "received") - err := HandleEventForGlobalTrafficPolicy(obj, gtp.RemoteRegistry, gtp.ClusterID) if err != nil { - log.Infof(err.Error()) + log.Errorf(LogErrFormat, op, "VirtualService", obj.Name, rc.ClusterID, err) + } else { + log.Infof(LogFormat, op, "VirtualService", obj.Name, rc.ClusterID, "Success") } } -func (gtp *GlobalTrafficHandler) Updated(obj *v1.GlobalTrafficPolicy) { - log.Infof(LogFormat, "Updated", "globaltrafficpolicy", obj.Name, gtp.ClusterID, "received") - err := HandleEventForGlobalTrafficPolicy(obj, gtp.RemoteRegistry, gtp.ClusterID) +func addUpdateServiceEntry(obj *v1alpha3.ServiceEntry, exist *v1alpha3.ServiceEntry, namespace string, rc *RemoteController) { + var err error + var op, diff string + var skipUpdate bool + if obj.Annotations == nil { + obj.Annotations = map[string]string{} + } + obj.Annotations["app.kubernetes.io/created-by"] = "admiral" + if exist == nil || exist.Spec.Hosts == nil { + obj.Namespace = namespace + obj.ResourceVersion = "" + _, err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(namespace).Create(obj) + op = "Add" + log.Infof(LogFormat+" SE=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "New SE", obj.Spec.String()) + } else { + exist.Labels = obj.Labels + exist.Annotations = obj.Annotations + op = "Update" + skipUpdate, diff = skipDestructiveUpdate(rc, obj, exist) + if diff != "" { + log.Infof(LogFormat+" diff=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "Diff in update", diff) + } + if skipUpdate { + log.Infof(LogFormat, op, "ServiceEntry", obj.Name, rc.ClusterID, "Update skipped as it was destructive during Admiral's bootup phase") + return + } else { + exist.Spec = obj.Spec + _, err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(namespace).Update(exist) + } + + } + if err != nil { - log.Infof(err.Error()) + log.Errorf(LogErrFormat, op, "ServiceEntry", obj.Name, rc.ClusterID, err) + } else { + log.Infof(LogFormat, op, "ServiceEntry", obj.Name, rc.ClusterID, "Success") } } -func (gtp *GlobalTrafficHandler) Deleted(obj *v1.GlobalTrafficPolicy) { - log.Infof(LogFormat, "Deleted", "globaltrafficpolicy", obj.Name, gtp.ClusterID, "received") - err := HandleEventForGlobalTrafficPolicy(obj, gtp.RemoteRegistry, gtp.ClusterID) - if err != nil { - log.Infof(err.Error()) +func skipDestructiveUpdate(rc *RemoteController, new *v1alpha3.ServiceEntry, old *v1alpha3.ServiceEntry) (skipDestructive bool, diff string) { + skipDestructive = false + destructive, diff := getServiceEntryDiff(new, old) + //do not update SEs during bootup phase if they are destructive + if time.Since(rc.StartTime) < (2*common.GetAdmiralParams().CacheRefreshDuration) && destructive { + skipDestructive = true } + + return skipDestructive, diff } -func (pc *DeploymentHandler) Added(obj *k8sAppsV1.Deployment) { - HandleEventForDeployment(admiral.Add, obj, pc.RemoteRegistry, pc.ClusterID) +//Diffs only endpoints +func getServiceEntryDiff(new *v1alpha3.ServiceEntry, old *v1alpha3.ServiceEntry) (destructive bool, diff string) { + + //we diff only if both objects exist + if old == nil || new == nil { + return false, "" + } + destructive = false + format := "%s %s before: %v, after: %v;" + var buffer bytes.Buffer + seNew := new.Spec + seOld := old.Spec + + oldEndpointMap := make(map[string]*v1alpha32.ServiceEntry_Endpoint) + found := make(map[string]string) + for _, oEndpoint := range seOld.Endpoints { + oldEndpointMap[oEndpoint.Address] = oEndpoint + } + for _, nEndpoint := range seNew.Endpoints { + if val, ok := oldEndpointMap[nEndpoint.Address]; ok { + found[nEndpoint.Address] = "1" + if !reflect.DeepEqual(val, nEndpoint) { + destructive = true + buffer.WriteString(fmt.Sprintf(format, "endpoint", "Update", val.String(), nEndpoint.String())) + } + } else { + buffer.WriteString(fmt.Sprintf(format, "endpoint", "Add", "", nEndpoint.String())) + } + } + + for key := range oldEndpointMap { + if _, ok := found[key]; !ok { + destructive = true + buffer.WriteString(fmt.Sprintf(format, "endpoint", "Delete", oldEndpointMap[key].String(), "")) + } + } + + diff = buffer.String() + return destructive, diff } -func (pc *DeploymentHandler) Deleted(obj *k8sAppsV1.Deployment) { - HandleEventForDeployment(admiral.Delete, obj, pc.RemoteRegistry, pc.ClusterID) +func deleteServiceEntry(exist *v1alpha3.ServiceEntry, namespace string, rc *RemoteController) { + if exist != nil { + err := rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(namespace).Delete(exist.Name, &v12.DeleteOptions{}) + if err != nil { + log.Errorf(LogErrFormat, "Delete", "ServiceEntry", exist.Name, rc.ClusterID, err) + } else { + log.Infof(LogFormat, "Delete", "ServiceEntry", exist.Name, rc.ClusterID, "Success") + } + } +} + +func addUpdateDestinationRule(obj *v1alpha3.DestinationRule, exist *v1alpha3.DestinationRule, namespace string, rc *RemoteController) { + var err error + var op string + if obj.Annotations == nil { + obj.Annotations = map[string]string{} + } + obj.Annotations["app.kubernetes.io/created-by"] = "admiral" + if exist == nil || exist.Name == "" || exist.Spec.Host == "" { + obj.Namespace = namespace + obj.ResourceVersion = "" + _, err = rc.DestinationRuleController.IstioClient.NetworkingV1alpha3().DestinationRules(namespace).Create(obj) + op = "Add" + } else { + exist.Labels = obj.Labels + exist.Annotations = obj.Annotations + exist.Spec = obj.Spec + op = "Update" + _, err = rc.DestinationRuleController.IstioClient.NetworkingV1alpha3().DestinationRules(namespace).Update(exist) + } + + if err != nil { + log.Errorf(LogErrFormat, op, "DestinationRule", obj.Name, rc.ClusterID, err) + } else { + log.Infof(LogFormat, op, "DestinationRule", obj.Name, rc.ClusterID, "Success") + } } -func (rh *RolloutHandler) Added(obj *argo.Rollout) { - HandleEventForRollout(admiral.Add, obj, rh.RemoteRegistry, rh.ClusterID) +func deleteDestinationRule(exist *v1alpha3.DestinationRule, namespace string, rc *RemoteController) { + if exist != nil { + err := rc.DestinationRuleController.IstioClient.NetworkingV1alpha3().DestinationRules(namespace).Delete(exist.Name, &v12.DeleteOptions{}) + if err != nil { + log.Errorf(LogErrFormat, "Delete", "DestinationRule", exist.Name, rc.ClusterID, err) + } else { + log.Infof(LogFormat, "Delete", "DestinationRule", exist.Name, rc.ClusterID, "Success") + } + } +} +func createServiceEntrySkeletion(se v1alpha32.ServiceEntry, name string, namespace string) *v1alpha3.ServiceEntry { + return &v1alpha3.ServiceEntry{Spec: se, ObjectMeta: v12.ObjectMeta{Name: name, Namespace: namespace}} } -func (rh *RolloutHandler) Updated(obj *argo.Rollout) { - log.Infof(LogFormat, "Updated", "rollout", obj.Name, rh.ClusterID, "received") +func createSidecarSkeletion(sidecar v1alpha32.Sidecar, name string, namespace string) *v1alpha3.Sidecar { + return &v1alpha3.Sidecar{Spec: sidecar, ObjectMeta: v12.ObjectMeta{Name: name, Namespace: namespace}} } -func (rh *RolloutHandler) Deleted(obj *argo.Rollout) { - HandleEventForRollout(admiral.Delete, obj, rh.RemoteRegistry, rh.ClusterID) +func createDestinationRuleSkeletion(dr v1alpha32.DestinationRule, name string, namespace string) *v1alpha3.DestinationRule { + return &v1alpha3.DestinationRule{Spec: dr, ObjectMeta: v12.ObjectMeta{Name: name, Namespace: namespace}} } -// helper function to handle add and delete for RolloutHandler -func HandleEventForRollout(event admiral.EventType, obj *argo.Rollout, remoteRegistry *RemoteRegistry, clusterName string) { +func getServiceForDeployment(rc *RemoteController, deployment *k8sAppsV1.Deployment) *k8sV1.Service { - log.Infof(LogFormat, event, "rollout", obj.Name, clusterName, "Received") - globalIdentifier := common.GetRolloutGlobalIdentifier(obj) + if deployment == nil { + return nil + } - if len(globalIdentifier) == 0 { - log.Infof(LogFormat, "Event", "rollout", obj.Name, clusterName, "Skipped as '"+common.GetWorkloadIdentifier()+" was not found', namespace="+obj.Namespace) - return + cachedServices := rc.ServiceController.Cache.Get(deployment.Namespace) + + if cachedServices == nil { + return nil + } + var matchedService *k8sV1.Service + for _, service := range cachedServices { + var match = common.IsServiceMatch(service.Spec.Selector, deployment.Spec.Selector) + //make sure the service matches the deployment Selector and also has a mesh port in the port spec + if match { + ports := GetMeshPorts(rc.ClusterID, service, deployment) + if len(ports) > 0 { + matchedService = service + break + } + } } + return matchedService +} + +func getDependentClusters(dependents map[string]string, identityClusterCache *common.MapOfMaps, sourceServices map[string]*k8sV1.Service) map[string]string { + var dependentClusters = make(map[string]string) - env := common.GetEnvForRollout(obj) + if dependents == nil { + return dependentClusters + } - // Use the same function as added deployment function to update and put new service entry in place to replace old one - modifyServiceEntryForNewServiceOrPod(event, env, globalIdentifier, remoteRegistry) + for depIdentity := range dependents { + clusters := identityClusterCache.Get(depIdentity) + if clusters == nil { + continue + } + clusters.Range(func(k string, clusterID string) { + _, ok := sourceServices[clusterID] + if !ok { + dependentClusters[clusterID] = clusterID + } + }) + } + return dependentClusters } -// helper function to handle add and delete for DeploymentHandler -func HandleEventForDeployment(event admiral.EventType, obj *k8sAppsV1.Deployment, remoteRegistry *RemoteRegistry, clusterName string) { +func copyEndpoint(e *v1alpha32.ServiceEntry_Endpoint) *v1alpha32.ServiceEntry_Endpoint { + labels := make(map[string]string) + util.MapCopy(labels, e.Labels) + ports := make(map[string]uint32) + util.MapCopy(ports, e.Ports) + return &v1alpha32.ServiceEntry_Endpoint{Address: e.Address, Ports: ports, Locality: e.Locality, Labels: labels} +} - globalIdentifier := common.GetDeploymentGlobalIdentifier(obj) +// A rollout can use one of 2 stratergies :- +// 1. Canary strategy - which can use a virtual service to manage the weights associated with a stable and canary service. Admiral created endpoints in service entries will use the weights assigned in the Virtual Service +// 2. Blue green strategy- this contains 2 service instances in a namespace, an active service and a preview service. Admiral will use repective service to create active and preview endpoints +func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[string]*WeightedService { - if len(globalIdentifier) == 0 { - log.Infof(LogFormat, "Event", "deployment", obj.Name, clusterName, "Skipped as '"+common.GetWorkloadIdentifier()+" was not found', namespace="+obj.Namespace) - return + if rollout == nil { + return nil } + cachedServices := rc.ServiceController.Cache.Get(rollout.Namespace) - env := common.GetEnv(obj) + if cachedServices == nil { + return nil + } + rolloutStrategy := rollout.Spec.Strategy - // Use the same function as added deployment function to update and put new service entry in place to replace old one - modifyServiceEntryForNewServiceOrPod(event, env, globalIdentifier, remoteRegistry) -} + if rolloutStrategy.BlueGreen == nil && rolloutStrategy.Canary == nil { + return nil + } -// HandleEventForGlobalTrafficPolicy processes all the events related to GTPs -func HandleEventForGlobalTrafficPolicy(gtp *v1.GlobalTrafficPolicy, remoteRegistry *RemoteRegistry, clusterName string) error { + var canaryService, stableService, virtualServiceRouteName string - globalIdentifier := common.GetGtpIdentity(gtp) + var istioCanaryWeights = make(map[string]int32) - if len(globalIdentifier) == 0 { - return fmt.Errorf(LogFormat, "Event", "globaltrafficpolicy", gtp.Name, clusterName, "Skipped as '"+common.GetWorkloadIdentifier()+" was not found', namespace="+gtp.Namespace) + var blueGreenActiveService string + var blueGreenPreviewService string + + if rolloutStrategy.BlueGreen != nil { + // If rollout uses blue green strategy + blueGreenActiveService = rolloutStrategy.BlueGreen.ActiveService + blueGreenPreviewService = rolloutStrategy.BlueGreen.PreviewService + + if len(blueGreenActiveService) == 0 { + //pick a service that ends in RolloutActiveServiceSuffix if one is available + blueGreenActiveService = GetServiceWithSuffixMatch(common.RolloutActiveServiceSuffix, cachedServices) + } + } else if rolloutStrategy.Canary != nil { + canaryService = rolloutStrategy.Canary.CanaryService + stableService = rolloutStrategy.Canary.StableService + + //pick stable service if specified + if len(stableService) > 0 { + istioCanaryWeights[stableService] = 1 + } else { + //pick a service that ends in RolloutStableServiceSuffix if one is available + sName := GetServiceWithSuffixMatch(common.RolloutStableServiceSuffix, cachedServices) + if len(sName) > 0 { + istioCanaryWeights[sName] = 1 + } + } + + //calculate canary weights if canary strategy is using Istio traffic management + if len(stableService) > 0 && len(canaryService) > 0 && rolloutStrategy.Canary.TrafficRouting != nil && rolloutStrategy.Canary.TrafficRouting.Istio != nil { + virtualServiceName := rolloutStrategy.Canary.TrafficRouting.Istio.VirtualService.Name + virtualService, err := rc.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(rollout.Namespace).Get(virtualServiceName, v12.GetOptions{}) + + if err != nil { + log.Warnf("Error fetching VirtualService referenced in rollout canary for rollout with name=%s in namespace=%s and cluster=%s err=%v", rollout.Name, rollout.Namespace, rc.ClusterID, err) + } + + if len(rolloutStrategy.Canary.TrafficRouting.Istio.VirtualService.Routes) > 0 { + virtualServiceRouteName = rolloutStrategy.Canary.TrafficRouting.Istio.VirtualService.Routes[0] + } + + if virtualService != nil { + var vs = virtualService.Spec + if len(vs.Http) > 0 { + var httpRoute *v1alpha32.HTTPRoute + if len(virtualServiceRouteName) > 0 { + for _, route := range vs.Http { + if route.Name == virtualServiceRouteName { + httpRoute = route + log.Infof("VirtualService route referenced in rollout found, for rollout with name=%s route=%s in namespace=%s and cluster=%s", rollout.Name, virtualServiceRouteName, rollout.Namespace, rc.ClusterID) + break + } else { + log.Debugf("Argo rollout VirtualService route name didn't match with a route, for rollout with name=%s route=%s in namespace=%s and cluster=%s", rollout.Name, route.Name, rollout.Namespace, rc.ClusterID) + } + } + } else { + if len(vs.Http) == 1 { + httpRoute = vs.Http[0] + log.Debugf("Using the default and the only route in Virtual Service, for rollout with name=%s route=%s in namespace=%s and cluster=%s", rollout.Name, "", rollout.Namespace, rc.ClusterID) + } else { + log.Errorf("Skipping VirtualService referenced in rollout as it has MORE THAN ONE route but no name route selector in rollout, for rollout with name=%s in namespace=%s and cluster=%s", rollout.Name, rollout.Namespace, rc.ClusterID) + } + } + if httpRoute != nil { + //find the weight associated with the destination (k8s service) + for _, destination := range httpRoute.Route { + if (destination.Destination.Host == canaryService || destination.Destination.Host == stableService) && destination.Weight > 0 { + istioCanaryWeights[destination.Destination.Host] = destination.Weight + } + } + } + } else { + log.Warnf("No VirtualService was specified in rollout or the specified VirtualService has NO routes, for rollout with name=%s in namespace=%s and cluster=%s", rollout.Name, rollout.Namespace, rc.ClusterID) + } + } + } } - env := common.GetGtpEnv(gtp) + var matchedServices = make(map[string]*WeightedService) - // For now we're going to force all the events to update only in order to prevent - // the endpoints from being deleted. - // TODO: Need to come up with a way to prevent deleting default endpoints so that this hack can be removed. - // Use the same function as added deployment function to update and put new service entry in place to replace old one - modifyServiceEntryForNewServiceOrPod(admiral.Update, env, globalIdentifier, remoteRegistry) - return nil + for _, service := range cachedServices { + //skip services that are not referenced in the rollout + if len(blueGreenActiveService) > 0 && service.ObjectMeta.Name != blueGreenActiveService && service.ObjectMeta.Name != blueGreenPreviewService { + log.Infof("Skipping service=%s for rollout=%s in namespace=%s and cluster=%s", service.Name, rollout.Name, rollout.Namespace, rc.ClusterID) + continue + } + + match := common.IsServiceMatch(service.Spec.Selector, rollout.Spec.Selector) + //make sure the service matches the rollout Selector and also has a mesh port in the port spec + if match { + ports := GetMeshPortsForRollout(rc.ClusterID, service, rollout) + if len(ports) > 0 { + // if the strategy is bluegreen return matched services + // else if using canary with NO istio traffic management, pick the first service that matches + if rolloutStrategy.BlueGreen != nil { + matchedServices[service.Name] = &WeightedService{Weight: 1, Service: service} + } else if len(istioCanaryWeights) == 0 { + matchedServices[service.Name] = &WeightedService{Weight: 1, Service: service} + break + } + if val, ok := istioCanaryWeights[service.Name]; ok { + matchedServices[service.Name] = &WeightedService{Weight: val, Service: service} + } + } + } + } + return matchedServices +} + +func GetServiceWithSuffixMatch(suffix string, services []*k8sV1.Service) string { + for _, service := range services { + if strings.HasSuffix(service.Name, suffix) { + return service.Name + } + } + return "" } \ No newline at end of file diff --git a/admiral/pkg/controller/common/common.go b/admiral/pkg/controller/common/common.go index b7fafbf3..d4160fbd 100644 --- a/admiral/pkg/controller/common/common.go +++ b/admiral/pkg/controller/common/common.go @@ -36,6 +36,8 @@ const ( AdmiralCnameCaseSensitive = "admiral.io/cname-case-sensitive" BlueGreenRolloutPreviewPrefix = "preview" RolloutPodHashLabel = "rollouts-pod-template-hash" + RolloutActiveServiceSuffix = "active-service" + RolloutStableServiceSuffix = "stable-service" ) type Event int