Skip to content

Commit

Permalink
Fix event processing during bootup (#240)
Browse files Browse the repository at this point in the history
Signed-off-by: sa <sushanth_a@intuit.com>
  • Loading branch information
aattuluri authored and sa committed Jul 21, 2022
1 parent 8dbcf47 commit ff96645
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 3 deletions.
2 changes: 2 additions & 0 deletions admiral/pkg/apis/admiral/routes/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"
)

func TestReturnSuccessGET(t *testing.T) {
Expand Down Expand Up @@ -48,6 +49,7 @@ func TestGetClusters(t *testing.T) {
RemoteClusters: map[string]*secret.RemoteCluster{},
},
},
StartTime: time.Now(),
},
}
testCases := []struct {
Expand Down
3 changes: 3 additions & 0 deletions admiral/pkg/clusters/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ func TestHandleVirtualServiceEvent(t *testing.T) {
CnameDependentClusterCache: cnameCache,
SeClusterCache: common.NewMapOfMaps(),
},
StartTime: time.Now(),
},
}

Expand All @@ -548,6 +549,7 @@ func TestHandleVirtualServiceEvent(t *testing.T) {
CnameDependentClusterCache: goodCnameCache,
SeClusterCache: common.NewMapOfMaps(),
},
StartTime: time.Now(),
},
}

Expand All @@ -574,6 +576,7 @@ func TestHandleVirtualServiceEvent(t *testing.T) {
CnameDependentClusterCache: goodCnameCache,
SeClusterCache: common.NewMapOfMaps(),
},
StartTime: time.Now(),
},
}

Expand Down
2 changes: 2 additions & 0 deletions admiral/pkg/clusters/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestDeleteCacheControllerThatDoesntExist(t *testing.T) {

w := RemoteRegistry{
RemoteControllers: make(map[string]*RemoteController),
StartTime: time.Now(),
}

err := w.deleteCacheController("I don't exit")
Expand All @@ -61,6 +62,7 @@ func TestDeleteCacheController(t *testing.T) {

w := RemoteRegistry{
RemoteControllers: make(map[string]*RemoteController),
StartTime: time.Now(),
}

r := rest.Config{
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s

defer util.LogElapsedTime("modifyServiceEntryForNewServiceOrPod", sourceIdentity, env, "")()

if IsCacheWarmupTime(remoteRegistry) {
log.Infof(LogFormat, event, env, sourceIdentity, "", "Processing skipped during cache warm up state")
return nil
}
//create a service entry, destination rule and virtual service in the local cluster
sourceServices := make(map[string]*k8sV1.Service)
sourceWeightedServices := make(map[string]map[string]*WeightedService)
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/clusters/serviceentry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func TestCreateServiceEntryForNewServiceOrPod(t *testing.T) {
KubeconfigPath: "testdata/fake.config",
}
rr, _ := InitAdmiral(context.Background(), p)
rr.StartTime = time.Now().Add(-60*time.Second)

config := rest.Config{
Host: "localhost",
Expand Down Expand Up @@ -875,6 +876,8 @@ func TestCreateServiceEntryForNewServiceOrPodRolloutsUsecase(t *testing.T) {

rr, _ := InitAdmiral(context.Background(), p)

rr.StartTime = time.Now().Add(-60*time.Second)

config := rest.Config{
Host: "localhost",
}
Expand Down Expand Up @@ -1011,6 +1014,7 @@ func TestCreateServiceEntryForBlueGreenRolloutsUsecase(t *testing.T) {
config := rest.Config{
Host: "localhost",
}
rr.StartTime = time.Now().Add(-60*time.Second)

d, e := admiral.NewDeploymentController("", make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))

Expand Down
5 changes: 5 additions & 0 deletions admiral/pkg/clusters/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
k8sV1 "k8s.io/api/core/v1"
"strconv"
"strings"
"time"
)

func GetMeshPorts(clusterName string, destService *k8sV1.Service,
Expand Down Expand Up @@ -127,3 +128,7 @@ func ValidateConfigmapBeforePutting(cm *k8sV1.ConfigMap) error {
}
return nil
}

func IsCacheWarmupTime(remoteRegistry *RemoteRegistry) bool {
return time.Since(remoteRegistry.StartTime) < common.GetAdmiralParams().CacheRefreshDuration
}
4 changes: 2 additions & 2 deletions admiral/pkg/controller/admiral/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}

log.Infof("Informer caches synced for controller=%v, current keys=%v", c.name, c.informer.GetStore().ListKeys())
//wait for 30 seconds for the first time (for all caches to sync)
wait.Until(c.runWorker, 30 * time.Second, stopCh)

wait.Until(c.runWorker, 5 * time.Second, stopCh)
}

func (c *Controller) runWorker() {
Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/controller/admiral/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ func (s *serviceCache) Put(service *k8sV1.Service) {
Service: make(map[string]map[string]*k8sV1.Service),
Identity: s.getKey(service),
}
s.cache[identity] = existing
}
namespaceServices := existing.Service[service.Namespace]
if namespaceServices == nil {
namespaceServices = make(map[string]*k8sV1.Service)
}
namespaceServices[service.Name] = service
existing.Service[service.Namespace] = namespaceServices
s.cache[identity] = existing

}

Expand Down

0 comments on commit ff96645

Please sign in to comment.