Skip to content

Commit

Permalink
Prom counters (#211)
Browse files Browse the repository at this point in the history
* Prometheus Counters

- add counter. rework tests
- add support for labels
- move metrics to metrics.go
- update metrics tests with labels
- use a dedicated delegator to capture metrics
- renamed to MonitoredDelegator

Signed-off-by: Adil Fulara <adil.fulara@gmail.com>
Signed-off-by: sa <sushanth_a@intuit.com>
  • Loading branch information
adilfulara authored and sa committed Jul 21, 2022
1 parent 44adf71 commit a486de8
Show file tree
Hide file tree
Showing 34 changed files with 589 additions and 549 deletions.
402 changes: 99 additions & 303 deletions admiral/pkg/clusters/handler.go

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions admiral/pkg/clusters/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,8 +670,8 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}
stop := make(chan struct{})

s, e := admiral.NewServiceController(stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
r, e := admiral.NewRolloutsController(stop, &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
s, e := admiral.NewServiceController("test", stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
r, e := admiral.NewRolloutsController("test", stop, &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))

fakeIstioClient := istiofake.NewSimpleClientset()

Expand Down Expand Up @@ -1057,10 +1057,10 @@ func TestGetServiceForRolloutBlueGreen(t *testing.T) {
}
stop := make(chan struct{})

s, e := admiral.NewServiceController(stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
r, e := admiral.NewRolloutsController(stop, &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
s, e := admiral.NewServiceController("test", stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
r, e := admiral.NewRolloutsController("test", stop, &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))

emptyCacheService, e := admiral.NewServiceController(stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
emptyCacheService, e := admiral.NewServiceController("test", stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
t.Fatalf("Inititalization failed")
Expand Down
76 changes: 37 additions & 39 deletions admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@ func InitAdmiral(ctx context.Context, params common.AdmiralParams) (*RemoteRegis
log.Infof("Initializing Admiral with params: %v", params)

common.InitializeConfig(params)

AdmiralCurrentState = AdmiralState{ReadOnlyEnabled}
startAdmiralStateChecker(ctx,params)

w := RemoteRegistry{
ctx: ctx,
StartTime: time.Now(),
}

wd := DependencyHandler{
Expand Down Expand Up @@ -69,7 +66,7 @@ func InitAdmiral(ctx context.Context, params common.AdmiralParams) (*RemoteRegis
log.Info("argo rollouts disabled")
}

configMapController, err := admiral.NewConfigMapController(params.ServiceEntryIPPrefix)
configMapController, err := admiral.NewConfigMapController()
if err != nil {
return nil, fmt.Errorf(" Error with configmap controller init: %v", err)
}
Expand Down Expand Up @@ -124,71 +121,72 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste

var err error

log.Infof("starting global traffic policy controller custerID: %v", clusterID)

rc.GlobalTraffic, err = admiral.NewGlobalTrafficController(stop, &GlobalTrafficHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting service controller clusterID: %v", clusterID)
rc.ServiceController, err = admiral.NewServiceController(clusterID, stop, &ServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with GlobalTrafficController controller init: %v", err)
return fmt.Errorf("error with ServiceController controller init: %v", err)
}

log.Infof("starting deployment controller clusterID: %v", clusterID)
rc.DeploymentController, err = admiral.NewDeploymentController(stop, &DeploymentHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting global traffic policy controller custerID: %v", clusterID)

rc.GlobalTraffic, err = admiral.NewGlobalTrafficController(clusterID, stop, &GlobalTrafficHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with DeploymentController controller init: %v", err)
return fmt.Errorf("error with GlobalTrafficController controller init: %v", err)
}

if r.AdmiralCache == nil {
log.Warn("admiral cache was nil!")
} else if r.AdmiralCache.argoRolloutsEnabled {
log.Infof("starting rollout controller clusterID: %v", clusterID)
rc.RolloutController, err = admiral.NewRolloutsController(stop, &RolloutHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with Rollout controller init: %v", err)
}
}

log.Infof("starting node controller clusterID: %v", clusterID)
rc.NodeController, err = admiral.NewNodeController(stop, &NodeHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig)
rc.NodeController, err = admiral.NewNodeController(clusterID, stop, &NodeHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig)

if err != nil {
return fmt.Errorf(" Error with NodeController controller init: %v", err)
return fmt.Errorf("error with NodeController controller init: %v", err)
}

log.Infof("starting service controller clusterID: %v", clusterID)
rc.ServiceController, err = admiral.NewServiceController(stop, &ServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting service entry controller for custerID: %v", clusterID)
rc.ServiceEntryController, err = istio.NewServiceEntryController(clusterID, stop, &ServiceEntryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with ServiceController controller init: %v", err)
return fmt.Errorf("error with ServiceEntryController init: %v", err)
}

log.Infof("starting service entry controller for custerID: %v", clusterID)
rc.ServiceEntryController, err = istio.NewServiceEntryController(stop, &ServiceEntryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting destination rule controller for custerID: %v", clusterID)
rc.DestinationRuleController, err = istio.NewDestinationRuleController(clusterID, stop, &DestinationRuleHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with ServiceEntryController init: %v", err)
return fmt.Errorf("error with DestinationRuleController init: %v", err)
}

log.Infof("starting destination rule controller for custerID: %v", clusterID)
rc.DestinationRuleController, err = istio.NewDestinationRuleController(stop, &DestinationRuleHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting virtual service controller for custerID: %v", clusterID)
rc.VirtualServiceController, err = istio.NewVirtualServiceController(clusterID, stop, &VirtualServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with DestinationRuleController init: %v", err)
return fmt.Errorf("error with VirtualServiceController init: %v", err)
}

log.Infof("starting virtual service controller for custerID: %v", clusterID)
rc.VirtualServiceController, err = istio.NewVirtualServiceController(stop, &VirtualServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.SidecarController, err = istio.NewSidecarController(clusterID, stop, &SidecarHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with VirtualServiceController init: %v", err)
return fmt.Errorf("error with DestinationRuleController init: %v", err)
}

rc.SidecarController, err = istio.NewSidecarController(stop, &SidecarHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting deployment controller clusterID: %v", clusterID)
rc.DeploymentController, err = admiral.NewDeploymentController(clusterID, stop, &DeploymentHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with DestinationRuleController init: %v", err)
return fmt.Errorf("error with DeploymentController controller init: %v", err)
}

if r.AdmiralCache == nil {
log.Warn("admiral cache was nil!")
} else if r.AdmiralCache.argoRolloutsEnabled {
log.Infof("starting rollout controller clusterID: %v", clusterID)
rc.RolloutController, err = admiral.NewRolloutsController(clusterID, stop, &RolloutHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf("error with Rollout controller init: %v", err)
}
}

r.Lock()
Expand Down Expand Up @@ -232,4 +230,4 @@ func (r *RemoteRegistry) deleteCacheController(clusterID string) error {

log.Infof(LogFormat, "Delete", "remote-controller", clusterID, clusterID, "success")
return nil
}
}
12 changes: 6 additions & 6 deletions admiral/pkg/clusters/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestCreateDestinationRuleForLocalNoDeployLabel(t *testing.T) {
Host: "localhost",
}

d, e := admiral.NewDeploymentController(make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))
d, e := admiral.NewDeploymentController("", make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
t.Fail()
Expand Down Expand Up @@ -184,10 +184,10 @@ func createMockRemoteController(f func(interface{})) (*RemoteController, error)
Host: "localhost",
}
stop := make(chan struct{})
d, e := admiral.NewDeploymentController(stop, &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))
s, e := admiral.NewServiceController(stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
n, e := admiral.NewNodeController(stop, &test.MockNodeHandler{}, &config)
r, e := admiral.NewRolloutsController(stop, &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
d, e := admiral.NewDeploymentController("", stop, &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))
s, e := admiral.NewServiceController("test", stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
n, e := admiral.NewNodeController("", stop, &test.MockNodeHandler{}, &config)
r, e := admiral.NewRolloutsController("test", stop, &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
return nil, e
Expand Down Expand Up @@ -444,7 +444,7 @@ func TestUpdateCacheController(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
hook := logTest.NewGlobal()
rr.RemoteControllers[c.clusterId].ApiServer = c.oldConfig.Host
d, err := admiral.NewDeploymentController(make(chan struct{}), &test.MockDeploymentHandler{}, c.oldConfig, time.Second*time.Duration(300))
d, err := admiral.NewDeploymentController("", make(chan struct{}), &test.MockDeploymentHandler{}, c.oldConfig, time.Second*time.Duration(300))
if err != nil {
t.Fatalf("Unexpected error creating controller %v", err)
}
Expand Down
83 changes: 41 additions & 42 deletions admiral/pkg/clusters/serviceentry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,9 @@ func TestCreateServiceEntryForNewServiceOrPod(t *testing.T) {
Host: "localhost",
}

d, e := admiral.NewDeploymentController(make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))
d, e := admiral.NewDeploymentController("", make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))

r, e := admiral.NewRolloutsController(make(chan struct{}), &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
r, e := admiral.NewRolloutsController("test", make(chan struct{}), &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
t.Fail()
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestCreateServiceEntry(t *testing.T) {
Host: "localhost",
}
stop := make(chan struct{})
s, e := admiral.NewServiceController(stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
s, e := admiral.NewServiceController("test", stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
t.Fatalf("%v", e)
Expand Down Expand Up @@ -879,17 +879,17 @@ func TestCreateServiceEntryForNewServiceOrPodRolloutsUsecase(t *testing.T) {
Host: "localhost",
}

d, e := admiral.NewDeploymentController(make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))
d, e := admiral.NewDeploymentController("", make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))

r, e := admiral.NewRolloutsController(make(chan struct{}), &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
v, e := istio.NewVirtualServiceController(make(chan struct{}), &test.MockVirtualServiceHandler{}, &config, time.Second*time.Duration(300))
r, e := admiral.NewRolloutsController("test", make(chan struct{}), &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
v, e := istio.NewVirtualServiceController("", make(chan struct{}), &test.MockVirtualServiceHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
t.Fail()
}
s, e := admiral.NewServiceController(make(chan struct{}), &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
s, e := admiral.NewServiceController("test", make(chan struct{}), &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))

gtpc, e := admiral.NewGlobalTrafficController(make(chan struct{}), &test.MockGlobalTrafficHandler{}, &config, time.Second*time.Duration(300))
gtpc, e := admiral.NewGlobalTrafficController("", make(chan struct{}), &test.MockGlobalTrafficHandler{}, &config, time.Second*time.Duration(300))

cacheWithEntry := ServiceEntryAddressStore{
EntryAddresses: map[string]string{"test.test.mesh-se": common.LocalAddressPrefix + ".10.1"},
Expand All @@ -913,7 +913,7 @@ func TestCreateServiceEntryForNewServiceOrPodRolloutsUsecase(t *testing.T) {
RolloutController: r,
ServiceController: s,
VirtualServiceController: v,
GlobalTraffic: gtpc,
GlobalTraffic: gtpc,
}
rc.ClusterID = "test.cluster"
rr.RemoteControllers["test.cluster"] = rc
Expand Down Expand Up @@ -1012,16 +1012,16 @@ func TestCreateServiceEntryForBlueGreenRolloutsUsecase(t *testing.T) {
Host: "localhost",
}

d, e := admiral.NewDeploymentController(make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))
d, e := admiral.NewDeploymentController("", make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))

r, e := admiral.NewRolloutsController(make(chan struct{}), &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
v, e := istio.NewVirtualServiceController(make(chan struct{}), &test.MockVirtualServiceHandler{}, &config, time.Second*time.Duration(300))
r, e := admiral.NewRolloutsController("test", make(chan struct{}), &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
v, e := istio.NewVirtualServiceController("", make(chan struct{}), &test.MockVirtualServiceHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
t.Fail()
}
s, e := admiral.NewServiceController(make(chan struct{}), &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
gtpc, e := admiral.NewGlobalTrafficController(make(chan struct{}), &test.MockGlobalTrafficHandler{}, &config, time.Second*time.Duration(300))
s, e := admiral.NewServiceController("test", make(chan struct{}), &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
gtpc, e := admiral.NewGlobalTrafficController("", make(chan struct{}), &test.MockGlobalTrafficHandler{}, &config, time.Second*time.Duration(300))

cacheWithEntry := ServiceEntryAddressStore{
EntryAddresses: map[string]string{
Expand All @@ -1048,7 +1048,7 @@ func TestCreateServiceEntryForBlueGreenRolloutsUsecase(t *testing.T) {
RolloutController: r,
ServiceController: s,
VirtualServiceController: v,
GlobalTraffic: gtpc,
GlobalTraffic: gtpc,
}
rc.ClusterID = "test.cluster"
rr.RemoteControllers["test.cluster"] = rc
Expand Down Expand Up @@ -1345,17 +1345,16 @@ func TestUpdateGlobalGtpCache(t *testing.T) {
env_stage = "stage"

gtp = &v13.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp", Namespace: "namespace1", CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-30))), Labels: map[string]string{"identity": identity1, "env": env_stage}}, Spec: model.GlobalTrafficPolicy{
Policy: []*model.TrafficPolicy {{DnsPrefix: "hello"}},
},}
Policy: []*model.TrafficPolicy{{DnsPrefix: "hello"}},
}}

gtp2 = &v13.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp2", Namespace: "namespace1", CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-15))), Labels: map[string]string{"identity": identity1, "env": env_stage}}, Spec: model.GlobalTrafficPolicy{
Policy: []*model.TrafficPolicy {{DnsPrefix: "hellogtp2"}},
},}

gtp3 = &v13.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp3", Namespace: "namespace2", CreationTimestamp: v12.NewTime(time.Now()), Labels: map[string]string{"identity": identity1, "env": env_stage}}, Spec: model.GlobalTrafficPolicy{
Policy: []*model.TrafficPolicy {{DnsPrefix: "hellogtp3"}},
},}
Policy: []*model.TrafficPolicy{{DnsPrefix: "hellogtp2"}},
}}

gtp3 = &v13.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp3", Namespace: "namespace2", CreationTimestamp: v12.NewTime(time.Now()), Labels: map[string]string{"identity": identity1, "env": env_stage}}, Spec: model.GlobalTrafficPolicy{
Policy: []*model.TrafficPolicy{{DnsPrefix: "hellogtp3"}},
}}
)

testCases := []struct {
Expand All @@ -1364,33 +1363,33 @@ func TestUpdateGlobalGtpCache(t *testing.T) {
env string
gtps map[string][]*v13.GlobalTrafficPolicy
expectedGtp *v13.GlobalTrafficPolicy
}{ {
name: "Should return nil when no GTP present",
gtps: map[string][]*v13.GlobalTrafficPolicy{},
identity: identity1,
env: env_stage,
expectedGtp: nil,
},
}{{
name: "Should return nil when no GTP present",
gtps: map[string][]*v13.GlobalTrafficPolicy{},
identity: identity1,
env: env_stage,
expectedGtp: nil,
},
{
name: "Should return the only existing gtp",
gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp}},
identity: identity1,
env: env_stage,
expectedGtp: gtp,
gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp}},
identity: identity1,
env: env_stage,
expectedGtp: gtp,
},
{
name: "Should return the gtp recently created within the cluster",
gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp, gtp2}},
identity: identity1,
env: env_stage,
expectedGtp: gtp2,
gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp, gtp2}},
identity: identity1,
env: env_stage,
expectedGtp: gtp2,
},
{
name: "Should return the gtp recently created from another cluster",
gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp, gtp2}, "c2": {gtp3}},
identity: identity1,
env: env_stage,
expectedGtp: gtp3,
gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp, gtp2}, "c2": {gtp3}},
identity: identity1,
env: env_stage,
expectedGtp: gtp3,
},
}

Expand Down
Loading

0 comments on commit a486de8

Please sign in to comment.