Skip to content

Commit

Permalink
Fix event processing during bootup (istio-ecosystem#240) (istio-ecosy…
Browse files Browse the repository at this point in the history
…stem#139)

Co-authored-by: aattuluri <44482891+aattuluri@users.noreply.github.com>
  • Loading branch information
2 people authored and GitHub Enterprise committed Jul 6, 2022
1 parent 5bb5e5b commit a9a4356
Show file tree
Hide file tree
Showing 11 changed files with 28 additions and 5 deletions.
2 changes: 2 additions & 0 deletions admiral/pkg/apis/admiral/routes/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"istio.io/client-go/pkg/apis/networking/v1alpha3"
istiofake "istio.io/client-go/pkg/clientset/versioned/fake"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"time"
)

func TestReturnSuccessGET(t *testing.T) {
Expand Down Expand Up @@ -51,6 +52,7 @@ func TestGetClusters(t *testing.T) {
RemoteClusters: map[string]*secret.RemoteCluster{},
},
},
StartTime: time.Now(),
},
}
testCases := []struct {
Expand Down
5 changes: 3 additions & 2 deletions admiral/pkg/clusters/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,8 @@ Service Entry object is not added/updated if the current pod is in ReadOnly mode
*/
func addUpdateServiceEntry(obj *v1alpha3.ServiceEntry, exist *v1alpha3.ServiceEntry, namespace string, rc *RemoteController, admiralState *AdmiralState) {
var err error
var op string
var op, diff string
var skipUpdate bool
if obj.Annotations == nil {
obj.Annotations = map[string]string{}
}
Expand Down Expand Up @@ -601,7 +602,7 @@ func addUpdateServiceEntry(obj *v1alpha3.ServiceEntry, exist *v1alpha3.ServiceEn
exist.Labels = obj.Labels
exist.Annotations = obj.Annotations
op = "Update"
skipUpdate, diff := skipDestructiveUpdate(rc, obj, exist)
skipUpdate, diff = skipDestructiveUpdate(rc, obj, exist)
if diff != "" {
log.Infof(LogFormat+" diff=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "Diff in update", diff)
}
Expand Down
3 changes: 3 additions & 0 deletions admiral/pkg/clusters/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ func TestHandleVirtualServiceEvent(t *testing.T) {
CnameDependentClusterCache: cnameCache,
SeClusterCache: common.NewMapOfMaps(),
},
StartTime: time.Now(),
},
}

Expand All @@ -547,6 +548,7 @@ func TestHandleVirtualServiceEvent(t *testing.T) {
CnameDependentClusterCache: goodCnameCache,
SeClusterCache: common.NewMapOfMaps(),
},
StartTime: time.Now(),
},
}

Expand All @@ -573,6 +575,7 @@ func TestHandleVirtualServiceEvent(t *testing.T) {
CnameDependentClusterCache: goodCnameCache,
SeClusterCache: common.NewMapOfMaps(),
},
StartTime: time.Now(),
},
}

Expand Down
1 change: 1 addition & 0 deletions admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func InitAdmiral(ctx context.Context, params common.AdmiralParams) (*RemoteRegis
w := RemoteRegistry{
ctx: ctx,
AdmiralState: &as,
StartTime: time.Now(),
}

wd := DependencyHandler{
Expand Down
2 changes: 2 additions & 0 deletions admiral/pkg/clusters/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestDeleteCacheControllerThatDoesntExist(t *testing.T) {

w := RemoteRegistry{
RemoteControllers: make(map[string]*RemoteController),
StartTime: time.Now(),
}

err := w.deleteCacheController("I don't exit")
Expand All @@ -61,6 +62,7 @@ func TestDeleteCacheController(t *testing.T) {

w := RemoteRegistry{
RemoteControllers: make(map[string]*RemoteController),
StartTime: time.Now(),
}

r := rest.Config{
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s

defer util.LogElapsedTime("modifyServiceEntryForNewServiceOrPod", sourceIdentity, env, "")()

if IsCacheWarmupTime(remoteRegistry) {
log.Infof(LogFormat, event, env, sourceIdentity, "", "Processing skipped during cache warm up state")
return nil
}
//create a service entry, destination rule and virtual service in the local cluster
sourceServices := make(map[string]*k8sV1.Service)
sourceWeightedServices := make(map[string]map[string]*WeightedService)
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/clusters/serviceentry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func TestCreateServiceEntryForNewServiceOrPod(t *testing.T) {
KubeconfigPath: "testdata/fake.config",
}
rr, _ := InitAdmiral(context.Background(), p)
rr.StartTime = time.Now().Add(-60*time.Second)

config := rest.Config{
Host: "localhost",
Expand Down Expand Up @@ -875,6 +876,8 @@ func TestCreateServiceEntryForNewServiceOrPodRolloutsUsecase(t *testing.T) {

rr, _ := InitAdmiral(context.Background(), p)

rr.StartTime = time.Now().Add(-60*time.Second)

config := rest.Config{
Host: "localhost",
}
Expand Down Expand Up @@ -1013,6 +1016,7 @@ func TestCreateServiceEntryForBlueGreenRolloutsUsecase(t *testing.T) {
config := rest.Config{
Host: "localhost",
}
rr.StartTime = time.Now().Add(-60*time.Second)

d, e := admiral.NewDeploymentController(make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))

Expand Down
1 change: 1 addition & 0 deletions admiral/pkg/clusters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type RemoteRegistry struct {
ctx context.Context
AdmiralCache *AdmiralCache
AdmiralState *AdmiralState
StartTime time.Time
}

func (r *RemoteRegistry) shutdown() {
Expand Down
5 changes: 5 additions & 0 deletions admiral/pkg/clusters/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
k8sV1 "k8s.io/api/core/v1"
"strconv"
"strings"
"time"
)

func GetMeshPorts(clusterName string, destService *k8sV1.Service,
Expand Down Expand Up @@ -150,3 +151,7 @@ func ValidateConfigmapBeforePutting(cm *k8sV1.ConfigMap) error {
}
return nil
}

func IsCacheWarmupTime(remoteRegistry *RemoteRegistry) bool {
return time.Since(remoteRegistry.StartTime) < common.GetAdmiralParams().CacheRefreshDuration
}
4 changes: 2 additions & 2 deletions admiral/pkg/controller/admiral/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}

log.Infof("Informer caches synced for controller=%v, current keys=%v", c.name, c.informer.GetStore().ListKeys())
//wait for 30 seconds for the first time (for all caches to sync)
wait.Until(c.runWorker, 30 * time.Second, stopCh)

wait.Until(c.runWorker, 5 * time.Second, stopCh)
}

func (c *Controller) runWorker() {
Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/controller/admiral/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ func (s *serviceCache) Put(service *k8sV1.Service) {
Service: make(map[string]map[string]*k8sV1.Service),
Identity: s.getKey(service),
}
s.cache[identity] = existing
}
namespaceServices := existing.Service[service.Namespace]
if namespaceServices == nil {
namespaceServices = make(map[string]*k8sV1.Service)
}
namespaceServices[service.Name] = service
existing.Service[service.Namespace] = namespaceServices
s.cache[identity] = existing

}

Expand Down

0 comments on commit a9a4356

Please sign in to comment.