Skip to content

Commit

Permalink
Handle k8s service events and trigger SE generation (#238)
Browse files Browse the repository at this point in the history
  • Loading branch information
aattuluri committed Jul 2, 2022
1 parent 3cd7ac8 commit eeefffa
Show file tree
Hide file tree
Showing 13 changed files with 265 additions and 234 deletions.
27 changes: 7 additions & 20 deletions admiral/pkg/clusters/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net"
"reflect"
"sort"
"strings"
"time"

Expand All @@ -25,7 +24,6 @@ import (
)

const (
ROLLOUT_POD_HASH_LABEL string = "rollouts-pod-template-hash"
DefaultBaseEjectionTime int64 = 300
DefaultConsecutiveGatewayErrors uint32 = 50
DefaultInterval int64 = 60
Expand Down Expand Up @@ -730,13 +728,13 @@ func getServiceForDeployment(rc *RemoteController, deployment *k8sAppsV1.Deploym
return nil
}

cachedService := rc.ServiceController.Cache.Get(deployment.Namespace)
cachedServices := rc.ServiceController.Cache.Get(deployment.Namespace)

if cachedService == nil {
if cachedServices == nil {
return nil
}
var matchedService *k8sV1.Service
for _, service := range cachedService.Service[deployment.Namespace] {
for _, service := range cachedServices {
var match = true
for lkey, lvalue := range service.Spec.Selector {
value, ok := deployment.Spec.Selector.MatchLabels[lkey]
Expand Down Expand Up @@ -795,9 +793,9 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin
if rollout == nil {
return nil
}
cachedService := rc.ServiceController.Cache.Get(rollout.Namespace)
cachedServices := rc.ServiceController.Cache.Get(rollout.Namespace)

if cachedService == nil {
if cachedServices == nil {
return nil
}
rolloutStrategy := rollout.Spec.Strategy
Expand Down Expand Up @@ -875,18 +873,7 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin

var matchedServices = make(map[string]*WeightedService)

//if we have more than one matching service we will pick the first one, for this to be deterministic we sort services
var servicesInNamespace = cachedService.Service[rollout.Namespace]

servicesOrdered := make([]string, 0, len(servicesInNamespace))
for k := range servicesInNamespace {
servicesOrdered = append(servicesOrdered, k)
}

sort.Strings(servicesOrdered)

for _, s := range servicesOrdered {
var service = servicesInNamespace[s]
for _, service := range cachedServices {
var match = true
//skip services that are not referenced in the rollout
if len(blueGreenActiveService) > 0 && service.ObjectMeta.Name != blueGreenActiveService && service.ObjectMeta.Name != blueGreenPreviewService {
Expand All @@ -897,7 +884,7 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin
for lkey, lvalue := range service.Spec.Selector {
// Rollouts controller adds a dynamic label with name rollouts-pod-template-hash to both active and passive replicasets.
// This dynamic label is not available on the rollout template. Hence ignoring the label with name rollouts-pod-template-hash
if lkey == ROLLOUT_POD_HASH_LABEL {
if lkey == common.RolloutPodHashLabel {
continue
}
value, ok := rollout.Spec.Selector.MatchLabels[lkey]
Expand Down
81 changes: 40 additions & 41 deletions admiral/pkg/clusters/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,10 +657,10 @@ func TestHandleVirtualServiceEvent(t *testing.T) {

func TestGetServiceForRolloutCanary(t *testing.T) {
//Struct of test case info. Name is required.
const NAMESPACE = "namespace"
const SERVICENAME = "serviceName"
const STABLESERVICENAME = "stableserviceName"
const CANARYSERVICENAME = "canaryserviceName"
const Namespace = "namespace"
const ServiceName = "serviceName"
const StableServiceName = "stableserviceName"
const CanaryServiceName = "canaryserviceName"
const VS_NAME_1 = "virtualservice1"
const VS_NAME_2 = "virtualservice2"
const VS_NAME_3 = "virtualservice3"
Expand Down Expand Up @@ -693,12 +693,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
selectorMap["app"] = "test"

service := &coreV1.Service{
ObjectMeta: v12.ObjectMeta{Name: ServiceName, Namespace: Namespace, CreationTimestamp: v12.NewTime(time.Now())},
Spec: coreV1.ServiceSpec{
Selector: selectorMap,
},
}
service.Name = SERVICENAME
service.Namespace = NAMESPACE
port1 := coreV1.ServicePort{
Port: 8080,
}
Expand All @@ -711,15 +710,15 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
service.Spec.Ports = ports

stableService := &coreV1.Service{
ObjectMeta: v12.ObjectMeta{Name: STABLESERVICENAME, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: StableServiceName, Namespace: Namespace, CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-15)))},
Spec: coreV1.ServiceSpec{
Selector: selectorMap,
Ports: ports,
},
}

canaryService := &coreV1.Service{
ObjectMeta: v12.ObjectMeta{Name: CANARYSERVICENAME, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: CanaryServiceName, Namespace: Namespace, CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-15)))},
Spec: coreV1.ServiceSpec{
Selector: selectorMap,
Ports: ports,
Expand Down Expand Up @@ -777,38 +776,38 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
rcTemp.ServiceController.Cache.Put(canaryService)

virtualService := &v1alpha32.VirtualService{
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_1, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_1, Namespace: Namespace},
Spec: v1alpha3.VirtualService{
Http: []*v1alpha3.HTTPRoute{{Route: []*v1alpha3.HTTPRouteDestination{
{Destination: &v1alpha3.Destination{Host: STABLESERVICENAME}, Weight: 80},
{Destination: &v1alpha3.Destination{Host: CANARYSERVICENAME}, Weight: 20},
{Destination: &v1alpha3.Destination{Host: StableServiceName}, Weight: 80},
{Destination: &v1alpha3.Destination{Host: CanaryServiceName}, Weight: 20},
}}},
},
}

vsMutipleRoutesWithMatch := &v1alpha32.VirtualService{
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_2, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_2, Namespace: Namespace},
Spec: v1alpha3.VirtualService{
Http: []*v1alpha3.HTTPRoute{{Name: VS_ROUTE_PRIMARY, Route: []*v1alpha3.HTTPRouteDestination{
{Destination: &v1alpha3.Destination{Host: STABLESERVICENAME}, Weight: 80},
{Destination: &v1alpha3.Destination{Host: CANARYSERVICENAME}, Weight: 20},
{Destination: &v1alpha3.Destination{Host: StableServiceName}, Weight: 80},
{Destination: &v1alpha3.Destination{Host: CanaryServiceName}, Weight: 20},
}}},
},
}

vsMutipleRoutesWithZeroWeight := &v1alpha32.VirtualService{
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_4, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_4, Namespace: Namespace},
Spec: v1alpha3.VirtualService{
Http: []*v1alpha3.HTTPRoute{{Name: "random", Route: []*v1alpha3.HTTPRouteDestination{
{Destination: &v1alpha3.Destination{Host: STABLESERVICENAME}, Weight: 100},
{Destination: &v1alpha3.Destination{Host: CANARYSERVICENAME}, Weight: 0},
{Destination: &v1alpha3.Destination{Host: StableServiceName}, Weight: 100},
{Destination: &v1alpha3.Destination{Host: CanaryServiceName}, Weight: 0},
}}},
},
}

rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(NAMESPACE).Create(virtualService)
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(NAMESPACE).Create(vsMutipleRoutesWithMatch)
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(NAMESPACE).Create(vsMutipleRoutesWithZeroWeight)
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(Namespace).Create(virtualService)
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(Namespace).Create(vsMutipleRoutesWithMatch)
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(Namespace).Create(vsMutipleRoutesWithZeroWeight)

canaryRollout := argo.Rollout{
Spec: argo.RolloutSpec{Template: coreV1.PodTemplateSpec{
Expand All @@ -822,7 +821,7 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}
canaryRollout.Spec.Selector = &labelSelector

canaryRollout.Namespace = NAMESPACE
canaryRollout.Namespace = Namespace
canaryRollout.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{},
}
Expand Down Expand Up @@ -872,11 +871,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}}}
canaryRolloutIstioVs.Spec.Selector = &labelSelector

canaryRolloutIstioVs.Namespace = NAMESPACE
canaryRolloutIstioVs.Namespace = Namespace
canaryRolloutIstioVs.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{
StableService: STABLESERVICENAME,
CanaryService: CANARYSERVICENAME,
StableService: StableServiceName,
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: argo.IstioVirtualService{Name: VS_NAME_1},
Expand All @@ -891,11 +890,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}}}
canaryRolloutIstioVsRouteMatch.Spec.Selector = &labelSelector

canaryRolloutIstioVsRouteMatch.Namespace = NAMESPACE
canaryRolloutIstioVsRouteMatch.Namespace = Namespace
canaryRolloutIstioVsRouteMatch.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{
StableService: STABLESERVICENAME,
CanaryService: CANARYSERVICENAME,
StableService: StableServiceName,
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: argo.IstioVirtualService{Name: VS_NAME_2, Routes: []string{VS_ROUTE_PRIMARY}},
Expand All @@ -910,11 +909,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}}}
canaryRolloutIstioVsRouteMisMatch.Spec.Selector = &labelSelector

canaryRolloutIstioVsRouteMisMatch.Namespace = NAMESPACE
canaryRolloutIstioVsRouteMisMatch.Namespace = Namespace
canaryRolloutIstioVsRouteMisMatch.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{
StableService: STABLESERVICENAME,
CanaryService: CANARYSERVICENAME,
StableService: StableServiceName,
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: argo.IstioVirtualService{Name: VS_NAME_3, Routes: []string{"random"}},
Expand All @@ -929,11 +928,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}}}
canaryRolloutIstioVsZeroWeight.Spec.Selector = &labelSelector

canaryRolloutIstioVsZeroWeight.Namespace = NAMESPACE
canaryRolloutIstioVsZeroWeight.Namespace = Namespace
canaryRolloutIstioVsZeroWeight.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{
StableService: STABLESERVICENAME,
CanaryService: CANARYSERVICENAME,
StableService: StableServiceName,
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: argo.IstioVirtualService{Name: VS_NAME_4},
Expand All @@ -948,11 +947,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}}}
canaryRolloutIstioVsMimatch.Spec.Selector = &labelSelector

canaryRolloutIstioVsMimatch.Namespace = NAMESPACE
canaryRolloutIstioVsMimatch.Namespace = Namespace
canaryRolloutIstioVsMimatch.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{
StableService: STABLESERVICENAME,
CanaryService: CANARYSERVICENAME,
StableService: StableServiceName,
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: argo.IstioVirtualService{Name: "random"},
Expand All @@ -963,14 +962,14 @@ func TestGetServiceForRolloutCanary(t *testing.T) {

resultForDummy := map[string]*WeightedService{"dummy": {Weight: 1, Service: service1}}

resultForRandomMatch := map[string]*WeightedService{CANARYSERVICENAME: {Weight: 1, Service: canaryService}}
resultForRandomMatch := map[string]*WeightedService{CanaryServiceName: {Weight: 1, Service: canaryService}}

resultForStableServiceOnly := map[string]*WeightedService{STABLESERVICENAME: {Weight: 1, Service: stableService}}
resultForStableServiceOnly := map[string]*WeightedService{StableServiceName: {Weight: 1, Service: stableService}}

resultForCanaryWithIstio := map[string]*WeightedService{STABLESERVICENAME: {Weight: 80, Service: stableService},
CANARYSERVICENAME: {Weight: 20, Service: canaryService}}
resultForCanaryWithIstio := map[string]*WeightedService{StableServiceName: {Weight: 80, Service: stableService},
CanaryServiceName: {Weight: 20, Service: canaryService}}

resultForCanaryWithStableService := map[string]*WeightedService{STABLESERVICENAME: {Weight: 100, Service: stableService}}
resultForCanaryWithStableService := map[string]*WeightedService{StableServiceName: {Weight: 100, Service: stableService}}

testCases := []struct {
name string
Expand Down
65 changes: 33 additions & 32 deletions admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,71 +120,72 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste

var err error

log.Infof("starting global traffic policy controller custerID: %v", clusterID)

rc.GlobalTraffic, err = admiral.NewGlobalTrafficController(clusterID, stop, &GlobalTrafficHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting service controller clusterID: %v", clusterID)
rc.ServiceController, err = admiral.NewServiceController(clusterID, stop, &ServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with GlobalTrafficController controller init: %v", err)
return fmt.Errorf("error with ServiceController controller init: %v", err)
}

log.Infof("starting deployment controller clusterID: %v", clusterID)
rc.DeploymentController, err = admiral.NewDeploymentController(clusterID, stop, &DeploymentHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting global traffic policy controller custerID: %v", clusterID)

rc.GlobalTraffic, err = admiral.NewGlobalTrafficController(clusterID, stop, &GlobalTrafficHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with DeploymentController controller init: %v", err)
return fmt.Errorf("error with GlobalTrafficController controller init: %v", err)
}

if r.AdmiralCache == nil {
log.Warn("admiral cache was nil!")
} else if r.AdmiralCache.argoRolloutsEnabled {
log.Infof("starting rollout controller clusterID: %v", clusterID)
rc.RolloutController, err = admiral.NewRolloutsController(clusterID, stop, &RolloutHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with Rollout controller init: %v", err)
}
}

log.Infof("starting node controller clusterID: %v", clusterID)
rc.NodeController, err = admiral.NewNodeController(clusterID, stop, &NodeHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig)

if err != nil {
return fmt.Errorf(" Error with NodeController controller init: %v", err)
return fmt.Errorf("error with NodeController controller init: %v", err)
}

log.Infof("starting service controller clusterID: %v", clusterID)
rc.ServiceController, err = admiral.NewServiceController(clusterID, stop, &ServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting service entry controller for custerID: %v", clusterID)
rc.ServiceEntryController, err = istio.NewServiceEntryController(clusterID, stop, &ServiceEntryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with ServiceController controller init: %v", err)
return fmt.Errorf("error with ServiceEntryController init: %v", err)
}

log.Infof("starting service entry controller for custerID: %v", clusterID)
rc.ServiceEntryController, err = istio.NewServiceEntryController(clusterID, stop, &ServiceEntryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting destination rule controller for custerID: %v", clusterID)
rc.DestinationRuleController, err = istio.NewDestinationRuleController(clusterID, stop, &DestinationRuleHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with ServiceEntryController init: %v", err)
return fmt.Errorf("error with DestinationRuleController init: %v", err)
}

log.Infof("starting destination rule controller for custerID: %v", clusterID)
rc.DestinationRuleController, err = istio.NewDestinationRuleController(clusterID, stop, &DestinationRuleHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting virtual service controller for custerID: %v", clusterID)
rc.VirtualServiceController, err = istio.NewVirtualServiceController(clusterID, stop, &VirtualServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with DestinationRuleController init: %v", err)
return fmt.Errorf("error with VirtualServiceController init: %v", err)
}

log.Infof("starting virtual service controller for custerID: %v", clusterID)
rc.VirtualServiceController, err = istio.NewVirtualServiceController(clusterID, stop, &VirtualServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.SidecarController, err = istio.NewSidecarController(clusterID, stop, &SidecarHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with VirtualServiceController init: %v", err)
return fmt.Errorf("error with DestinationRuleController init: %v", err)
}

rc.SidecarController, err = istio.NewSidecarController(clusterID, stop, &SidecarHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting deployment controller clusterID: %v", clusterID)
rc.DeploymentController, err = admiral.NewDeploymentController(clusterID, stop, &DeploymentHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with DestinationRuleController init: %v", err)
return fmt.Errorf("error with DeploymentController controller init: %v", err)
}

if r.AdmiralCache == nil {
log.Warn("admiral cache was nil!")
} else if r.AdmiralCache.argoRolloutsEnabled {
log.Infof("starting rollout controller clusterID: %v", clusterID)
rc.RolloutController, err = admiral.NewRolloutsController(clusterID, stop, &RolloutHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf("error with Rollout controller init: %v", err)
}
}

r.Lock()
Expand Down
Loading

0 comments on commit eeefffa

Please sign in to comment.