From 1313ef01b488fab22b3db1cabdf88fc382cd46f7 Mon Sep 17 00:00:00 2001 From: Shriram Sharma Date: Mon, 6 Mar 2023 21:04:05 -0800 Subject: [PATCH] generate additional endpoints based on labels set on deployments or rollouts (#285) --- admiral/cmd/admiral/cmd/root.go | 2 + admiral/pkg/clusters/serviceentry.go | 82 ++++++++++++++++------- admiral/pkg/clusters/serviceentry_test.go | 64 +++++++++++++++++- admiral/pkg/controller/common/config.go | 4 ++ admiral/pkg/controller/common/types.go | 49 +++++++------- 5 files changed, 150 insertions(+), 51 deletions(-) diff --git a/admiral/cmd/admiral/cmd/root.go b/admiral/cmd/admiral/cmd/root.go index 457f3e77..6c253070 100644 --- a/admiral/cmd/admiral/cmd/root.go +++ b/admiral/cmd/admiral/cmd/root.go @@ -146,6 +146,8 @@ func GetRootCmd(args []string) *cobra.Command { "List of identities which should be excluded from getting processed") rootCmd.PersistentFlags().StringArrayVar(¶ms.AdditionalEndpointSuffixes, "additional_endpoint_suffixes", []string{}, "Suffixes that Admiral should use to generate additional endpoints through VirtualServices") + rootCmd.PersistentFlags().StringArrayVar(¶ms.AdditionalEndpointLabelFilters, "additional_endpoint_label_filters", []string{}, + "Labels that admiral will check on deployment/rollout before creating additional endpoints. '*' would indicate generating additional endpoints for all deployment/rollouts") return rootCmd } diff --git a/admiral/pkg/clusters/serviceentry.go b/admiral/pkg/clusters/serviceentry.go index 021fbde4..a5f315e6 100644 --- a/admiral/pkg/clusters/serviceentry.go +++ b/admiral/pkg/clusters/serviceentry.go @@ -83,22 +83,23 @@ func modifyServiceEntryForNewServiceOrPod( } var ( - cname string - namespace string - serviceInstance *k8sV1.Service - rollout *argo.Rollout - deployment *k8sAppsV1.Deployment - start = time.Now() - gtpKey = common.ConstructGtpKey(env, sourceIdentity) - clusters = remoteRegistry.GetClusterIds() - gtps = make(map[string][]*v1.GlobalTrafficPolicy) - weightedServices = make(map[string]*WeightedService) - cnames = make(map[string]string) - sourceServices = make(map[string]*k8sV1.Service) - sourceWeightedServices = make(map[string]map[string]*WeightedService) - sourceDeployments = make(map[string]*k8sAppsV1.Deployment) - sourceRollouts = make(map[string]*argo.Rollout) - serviceEntries = make(map[string]*networking.ServiceEntry) + cname string + namespace string + serviceInstance *k8sV1.Service + rollout *argo.Rollout + deployment *k8sAppsV1.Deployment + start = time.Now() + gtpKey = common.ConstructGtpKey(env, sourceIdentity) + clusters = remoteRegistry.GetClusterIds() + gtps = make(map[string][]*v1.GlobalTrafficPolicy) + weightedServices = make(map[string]*WeightedService) + cnames = make(map[string]string) + sourceServices = make(map[string]*k8sV1.Service) + sourceWeightedServices = make(map[string]map[string]*WeightedService) + sourceDeployments = make(map[string]*k8sAppsV1.Deployment) + sourceRollouts = make(map[string]*argo.Rollout) + serviceEntries = make(map[string]*networking.ServiceEntry) + isAdditionalEndpointGenerationEnabled bool ) for _, clusterId := range clusters { @@ -193,17 +194,25 @@ func modifyServiceEntryForNewServiceOrPod( var meshPorts map[string]uint32 blueGreenStrategy := isBlueGreenStrategy(sourceRollouts[sourceCluster]) + var deploymentRolloutLabels map[string]string if len(sourceDeployments) > 0 { meshPorts = GetMeshPorts(sourceCluster, serviceInstance, sourceDeployments[sourceCluster]) + deployment = sourceDeployments[sourceCluster] + deploymentRolloutLabels = deployment.Labels } else { meshPorts = GetMeshPortsForRollout(sourceCluster, serviceInstance, sourceRollouts[sourceCluster]) + rollout := sourceRollouts[sourceCluster] + deploymentRolloutLabels = rollout.Labels } + // check if additional endpoint generation is required + isAdditionalEndpointGenerationEnabled = doGenerateAdditionalEndpoints(deploymentRolloutLabels) + for key, serviceEntry := range serviceEntries { if len(serviceEntry.Endpoints) == 0 { AddServiceEntriesWithDr( ctx, remoteRegistry, map[string]string{sourceCluster: sourceCluster}, - map[string]*networking.ServiceEntry{key: serviceEntry}) + map[string]*networking.ServiceEntry{key: serviceEntry}, isAdditionalEndpointGenerationEnabled) } clusterIngress, _ := rc.ServiceController.Cache.GetLoadBalancer(common.GetAdmiralParams().LabelSet.GatewayApp, common.NamespaceIstioSystem) for _, ep := range serviceEntry.Endpoints { @@ -215,7 +224,7 @@ func modifyServiceEntryForNewServiceOrPod( updateEndpointsForBlueGreen(sourceRollouts[sourceCluster], sourceWeightedServices[sourceCluster], cnames, ep, sourceCluster, key) AddServiceEntriesWithDr( ctx, remoteRegistry, map[string]string{sourceCluster: sourceCluster}, - map[string]*networking.ServiceEntry{key: serviceEntry}) + map[string]*networking.ServiceEntry{key: serviceEntry}, isAdditionalEndpointGenerationEnabled) //swap it back to use for next iteration ep.Address = clusterIngress ep.Ports = oldPorts @@ -226,14 +235,14 @@ func modifyServiceEntryForNewServiceOrPod( updateEndpointsForWeightedServices(se, sourceWeightedServices[sourceCluster], clusterIngress, meshPorts) AddServiceEntriesWithDr( ctx, remoteRegistry, map[string]string{sourceCluster: sourceCluster}, - map[string]*networking.ServiceEntry{key: se}) + map[string]*networking.ServiceEntry{key: se}, isAdditionalEndpointGenerationEnabled) } else { ep.Address = localFqdn oldPorts := ep.Ports ep.Ports = meshPorts AddServiceEntriesWithDr( ctx, remoteRegistry, map[string]string{sourceCluster: sourceCluster}, - map[string]*networking.ServiceEntry{key: serviceEntry}) + map[string]*networking.ServiceEntry{key: serviceEntry}, isAdditionalEndpointGenerationEnabled) // swap it back to use for next iteration ep.Address = clusterIngress ep.Ports = oldPorts @@ -270,7 +279,7 @@ func modifyServiceEntryForNewServiceOrPod( remoteRegistry.AdmiralCache.CnameDependentClusterCache.Put(cname, clusterId, clusterId) } - AddServiceEntriesWithDr(ctx, remoteRegistry, dependentClusters, serviceEntries) + AddServiceEntriesWithDr(ctx, remoteRegistry, dependentClusters, serviceEntries, isAdditionalEndpointGenerationEnabled) util.LogElapsedTimeSince("WriteServiceEntryToDependentClusters", sourceIdentity, env, "", start) @@ -510,7 +519,9 @@ func copySidecar(sidecar *v1alpha3.Sidecar) *v1alpha3.Sidecar { } //AddServiceEntriesWithDr will create the default service entries and also additional ones specified in GTP -func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClusters map[string]string, serviceEntries map[string]*networking.ServiceEntry) { +func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClusters map[string]string, + serviceEntries map[string]*networking.ServiceEntry, isAdditionalEndpointsEnabled bool) { + cache := rr.AdmiralCache syncNamespace := common.GetSyncNamespace() for _, se := range serviceEntries { @@ -587,11 +598,13 @@ func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClus cache.SeClusterCache.Delete(seDr.ServiceEntry.Hosts[0]) // Delete additional endpoints if any - if isAdditionalEndpointsEnabled() { + if isAdditionalEndpointsEnabled { err := deleteAdditionalEndpoints(ctx, rc, identityId, env, syncNamespace) if err != nil { log.Error(err) } + } else { + log.Infof(LogFormat, "Delete", "VirtualService", env+"."+identityId, sourceCluster, "skipped deleting additional endpoints through VirtualService in "+syncNamespace+" namespace") } } @@ -621,11 +634,13 @@ func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClus cache.SeClusterCache.Put(newServiceEntry.Spec.Hosts[0], rc.ClusterID, rc.ClusterID) // Create additional endpoints if necessary - if isAdditionalEndpointsEnabled() { + if isAdditionalEndpointsEnabled { err := createAdditionalEndpoints(ctx, rc, identityId, env, newServiceEntry.Spec.Hosts[0], syncNamespace) if err != nil { log.Error(err) } + } else { + log.Infof(LogFormat, "Create", "VirtualService", env+"."+identityId, sourceCluster, "skipped creating additional endpoints through VirtualService in "+syncNamespace+" namespace") } } } @@ -642,13 +657,28 @@ func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClus } } -func isAdditionalEndpointsEnabled() bool { +// This func returns a bool to indicate if additional endpoints generation is needed +// based on the following conditions. +// 1. Additional endpoint suffixes have been configured in the admiral params +// 2. The rollout/deployment labels passed contains any of the allowed labels +// configured in the admiral params 'additional_endpoint_label_filters' +func doGenerateAdditionalEndpoints(labels map[string]string) bool { additionalEndpointSuffixes := common.GetAdditionalEndpointSuffixes() if len(additionalEndpointSuffixes) <= 0 { log.Debugf("no additional endpoints configured") return false } - return true + // Check if admiral configured allowed labels are in the passed labels map + additionalEndpointAnnotationFilters := common.GetAdditionalEndpointLabelFilters() + for _, filter := range additionalEndpointAnnotationFilters { + if filter == "*" { + return true + } + if _, ok := labels[filter]; ok { + return true + } + } + return false } func validateAdditionalEndpointParams(identity, env string) error { diff --git a/admiral/pkg/clusters/serviceentry_test.go b/admiral/pkg/clusters/serviceentry_test.go index 4c0ec3fa..e83df099 100644 --- a/admiral/pkg/clusters/serviceentry_test.go +++ b/admiral/pkg/clusters/serviceentry_test.go @@ -784,7 +784,7 @@ func TestAddServiceEntriesWithDr(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - AddServiceEntriesWithDr(ctx, rr, map[string]string{"cl1": "cl1"}, tt.serviceEntries) + AddServiceEntriesWithDr(ctx, rr, map[string]string{"cl1": "cl1"}, tt.serviceEntries, false) if tt.dnsPrefix != "" && tt.dnsPrefix != "default" { tt.serviceEntries["se1"].Hosts = []string{tt.dnsPrefix + ".e2e.foo.global"} } @@ -2781,3 +2781,65 @@ func TestGetAdmiralGeneratedVirtualService(t *testing.T) { }) } } + +func TestDoGenerateAdditionalEndpoints(t *testing.T) { + + testcases := []struct { + name string + labels map[string]string + additionalEndpointSuffixes []string + additionalEndpointLabelFilters []string + expectedResult bool + }{ + { + name: "Given additional endpoint suffixes and labels, when no additional endpoint suffixes are set, then the func should return false", + labels: map[string]string{"foo": "bar"}, + expectedResult: false, + }, + { + name: "Given additional endpoint suffixes and labels, when no additional endpoint labels filters are set, then the func should return false", + labels: map[string]string{"foo": "bar"}, + additionalEndpointSuffixes: []string{"fuzz"}, + expectedResult: false, + }, + { + name: "Given additional endpoint suffixes and labels, when additional endpoint labels filters contains '*', then the func should return true", + labels: map[string]string{"foo": "bar"}, + additionalEndpointSuffixes: []string{"fuzz"}, + additionalEndpointLabelFilters: []string{"*"}, + expectedResult: true, + }, + { + name: "Given additional endpoint suffixes and labels, when additional endpoint label filters do not include any key in labels, then it should return false", + labels: map[string]string{"foo": "bar"}, + additionalEndpointSuffixes: []string{"fuzz"}, + additionalEndpointLabelFilters: []string{"baz"}, + expectedResult: false, + }, + { + name: "Given additional endpoint suffixes and labels, when additional endpoint labels filters contains one of the keys in the labels, then it should return true", + labels: map[string]string{"foo": "bar"}, + additionalEndpointSuffixes: []string{"fuzz"}, + additionalEndpointLabelFilters: []string{"foo"}, + expectedResult: true, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + + admiralParams := common.AdmiralParams{ + AdditionalEndpointSuffixes: tc.additionalEndpointSuffixes, + AdditionalEndpointLabelFilters: tc.additionalEndpointLabelFilters, + } + common.ResetSync() + common.InitializeConfig(admiralParams) + + actual := doGenerateAdditionalEndpoints(tc.labels) + + if actual != tc.expectedResult { + t.Errorf("expected %t, got %t", tc.expectedResult, actual) + } + }) + } +} diff --git a/admiral/pkg/controller/common/config.go b/admiral/pkg/controller/common/config.go index f6f968b7..2875853b 100644 --- a/admiral/pkg/controller/common/config.go +++ b/admiral/pkg/controller/common/config.go @@ -79,6 +79,10 @@ func GetAdditionalEndpointSuffixes() []string { return admiralParams.AdditionalEndpointSuffixes } +func GetAdditionalEndpointLabelFilters() []string { + return admiralParams.AdditionalEndpointLabelFilters +} + func GetHostnameSuffix() string { return admiralParams.HostnameSuffix } diff --git a/admiral/pkg/controller/common/types.go b/admiral/pkg/controller/common/types.go index 61cd8cf7..210ba4f6 100644 --- a/admiral/pkg/controller/common/types.go +++ b/admiral/pkg/controller/common/types.go @@ -29,30 +29,31 @@ type SidecarEgressMap struct { } type AdmiralParams struct { - ArgoRolloutsEnabled bool - KubeconfigPath string - CacheRefreshDuration time.Duration - ClusterRegistriesNamespace string - DependenciesNamespace string - SyncNamespace string - EnableSAN bool - SANPrefix string - SecretResolver string - LabelSet *LabelSet - LogLevel int - HostnameSuffix string - PreviewHostnamePrefix string - MetricsEnabled bool - WorkloadSidecarUpdate string - WorkloadSidecarName string - AdmiralStateCheckerName string - DRStateStoreConfigPath string - ServiceEntryIPPrefix string - EnvoyFilterVersion string - EnvoyFilterAdditionalConfig string - EnableRoutingPolicy bool - ExcludedIdentityList []string - AdditionalEndpointSuffixes []string + ArgoRolloutsEnabled bool + KubeconfigPath string + CacheRefreshDuration time.Duration + ClusterRegistriesNamespace string + DependenciesNamespace string + SyncNamespace string + EnableSAN bool + SANPrefix string + SecretResolver string + LabelSet *LabelSet + LogLevel int + HostnameSuffix string + PreviewHostnamePrefix string + MetricsEnabled bool + WorkloadSidecarUpdate string + WorkloadSidecarName string + AdmiralStateCheckerName string + DRStateStoreConfigPath string + ServiceEntryIPPrefix string + EnvoyFilterVersion string + EnvoyFilterAdditionalConfig string + EnableRoutingPolicy bool + ExcludedIdentityList []string + AdditionalEndpointSuffixes []string + AdditionalEndpointLabelFilters []string } func (b AdmiralParams) String() string {