Skip to content

Commit

Permalink
Reconcile ClusterIP services
Browse files Browse the repository at this point in the history
- add svc update tests fix: #114
  • Loading branch information
stefanprodan committed Mar 26, 2019
1 parent f5b862d commit ddd3a82
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 105 deletions.
176 changes: 72 additions & 104 deletions pkg/router/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package router

import (
"fmt"

"github.com/google/go-cmp/cmp"
flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3"
clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned"
"go.uber.org/zap"
Expand All @@ -22,137 +22,105 @@ type KubernetesRouter struct {
}

// Sync creates or updates the primary and canary services
func (c *KubernetesRouter) Sync(cd *flaggerv1.Canary) error {
targetName := cd.Spec.TargetRef.Name
func (c *KubernetesRouter) Sync(canary *flaggerv1.Canary) error {
targetName := canary.Spec.TargetRef.Name
primaryName := fmt.Sprintf("%s-primary", targetName)
portName := cd.Spec.Service.PortName
canaryName := fmt.Sprintf("%s-canary", targetName)

// main svc
err := c.syncService(canary, targetName, primaryName)
if err != nil {
return err
}

// canary svc
err = c.syncService(canary, canaryName, targetName)
if err != nil {
return err
}

// primary svc
err = c.syncService(canary, primaryName, primaryName)
if err != nil {
return err
}

return nil
}

func (c *KubernetesRouter) SetRoutes(canary *flaggerv1.Canary, primaryRoute int, canaryRoute int) error {
return nil
}

func (c *KubernetesRouter) GetRoutes(canary *flaggerv1.Canary) (primaryRoute int, canaryRoute int, err error) {
return 0, 0, nil
}

func (c *KubernetesRouter) syncService(canary *flaggerv1.Canary, name string, target string) error {
portName := canary.Spec.Service.PortName
if portName == "" {
portName = "http"
}

canaryService, err := c.kubeClient.CoreV1().Services(cd.Namespace).Get(targetName, metav1.GetOptions{})
if errors.IsNotFound(err) {
canaryService = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: targetName,
Namespace: cd.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(cd, schema.GroupVersionKind{
Group: flaggerv1.SchemeGroupVersion.Group,
Version: flaggerv1.SchemeGroupVersion.Version,
Kind: flaggerv1.CanaryKind,
}),
svcSpec := corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Selector: map[string]string{"app": target},
Ports: []corev1.ServicePort{
{
Name: portName,
Protocol: corev1.ProtocolTCP,
Port: canary.Spec.Service.Port,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: canary.Spec.Service.Port,
},
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Selector: map[string]string{"app": primaryName},
Ports: []corev1.ServicePort{
{
Name: portName,
Protocol: corev1.ProtocolTCP,
Port: cd.Spec.Service.Port,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: cd.Spec.Service.Port,
},
},
},
},
}

_, err = c.kubeClient.CoreV1().Services(cd.Namespace).Create(canaryService)
if err != nil {
return err
}
c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Service %s.%s created", canaryService.GetName(), cd.Namespace)
},
}

canaryTestServiceName := fmt.Sprintf("%s-canary", cd.Spec.TargetRef.Name)
canaryTestService, err := c.kubeClient.CoreV1().Services(cd.Namespace).Get(canaryTestServiceName, metav1.GetOptions{})
svc, err := c.kubeClient.CoreV1().Services(canary.Namespace).Get(name, metav1.GetOptions{})
if errors.IsNotFound(err) {
canaryTestService = &corev1.Service{
svc = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: canaryTestServiceName,
Namespace: cd.Namespace,
Name: name,
Namespace: canary.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(cd, schema.GroupVersionKind{
*metav1.NewControllerRef(canary, schema.GroupVersionKind{
Group: flaggerv1.SchemeGroupVersion.Group,
Version: flaggerv1.SchemeGroupVersion.Version,
Kind: flaggerv1.CanaryKind,
}),
},
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Selector: map[string]string{"app": targetName},
Ports: []corev1.ServicePort{
{
Name: portName,
Protocol: corev1.ProtocolTCP,
Port: cd.Spec.Service.Port,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: cd.Spec.Service.Port,
},
},
},
},
Spec: svcSpec,
}

_, err = c.kubeClient.CoreV1().Services(cd.Namespace).Create(canaryTestService)
_, err = c.kubeClient.CoreV1().Services(canary.Namespace).Create(svc)
if err != nil {
return err
}
c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Service %s.%s created", canaryTestService.GetName(), cd.Namespace)

c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).
Infof("Service %s.%s created", svc.GetName(), canary.Namespace)
return nil
}

primaryService, err := c.kubeClient.CoreV1().Services(cd.Namespace).Get(primaryName, metav1.GetOptions{})
if errors.IsNotFound(err) {
primaryService = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: primaryName,
Namespace: cd.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(cd, schema.GroupVersionKind{
Group: flaggerv1.SchemeGroupVersion.Group,
Version: flaggerv1.SchemeGroupVersion.Version,
Kind: flaggerv1.CanaryKind,
}),
},
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Selector: map[string]string{"app": primaryName},
Ports: []corev1.ServicePort{
{
Name: portName,
Protocol: corev1.ProtocolTCP,
Port: cd.Spec.Service.Port,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: cd.Spec.Service.Port,
},
},
},
},
}
if err != nil {
return fmt.Errorf("service %s query error %v", name, err)
}

_, err = c.kubeClient.CoreV1().Services(cd.Namespace).Create(primaryService)
if err != nil {
return err
if svc != nil {
if diff := cmp.Diff(svcSpec.Ports, svc.Spec.Ports); diff != "" {
svcClone := svc.DeepCopy()
svcClone.Spec = svcSpec
_, err = c.kubeClient.CoreV1().Services(canary.Namespace).Update(svcClone)
if err != nil {
return fmt.Errorf("service %s update error %v", name, err)
}
c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).
Infof("Service %s updated", svc.GetName())
}

c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Service %s.%s created", primaryService.GetName(), cd.Namespace)
}

return nil
}

func (c *KubernetesRouter) SetRoutes(canary *flaggerv1.Canary, primaryRoute int, canaryRoute int) error {
return nil
}

func (c *KubernetesRouter) GetRoutes(canary *flaggerv1.Canary) (primaryRoute int, canaryRoute int, err error) {
return 0, 0, nil
}
91 changes: 90 additions & 1 deletion pkg/router/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"
)

func TestServiceRouter_Sync(t *testing.T) {
func TestServiceRouter_Create(t *testing.T) {
mocks := setupfakeClients()
router := &KubernetesRouter{
kubeClient: mocks.kubeClient,
Expand Down Expand Up @@ -44,3 +44,92 @@ func TestServiceRouter_Sync(t *testing.T) {
t.Errorf("Got primary svc port %v wanted %v", primarySvc.Spec.Ports[0].Port, 9898)
}
}

func TestServiceRouter_Update(t *testing.T) {
mocks := setupfakeClients()
router := &KubernetesRouter{
kubeClient: mocks.kubeClient,
flaggerClient: mocks.flaggerClient,
logger: mocks.logger,
}

err := router.Sync(mocks.canary)
if err != nil {
t.Fatal(err.Error())
}

canary, err := mocks.flaggerClient.FlaggerV1alpha3().Canaries("default").Get("podinfo", metav1.GetOptions{})
if err != nil {
t.Fatal(err.Error())
}

canaryClone := canary.DeepCopy()
canaryClone.Spec.Service.PortName = "grpc"

c, err := mocks.flaggerClient.FlaggerV1alpha3().Canaries("default").Update(canaryClone)
if err != nil {
t.Fatal(err.Error())
}

// apply changes
err = router.Sync(c)
if err != nil {
t.Fatal(err.Error())
}

canarySvc, err := mocks.kubeClient.CoreV1().Services("default").Get("podinfo-canary", metav1.GetOptions{})
if err != nil {
t.Fatal(err.Error())
}

if canarySvc.Spec.Ports[0].Name != "grpc" {
t.Errorf("Got svc port name %s wanted %s", canarySvc.Spec.Ports[0].Name, "grpc")
}
}

func TestServiceRouter_Undo(t *testing.T) {
mocks := setupfakeClients()
router := &KubernetesRouter{
kubeClient: mocks.kubeClient,
flaggerClient: mocks.flaggerClient,
logger: mocks.logger,
}

err := router.Sync(mocks.canary)
if err != nil {
t.Fatal(err.Error())
}

canarySvc, err := mocks.kubeClient.CoreV1().Services("default").Get("podinfo-canary", metav1.GetOptions{})
if err != nil {
t.Fatal(err.Error())
}

svcClone := canarySvc.DeepCopy()
svcClone.Spec.Ports[0].Name = "http2-podinfo"
svcClone.Spec.Ports[0].Port = 8080

_, err = mocks.kubeClient.CoreV1().Services("default").Update(svcClone)
if err != nil {
t.Fatal(err.Error())
}

// undo changes
err = router.Sync(mocks.canary)
if err != nil {
t.Fatal(err.Error())
}

canarySvc, err = mocks.kubeClient.CoreV1().Services("default").Get("podinfo-canary", metav1.GetOptions{})
if err != nil {
t.Fatal(err.Error())
}

if canarySvc.Spec.Ports[0].Name != "http" {
t.Errorf("Got svc port name %s wanted %s", canarySvc.Spec.Ports[0].Name, "http")
}

if canarySvc.Spec.Ports[0].Port != 9898 {
t.Errorf("Got svc port %v wanted %v", canarySvc.Spec.Ports[0].Port, 9898)
}
}

0 comments on commit ddd3a82

Please sign in to comment.