From a9a435631c872ac0ba02e1ff900baa8af925486a Mon Sep 17 00:00:00 2001 From: aattuluri Date: Tue, 5 Jul 2022 17:39:38 -0700 Subject: [PATCH] Fix event processing during bootup (#240) (#139) Co-authored-by: aattuluri <44482891+aattuluri@users.noreply.github.com> --- admiral/pkg/apis/admiral/routes/handler_test.go | 2 ++ admiral/pkg/clusters/handler.go | 5 +++-- admiral/pkg/clusters/handler_test.go | 3 +++ admiral/pkg/clusters/registry.go | 1 + admiral/pkg/clusters/registry_test.go | 2 ++ admiral/pkg/clusters/serviceentry.go | 4 ++++ admiral/pkg/clusters/serviceentry_test.go | 4 ++++ admiral/pkg/clusters/types.go | 1 + admiral/pkg/clusters/util.go | 5 +++++ admiral/pkg/controller/admiral/controller.go | 4 ++-- admiral/pkg/controller/admiral/service.go | 2 +- 11 files changed, 28 insertions(+), 5 deletions(-) diff --git a/admiral/pkg/apis/admiral/routes/handler_test.go b/admiral/pkg/apis/admiral/routes/handler_test.go index 1b540f3f..97114ab6 100644 --- a/admiral/pkg/apis/admiral/routes/handler_test.go +++ b/admiral/pkg/apis/admiral/routes/handler_test.go @@ -18,6 +18,7 @@ import ( "istio.io/client-go/pkg/apis/networking/v1alpha3" istiofake "istio.io/client-go/pkg/clientset/versioned/fake" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "time" ) func TestReturnSuccessGET(t *testing.T) { @@ -51,6 +52,7 @@ func TestGetClusters(t *testing.T) { RemoteClusters: map[string]*secret.RemoteCluster{}, }, }, + StartTime: time.Now(), }, } testCases := []struct { diff --git a/admiral/pkg/clusters/handler.go b/admiral/pkg/clusters/handler.go index ca9bb694..d529c14a 100644 --- a/admiral/pkg/clusters/handler.go +++ b/admiral/pkg/clusters/handler.go @@ -573,7 +573,8 @@ Service Entry object is not added/updated if the current pod is in ReadOnly mode */ func addUpdateServiceEntry(obj *v1alpha3.ServiceEntry, exist *v1alpha3.ServiceEntry, namespace string, rc *RemoteController, admiralState *AdmiralState) { var err error - var op string + var op, diff string + var skipUpdate bool if obj.Annotations == nil { obj.Annotations = map[string]string{} } @@ -601,7 +602,7 @@ func addUpdateServiceEntry(obj *v1alpha3.ServiceEntry, exist *v1alpha3.ServiceEn exist.Labels = obj.Labels exist.Annotations = obj.Annotations op = "Update" - skipUpdate, diff := skipDestructiveUpdate(rc, obj, exist) + skipUpdate, diff = skipDestructiveUpdate(rc, obj, exist) if diff != "" { log.Infof(LogFormat+" diff=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "Diff in update", diff) } diff --git a/admiral/pkg/clusters/handler_test.go b/admiral/pkg/clusters/handler_test.go index 058ca239..0acaf938 100644 --- a/admiral/pkg/clusters/handler_test.go +++ b/admiral/pkg/clusters/handler_test.go @@ -528,6 +528,7 @@ func TestHandleVirtualServiceEvent(t *testing.T) { CnameDependentClusterCache: cnameCache, SeClusterCache: common.NewMapOfMaps(), }, + StartTime: time.Now(), }, } @@ -547,6 +548,7 @@ func TestHandleVirtualServiceEvent(t *testing.T) { CnameDependentClusterCache: goodCnameCache, SeClusterCache: common.NewMapOfMaps(), }, + StartTime: time.Now(), }, } @@ -573,6 +575,7 @@ func TestHandleVirtualServiceEvent(t *testing.T) { CnameDependentClusterCache: goodCnameCache, SeClusterCache: common.NewMapOfMaps(), }, + StartTime: time.Now(), }, } diff --git a/admiral/pkg/clusters/registry.go b/admiral/pkg/clusters/registry.go index 01a07ab2..b8dcb5ae 100644 --- a/admiral/pkg/clusters/registry.go +++ b/admiral/pkg/clusters/registry.go @@ -33,6 +33,7 @@ func InitAdmiral(ctx context.Context, params common.AdmiralParams) (*RemoteRegis w := RemoteRegistry{ ctx: ctx, AdmiralState: &as, + StartTime: time.Now(), } wd := DependencyHandler{ diff --git a/admiral/pkg/clusters/registry_test.go b/admiral/pkg/clusters/registry_test.go index de222c20..1076eeab 100644 --- a/admiral/pkg/clusters/registry_test.go +++ b/admiral/pkg/clusters/registry_test.go @@ -48,6 +48,7 @@ func TestDeleteCacheControllerThatDoesntExist(t *testing.T) { w := RemoteRegistry{ RemoteControllers: make(map[string]*RemoteController), + StartTime: time.Now(), } err := w.deleteCacheController("I don't exit") @@ -61,6 +62,7 @@ func TestDeleteCacheController(t *testing.T) { w := RemoteRegistry{ RemoteControllers: make(map[string]*RemoteController), + StartTime: time.Now(), } r := rest.Config{ diff --git a/admiral/pkg/clusters/serviceentry.go b/admiral/pkg/clusters/serviceentry.go index 5476f7dc..9c4f22b8 100644 --- a/admiral/pkg/clusters/serviceentry.go +++ b/admiral/pkg/clusters/serviceentry.go @@ -58,6 +58,10 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s defer util.LogElapsedTime("modifyServiceEntryForNewServiceOrPod", sourceIdentity, env, "")() + if IsCacheWarmupTime(remoteRegistry) { + log.Infof(LogFormat, event, env, sourceIdentity, "", "Processing skipped during cache warm up state") + return nil + } //create a service entry, destination rule and virtual service in the local cluster sourceServices := make(map[string]*k8sV1.Service) sourceWeightedServices := make(map[string]map[string]*WeightedService) diff --git a/admiral/pkg/clusters/serviceentry_test.go b/admiral/pkg/clusters/serviceentry_test.go index 8a5cd8bb..a54a8aa1 100644 --- a/admiral/pkg/clusters/serviceentry_test.go +++ b/admiral/pkg/clusters/serviceentry_test.go @@ -289,6 +289,7 @@ func TestCreateServiceEntryForNewServiceOrPod(t *testing.T) { KubeconfigPath: "testdata/fake.config", } rr, _ := InitAdmiral(context.Background(), p) + rr.StartTime = time.Now().Add(-60*time.Second) config := rest.Config{ Host: "localhost", @@ -875,6 +876,8 @@ func TestCreateServiceEntryForNewServiceOrPodRolloutsUsecase(t *testing.T) { rr, _ := InitAdmiral(context.Background(), p) + rr.StartTime = time.Now().Add(-60*time.Second) + config := rest.Config{ Host: "localhost", } @@ -1013,6 +1016,7 @@ func TestCreateServiceEntryForBlueGreenRolloutsUsecase(t *testing.T) { config := rest.Config{ Host: "localhost", } + rr.StartTime = time.Now().Add(-60*time.Second) d, e := admiral.NewDeploymentController(make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300)) diff --git a/admiral/pkg/clusters/types.go b/admiral/pkg/clusters/types.go index a874b0c1..a0d99d48 100644 --- a/admiral/pkg/clusters/types.go +++ b/admiral/pkg/clusters/types.go @@ -61,6 +61,7 @@ type RemoteRegistry struct { ctx context.Context AdmiralCache *AdmiralCache AdmiralState *AdmiralState + StartTime time.Time } func (r *RemoteRegistry) shutdown() { diff --git a/admiral/pkg/clusters/util.go b/admiral/pkg/clusters/util.go index 76099661..90e3bad4 100644 --- a/admiral/pkg/clusters/util.go +++ b/admiral/pkg/clusters/util.go @@ -10,6 +10,7 @@ import ( k8sV1 "k8s.io/api/core/v1" "strconv" "strings" + "time" ) func GetMeshPorts(clusterName string, destService *k8sV1.Service, @@ -150,3 +151,7 @@ func ValidateConfigmapBeforePutting(cm *k8sV1.ConfigMap) error { } return nil } + +func IsCacheWarmupTime(remoteRegistry *RemoteRegistry) bool { + return time.Since(remoteRegistry.StartTime) < common.GetAdmiralParams().CacheRefreshDuration +} diff --git a/admiral/pkg/controller/admiral/controller.go b/admiral/pkg/controller/admiral/controller.go index 5275c106..33fc427e 100644 --- a/admiral/pkg/controller/admiral/controller.go +++ b/admiral/pkg/controller/admiral/controller.go @@ -100,8 +100,8 @@ func (c *Controller) Run(stopCh <-chan struct{}) { } log.Infof("Informer caches synced for controller=%v, current keys=%v", c.name, c.informer.GetStore().ListKeys()) - //wait for 30 seconds for the first time (for all caches to sync) - wait.Until(c.runWorker, 30 * time.Second, stopCh) + + wait.Until(c.runWorker, 5 * time.Second, stopCh) } func (c *Controller) runWorker() { diff --git a/admiral/pkg/controller/admiral/service.go b/admiral/pkg/controller/admiral/service.go index 7c050640..d47f2082 100644 --- a/admiral/pkg/controller/admiral/service.go +++ b/admiral/pkg/controller/admiral/service.go @@ -60,7 +60,6 @@ func (s *serviceCache) Put(service *k8sV1.Service) { Service: make(map[string]map[string]*k8sV1.Service), Identity: s.getKey(service), } - s.cache[identity] = existing } namespaceServices := existing.Service[service.Namespace] if namespaceServices == nil { @@ -68,6 +67,7 @@ func (s *serviceCache) Put(service *k8sV1.Service) { } namespaceServices[service.Name] = service existing.Service[service.Namespace] = namespaceServices + s.cache[identity] = existing }