Skip to content

Commit

Permalink
Reorder ServiceEntry writes. Add execution time and informer event lo…
Browse files Browse the repository at this point in the history
…gs. (istio-ecosystem#202)

Signed-off-by: psikka1 <pankaj_sikka@intuit.com>
  • Loading branch information
aattuluri authored and psikka1 committed Jun 15, 2022
1 parent 1877751 commit 74b9c09
Show file tree
Hide file tree
Showing 14 changed files with 67 additions and 33 deletions.
42 changes: 31 additions & 11 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func createServiceEntry(event admiral.EventType, rc *RemoteController, admiralCa
}

func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, sourceIdentity string, remoteRegistry *RemoteRegistry) map[string]*networking.ServiceEntry {

defer util.LogElapsedTime("modifyServiceEntryForNewServiceOrPod", sourceIdentity, env, "")()

//create a service entry, destination rule and virtual service in the local cluster
sourceServices := make(map[string]*k8sV1.Service)
sourceDeployments := make(map[string]*k8sAppsV1.Deployment)
Expand All @@ -67,6 +70,7 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s
var weightedServices map[string]*WeightedService
var rollout *admiral.RolloutClusterEntry

start := time.Now()
for _, rc := range remoteRegistry.RemoteControllers {

deployment := rc.DeploymentController.Cache.Get(sourceIdentity)
Expand Down Expand Up @@ -118,18 +122,15 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s
sourceServices[rc.ClusterID] = serviceInstance
}

dependents := remoteRegistry.AdmiralCache.IdentityDependencyCache.Get(sourceIdentity).Copy()
util.LogElapsedTimeSince("BuildServiceEntry", sourceIdentity, env, "", start)

dependentClusters := getDependentClusters(dependents, remoteRegistry.AdmiralCache.IdentityClusterCache, sourceServices)
dependents := remoteRegistry.AdmiralCache.IdentityDependencyCache.Get(sourceIdentity).Copy()

//update cname dependent cluster cache
for clusterId := range dependentClusters {
remoteRegistry.AdmiralCache.CnameDependentClusterCache.Put(cname, clusterId, clusterId)
}
//handle local updates (source clusters first)
//update the address to local fqdn for service entry in a cluster local to the service instance

AddServiceEntriesWithDr(remoteRegistry.AdmiralCache, dependentClusters, remoteRegistry.RemoteControllers, serviceEntries)
start = time.Now()

//update the address to local fqdn for service entry in a cluster local to the service instance
for sourceCluster, serviceInstance := range sourceServices {
localFqdn := serviceInstance.Name + common.Sep + serviceInstance.Namespace + common.DotLocalDomainSuffix
rc := remoteRegistry.RemoteControllers[sourceCluster]
Expand Down Expand Up @@ -186,14 +187,33 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s
}
}

if common.GetWorkloadSidecarUpdate() == "enabled" {
modifySidecarForLocalClusterCommunication(serviceInstance.Namespace, remoteRegistry.AdmiralCache.DependencyNamespaceCache.Get(sourceIdentity), rc)
}

for _, val := range dependents {
remoteRegistry.AdmiralCache.DependencyNamespaceCache.Put(val, serviceInstance.Namespace, localFqdn, cnames)
}

if common.GetWorkloadSidecarUpdate() == "enabled" {
modifySidecarForLocalClusterCommunication(serviceInstance.Namespace, remoteRegistry.AdmiralCache.DependencyNamespaceCache.Get(sourceIdentity), rc)
}
}

util.LogElapsedTimeSince("WriteServiceEntryToSourceClusters", sourceIdentity, env, "", start)

//Write to dependent clusters

start = time.Now()

dependentClusters := getDependentClusters(dependents, remoteRegistry.AdmiralCache.IdentityClusterCache, sourceServices)

//update cname dependent cluster cache
for clusterId := range dependentClusters {
remoteRegistry.AdmiralCache.CnameDependentClusterCache.Put(cname, clusterId, clusterId)
}

AddServiceEntriesWithDr(remoteRegistry.AdmiralCache, dependentClusters, remoteRegistry.RemoteControllers, serviceEntries)

util.LogElapsedTimeSince("WriteServiceEntryToDependentClusters", sourceIdentity, env, "", start)

return serviceEntries
}

Expand Down
19 changes: 10 additions & 9 deletions admiral/pkg/controller/admiral/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,40 +39,41 @@ type InformerCacheObj struct {
}

type Controller struct {
name string
delegator Delegator
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
}

func NewController(stopCh <-chan struct{}, delegator Delegator, informer cache.SharedIndexInformer) Controller {
func NewController(name string, stopCh <-chan struct{}, delegator Delegator, informer cache.SharedIndexInformer) Controller {

controller := Controller{
name: name,
informer: informer,
delegator: delegator,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}

controller.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debugf("Informer: add : %v", obj)
key, err := cache.MetaNamespaceKeyFunc(obj)

if err == nil {
log.Infof("Informer Add controller=%v obj=%v", controller.name, key)
controller.queue.Add(InformerCacheObj{key: key, eventType: Add, obj: obj})
}

},
UpdateFunc: func(oldObj, newObj interface{}) {
log.Debugf("Informer Update: %v", newObj)
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err == nil {
log.Infof("Informer Update controller=%v obj=%v", controller.name, key)
controller.queue.Add(InformerCacheObj{key: key, eventType: Update, obj: newObj, oldObj: oldObj})
}
},
DeleteFunc: func(obj interface{}) {
log.Debugf("Informer Delete: %v", obj)
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
log.Infof("Informer Delete controller=%v obj=%v", controller.name, key)
controller.queue.Add(InformerCacheObj{key: key, eventType: Delete, obj: obj})
}
},
Expand All @@ -88,18 +89,18 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

log.Info("Starting controller")
log.Infof("Starting controller=%v", c.name)

go c.informer.Run(stopCh)

// Wait for the caches to be synced before starting workers
log.Info(" Waiting for informer caches to sync")
log.Infof(" Waiting for informer caches to sync for controller=%v", c.name)
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
utilruntime.HandleError(fmt.Errorf(" timed out waiting for caches to sync"))
utilruntime.HandleError(fmt.Errorf(" timed out waiting for caches to sync for controller=%v", c.name))
return
}

log.Info("informer caches synced")
log.Infof("Informer caches synced for controller=%v, current keys=%v", c.name, c.informer.GetStore().ListKeys())
wait.Until(c.runWorker, 5*time.Second, stopCh)
}

Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/controller/admiral/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewDependencyController(stopCh <-chan struct{}, handler DepHandler, configP
cache.Indexers{},
)

NewController(stopCh, &depController, depController.informer)
NewController("dependency-ctrl-" + namespace, stopCh, &depController, depController.informer)

return &depController, nil
}
Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/controller/admiral/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func NewDeploymentController(stopCh <-chan struct{}, handler DeploymentHandler,
cache.Indexers{},
)

NewController(stopCh, &deploymentController, deploymentController.informer)
NewController("deployment-ctrl-" + config.Host, stopCh, &deploymentController, deploymentController.informer)

return &deploymentController, nil
}
Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/controller/admiral/globaltraffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewGlobalTrafficController(stopCh <-chan struct{}, handler GlobalTrafficHan
cache.Indexers{},
)

NewController(stopCh, &globalTrafficController, globalTrafficController.informer)
NewController("gtp-ctrl-" + configPath.Host, stopCh, &globalTrafficController, globalTrafficController.informer)

return &globalTrafficController, nil
}
Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/controller/admiral/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewNodeController(stopCh <-chan struct{}, handler NodeHandler, config *rest
cache.Indexers{},
)

NewController(stopCh, &nodeController, nodeController.informer)
NewController("node-ctrl-" + config.Host, stopCh, &nodeController, nodeController.informer)

return &nodeController, nil
}
Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/controller/admiral/rollouts.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func NewRolloutsController(stopCh <-chan struct{}, handler RolloutHandler, confi
//Initialize informer
roController.informer = argoRolloutsInformerFactory.Argoproj().V1alpha1().Rollouts().Informer()

NewController(stopCh, &roController, roController.informer)
NewController("rollouts-ctrl-" + config.Host , stopCh, &roController, roController.informer)
return &roController, nil
}

Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/controller/admiral/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func NewServiceController(stopCh <-chan struct{}, handler ServiceHandler, config
&k8sV1.Service{}, resyncPeriod, cache.Indexers{},
)

NewController(stopCh, &serviceController, serviceController.informer)
NewController("service-ctrl-" + config.Host , stopCh, &serviceController, serviceController.informer)

return &serviceController, nil
}
Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/controller/istio/destinationrule.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewDestinationRuleController(stopCh <-chan struct{}, handler DestinationRul

drController.informer = informers.NewDestinationRuleInformer(ic, k8sV1.NamespaceAll, resyncPeriod, cache.Indexers{})

admiral.NewController(stopCh, &drController, drController.informer)
admiral.NewController("destinationrule-ctrl-" + config.Host, stopCh, &drController, drController.informer)

return &drController, nil
}
Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/controller/istio/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewServiceEntryController(stopCh <-chan struct{}, handler ServiceEntryHandl

seController.informer = informers.NewServiceEntryInformer(ic, k8sV1.NamespaceAll, resyncPeriod, cache.Indexers{})

admiral.NewController(stopCh, &seController, seController.informer)
admiral.NewController("serviceentry-ctrl-" + config.Host, stopCh, &seController, seController.informer)

return &seController, nil
}
Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/controller/istio/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewSidecarController(stopCh <-chan struct{}, handler SidecarHandler, config

sidecarController.informer = informers.NewSidecarInformer(ic, k8sV1.NamespaceAll, resyncPeriod, cache.Indexers{})

admiral.NewController(stopCh, &sidecarController, sidecarController.informer)
admiral.NewController("sidecar-ctrl-" + config.Host, stopCh, &sidecarController, sidecarController.informer)

return &sidecarController, nil
}
Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/controller/istio/virtualservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewVirtualServiceController(stopCh <-chan struct{}, handler VirtualServiceH

drController.informer = informers.NewVirtualServiceInformer(ic, k8sV1.NamespaceAll, resyncPeriod, cache.Indexers{})

admiral.NewController(stopCh, &drController, drController.informer)
admiral.NewController("virtualservice-ctrl-" + config.Host, stopCh, &drController, drController.informer)

return &drController, nil
}
Expand Down
6 changes: 3 additions & 3 deletions admiral/pkg/controller/secret/secretcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,21 +141,21 @@ func NewController(
secretsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
log.Infof("Processing add: %s", key)
log.Infof("Processing cluster add: %s", key)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
log.Infof("Processing update: %s", key)
log.Infof("Processing cluster update: %s", key)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
log.Infof("Processing delete: %s", key)
log.Infof("Processing cluster delete: %s", key)
if err == nil {
queue.Add(key)
}
Expand Down
13 changes: 13 additions & 0 deletions admiral/pkg/controller/util/util.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package util

import (
log "github.com/sirupsen/logrus"
"reflect"
"time"
)

func MapCopy(dst, src interface{}) {
Expand Down Expand Up @@ -36,3 +38,14 @@ func Contains(vs []string, t string) bool {
}
return false
}

func LogElapsedTime(op, identity, env, clusterId string) func() {
start := time.Now()
return func() {
LogElapsedTimeSince(op, identity, env, clusterId, start)
}
}

func LogElapsedTimeSince(op, identity, env, clusterId string, start time.Time) {
log.Infof("op=%s identity=%s env=%s cluster=%s time=%v\n", op, identity, env, clusterId, time.Since(start).Milliseconds())
}

0 comments on commit 74b9c09

Please sign in to comment.