Skip to content

Commit

Permalink
generate additional endpoints based on labels set on deployments or r…
Browse files Browse the repository at this point in the history
…ollouts (#285)
  • Loading branch information
shriramsharma committed Mar 7, 2023
1 parent cabf266 commit 1313ef0
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 51 deletions.
2 changes: 2 additions & 0 deletions admiral/cmd/admiral/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ func GetRootCmd(args []string) *cobra.Command {
"List of identities which should be excluded from getting processed")
rootCmd.PersistentFlags().StringArrayVar(&params.AdditionalEndpointSuffixes, "additional_endpoint_suffixes", []string{},
"Suffixes that Admiral should use to generate additional endpoints through VirtualServices")
rootCmd.PersistentFlags().StringArrayVar(&params.AdditionalEndpointLabelFilters, "additional_endpoint_label_filters", []string{},
"Labels that admiral will check on deployment/rollout before creating additional endpoints. '*' would indicate generating additional endpoints for all deployment/rollouts")
return rootCmd
}

Expand Down
82 changes: 56 additions & 26 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,23 @@ func modifyServiceEntryForNewServiceOrPod(
}

var (
cname string
namespace string
serviceInstance *k8sV1.Service
rollout *argo.Rollout
deployment *k8sAppsV1.Deployment
start = time.Now()
gtpKey = common.ConstructGtpKey(env, sourceIdentity)
clusters = remoteRegistry.GetClusterIds()
gtps = make(map[string][]*v1.GlobalTrafficPolicy)
weightedServices = make(map[string]*WeightedService)
cnames = make(map[string]string)
sourceServices = make(map[string]*k8sV1.Service)
sourceWeightedServices = make(map[string]map[string]*WeightedService)
sourceDeployments = make(map[string]*k8sAppsV1.Deployment)
sourceRollouts = make(map[string]*argo.Rollout)
serviceEntries = make(map[string]*networking.ServiceEntry)
cname string
namespace string
serviceInstance *k8sV1.Service
rollout *argo.Rollout
deployment *k8sAppsV1.Deployment
start = time.Now()
gtpKey = common.ConstructGtpKey(env, sourceIdentity)
clusters = remoteRegistry.GetClusterIds()
gtps = make(map[string][]*v1.GlobalTrafficPolicy)
weightedServices = make(map[string]*WeightedService)
cnames = make(map[string]string)
sourceServices = make(map[string]*k8sV1.Service)
sourceWeightedServices = make(map[string]map[string]*WeightedService)
sourceDeployments = make(map[string]*k8sAppsV1.Deployment)
sourceRollouts = make(map[string]*argo.Rollout)
serviceEntries = make(map[string]*networking.ServiceEntry)
isAdditionalEndpointGenerationEnabled bool
)

for _, clusterId := range clusters {
Expand Down Expand Up @@ -193,17 +194,25 @@ func modifyServiceEntryForNewServiceOrPod(
var meshPorts map[string]uint32
blueGreenStrategy := isBlueGreenStrategy(sourceRollouts[sourceCluster])

var deploymentRolloutLabels map[string]string
if len(sourceDeployments) > 0 {
meshPorts = GetMeshPorts(sourceCluster, serviceInstance, sourceDeployments[sourceCluster])
deployment = sourceDeployments[sourceCluster]
deploymentRolloutLabels = deployment.Labels
} else {
meshPorts = GetMeshPortsForRollout(sourceCluster, serviceInstance, sourceRollouts[sourceCluster])
rollout := sourceRollouts[sourceCluster]
deploymentRolloutLabels = rollout.Labels
}

// check if additional endpoint generation is required
isAdditionalEndpointGenerationEnabled = doGenerateAdditionalEndpoints(deploymentRolloutLabels)

for key, serviceEntry := range serviceEntries {
if len(serviceEntry.Endpoints) == 0 {
AddServiceEntriesWithDr(
ctx, remoteRegistry, map[string]string{sourceCluster: sourceCluster},
map[string]*networking.ServiceEntry{key: serviceEntry})
map[string]*networking.ServiceEntry{key: serviceEntry}, isAdditionalEndpointGenerationEnabled)
}
clusterIngress, _ := rc.ServiceController.Cache.GetLoadBalancer(common.GetAdmiralParams().LabelSet.GatewayApp, common.NamespaceIstioSystem)
for _, ep := range serviceEntry.Endpoints {
Expand All @@ -215,7 +224,7 @@ func modifyServiceEntryForNewServiceOrPod(
updateEndpointsForBlueGreen(sourceRollouts[sourceCluster], sourceWeightedServices[sourceCluster], cnames, ep, sourceCluster, key)
AddServiceEntriesWithDr(
ctx, remoteRegistry, map[string]string{sourceCluster: sourceCluster},
map[string]*networking.ServiceEntry{key: serviceEntry})
map[string]*networking.ServiceEntry{key: serviceEntry}, isAdditionalEndpointGenerationEnabled)
//swap it back to use for next iteration
ep.Address = clusterIngress
ep.Ports = oldPorts
Expand All @@ -226,14 +235,14 @@ func modifyServiceEntryForNewServiceOrPod(
updateEndpointsForWeightedServices(se, sourceWeightedServices[sourceCluster], clusterIngress, meshPorts)
AddServiceEntriesWithDr(
ctx, remoteRegistry, map[string]string{sourceCluster: sourceCluster},
map[string]*networking.ServiceEntry{key: se})
map[string]*networking.ServiceEntry{key: se}, isAdditionalEndpointGenerationEnabled)
} else {
ep.Address = localFqdn
oldPorts := ep.Ports
ep.Ports = meshPorts
AddServiceEntriesWithDr(
ctx, remoteRegistry, map[string]string{sourceCluster: sourceCluster},
map[string]*networking.ServiceEntry{key: serviceEntry})
map[string]*networking.ServiceEntry{key: serviceEntry}, isAdditionalEndpointGenerationEnabled)
// swap it back to use for next iteration
ep.Address = clusterIngress
ep.Ports = oldPorts
Expand Down Expand Up @@ -270,7 +279,7 @@ func modifyServiceEntryForNewServiceOrPod(
remoteRegistry.AdmiralCache.CnameDependentClusterCache.Put(cname, clusterId, clusterId)
}

AddServiceEntriesWithDr(ctx, remoteRegistry, dependentClusters, serviceEntries)
AddServiceEntriesWithDr(ctx, remoteRegistry, dependentClusters, serviceEntries, isAdditionalEndpointGenerationEnabled)

util.LogElapsedTimeSince("WriteServiceEntryToDependentClusters", sourceIdentity, env, "", start)

Expand Down Expand Up @@ -510,7 +519,9 @@ func copySidecar(sidecar *v1alpha3.Sidecar) *v1alpha3.Sidecar {
}

//AddServiceEntriesWithDr will create the default service entries and also additional ones specified in GTP
func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClusters map[string]string, serviceEntries map[string]*networking.ServiceEntry) {
func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClusters map[string]string,
serviceEntries map[string]*networking.ServiceEntry, isAdditionalEndpointsEnabled bool) {

cache := rr.AdmiralCache
syncNamespace := common.GetSyncNamespace()
for _, se := range serviceEntries {
Expand Down Expand Up @@ -587,11 +598,13 @@ func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClus
cache.SeClusterCache.Delete(seDr.ServiceEntry.Hosts[0])

// Delete additional endpoints if any
if isAdditionalEndpointsEnabled() {
if isAdditionalEndpointsEnabled {
err := deleteAdditionalEndpoints(ctx, rc, identityId, env, syncNamespace)
if err != nil {
log.Error(err)
}
} else {
log.Infof(LogFormat, "Delete", "VirtualService", env+"."+identityId, sourceCluster, "skipped deleting additional endpoints through VirtualService in "+syncNamespace+" namespace")
}

}
Expand Down Expand Up @@ -621,11 +634,13 @@ func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClus
cache.SeClusterCache.Put(newServiceEntry.Spec.Hosts[0], rc.ClusterID, rc.ClusterID)

// Create additional endpoints if necessary
if isAdditionalEndpointsEnabled() {
if isAdditionalEndpointsEnabled {
err := createAdditionalEndpoints(ctx, rc, identityId, env, newServiceEntry.Spec.Hosts[0], syncNamespace)
if err != nil {
log.Error(err)
}
} else {
log.Infof(LogFormat, "Create", "VirtualService", env+"."+identityId, sourceCluster, "skipped creating additional endpoints through VirtualService in "+syncNamespace+" namespace")
}
}
}
Expand All @@ -642,13 +657,28 @@ func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClus
}
}

func isAdditionalEndpointsEnabled() bool {
// This func returns a bool to indicate if additional endpoints generation is needed
// based on the following conditions.
// 1. Additional endpoint suffixes have been configured in the admiral params
// 2. The rollout/deployment labels passed contains any of the allowed labels
// configured in the admiral params 'additional_endpoint_label_filters'
func doGenerateAdditionalEndpoints(labels map[string]string) bool {
additionalEndpointSuffixes := common.GetAdditionalEndpointSuffixes()
if len(additionalEndpointSuffixes) <= 0 {
log.Debugf("no additional endpoints configured")
return false
}
return true
// Check if admiral configured allowed labels are in the passed labels map
additionalEndpointAnnotationFilters := common.GetAdditionalEndpointLabelFilters()
for _, filter := range additionalEndpointAnnotationFilters {
if filter == "*" {
return true
}
if _, ok := labels[filter]; ok {
return true
}
}
return false
}

func validateAdditionalEndpointParams(identity, env string) error {
Expand Down
64 changes: 63 additions & 1 deletion admiral/pkg/clusters/serviceentry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ func TestAddServiceEntriesWithDr(t *testing.T) {

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
AddServiceEntriesWithDr(ctx, rr, map[string]string{"cl1": "cl1"}, tt.serviceEntries)
AddServiceEntriesWithDr(ctx, rr, map[string]string{"cl1": "cl1"}, tt.serviceEntries, false)
if tt.dnsPrefix != "" && tt.dnsPrefix != "default" {
tt.serviceEntries["se1"].Hosts = []string{tt.dnsPrefix + ".e2e.foo.global"}
}
Expand Down Expand Up @@ -2781,3 +2781,65 @@ func TestGetAdmiralGeneratedVirtualService(t *testing.T) {
})
}
}

func TestDoGenerateAdditionalEndpoints(t *testing.T) {

testcases := []struct {
name string
labels map[string]string
additionalEndpointSuffixes []string
additionalEndpointLabelFilters []string
expectedResult bool
}{
{
name: "Given additional endpoint suffixes and labels, when no additional endpoint suffixes are set, then the func should return false",
labels: map[string]string{"foo": "bar"},
expectedResult: false,
},
{
name: "Given additional endpoint suffixes and labels, when no additional endpoint labels filters are set, then the func should return false",
labels: map[string]string{"foo": "bar"},
additionalEndpointSuffixes: []string{"fuzz"},
expectedResult: false,
},
{
name: "Given additional endpoint suffixes and labels, when additional endpoint labels filters contains '*', then the func should return true",
labels: map[string]string{"foo": "bar"},
additionalEndpointSuffixes: []string{"fuzz"},
additionalEndpointLabelFilters: []string{"*"},
expectedResult: true,
},
{
name: "Given additional endpoint suffixes and labels, when additional endpoint label filters do not include any key in labels, then it should return false",
labels: map[string]string{"foo": "bar"},
additionalEndpointSuffixes: []string{"fuzz"},
additionalEndpointLabelFilters: []string{"baz"},
expectedResult: false,
},
{
name: "Given additional endpoint suffixes and labels, when additional endpoint labels filters contains one of the keys in the labels, then it should return true",
labels: map[string]string{"foo": "bar"},
additionalEndpointSuffixes: []string{"fuzz"},
additionalEndpointLabelFilters: []string{"foo"},
expectedResult: true,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {

admiralParams := common.AdmiralParams{
AdditionalEndpointSuffixes: tc.additionalEndpointSuffixes,
AdditionalEndpointLabelFilters: tc.additionalEndpointLabelFilters,
}
common.ResetSync()
common.InitializeConfig(admiralParams)

actual := doGenerateAdditionalEndpoints(tc.labels)

if actual != tc.expectedResult {
t.Errorf("expected %t, got %t", tc.expectedResult, actual)
}
})
}
}
4 changes: 4 additions & 0 deletions admiral/pkg/controller/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func GetAdditionalEndpointSuffixes() []string {
return admiralParams.AdditionalEndpointSuffixes
}

func GetAdditionalEndpointLabelFilters() []string {
return admiralParams.AdditionalEndpointLabelFilters
}

func GetHostnameSuffix() string {
return admiralParams.HostnameSuffix
}
Expand Down
49 changes: 25 additions & 24 deletions admiral/pkg/controller/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,31 @@ type SidecarEgressMap struct {
}

type AdmiralParams struct {
ArgoRolloutsEnabled bool
KubeconfigPath string
CacheRefreshDuration time.Duration
ClusterRegistriesNamespace string
DependenciesNamespace string
SyncNamespace string
EnableSAN bool
SANPrefix string
SecretResolver string
LabelSet *LabelSet
LogLevel int
HostnameSuffix string
PreviewHostnamePrefix string
MetricsEnabled bool
WorkloadSidecarUpdate string
WorkloadSidecarName string
AdmiralStateCheckerName string
DRStateStoreConfigPath string
ServiceEntryIPPrefix string
EnvoyFilterVersion string
EnvoyFilterAdditionalConfig string
EnableRoutingPolicy bool
ExcludedIdentityList []string
AdditionalEndpointSuffixes []string
ArgoRolloutsEnabled bool
KubeconfigPath string
CacheRefreshDuration time.Duration
ClusterRegistriesNamespace string
DependenciesNamespace string
SyncNamespace string
EnableSAN bool
SANPrefix string
SecretResolver string
LabelSet *LabelSet
LogLevel int
HostnameSuffix string
PreviewHostnamePrefix string
MetricsEnabled bool
WorkloadSidecarUpdate string
WorkloadSidecarName string
AdmiralStateCheckerName string
DRStateStoreConfigPath string
ServiceEntryIPPrefix string
EnvoyFilterVersion string
EnvoyFilterAdditionalConfig string
EnableRoutingPolicy bool
ExcludedIdentityList []string
AdditionalEndpointSuffixes []string
AdditionalEndpointLabelFilters []string
}

func (b AdmiralParams) String() string {
Expand Down

0 comments on commit 1313ef0

Please sign in to comment.