Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prom counters #211

Merged
merged 3 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions admiral/pkg/clusters/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,8 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}
stop := make(chan struct{})

s, e := admiral.NewServiceController(stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
r, e := admiral.NewRolloutsController(stop, &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
s, e := admiral.NewServiceController("test", stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
r, e := admiral.NewRolloutsController("test", stop, &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))

fakeIstioClient := istiofake.NewSimpleClientset()

Expand Down Expand Up @@ -1058,10 +1058,10 @@ func TestGetServiceForRolloutBlueGreen(t *testing.T) {
}
stop := make(chan struct{})

s, e := admiral.NewServiceController(stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
r, e := admiral.NewRolloutsController(stop, &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
s, e := admiral.NewServiceController("test", stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
r, e := admiral.NewRolloutsController("test", stop, &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))

emptyCacheService, e := admiral.NewServiceController(stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
emptyCacheService, e := admiral.NewServiceController("test", stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
t.Fatalf("Inititalization failed")
Expand Down
19 changes: 9 additions & 10 deletions admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func InitAdmiral(ctx context.Context, params common.AdmiralParams) (*RemoteRegis
log.Infof("Initializing Admiral with params: %v", params)

common.InitializeConfig(params)

w := RemoteRegistry{
ctx: ctx,
}
Expand Down Expand Up @@ -123,14 +122,14 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste

log.Infof("starting global traffic policy controller custerID: %v", clusterID)

rc.GlobalTraffic, err = admiral.NewGlobalTrafficController(stop, &GlobalTrafficHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.GlobalTraffic, err = admiral.NewGlobalTrafficController(clusterID, stop, &GlobalTrafficHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with GlobalTrafficController controller init: %v", err)
}

log.Infof("starting deployment controller clusterID: %v", clusterID)
rc.DeploymentController, err = admiral.NewDeploymentController(stop, &DeploymentHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.DeploymentController, err = admiral.NewDeploymentController(clusterID, stop, &DeploymentHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with DeploymentController controller init: %v", err)
Expand All @@ -140,49 +139,49 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste
log.Warn("admiral cache was nil!")
} else if r.AdmiralCache.argoRolloutsEnabled {
log.Infof("starting rollout controller clusterID: %v", clusterID)
rc.RolloutController, err = admiral.NewRolloutsController(stop, &RolloutHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.RolloutController, err = admiral.NewRolloutsController(clusterID, stop, &RolloutHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with Rollout controller init: %v", err)
}
}

log.Infof("starting node controller clusterID: %v", clusterID)
rc.NodeController, err = admiral.NewNodeController(stop, &NodeHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig)
rc.NodeController, err = admiral.NewNodeController(clusterID, stop, &NodeHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig)

if err != nil {
return fmt.Errorf(" Error with NodeController controller init: %v", err)
}

log.Infof("starting service controller clusterID: %v", clusterID)
rc.ServiceController, err = admiral.NewServiceController(stop, &ServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.ServiceController, err = admiral.NewServiceController(clusterID, stop, &ServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with ServiceController controller init: %v", err)
}

log.Infof("starting service entry controller for custerID: %v", clusterID)
rc.ServiceEntryController, err = istio.NewServiceEntryController(stop, &ServiceEntryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.ServiceEntryController, err = istio.NewServiceEntryController(clusterID, stop, &ServiceEntryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with ServiceEntryController init: %v", err)
}

log.Infof("starting destination rule controller for custerID: %v", clusterID)
rc.DestinationRuleController, err = istio.NewDestinationRuleController(stop, &DestinationRuleHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.DestinationRuleController, err = istio.NewDestinationRuleController(clusterID, stop, &DestinationRuleHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with DestinationRuleController init: %v", err)
}

log.Infof("starting virtual service controller for custerID: %v", clusterID)
rc.VirtualServiceController, err = istio.NewVirtualServiceController(stop, &VirtualServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.VirtualServiceController, err = istio.NewVirtualServiceController(clusterID, stop, &VirtualServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with VirtualServiceController init: %v", err)
}

rc.SidecarController, err = istio.NewSidecarController(stop, &SidecarHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.SidecarController, err = istio.NewSidecarController(clusterID, stop, &SidecarHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with DestinationRuleController init: %v", err)
Expand Down
12 changes: 6 additions & 6 deletions admiral/pkg/clusters/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestCreateDestinationRuleForLocalNoDeployLabel(t *testing.T) {
Host: "localhost",
}

d, e := admiral.NewDeploymentController(make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))
d, e := admiral.NewDeploymentController("", make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
t.Fail()
Expand Down Expand Up @@ -184,10 +184,10 @@ func createMockRemoteController(f func(interface{})) (*RemoteController, error)
Host: "localhost",
}
stop := make(chan struct{})
d, e := admiral.NewDeploymentController(stop, &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))
s, e := admiral.NewServiceController(stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
n, e := admiral.NewNodeController(stop, &test.MockNodeHandler{}, &config)
r, e := admiral.NewRolloutsController(stop, &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
d, e := admiral.NewDeploymentController("", stop, &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))
s, e := admiral.NewServiceController("test", stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
n, e := admiral.NewNodeController("", stop, &test.MockNodeHandler{}, &config)
r, e := admiral.NewRolloutsController("test", stop, &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
return nil, e
Expand Down Expand Up @@ -444,7 +444,7 @@ func TestUpdateCacheController(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
hook := logTest.NewGlobal()
rr.RemoteControllers[c.clusterId].ApiServer = c.oldConfig.Host
d, err := admiral.NewDeploymentController(make(chan struct{}), &test.MockDeploymentHandler{}, c.oldConfig, time.Second*time.Duration(300))
d, err := admiral.NewDeploymentController("", make(chan struct{}), &test.MockDeploymentHandler{}, c.oldConfig, time.Second*time.Duration(300))
if err != nil {
t.Fatalf("Unexpected error creating controller %v", err)
}
Expand Down
83 changes: 41 additions & 42 deletions admiral/pkg/clusters/serviceentry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,9 @@ func TestCreateServiceEntryForNewServiceOrPod(t *testing.T) {
Host: "localhost",
}

d, e := admiral.NewDeploymentController(make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))
d, e := admiral.NewDeploymentController("", make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))

r, e := admiral.NewRolloutsController(make(chan struct{}), &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
r, e := admiral.NewRolloutsController("test", make(chan struct{}), &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
t.Fail()
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestCreateServiceEntry(t *testing.T) {
Host: "localhost",
}
stop := make(chan struct{})
s, e := admiral.NewServiceController(stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
s, e := admiral.NewServiceController("test", stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
t.Fatalf("%v", e)
Expand Down Expand Up @@ -879,17 +879,17 @@ func TestCreateServiceEntryForNewServiceOrPodRolloutsUsecase(t *testing.T) {
Host: "localhost",
}

d, e := admiral.NewDeploymentController(make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))
d, e := admiral.NewDeploymentController("", make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))

r, e := admiral.NewRolloutsController(make(chan struct{}), &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
v, e := istio.NewVirtualServiceController(make(chan struct{}), &test.MockVirtualServiceHandler{}, &config, time.Second*time.Duration(300))
r, e := admiral.NewRolloutsController("test", make(chan struct{}), &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
v, e := istio.NewVirtualServiceController("", make(chan struct{}), &test.MockVirtualServiceHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
t.Fail()
}
s, e := admiral.NewServiceController(make(chan struct{}), &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
s, e := admiral.NewServiceController("test", make(chan struct{}), &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))

gtpc, e := admiral.NewGlobalTrafficController(make(chan struct{}), &test.MockGlobalTrafficHandler{}, &config, time.Second*time.Duration(300))
gtpc, e := admiral.NewGlobalTrafficController("", make(chan struct{}), &test.MockGlobalTrafficHandler{}, &config, time.Second*time.Duration(300))

cacheWithEntry := ServiceEntryAddressStore{
EntryAddresses: map[string]string{"test.test.mesh-se": common.LocalAddressPrefix + ".10.1"},
Expand All @@ -913,7 +913,7 @@ func TestCreateServiceEntryForNewServiceOrPodRolloutsUsecase(t *testing.T) {
RolloutController: r,
ServiceController: s,
VirtualServiceController: v,
GlobalTraffic: gtpc,
GlobalTraffic: gtpc,
}
rc.ClusterID = "test.cluster"
rr.RemoteControllers["test.cluster"] = rc
Expand Down Expand Up @@ -1012,16 +1012,16 @@ func TestCreateServiceEntryForBlueGreenRolloutsUsecase(t *testing.T) {
Host: "localhost",
}

d, e := admiral.NewDeploymentController(make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))
d, e := admiral.NewDeploymentController("", make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))

r, e := admiral.NewRolloutsController(make(chan struct{}), &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
v, e := istio.NewVirtualServiceController(make(chan struct{}), &test.MockVirtualServiceHandler{}, &config, time.Second*time.Duration(300))
r, e := admiral.NewRolloutsController("test", make(chan struct{}), &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300))
v, e := istio.NewVirtualServiceController("", make(chan struct{}), &test.MockVirtualServiceHandler{}, &config, time.Second*time.Duration(300))

if e != nil {
t.Fail()
}
s, e := admiral.NewServiceController(make(chan struct{}), &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
gtpc, e := admiral.NewGlobalTrafficController(make(chan struct{}), &test.MockGlobalTrafficHandler{}, &config, time.Second*time.Duration(300))
s, e := admiral.NewServiceController("test", make(chan struct{}), &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
gtpc, e := admiral.NewGlobalTrafficController("", make(chan struct{}), &test.MockGlobalTrafficHandler{}, &config, time.Second*time.Duration(300))

cacheWithEntry := ServiceEntryAddressStore{
EntryAddresses: map[string]string{
Expand All @@ -1048,7 +1048,7 @@ func TestCreateServiceEntryForBlueGreenRolloutsUsecase(t *testing.T) {
RolloutController: r,
ServiceController: s,
VirtualServiceController: v,
GlobalTraffic: gtpc,
GlobalTraffic: gtpc,
}
rc.ClusterID = "test.cluster"
rr.RemoteControllers["test.cluster"] = rc
Expand Down Expand Up @@ -1345,17 +1345,16 @@ func TestUpdateGlobalGtpCache(t *testing.T) {
env_stage = "stage"

gtp = &v13.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp", Namespace: "namespace1", CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-30))), Labels: map[string]string{"identity": identity1, "env": env_stage}}, Spec: model.GlobalTrafficPolicy{
Policy: []*model.TrafficPolicy {{DnsPrefix: "hello"}},
},}
Policy: []*model.TrafficPolicy{{DnsPrefix: "hello"}},
}}

gtp2 = &v13.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp2", Namespace: "namespace1", CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-15))), Labels: map[string]string{"identity": identity1, "env": env_stage}}, Spec: model.GlobalTrafficPolicy{
Policy: []*model.TrafficPolicy {{DnsPrefix: "hellogtp2"}},
},}

gtp3 = &v13.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp3", Namespace: "namespace2", CreationTimestamp: v12.NewTime(time.Now()), Labels: map[string]string{"identity": identity1, "env": env_stage}}, Spec: model.GlobalTrafficPolicy{
Policy: []*model.TrafficPolicy {{DnsPrefix: "hellogtp3"}},
},}
Policy: []*model.TrafficPolicy{{DnsPrefix: "hellogtp2"}},
}}

gtp3 = &v13.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp3", Namespace: "namespace2", CreationTimestamp: v12.NewTime(time.Now()), Labels: map[string]string{"identity": identity1, "env": env_stage}}, Spec: model.GlobalTrafficPolicy{
Policy: []*model.TrafficPolicy{{DnsPrefix: "hellogtp3"}},
}}
)

testCases := []struct {
Expand All @@ -1364,33 +1363,33 @@ func TestUpdateGlobalGtpCache(t *testing.T) {
env string
gtps map[string][]*v13.GlobalTrafficPolicy
expectedGtp *v13.GlobalTrafficPolicy
}{ {
name: "Should return nil when no GTP present",
gtps: map[string][]*v13.GlobalTrafficPolicy{},
identity: identity1,
env: env_stage,
expectedGtp: nil,
},
}{{
name: "Should return nil when no GTP present",
gtps: map[string][]*v13.GlobalTrafficPolicy{},
identity: identity1,
env: env_stage,
expectedGtp: nil,
},
{
name: "Should return the only existing gtp",
gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp}},
identity: identity1,
env: env_stage,
expectedGtp: gtp,
gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp}},
identity: identity1,
env: env_stage,
expectedGtp: gtp,
},
{
name: "Should return the gtp recently created within the cluster",
gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp, gtp2}},
identity: identity1,
env: env_stage,
expectedGtp: gtp2,
gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp, gtp2}},
identity: identity1,
env: env_stage,
expectedGtp: gtp2,
},
{
name: "Should return the gtp recently created from another cluster",
gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp, gtp2}, "c2": {gtp3}},
identity: identity1,
env: env_stage,
expectedGtp: gtp3,
gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp, gtp2}, "c2": {gtp3}},
identity: identity1,
env: env_stage,
expectedGtp: gtp3,
},
}

Expand Down
34 changes: 32 additions & 2 deletions admiral/pkg/controller/admiral/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package admiral

import (
"fmt"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
log "github.com/sirupsen/logrus"
"time"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"

"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"time"
)

const (
Expand Down Expand Up @@ -48,7 +49,7 @@ type Controller struct {
func NewController(name string, stopCh <-chan struct{}, delegator Delegator, informer cache.SharedIndexInformer) Controller {

controller := Controller{
name: name,
name: name,
informer: informer,
delegator: delegator,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
Expand Down Expand Up @@ -145,3 +146,32 @@ func (c *Controller) processItem(informerCacheObj InformerCacheObj) error {
}
return nil
}

type MonitoredDelegator struct {
clusterID string
objectType string
d Delegator
}

func NewMonitoredDelegator(d Delegator, clusterID string, objectType string) *MonitoredDelegator {
return &MonitoredDelegator{
clusterID: clusterID,
objectType: objectType,
d: d,
}
}

func (s *MonitoredDelegator) Added(obj interface{}) {
common.EventsProcessed.With(s.clusterID, s.objectType, common.AddEventLabelValue).Inc()
s.d.Added(obj)
}

func (s *MonitoredDelegator) Updated(obj interface{}, oldObj interface{}) {
common.EventsProcessed.With(s.clusterID, s.objectType, common.UpdateEventLabelValue).Inc()
s.d.Updated(obj, oldObj)
}

func (s *MonitoredDelegator) Deleted(obj interface{}) {
common.EventsProcessed.With(s.clusterID, s.objectType, common.DeleteEventLabelValue).Inc()
s.d.Deleted(obj)
}
Loading