Skip to content

Commit

Permalink
MESH-1970 Handle k8s service events and trigger SE generation (istio-…
Browse files Browse the repository at this point in the history
  • Loading branch information
aattuluri authored and GitHub Enterprise committed Jul 5, 2022
1 parent 6f665b4 commit d11568d
Show file tree
Hide file tree
Showing 13 changed files with 283 additions and 258 deletions.
27 changes: 7 additions & 20 deletions admiral/pkg/clusters/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net"
"reflect"
"sort"
"strings"
"time"

Expand All @@ -25,7 +24,6 @@ import (
)

const (
ROLLOUT_POD_HASH_LABEL string = "rollouts-pod-template-hash"
DefaultBaseEjectionTime int64 = 300
DefaultConsecutiveGatewayErrors uint32 = 50
DefaultInterval int64 = 60
Expand Down Expand Up @@ -774,13 +772,13 @@ func getServiceForDeployment(rc *RemoteController, deployment *k8sAppsV1.Deploym
return nil
}

cachedService := rc.ServiceController.Cache.Get(deployment.Namespace)
cachedServices := rc.ServiceController.Cache.Get(deployment.Namespace)

if cachedService == nil {
if cachedServices == nil {
return nil
}
var matchedService *k8sV1.Service
for _, service := range cachedService.Service[deployment.Namespace] {
for _, service := range cachedServices {
var match = true
for lkey, lvalue := range service.Spec.Selector {
value, ok := deployment.Spec.Selector.MatchLabels[lkey]
Expand Down Expand Up @@ -846,9 +844,9 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin
return nil
}

cachedService := rc.ServiceController.Cache.Get(rollout.Namespace)
cachedServices := rc.ServiceController.Cache.Get(rollout.Namespace)

if cachedService == nil {
if cachedServices == nil {
return nil
}
rolloutStrategy := rollout.Spec.Strategy
Expand Down Expand Up @@ -926,18 +924,7 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin

var matchedServices = make(map[string]*WeightedService)

//if we have more than one matching service we will pick the first one, for this to be deterministic we sort services
var servicesInNamespace = cachedService.Service[rollout.Namespace]

servicesOrdered := make([]string, 0, len(servicesInNamespace))
for k := range servicesInNamespace {
servicesOrdered = append(servicesOrdered, k)
}

sort.Strings(servicesOrdered)

for _, s := range servicesOrdered {
var service = servicesInNamespace[s]
for _, service := range cachedServices {
var match = true
//skip services that are not referenced in the rollout
if len(blueGreenActiveService) > 0 && service.ObjectMeta.Name != blueGreenActiveService && service.ObjectMeta.Name != blueGreenPreviewService {
Expand All @@ -948,7 +935,7 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin
for lkey, lvalue := range service.Spec.Selector {
// Rollouts controller adds a dynamic label with name rollouts-pod-template-hash to both active and passive replicasets.
// This dynamic label is not available on the rollout template. Hence ignoring the label with name rollouts-pod-template-hash
if lkey == ROLLOUT_POD_HASH_LABEL {
if lkey == common.RolloutPodHashLabel {
continue
}
value, ok := rollout.Spec.Selector.MatchLabels[lkey]
Expand Down
81 changes: 40 additions & 41 deletions admiral/pkg/clusters/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,10 +658,10 @@ func TestHandleVirtualServiceEvent(t *testing.T) {

func TestGetServiceForRolloutCanary(t *testing.T) {
//Struct of test case info. Name is required.
const NAMESPACE = "namespace"
const SERVICENAME = "serviceName"
const STABLESERVICENAME = "stableserviceName"
const CANARYSERVICENAME = "canaryserviceName"
const Namespace = "namespace"
const ServiceName = "serviceName"
const StableServiceName = "stableserviceName"
const CanaryServiceName = "canaryserviceName"
const VS_NAME_1 = "virtualservice1"
const VS_NAME_2 = "virtualservice2"
const VS_NAME_3 = "virtualservice3"
Expand Down Expand Up @@ -694,12 +694,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
selectorMap["app"] = "test"

service := &coreV1.Service{
ObjectMeta: v12.ObjectMeta{Name: ServiceName, Namespace: Namespace, CreationTimestamp: v12.NewTime(time.Now())},
Spec: coreV1.ServiceSpec{
Selector: selectorMap,
},
}
service.Name = SERVICENAME
service.Namespace = NAMESPACE
port1 := coreV1.ServicePort{
Port: 8080,
}
Expand All @@ -712,15 +711,15 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
service.Spec.Ports = ports

stableService := &coreV1.Service{
ObjectMeta: v12.ObjectMeta{Name: STABLESERVICENAME, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: StableServiceName, Namespace: Namespace, CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-15)))},
Spec: coreV1.ServiceSpec{
Selector: selectorMap,
Ports: ports,
},
}

canaryService := &coreV1.Service{
ObjectMeta: v12.ObjectMeta{Name: CANARYSERVICENAME, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: CanaryServiceName, Namespace: Namespace, CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-15)))},
Spec: coreV1.ServiceSpec{
Selector: selectorMap,
Ports: ports,
Expand Down Expand Up @@ -778,38 +777,38 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
rcTemp.ServiceController.Cache.Put(canaryService)

virtualService := &v1alpha32.VirtualService{
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_1, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_1, Namespace: Namespace},
Spec: v1alpha3.VirtualService{
Http: []*v1alpha3.HTTPRoute{{Route: []*v1alpha3.HTTPRouteDestination{
{Destination: &v1alpha3.Destination{Host: STABLESERVICENAME}, Weight: 80},
{Destination: &v1alpha3.Destination{Host: CANARYSERVICENAME}, Weight: 20},
{Destination: &v1alpha3.Destination{Host: StableServiceName}, Weight: 80},
{Destination: &v1alpha3.Destination{Host: CanaryServiceName}, Weight: 20},
}}},
},
}

vsMutipleRoutesWithMatch := &v1alpha32.VirtualService{
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_2, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_2, Namespace: Namespace},
Spec: v1alpha3.VirtualService{
Http: []*v1alpha3.HTTPRoute{{Name: VS_ROUTE_PRIMARY, Route: []*v1alpha3.HTTPRouteDestination{
{Destination: &v1alpha3.Destination{Host: STABLESERVICENAME}, Weight: 80},
{Destination: &v1alpha3.Destination{Host: CANARYSERVICENAME}, Weight: 20},
{Destination: &v1alpha3.Destination{Host: StableServiceName}, Weight: 80},
{Destination: &v1alpha3.Destination{Host: CanaryServiceName}, Weight: 20},
}}},
},
}

vsMutipleRoutesWithZeroWeight := &v1alpha32.VirtualService{
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_4, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: VS_NAME_4, Namespace: Namespace},
Spec: v1alpha3.VirtualService{
Http: []*v1alpha3.HTTPRoute{{Name: "random", Route: []*v1alpha3.HTTPRouteDestination{
{Destination: &v1alpha3.Destination{Host: STABLESERVICENAME}, Weight: 100},
{Destination: &v1alpha3.Destination{Host: CANARYSERVICENAME}, Weight: 0},
{Destination: &v1alpha3.Destination{Host: StableServiceName}, Weight: 100},
{Destination: &v1alpha3.Destination{Host: CanaryServiceName}, Weight: 0},
}}},
},
}

rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(NAMESPACE).Create(virtualService)
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(NAMESPACE).Create(vsMutipleRoutesWithMatch)
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(NAMESPACE).Create(vsMutipleRoutesWithZeroWeight)
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(Namespace).Create(virtualService)
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(Namespace).Create(vsMutipleRoutesWithMatch)
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(Namespace).Create(vsMutipleRoutesWithZeroWeight)

canaryRollout := argo.Rollout{
Spec: argo.RolloutSpec{Template: coreV1.PodTemplateSpec{
Expand All @@ -823,7 +822,7 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}
canaryRollout.Spec.Selector = &labelSelector

canaryRollout.Namespace = NAMESPACE
canaryRollout.Namespace = Namespace
canaryRollout.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{},
}
Expand Down Expand Up @@ -873,11 +872,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}}}
canaryRolloutIstioVs.Spec.Selector = &labelSelector

canaryRolloutIstioVs.Namespace = NAMESPACE
canaryRolloutIstioVs.Namespace = Namespace
canaryRolloutIstioVs.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{
StableService: STABLESERVICENAME,
CanaryService: CANARYSERVICENAME,
StableService: StableServiceName,
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: argo.IstioVirtualService{Name: VS_NAME_1},
Expand All @@ -892,11 +891,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}}}
canaryRolloutIstioVsRouteMatch.Spec.Selector = &labelSelector

canaryRolloutIstioVsRouteMatch.Namespace = NAMESPACE
canaryRolloutIstioVsRouteMatch.Namespace = Namespace
canaryRolloutIstioVsRouteMatch.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{
StableService: STABLESERVICENAME,
CanaryService: CANARYSERVICENAME,
StableService: StableServiceName,
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: argo.IstioVirtualService{Name: VS_NAME_2, Routes: []string{VS_ROUTE_PRIMARY}},
Expand All @@ -911,11 +910,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}}}
canaryRolloutIstioVsRouteMisMatch.Spec.Selector = &labelSelector

canaryRolloutIstioVsRouteMisMatch.Namespace = NAMESPACE
canaryRolloutIstioVsRouteMisMatch.Namespace = Namespace
canaryRolloutIstioVsRouteMisMatch.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{
StableService: STABLESERVICENAME,
CanaryService: CANARYSERVICENAME,
StableService: StableServiceName,
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: argo.IstioVirtualService{Name: VS_NAME_3, Routes: []string{"random"}},
Expand All @@ -930,11 +929,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}}}
canaryRolloutIstioVsZeroWeight.Spec.Selector = &labelSelector

canaryRolloutIstioVsZeroWeight.Namespace = NAMESPACE
canaryRolloutIstioVsZeroWeight.Namespace = Namespace
canaryRolloutIstioVsZeroWeight.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{
StableService: STABLESERVICENAME,
CanaryService: CANARYSERVICENAME,
StableService: StableServiceName,
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: argo.IstioVirtualService{Name: VS_NAME_4},
Expand All @@ -949,11 +948,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
}}}
canaryRolloutIstioVsMimatch.Spec.Selector = &labelSelector

canaryRolloutIstioVsMimatch.Namespace = NAMESPACE
canaryRolloutIstioVsMimatch.Namespace = Namespace
canaryRolloutIstioVsMimatch.Spec.Strategy = argo.RolloutStrategy{
Canary: &argo.CanaryStrategy{
StableService: STABLESERVICENAME,
CanaryService: CANARYSERVICENAME,
StableService: StableServiceName,
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: argo.IstioVirtualService{Name: "random"},
Expand All @@ -964,14 +963,14 @@ func TestGetServiceForRolloutCanary(t *testing.T) {

resultForDummy := map[string]*WeightedService{"dummy": {Weight: 1, Service: service1}}

resultForRandomMatch := map[string]*WeightedService{CANARYSERVICENAME: {Weight: 1, Service: canaryService}}
resultForRandomMatch := map[string]*WeightedService{CanaryServiceName: {Weight: 1, Service: canaryService}}

resultForStableServiceOnly := map[string]*WeightedService{STABLESERVICENAME: {Weight: 1, Service: stableService}}
resultForStableServiceOnly := map[string]*WeightedService{StableServiceName: {Weight: 1, Service: stableService}}

resultForCanaryWithIstio := map[string]*WeightedService{STABLESERVICENAME: {Weight: 80, Service: stableService},
CANARYSERVICENAME: {Weight: 20, Service: canaryService}}
resultForCanaryWithIstio := map[string]*WeightedService{StableServiceName: {Weight: 80, Service: stableService},
CanaryServiceName: {Weight: 20, Service: canaryService}}

resultForCanaryWithStableService := map[string]*WeightedService{STABLESERVICENAME: {Weight: 100, Service: stableService}}
resultForCanaryWithStableService := map[string]*WeightedService{StableServiceName: {Weight: 100, Service: stableService}}

testCases := []struct {
name string
Expand Down
66 changes: 33 additions & 33 deletions admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ const (
LogErrFormat = "op=%s type=%v name=%v cluster=%s, e=%v"
)


func InitAdmiral(ctx context.Context, params common.AdmiralParams) (*RemoteRegistry, error) {

log.Infof("Initializing Admiral with params: %v", params)
Expand Down Expand Up @@ -127,71 +126,72 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste

var err error

log.Infof("starting global traffic policy controller custerID: %v", clusterID)

rc.GlobalTraffic, err = admiral.NewGlobalTrafficController(stop, &GlobalTrafficHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting service controller clusterID: %v", clusterID)
rc.ServiceController, err = admiral.NewServiceController(stop, &ServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with GlobalTrafficController controller init: %v", err)
return fmt.Errorf("error with ServiceController controller init: %v", err)
}

log.Infof("starting deployment controller clusterID: %v", clusterID)
rc.DeploymentController, err = admiral.NewDeploymentController(stop, &DeploymentHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting global traffic policy controller custerID: %v", clusterID)

rc.GlobalTraffic, err = admiral.NewGlobalTrafficController(stop, &GlobalTrafficHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with DeploymentController controller init: %v", err)
return fmt.Errorf("error with GlobalTrafficController controller init: %v", err)
}

if r.AdmiralCache == nil {
log.Warn("admiral cache was nil!")
} else if r.AdmiralCache.argoRolloutsEnabled {
log.Infof("starting rollout controller clusterID: %v", clusterID)
rc.RolloutController, err = admiral.NewRolloutsController(stop, &RolloutHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with Rollout controller init: %v", err)
}
}

log.Infof("starting node controller clusterID: %v", clusterID)
rc.NodeController, err = admiral.NewNodeController(stop, &NodeHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig)

if err != nil {
return fmt.Errorf(" Error with NodeController controller init: %v", err)
return fmt.Errorf("error with NodeController controller init: %v", err)
}

log.Infof("starting service controller clusterID: %v", clusterID)
rc.ServiceController, err = admiral.NewServiceController(stop, &ServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting service entry controller for custerID: %v", clusterID)
rc.ServiceEntryController, err = istio.NewServiceEntryController(stop, &ServiceEntryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with ServiceController controller init: %v", err)
return fmt.Errorf("error with ServiceEntryController init: %v", err)
}

log.Infof("starting service entry controller for custerID: %v", clusterID)
rc.ServiceEntryController, err = istio.NewServiceEntryController(stop, &ServiceEntryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting destination rule controller for custerID: %v", clusterID)
rc.DestinationRuleController, err = istio.NewDestinationRuleController(stop, &DestinationRuleHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with ServiceEntryController init: %v", err)
return fmt.Errorf("error with DestinationRuleController init: %v", err)
}

log.Infof("starting destination rule controller for custerID: %v", clusterID)
rc.DestinationRuleController, err = istio.NewDestinationRuleController(stop, &DestinationRuleHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting virtual service controller for custerID: %v", clusterID)
rc.VirtualServiceController, err = istio.NewVirtualServiceController(stop, &VirtualServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with DestinationRuleController init: %v", err)
return fmt.Errorf("error with VirtualServiceController init: %v", err)
}

log.Infof("starting virtual service controller for custerID: %v", clusterID)
rc.VirtualServiceController, err = istio.NewVirtualServiceController(stop, &VirtualServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.SidecarController, err = istio.NewSidecarController(stop, &SidecarHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with VirtualServiceController init: %v", err)
return fmt.Errorf("error with DestinationRuleController init: %v", err)
}

rc.SidecarController, err = istio.NewSidecarController(stop, &SidecarHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting deployment controller clusterID: %v", clusterID)
rc.DeploymentController, err = admiral.NewDeploymentController(stop, &DeploymentHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with DestinationRuleController init: %v", err)
return fmt.Errorf("error with DeploymentController controller init: %v", err)
}

if r.AdmiralCache == nil {
log.Warn("admiral cache was nil!")
} else if r.AdmiralCache.argoRolloutsEnabled {
log.Infof("starting rollout controller clusterID: %v", clusterID)
rc.RolloutController, err = admiral.NewRolloutsController(stop, &RolloutHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf("error with Rollout controller init: %v", err)
}
}

r.Lock()
Expand Down
Loading

0 comments on commit d11568d

Please sign in to comment.