Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle k8s service events and trigger SE generation #238

Merged
merged 4 commits into from
Jul 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 7 additions & 20 deletions admiral/pkg/clusters/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net"
"reflect"
"sort"
"strings"
"time"

Expand All @@ -25,7 +24,6 @@ import (
)

const (
ROLLOUT_POD_HASH_LABEL string = "rollouts-pod-template-hash"
DefaultBaseEjectionTime int64 = 300
DefaultConsecutiveGatewayErrors uint32 = 50
DefaultInterval int64 = 60
Expand Down Expand Up @@ -730,13 +728,13 @@ func getServiceForDeployment(rc *RemoteController, deployment *k8sAppsV1.Deploym
return nil
}

cachedService := rc.ServiceController.Cache.Get(deployment.Namespace)
cachedServices := rc.ServiceController.Cache.Get(deployment.Namespace)

if cachedService == nil {
if cachedServices == nil {
return nil
}
var matchedService *k8sV1.Service
for _, service := range cachedService.Service[deployment.Namespace] {
for _, service := range cachedServices {
var match = true
for lkey, lvalue := range service.Spec.Selector {
value, ok := deployment.Spec.Selector.MatchLabels[lkey]
Expand Down Expand Up @@ -795,9 +793,9 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin
if rollout == nil {
return nil
}
cachedService := rc.ServiceController.Cache.Get(rollout.Namespace)
cachedServices := rc.ServiceController.Cache.Get(rollout.Namespace)

if cachedService == nil {
if cachedServices == nil {
return nil
}
rolloutStrategy := rollout.Spec.Strategy
Expand Down Expand Up @@ -875,18 +873,7 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin

var matchedServices = make(map[string]*WeightedService)

//if we have more than one matching service we will pick the first one, for this to be deterministic we sort services
var servicesInNamespace = cachedService.Service[rollout.Namespace]

servicesOrdered := make([]string, 0, len(servicesInNamespace))
for k := range servicesInNamespace {
servicesOrdered = append(servicesOrdered, k)
}

sort.Strings(servicesOrdered)

for _, s := range servicesOrdered {
var service = servicesInNamespace[s]
for _, service := range cachedServices {
var match = true
//skip services that are not referenced in the rollout
if len(blueGreenActiveService) > 0 && service.ObjectMeta.Name != blueGreenActiveService && service.ObjectMeta.Name != blueGreenPreviewService {
Expand All @@ -897,7 +884,7 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin
for lkey, lvalue := range service.Spec.Selector {
// Rollouts controller adds a dynamic label with name rollouts-pod-template-hash to both active and passive replicasets.
// This dynamic label is not available on the rollout template. Hence ignoring the label with name rollouts-pod-template-hash
if lkey == ROLLOUT_POD_HASH_LABEL {
if lkey == common.RolloutPodHashLabel {
continue
}
value, ok := rollout.Spec.Selector.MatchLabels[lkey]
Expand Down
81 changes: 40 additions & 41 deletions admiral/pkg/clusters/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,10 +657,10 @@ func TestHandleVirtualServiceEvent(t *testing.T) {

func TestGetServiceForRolloutCanary(t *testing.T) {
//Struct of test case info. Name is required.
const NAMESPACE = "namespace"
const SERVICENAME = "serviceName"
const STABLESERVICENAME = "stableserviceName"
const CANARYSERVICENAME = "canaryserviceName"
const Namespace = "namespace"
const ServiceName = "serviceName"
const StableServiceName = "stableserviceName"
const CanaryServiceName = "canaryserviceName"
const VS_NAME_1 = "virtualservice1"
const VS_NAME_2 = "virtualservice2"
const VS_NAME_3 = "virtualservice3"
Expand Down Expand Up @@ -693,12 +693,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
selectorMap["app"] = "test"

service := &coreV1.Service{
ObjectMeta: v12.ObjectMeta{Name: ServiceName, Namespace: Namespace, CreationTimestamp: v12.NewTime(time.Now())},
Spec: coreV1.ServiceSpec{
Selector: selectorMap,
},
}
service.Name = SERVICENAME
service.Namespace = NAMESPACE
port1 := coreV1.ServicePort{
Port: 8080,
}
Expand All @@ -711,15 +710,15 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
service.Spec.Ports = ports

stableService := &coreV1.Service{
ObjectMeta: v12.ObjectMeta{Name: STABLESERVICENAME, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: StableServiceName, Namespace: Namespace, CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-15)))},
Spec: coreV1.ServiceSpec{
Selector: selectorMap,
Ports: ports,
},
}

canaryService := &coreV1.Service{
ObjectMeta: v12.ObjectMeta{Name: CANARYSERVICENAME, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: CanaryServiceName, Namespace: Namespace, CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-15)))},
Spec: coreV1.ServiceSpec{
Selector: selectorMap,
Ports: ports,
Expand Down Expand Up @@ -777,38 +776,38 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
rcTemp.ServiceController.Cache.Put(canaryService)

virtualService := &v1alpha32.VirtualService{
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_1, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_1, Namespace: Namespace},
Spec: v1alpha3.VirtualService{
Http: []*v1alpha3.HTTPRoute{{Route: []*v1alpha3.HTTPRouteDestination{
{Destination: &v1alpha3.Destination{Host: STABLESERVICENAME}, Weight: 80},
{Destination: &v1alpha3.Destination{Host: CANARYSERVICENAME}, Weight: 20},
{Destination: &v1alpha3.Destination{Host: StableServiceName}, Weight: 80},
{Destination: &v1alpha3.Destination{Host: CanaryServiceName}, Weight: 20},
}}},
},
}

vsMutipleRoutesWithMatch := &v1alpha32.VirtualService{
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_2, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_2, Namespace: Namespace},
Spec: v1alpha3.VirtualService{
Http: []*v1alpha3.HTTPRoute{{Name: VS_ROUTE_PRIMARY, Route: []*v1alpha3.HTTPRouteDestination{
{Destination: &v1alpha3.Destination{Host: STABLESERVICENAME}, Weight: 80},
{Destination: &v1alpha3.Destination{Host: CANARYSERVICENAME}, Weight: 20},
{Destination: &v1alpha3.Destination{Host: StableServiceName}, Weight: 80},
{Destination: &v1alpha3.Destination{Host: CanaryServiceName}, Weight: 20},
}}},
},
}

vsMutipleRoutesWithZeroWeight := &v1alpha32.VirtualService{
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_4, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_4, Namespace: Namespace},
Spec: v1alpha3.VirtualService{
Http: []*v1alpha3.HTTPRoute{{Name: "random", Route: []*v1alpha3.HTTPRouteDestination{
{Destination: &v1alpha3.Destination{Host: STABLESERVICENAME}, Weight: 100},
{Destination: &v1alpha3.Destination{Host: CANARYSERVICENAME}, Weight: 0},
{Destination: &v1alpha3.Destination{Host: StableServiceName}, Weight: 100},
{Destination: &v1alpha3.Destination{Host: CanaryServiceName}, Weight: 0},
}}},
},
}

rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(NAMESPACE).Create(virtualService)
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(NAMESPACE).Create(vsMutipleRoutesWithMatch)
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(NAMESPACE).Create(vsMutipleRoutesWithZeroWeight)
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(Namespace).Create(virtualService)
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(Namespace).Create(vsMutipleRoutesWithMatch)
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(Namespace).Create(vsMutipleRoutesWithZeroWeight)

canaryRollout := argo.Rollout{
Spec: argo.RolloutSpec{Template: coreV1.PodTemplateSpec{
Expand All @@ -822,7 +821,7 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}
canaryRollout.Spec.Selector = &labelSelector

canaryRollout.Namespace = NAMESPACE
canaryRollout.Namespace = Namespace
canaryRollout.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{},
}
Expand Down Expand Up @@ -872,11 +871,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}}}
canaryRolloutIstioVs.Spec.Selector = &labelSelector

canaryRolloutIstioVs.Namespace = NAMESPACE
canaryRolloutIstioVs.Namespace = Namespace
canaryRolloutIstioVs.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{
StableService: STABLESERVICENAME,
CanaryService: CANARYSERVICENAME,
StableService: StableServiceName,
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: argo.IstioVirtualService{Name: VS_NAME_1},
Expand All @@ -891,11 +890,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}}}
canaryRolloutIstioVsRouteMatch.Spec.Selector = &labelSelector

canaryRolloutIstioVsRouteMatch.Namespace = NAMESPACE
canaryRolloutIstioVsRouteMatch.Namespace = Namespace
canaryRolloutIstioVsRouteMatch.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{
StableService: STABLESERVICENAME,
CanaryService: CANARYSERVICENAME,
StableService: StableServiceName,
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: argo.IstioVirtualService{Name: VS_NAME_2, Routes: []string{VS_ROUTE_PRIMARY}},
Expand All @@ -910,11 +909,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}}}
canaryRolloutIstioVsRouteMisMatch.Spec.Selector = &labelSelector

canaryRolloutIstioVsRouteMisMatch.Namespace = NAMESPACE
canaryRolloutIstioVsRouteMisMatch.Namespace = Namespace
canaryRolloutIstioVsRouteMisMatch.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{
StableService: STABLESERVICENAME,
CanaryService: CANARYSERVICENAME,
StableService: StableServiceName,
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: argo.IstioVirtualService{Name: VS_NAME_3, Routes: []string{"random"}},
Expand All @@ -929,11 +928,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}}}
canaryRolloutIstioVsZeroWeight.Spec.Selector = &labelSelector

canaryRolloutIstioVsZeroWeight.Namespace = NAMESPACE
canaryRolloutIstioVsZeroWeight.Namespace = Namespace
canaryRolloutIstioVsZeroWeight.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{
StableService: STABLESERVICENAME,
CanaryService: CANARYSERVICENAME,
StableService: StableServiceName,
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: argo.IstioVirtualService{Name: VS_NAME_4},
Expand All @@ -948,11 +947,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}}}
canaryRolloutIstioVsMimatch.Spec.Selector = &labelSelector

canaryRolloutIstioVsMimatch.Namespace = NAMESPACE
canaryRolloutIstioVsMimatch.Namespace = Namespace
canaryRolloutIstioVsMimatch.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{
StableService: STABLESERVICENAME,
CanaryService: CANARYSERVICENAME,
StableService: StableServiceName,
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: argo.IstioVirtualService{Name: "random"},
Expand All @@ -963,14 +962,14 @@ func TestGetServiceForRolloutCanary(t *testing.T) {

resultForDummy := map[string]*WeightedService{"dummy": {Weight: 1, Service: service1}}

resultForRandomMatch := map[string]*WeightedService{CANARYSERVICENAME: {Weight: 1, Service: canaryService}}
resultForRandomMatch := map[string]*WeightedService{CanaryServiceName: {Weight: 1, Service: canaryService}}

resultForStableServiceOnly := map[string]*WeightedService{STABLESERVICENAME: {Weight: 1, Service: stableService}}
resultForStableServiceOnly := map[string]*WeightedService{StableServiceName: {Weight: 1, Service: stableService}}

resultForCanaryWithIstio := map[string]*WeightedService{STABLESERVICENAME: {Weight: 80, Service: stableService},
CANARYSERVICENAME: {Weight: 20, Service: canaryService}}
resultForCanaryWithIstio := map[string]*WeightedService{StableServiceName: {Weight: 80, Service: stableService},
CanaryServiceName: {Weight: 20, Service: canaryService}}

resultForCanaryWithStableService := map[string]*WeightedService{STABLESERVICENAME: {Weight: 100, Service: stableService}}
resultForCanaryWithStableService := map[string]*WeightedService{StableServiceName: {Weight: 100, Service: stableService}}

testCases := []struct {
name string
Expand Down
65 changes: 33 additions & 32 deletions admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,71 +120,72 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste

var err error

log.Infof("starting global traffic policy controller custerID: %v", clusterID)

rc.GlobalTraffic, err = admiral.NewGlobalTrafficController(clusterID, stop, &GlobalTrafficHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting service controller clusterID: %v", clusterID)
rc.ServiceController, err = admiral.NewServiceController(clusterID, stop, &ServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with GlobalTrafficController controller init: %v", err)
return fmt.Errorf("error with ServiceController controller init: %v", err)
}

log.Infof("starting deployment controller clusterID: %v", clusterID)
rc.DeploymentController, err = admiral.NewDeploymentController(clusterID, stop, &DeploymentHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting global traffic policy controller custerID: %v", clusterID)

rc.GlobalTraffic, err = admiral.NewGlobalTrafficController(clusterID, stop, &GlobalTrafficHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with DeploymentController controller init: %v", err)
return fmt.Errorf("error with GlobalTrafficController controller init: %v", err)
}

if r.AdmiralCache == nil {
log.Warn("admiral cache was nil!")
} else if r.AdmiralCache.argoRolloutsEnabled {
log.Infof("starting rollout controller clusterID: %v", clusterID)
rc.RolloutController, err = admiral.NewRolloutsController(clusterID, stop, &RolloutHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with Rollout controller init: %v", err)
}
}

log.Infof("starting node controller clusterID: %v", clusterID)
rc.NodeController, err = admiral.NewNodeController(clusterID, stop, &NodeHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig)

if err != nil {
return fmt.Errorf(" Error with NodeController controller init: %v", err)
return fmt.Errorf("error with NodeController controller init: %v", err)
}

log.Infof("starting service controller clusterID: %v", clusterID)
rc.ServiceController, err = admiral.NewServiceController(clusterID, stop, &ServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting service entry controller for custerID: %v", clusterID)
rc.ServiceEntryController, err = istio.NewServiceEntryController(clusterID, stop, &ServiceEntryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with ServiceController controller init: %v", err)
return fmt.Errorf("error with ServiceEntryController init: %v", err)
}

log.Infof("starting service entry controller for custerID: %v", clusterID)
rc.ServiceEntryController, err = istio.NewServiceEntryController(clusterID, stop, &ServiceEntryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting destination rule controller for custerID: %v", clusterID)
rc.DestinationRuleController, err = istio.NewDestinationRuleController(clusterID, stop, &DestinationRuleHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with ServiceEntryController init: %v", err)
return fmt.Errorf("error with DestinationRuleController init: %v", err)
}

log.Infof("starting destination rule controller for custerID: %v", clusterID)
rc.DestinationRuleController, err = istio.NewDestinationRuleController(clusterID, stop, &DestinationRuleHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting virtual service controller for custerID: %v", clusterID)
rc.VirtualServiceController, err = istio.NewVirtualServiceController(clusterID, stop, &VirtualServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with DestinationRuleController init: %v", err)
return fmt.Errorf("error with VirtualServiceController init: %v", err)
}

log.Infof("starting virtual service controller for custerID: %v", clusterID)
rc.VirtualServiceController, err = istio.NewVirtualServiceController(clusterID, stop, &VirtualServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.SidecarController, err = istio.NewSidecarController(clusterID, stop, &SidecarHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with VirtualServiceController init: %v", err)
return fmt.Errorf("error with DestinationRuleController init: %v", err)
}

rc.SidecarController, err = istio.NewSidecarController(clusterID, stop, &SidecarHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting deployment controller clusterID: %v", clusterID)
rc.DeploymentController, err = admiral.NewDeploymentController(clusterID, stop, &DeploymentHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with DestinationRuleController init: %v", err)
return fmt.Errorf("error with DeploymentController controller init: %v", err)
}

if r.AdmiralCache == nil {
log.Warn("admiral cache was nil!")
} else if r.AdmiralCache.argoRolloutsEnabled {
log.Infof("starting rollout controller clusterID: %v", clusterID)
rc.RolloutController, err = admiral.NewRolloutsController(clusterID, stop, &RolloutHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf("error with Rollout controller init: %v", err)
}
}

r.Lock()
Expand Down
Loading