Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add interface for ignoring identity through command line #267

Merged
merged 3 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions admiral/pkg/clusters/serviceEntrySuspender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package clusters

import (
"sync"

log "github.com/sirupsen/logrus"
)

const (
// Alert logs
alertMsgSuspensionEnabled = "op=dynamicEndpointSuspension message=endpoint generation suspension is enabled." +
"this does not mean that endpoint generation will be suspended. " +
"it will depend on the suspension list, which can include all identities " +
"for all environments, OR certain identities for all or certain environments"
alertMsgSuspensionForAll = "op=dynamicEndpointSuspension message=endpoint generation suspended for all"
alertMsgSuspensionForIdentityInAllEnvironments = "op=dynamicEndpointSuspension message=endpoint generation suspended for identity across all environments"
alertMsgSuspensionForIdentityInMatchingEnvironment = "op=dynamicEndpointSuspension message=endpoint generation suspended for identity for given environment"
)

type serviceEntrySuspender struct {
ignoredIdentityCache *IgnoredIdentityCache
}

func NewDefaultServiceEntrySuspender(items []string) *serviceEntrySuspender {
var (
enabled bool
environmentByIdentity = make(map[string][]string)
)
if len(items) > 0 {
enabled = true
}
for _, item := range items {
environmentByIdentity[item] = []string{""}
}
return &serviceEntrySuspender{ignoredIdentityCache: &IgnoredIdentityCache{
RWLock: &sync.RWMutex{},
All: false,
Enabled: enabled,
EnvironmentsByIdentity: environmentByIdentity,
}}
}

func NewDummyServiceEntrySuspender() *serviceEntrySuspender {
return &serviceEntrySuspender{
ignoredIdentityCache: &IgnoredIdentityCache{
RWLock: &sync.RWMutex{},
},
}
}

func (des *serviceEntrySuspender) SuspendUpdate(identity, environment string) bool {
return des.enabled() && (des.all() || des.identityByEnvironment(identity, environment))
}

func (des *serviceEntrySuspender) enabled() bool {
if des.ignoredIdentityCache.Enabled {
log.Println(alertMsgSuspensionEnabled)
}
log.Println("op=dynamicEndpointSuspension message=endpoint generation suspension is not enabled")
return des.ignoredIdentityCache.Enabled
}

func (des *serviceEntrySuspender) all() bool {
if des.ignoredIdentityCache.All {
log.Println(alertMsgSuspensionForAll)
}
log.Println("op=dynamicEndpointSuspension message=endpoint generation suspension for 'all' identities is not enabled")
return des.ignoredIdentityCache.All
}

func (des *serviceEntrySuspender) identityByEnvironment(identity, environment string) bool {
log.Printf("op=dynamicEndpointSuspension message=checking if identity %s in environment %s is in the suspension list",
identity, environment)
des.ignoredIdentityCache.RWLock.RLock()
defer des.ignoredIdentityCache.RWLock.RUnlock()
if des.ignoredIdentityCache.EnvironmentsByIdentity[identity] != nil {
identityEnvironments := des.ignoredIdentityCache.EnvironmentsByIdentity[identity]
if len(identityEnvironments) == 0 || (len(identityEnvironments) == 1 && identityEnvironments[0] == "") {
log.Printf("%s, identity: %s", alertMsgSuspensionForIdentityInAllEnvironments, identity)
return true
}
for _, identityEnvironment := range identityEnvironments {
if identityEnvironment == environment {
log.Printf("%s, identity: %s, environment: %s",
alertMsgSuspensionForIdentityInMatchingEnvironment, identity, environment,
)
return true
}
}
}
return false
}
118 changes: 118 additions & 0 deletions admiral/pkg/clusters/serviceEntrySuspender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package clusters

import (
"sync"
"testing"
)

func TestSuspendGeneration(t *testing.T) {
testCases := []struct {
name string
ignoredIdentityCache *IgnoredIdentityCache
sourceIdentity string
sourceEnvironment string
expectedIgnoreDecision bool
}{
{
name: "Given suspension is enabled, " +
"And is configured to ignore all, " +
"When SuspendGeneration is called with 'identity1' and 'environment1', " +
"Then, it should return true",
ignoredIdentityCache: &IgnoredIdentityCache{
RWLock: &sync.RWMutex{},
Enabled: true,
All: true,
},
sourceIdentity: "identity1",
sourceEnvironment: "environment1",
expectedIgnoreDecision: true,
},
{
name: "Given suspension is enabled, " +
"And is configured to ignore none, " +
"When SuspendGeneration is called with 'identity1' and 'environment1', " +
"Then, it should return false",
ignoredIdentityCache: &IgnoredIdentityCache{
RWLock: &sync.RWMutex{},
Enabled: true,
},
sourceIdentity: "identity1",
sourceEnvironment: "environment1",
expectedIgnoreDecision: false,
},
{
name: "Given suspension is enabled, " +
"And suspension includes identity 'identity1' with no environments, " +
"When SuspendGeneration is called with 'identity1' and 'environment1, " +
"Then, it should return true",
ignoredIdentityCache: &IgnoredIdentityCache{
RWLock: &sync.RWMutex{},
Enabled: true,
EnvironmentsByIdentity: map[string][]string{
"identity1": {""},
}},
sourceIdentity: "identity1",
sourceEnvironment: "environment1",
expectedIgnoreDecision: true,
},
{
name: "Given suspension is enabled, " +
"And suspension includes 'identity1' for 'environment1', " +
"When SuspendGeneration is called with 'identity2' and 'environment1, " +
"Then, it should return false",
ignoredIdentityCache: &IgnoredIdentityCache{
RWLock: &sync.RWMutex{},
Enabled: true,
EnvironmentsByIdentity: map[string][]string{
"identity1": {"environment1"},
},
},
sourceIdentity: "identity2",
sourceEnvironment: "environment1",
expectedIgnoreDecision: false,
},
{
name: "Given suspension is enabled, " +
"And suspension includes 'identity1' for 'environment1'" +
"When ignoreProcessingIdentity is called with 'identity1' and 'environment1, " +
"Then, it should return true",
ignoredIdentityCache: &IgnoredIdentityCache{
RWLock: &sync.RWMutex{},
Enabled: true,
EnvironmentsByIdentity: map[string][]string{
"identity1": {"environment1"},
},
},
sourceIdentity: "identity1",
sourceEnvironment: "environment1",
expectedIgnoreDecision: true,
},
{
name: "Given suspension is enabled, " +
"And suspension includes 'identity1' for 'environment1'" +
"When ignoreProcessingIdentity is called with 'identity2' and 'environment2, " +
"Then, it should return false",
ignoredIdentityCache: &IgnoredIdentityCache{
RWLock: &sync.RWMutex{},
Enabled: true,
EnvironmentsByIdentity: map[string][]string{
"identity1": {"environment1"},
},
},
sourceIdentity: "identity2",
sourceEnvironment: "environment2",
expectedIgnoreDecision: false,
},
}
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
checker := &serviceEntrySuspender{
ignoredIdentityCache: c.ignoredIdentityCache,
}
ignore := checker.SuspendUpdate(c.sourceIdentity, c.sourceEnvironment)
if ignore != c.expectedIgnoreDecision {
t.Errorf("expected ignore decision to be: %v, got: %v", c.expectedIgnoreDecision, ignore)
}
})
}
}
14 changes: 6 additions & 8 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ func modifyServiceEntryForNewServiceOrPod(
sourceIdentity string, remoteRegistry *RemoteRegistry) map[string]*networking.ServiceEntry {
defer util.LogElapsedTime("modifyServiceEntryForNewServiceOrPod", sourceIdentity, env, "")()

if remoteRegistry.ServiceEntryUpdateSuspender.SuspendUpdate(sourceIdentity, env) {
log.Infof(LogFormat, event, env, sourceIdentity, "",
"skipping update because endpoint generation is suspended for identity '"+sourceIdentity+"' in environment '"+env+"'")
return nil
}

if CurrentAdmiralState.ReadOnly {
log.Infof(LogFormat, event, env, sourceIdentity, "", "Processing skipped as Admiral is in Read-only mode")
return nil
Expand Down Expand Up @@ -104,10 +110,6 @@ func modifyServiceEntryForNewServiceOrPod(
continue
}
if deployment != nil {
if len(remoteRegistry.ExcludedIdentityMap) > 0 && remoteRegistry.ExcludedIdentityMap[common.GetDeploymentGlobalIdentifier(deployment)] {
log.Infof(LogFormat, event, env, sourceIdentity, clusterId, "Processing skipped as identity is in the exclude list")
return nil
}
remoteRegistry.AdmiralCache.IdentityClusterCache.Put(sourceIdentity, rc.ClusterID, rc.ClusterID)
serviceInstance = getServiceForDeployment(rc, deployment)
if serviceInstance == nil {
Expand All @@ -120,10 +122,6 @@ func modifyServiceEntryForNewServiceOrPod(
sourceDeployments[rc.ClusterID] = deployment
createServiceEntryForDeployment(ctx, event, rc, remoteRegistry.AdmiralCache, localMeshPorts, deployment, serviceEntries)
} else if rollout != nil {
if len(remoteRegistry.ExcludedIdentityMap) > 0 && remoteRegistry.ExcludedIdentityMap[common.GetRolloutGlobalIdentifier(rollout)] {
log.Infof(LogFormat, event, env, sourceIdentity, clusterId, "Processing skipped as identity is in the exclude list")
return nil
}
remoteRegistry.AdmiralCache.IdentityClusterCache.Put(sourceIdentity, rc.ClusterID, rc.ClusterID)
weightedServices = getServiceForRollout(ctx, rc, rollout)
if len(weightedServices) == 0 {
Expand Down
43 changes: 31 additions & 12 deletions admiral/pkg/clusters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ import (
k8s "k8s.io/client-go/kubernetes"
)

type ServiceEntrySuspender interface {
SuspendUpdate(identity string, environment string) bool
}

type IgnoredIdentityCache struct {
RWLock *sync.RWMutex
Enabled bool `json:"enabled"`
All bool `json:"all"`
ClusterEnvironment string `json:"clusterEnvironment"`
EnvironmentsByIdentity map[string][]string `json:"environmentsByIdentities"`
}

type RemoteController struct {
ClusterID string
ApiServer string
Expand Down Expand Up @@ -60,16 +72,18 @@ type AdmiralCache struct {

type RemoteRegistry struct {
sync.Mutex
remoteControllers map[string]*RemoteController
SecretController *secret.Controller
secretClient k8s.Interface
ctx context.Context
AdmiralCache *AdmiralCache
StartTime time.Time
ExcludedIdentityMap map[string]bool
remoteControllers map[string]*RemoteController
SecretController *secret.Controller
secretClient k8s.Interface
ctx context.Context
AdmiralCache *AdmiralCache
StartTime time.Time
ServiceEntryUpdateSuspender ServiceEntrySuspender
ExcludedIdentityMap map[string]bool
}

func NewRemoteRegistry(ctx context.Context, params common.AdmiralParams) *RemoteRegistry {
var serviceEntryUpdateSuspender ServiceEntrySuspender
gtpCache := &globalTrafficCache{}
gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy)
gtpCache.mutex = &sync.Mutex{}
Expand All @@ -96,12 +110,17 @@ func NewRemoteRegistry(ctx context.Context, params common.AdmiralParams) *Remote
SeClusterCache: common.NewMapOfMaps(),
argoRolloutsEnabled: params.ArgoRolloutsEnabled,
}
if common.GetSecretResolver() == "" {
serviceEntryUpdateSuspender = NewDefaultServiceEntrySuspender(params.ExcludedIdentityList)
} else {
serviceEntryUpdateSuspender = NewDummyServiceEntrySuspender()
}
return &RemoteRegistry{
ctx: ctx,
StartTime: time.Now(),
remoteControllers: make(map[string]*RemoteController),
AdmiralCache: admiralCache,
ExcludedIdentityMap: mapSliceToBool(params.ExcludedIdentityList, true),
ctx: ctx,
StartTime: time.Now(),
remoteControllers: make(map[string]*RemoteController),
AdmiralCache: admiralCache,
ServiceEntryUpdateSuspender: serviceEntryUpdateSuspender,
}
}

Expand Down
8 changes: 0 additions & 8 deletions admiral/pkg/clusters/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,3 @@ func ValidateConfigmapBeforePutting(cm *k8sV1.ConfigMap) error {
func IsCacheWarmupTime(remoteRegistry *RemoteRegistry) bool {
return time.Since(remoteRegistry.StartTime) < common.GetAdmiralParams().CacheRefreshDuration
}

func mapSliceToBool(list []string, value bool) map[string]bool {
m := make(map[string]bool, len(list))
for _, item := range list {
m[item] = value
}
return m
}