Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle k8s service events and trigger SE generation #238

Merged
merged 4 commits into from
Jul 2, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 7 additions & 20 deletions admiral/pkg/clusters/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net"
"reflect"
"sort"
"strings"
"time"

Expand All @@ -25,7 +24,6 @@ import (
)

const (
ROLLOUT_POD_HASH_LABEL string = "rollouts-pod-template-hash"
DefaultBaseEjectionTime int64 = 300
DefaultConsecutiveGatewayErrors uint32 = 50
DefaultInterval int64 = 60
Expand Down Expand Up @@ -730,13 +728,13 @@ func getServiceForDeployment(rc *RemoteController, deployment *k8sAppsV1.Deploym
return nil
}

cachedService := rc.ServiceController.Cache.Get(deployment.Namespace)
cachedServices := rc.ServiceController.Cache.Get(deployment.Namespace)

if cachedService == nil {
if cachedServices == nil {
return nil
}
var matchedService *k8sV1.Service
for _, service := range cachedService.Service[deployment.Namespace] {
for _, service := range cachedServices {
var match = true
for lkey, lvalue := range service.Spec.Selector {
value, ok := deployment.Spec.Selector.MatchLabels[lkey]
Expand Down Expand Up @@ -795,9 +793,9 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin
if rollout == nil {
return nil
}
cachedService := rc.ServiceController.Cache.Get(rollout.Namespace)
cachedServices := rc.ServiceController.Cache.Get(rollout.Namespace)

if cachedService == nil {
if cachedServices == nil {
return nil
}
rolloutStrategy := rollout.Spec.Strategy
Expand Down Expand Up @@ -875,18 +873,7 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin

var matchedServices = make(map[string]*WeightedService)

//if we have more than one matching service we will pick the first one, for this to be deterministic we sort services
var servicesInNamespace = cachedService.Service[rollout.Namespace]

servicesOrdered := make([]string, 0, len(servicesInNamespace))
for k := range servicesInNamespace {
servicesOrdered = append(servicesOrdered, k)
}

sort.Strings(servicesOrdered)

for _, s := range servicesOrdered {
var service = servicesInNamespace[s]
for _, service := range cachedServices {
var match = true
//skip services that are not referenced in the rollout
if len(blueGreenActiveService) > 0 && service.ObjectMeta.Name != blueGreenActiveService && service.ObjectMeta.Name != blueGreenPreviewService {
Expand All @@ -897,7 +884,7 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin
for lkey, lvalue := range service.Spec.Selector {
// Rollouts controller adds a dynamic label with name rollouts-pod-template-hash to both active and passive replicasets.
// This dynamic label is not available on the rollout template. Hence ignoring the label with name rollouts-pod-template-hash
if lkey == ROLLOUT_POD_HASH_LABEL {
if lkey == common.RolloutPodHashLabel {
continue
}
value, ok := rollout.Spec.Selector.MatchLabels[lkey]
Expand Down
7 changes: 3 additions & 4 deletions admiral/pkg/clusters/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,12 +693,11 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
selectorMap["app"] = "test"

service := &coreV1.Service{
ObjectMeta: v12.ObjectMeta{Name: SERVICENAME, Namespace: NAMESPACE, CreationTimestamp: v12.NewTime(time.Now())},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: ServiceName, Namespace

Spec: coreV1.ServiceSpec{
Selector: selectorMap,
},
}
service.Name = SERVICENAME
service.Namespace = NAMESPACE
port1 := coreV1.ServicePort{
Port: 8080,
}
Expand All @@ -711,15 +710,15 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
service.Spec.Ports = ports

stableService := &coreV1.Service{
ObjectMeta: v12.ObjectMeta{Name: STABLESERVICENAME, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: STABLESERVICENAME, Namespace: NAMESPACE, CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-15)))},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: StableServiceName

Spec: coreV1.ServiceSpec{
Selector: selectorMap,
Ports: ports,
},
}

canaryService := &coreV1.Service{
ObjectMeta: v12.ObjectMeta{Name: CANARYSERVICENAME, Namespace: NAMESPACE},
ObjectMeta: v12.ObjectMeta{Name: CANARYSERVICENAME, Namespace: NAMESPACE, CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-15)))},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: CanaryServiceName

Spec: coreV1.ServiceSpec{
Selector: selectorMap,
Ports: ports,
Expand Down
65 changes: 33 additions & 32 deletions admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,71 +120,72 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste

var err error

log.Infof("starting global traffic policy controller custerID: %v", clusterID)

rc.GlobalTraffic, err = admiral.NewGlobalTrafficController(clusterID, stop, &GlobalTrafficHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting service controller clusterID: %v", clusterID)
rc.ServiceController, err = admiral.NewServiceController(clusterID, stop, &ServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with GlobalTrafficController controller init: %v", err)
return fmt.Errorf("error with ServiceController controller init: %v", err)
}

log.Infof("starting deployment controller clusterID: %v", clusterID)
rc.DeploymentController, err = admiral.NewDeploymentController(clusterID, stop, &DeploymentHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting global traffic policy controller custerID: %v", clusterID)

rc.GlobalTraffic, err = admiral.NewGlobalTrafficController(clusterID, stop, &GlobalTrafficHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with DeploymentController controller init: %v", err)
return fmt.Errorf("error with GlobalTrafficController controller init: %v", err)
}

if r.AdmiralCache == nil {
log.Warn("admiral cache was nil!")
} else if r.AdmiralCache.argoRolloutsEnabled {
log.Infof("starting rollout controller clusterID: %v", clusterID)
rc.RolloutController, err = admiral.NewRolloutsController(clusterID, stop, &RolloutHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with Rollout controller init: %v", err)
}
}

log.Infof("starting node controller clusterID: %v", clusterID)
rc.NodeController, err = admiral.NewNodeController(clusterID, stop, &NodeHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig)

if err != nil {
return fmt.Errorf(" Error with NodeController controller init: %v", err)
return fmt.Errorf("error with NodeController controller init: %v", err)
}

log.Infof("starting service controller clusterID: %v", clusterID)
rc.ServiceController, err = admiral.NewServiceController(clusterID, stop, &ServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting service entry controller for custerID: %v", clusterID)
rc.ServiceEntryController, err = istio.NewServiceEntryController(clusterID, stop, &ServiceEntryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with ServiceController controller init: %v", err)
return fmt.Errorf("error with ServiceEntryController init: %v", err)
}

log.Infof("starting service entry controller for custerID: %v", clusterID)
rc.ServiceEntryController, err = istio.NewServiceEntryController(clusterID, stop, &ServiceEntryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting destination rule controller for custerID: %v", clusterID)
rc.DestinationRuleController, err = istio.NewDestinationRuleController(clusterID, stop, &DestinationRuleHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with ServiceEntryController init: %v", err)
return fmt.Errorf("error with DestinationRuleController init: %v", err)
}

log.Infof("starting destination rule controller for custerID: %v", clusterID)
rc.DestinationRuleController, err = istio.NewDestinationRuleController(clusterID, stop, &DestinationRuleHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting virtual service controller for custerID: %v", clusterID)
rc.VirtualServiceController, err = istio.NewVirtualServiceController(clusterID, stop, &VirtualServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with DestinationRuleController init: %v", err)
return fmt.Errorf("error with VirtualServiceController init: %v", err)
}

log.Infof("starting virtual service controller for custerID: %v", clusterID)
rc.VirtualServiceController, err = istio.NewVirtualServiceController(clusterID, stop, &VirtualServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.SidecarController, err = istio.NewSidecarController(clusterID, stop, &SidecarHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with VirtualServiceController init: %v", err)
return fmt.Errorf("error with DestinationRuleController init: %v", err)
}

rc.SidecarController, err = istio.NewSidecarController(clusterID, stop, &SidecarHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting deployment controller clusterID: %v", clusterID)
rc.DeploymentController, err = admiral.NewDeploymentController(clusterID, stop, &DeploymentHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with DestinationRuleController init: %v", err)
return fmt.Errorf("error with DeploymentController controller init: %v", err)
}

if r.AdmiralCache == nil {
log.Warn("admiral cache was nil!")
} else if r.AdmiralCache.argoRolloutsEnabled {
log.Infof("starting rollout controller clusterID: %v", clusterID)
rc.RolloutController, err = admiral.NewRolloutsController(clusterID, stop, &RolloutHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf("error with Rollout controller init: %v", err)
}
}

r.Lock()
Expand Down
48 changes: 47 additions & 1 deletion admiral/pkg/clusters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/secret"
log "github.com/sirupsen/logrus"
k8sAppsV1 "k8s.io/api/apps/v1"
k8sV1 "k8s.io/api/core/v1"
k8s "k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -145,6 +146,52 @@ type ServiceHandler struct {
ClusterID string
}

func (sh *ServiceHandler) Added(obj *k8sV1.Service) {
log.Infof(LogFormat, "Added", "service", obj.Name, sh.ClusterID, "received")
err := HandleEventForService(obj, sh.RemoteRegistry, sh.ClusterID)
if err != nil {
log.Errorf(LogErrFormat, "Error", "service", obj.Name, sh.ClusterID, err)
}
}

func (sh *ServiceHandler) Updated(obj *k8sV1.Service) {
log.Infof(LogFormat, "Updated", "service", obj.Name, sh.ClusterID, "received")
err := HandleEventForService(obj, sh.RemoteRegistry, sh.ClusterID)
if err != nil {
log.Errorf(LogErrFormat, "Error", "service", obj.Name, sh.ClusterID, err)
}
}

func (sh *ServiceHandler) Deleted(obj *k8sV1.Service) {
log.Infof(LogFormat, "Deleted", "service", obj.Name, sh.ClusterID, "received")
err := HandleEventForService(obj, sh.RemoteRegistry, sh.ClusterID)
if err != nil {
log.Errorf(LogErrFormat, "Error", "service", obj.Name, sh.ClusterID, err)
}
}

func HandleEventForService(svc *k8sV1.Service, remoteRegistry *RemoteRegistry, clusterName string) error {
if svc.Spec.Selector == nil {
return errors.New("selector missing on service");
}
matchingDeployements := remoteRegistry.RemoteControllers[clusterName].DeploymentController.GetDeploymentBySelectorInNamespace(svc.Spec.Selector, svc.Namespace)
nirvanagit marked this conversation as resolved.
Show resolved Hide resolved
if len(matchingDeployements) > 0 {
for _, deployment := range matchingDeployements {
HandleEventForDeployment(admiral.Update, &deployment, remoteRegistry, clusterName)
}
}
if common.GetAdmiralParams().ArgoRolloutsEnabled {
matchingRollouts := remoteRegistry.RemoteControllers[clusterName].RolloutController.GetRolloutBySelectorInNamespace(svc.Spec.Selector, svc.Namespace)

if len(matchingRollouts) > 0 {
for _, rollout := range matchingRollouts {
HandleEventForRollout(admiral.Update, &rollout, remoteRegistry, clusterName)
}
}
}
return nil
}

func (dh *DependencyHandler) Added(obj *v1.Dependency) {

log.Infof(LogFormat, "Add", "dependency-record", obj.Name, "", "Received=true namespace="+obj.Namespace)
Expand Down Expand Up @@ -243,7 +290,6 @@ func HandleEventForRollout(event admiral.EventType, obj *argo.Rollout, remoteReg

// helper function to handle add and delete for DeploymentHandler
func HandleEventForDeployment(event admiral.EventType, obj *k8sAppsV1.Deployment, remoteRegistry *RemoteRegistry, clusterName string) {
log.Infof(LogFormat, event, "deployment", obj.Name, clusterName, "Received")
nirvanagit marked this conversation as resolved.
Show resolved Hide resolved

globalIdentifier := common.GetDeploymentGlobalIdentifier(obj)

Expand Down
3 changes: 2 additions & 1 deletion admiral/pkg/controller/admiral/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}

log.Infof("Informer caches synced for controller=%v, current keys=%v", c.name, c.informer.GetStore().ListKeys())
wait.Until(c.runWorker, 5*time.Second, stopCh)
//wait for 30 seconds for the first time (for all caches to sync)
wait.Until(c.runWorker, 30 * time.Second, stopCh)
nirvanagit marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *Controller) runWorker() {
Expand Down
59 changes: 16 additions & 43 deletions admiral/pkg/controller/admiral/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
k8sAppsV1 "k8s.io/api/apps/v1"
k8sAppsinformers "k8s.io/client-go/informers/apps/v1"
"k8s.io/client-go/rest"
"reflect"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -78,45 +79,6 @@ func (p *deploymentCache) DeleteFromDeploymentClusterCache(key string, deploymen
}
}

func (d *DeploymentController) GetDeployments() ([]*k8sAppsV1.Deployment, error) {

ns := d.K8sClient.CoreV1().Namespaces()

namespaceSidecarInjectionLabelFilter := d.labelSet.NamespaceSidecarInjectionLabel + "=" + d.labelSet.NamespaceSidecarInjectionLabelValue
istioEnabledNs, err := ns.List(meta_v1.ListOptions{LabelSelector: namespaceSidecarInjectionLabelFilter})

if err != nil {
return nil, fmt.Errorf("error getting istio labled namespaces: %v", err)
}

var res []*k8sAppsV1.Deployment

for _, v := range istioEnabledNs.Items {

deployments := d.K8sClient.AppsV1().Deployments(v.Name)
deploymentsList, err := deployments.List(meta_v1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("error listing deployments: %v", err)
}
var admiralDeployments []k8sAppsV1.Deployment
for _, deployment := range deploymentsList.Items {
if !d.shouldIgnoreBasedOnLabels(&deployment) {
admiralDeployments = append(admiralDeployments, deployment)
}
}

if err != nil {
return nil, fmt.Errorf("error getting istio labled namespaces: %v", err)
}

for _, pi := range admiralDeployments {
res = append(res, &pi)
}
}

return res, nil
}

func NewDeploymentController(clusterID string, stopCh <-chan struct{}, handler DeploymentHandler, config *rest.Config, resyncPeriod time.Duration) (*DeploymentController, error) {

deploymentController := DeploymentController{}
Expand Down Expand Up @@ -211,10 +173,10 @@ func (d *DeploymentController) shouldIgnoreBasedOnLabels(deployment *k8sAppsV1.D
return false //labels are fine, we should not ignore
}

func (d *DeploymentController) GetDeploymentByLabel(labelValue string, namespace string) []k8sAppsV1.Deployment {
matchLabel := common.GetGlobalTrafficDeploymentLabel()
func (d *DeploymentController) GetDeploymentBySelectorInNamespace(serviceSelector map[string]string, namespace string) []k8sAppsV1.Deployment {

labelOptions := meta_v1.ListOptions{}
labelOptions.LabelSelector = fmt.Sprintf("%s=%s", matchLabel, labelValue)

matchedDeployments, err := d.K8sClient.AppsV1().Deployments(namespace).List(labelOptions)
nirvanagit marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
Expand All @@ -226,5 +188,16 @@ func (d *DeploymentController) GetDeploymentByLabel(labelValue string, namespace
return []k8sAppsV1.Deployment{}
}

return matchedDeployments.Items
var filteredDeployments = make([]k8sAppsV1.Deployment, 0)

for _, deployment := range matchedDeployments.Items {
if deployment.Spec.Selector == nil || deployment.Spec.Selector.MatchLabels == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could check for nil values for .Selector and .MatchLabels

continue
}
if reflect.DeepEqual(deployment.Spec.Selector.MatchLabels, serviceSelector) {
nirvanagit marked this conversation as resolved.
Show resolved Hide resolved
filteredDeployments = append(filteredDeployments, deployment)
}
}

return filteredDeployments
}
Loading