From c8f14da2920fdcc190b4aba749ea995496e46d2f Mon Sep 17 00:00:00 2001 From: aattuluri <44482891+aattuluri@users.noreply.github.com> Date: Thu, 10 Feb 2022 15:29:18 -0800 Subject: [PATCH] Do not do destructive updates during bootup (#193) --- admiral/pkg/clusters/handler.go | 70 ++++++- admiral/pkg/clusters/handler_test.go | 262 +++++++++++++++++++++++++++ admiral/pkg/clusters/registry.go | 1 + admiral/pkg/clusters/types.go | 2 + 4 files changed, 333 insertions(+), 2 deletions(-) diff --git a/admiral/pkg/clusters/handler.go b/admiral/pkg/clusters/handler.go index 8aabc30a..158eff1f 100644 --- a/admiral/pkg/clusters/handler.go +++ b/admiral/pkg/clusters/handler.go @@ -1,6 +1,7 @@ package clusters import ( + "bytes" "fmt" argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" "github.com/gogo/protobuf/types" @@ -15,8 +16,10 @@ import ( k8sAppsV1 "k8s.io/api/apps/v1" k8sV1 "k8s.io/api/core/v1" v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + "reflect" "sort" "strings" + "time" ) const ROLLOUT_POD_HASH_LABEL string = "rollouts-pod-template-hash" @@ -540,12 +543,23 @@ func addUpdateServiceEntry(obj *v1alpha3.ServiceEntry, exist *v1alpha3.ServiceEn obj.ResourceVersion = "" _, err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(namespace).Create(obj) op = "Add" + log.Infof(LogFormat + " SE=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "New SE", obj.Spec.String()) } else { exist.Labels = obj.Labels exist.Annotations = obj.Annotations - exist.Spec = obj.Spec op = "Update" - _, err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(namespace).Update(exist) + skipUpdate, diff := skipDestructiveUpdate(rc, obj, exist) + if diff != "" { + log.Infof(LogFormat + " diff=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "Diff in update", diff) + } + if skipUpdate { + log.Infof(LogFormat, op, "ServiceEntry", obj.Name, rc.ClusterID, "Update skipped as it was destructive during Admiral's bootup phase") + return + } else { + exist.Spec = obj.Spec + _, err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(namespace).Update(exist) + } + } if err != nil { @@ -555,6 +569,58 @@ func addUpdateServiceEntry(obj *v1alpha3.ServiceEntry, exist *v1alpha3.ServiceEn } } +func skipDestructiveUpdate(rc *RemoteController, new *v1alpha3.ServiceEntry, old *v1alpha3.ServiceEntry) (skipDestructive bool, diff string) { + skipDestructive = false + destructive, diff := getServiceEntryDiff(new, old) + //do not update SEs during bootup phase if they are destructive + if time.Since(rc.StartTime) < (2 * common.GetAdmiralParams().CacheRefreshDuration) && destructive { + skipDestructive = true + } + + return skipDestructive, diff +} + +//Diffs only endpoints +func getServiceEntryDiff(new *v1alpha3.ServiceEntry, old *v1alpha3.ServiceEntry) (destructive bool, diff string) { + + //we diff only if both objects exist + if old == nil || new == nil { + return false, "" + } + destructive = false + format := "%s %s before: %v, after: %v;" + var buffer bytes.Buffer + seNew := new.Spec + seOld := old.Spec + + oldEndpointMap := make(map[string]*v1alpha32.ServiceEntry_Endpoint) + found := make(map[string]string) + for _, oEndpoint := range seOld.Endpoints { + oldEndpointMap[oEndpoint.Address] = oEndpoint + } + for _, nEndpoint := range seNew.Endpoints { + if val, ok := oldEndpointMap[nEndpoint.Address]; ok { + found[nEndpoint.Address] = "1" + if !reflect.DeepEqual(val, nEndpoint) { + destructive = true + buffer.WriteString(fmt.Sprintf(format, "endpoint", "Update", val.String(), nEndpoint.String())) + } + } else { + buffer.WriteString(fmt.Sprintf(format, "endpoint", "Add", "", nEndpoint.String())) + } + } + + for key := range oldEndpointMap { + if _, ok := found[key]; !ok { + destructive = true + buffer.WriteString(fmt.Sprintf(format, "endpoint", "Delete", oldEndpointMap[key].String(), "")) + } + } + + diff = buffer.String() + return destructive, diff +} + func deleteServiceEntry(exist *v1alpha3.ServiceEntry, namespace string, rc *RemoteController) { if exist != nil { err := rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(namespace).Delete(exist.Name, &v12.DeleteOptions{}) diff --git a/admiral/pkg/clusters/handler_test.go b/admiral/pkg/clusters/handler_test.go index 1038f228..8c97e573 100644 --- a/admiral/pkg/clusters/handler_test.go +++ b/admiral/pkg/clusters/handler_test.go @@ -16,6 +16,7 @@ import ( k8sv1 "k8s.io/apimachinery/pkg/apis/meta/v1" v12 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" + "strings" "testing" "time" ) @@ -993,4 +994,265 @@ func TestGetServiceForRolloutBlueGreen(t *testing.T) { } }) } +} + +func TestSkipDestructiveUpdate(t *testing.T) { + + twoEndpointSe := v1alpha3.ServiceEntry{ + Hosts: []string{"e2e.my-first-service.mesh"}, + Addresses: []string{"240.10.1.1"}, + Ports: []*v1alpha3.Port{{Number: uint32(common.DefaultServiceEntryPort), + Name: "http", Protocol: "http"}}, + Location: v1alpha3.ServiceEntry_MESH_INTERNAL, + Resolution: v1alpha3.ServiceEntry_DNS, + SubjectAltNames: []string{"spiffe://prefix/my-first-service"}, + Endpoints: []*v1alpha3.ServiceEntry_Endpoint{ + {Address: "dummy.admiral.global-west", Ports: map[string]uint32{"http": 0}, Locality: "us-west-2"}, + {Address: "dummy.admiral.global-east", Ports: map[string]uint32{"http": 0}, Locality: "us-east-2"}, + }, + } + + twoEndpointSeUpdated := v1alpha3.ServiceEntry{ + Hosts: []string{"e2e.my-first-service.mesh"}, + Addresses: []string{"240.10.1.1"}, + Ports: []*v1alpha3.Port{{Number: uint32(common.DefaultServiceEntryPort), + Name: "http", Protocol: "http"}}, + Location: v1alpha3.ServiceEntry_MESH_INTERNAL, + Resolution: v1alpha3.ServiceEntry_DNS, + SubjectAltNames: []string{"spiffe://prefix/my-first-service"}, + Endpoints: []*v1alpha3.ServiceEntry_Endpoint{ + {Address: "dummy.admiral.global-west", Ports: map[string]uint32{"http": 90}, Locality: "us-west-2"}, + {Address: "dummy.admiral.global-east", Ports: map[string]uint32{"http": 0}, Locality: "us-east-2"}, + }, + } + + oneEndpointSe := v1alpha3.ServiceEntry{ + Hosts: []string{"e2e.my-first-service.mesh"}, + Addresses: []string{"240.10.1.1"}, + Ports: []*v1alpha3.Port{{Number: uint32(common.DefaultServiceEntryPort), + Name: "http", Protocol: "http"}}, + Location: v1alpha3.ServiceEntry_MESH_INTERNAL, + Resolution: v1alpha3.ServiceEntry_DNS, + SubjectAltNames: []string{"spiffe://prefix/my-first-service"}, + Endpoints: []*v1alpha3.ServiceEntry_Endpoint{ + {Address: "dummy.admiral.global-west", Ports: map[string]uint32{"http": 0}, Locality: "us-west-2"}, + }, + } + + newSeTwoEndpoints := &v1alpha32.ServiceEntry{ + ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "random"}, + Spec: twoEndpointSe, + } + + newSeTwoEndpointsUpdated := &v1alpha32.ServiceEntry{ + ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "random"}, + Spec: twoEndpointSeUpdated, + } + + newSeOneEndpoint := &v1alpha32.ServiceEntry{ + ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "random"}, + Spec: oneEndpointSe, + } + + oldSeTwoEndpoints := &v1alpha32.ServiceEntry{ + ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "random"}, + Spec: twoEndpointSe, + } + + oldSeOneEndpoint := &v1alpha32.ServiceEntry{ + ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "random"}, + Spec: oneEndpointSe, + } + + + rcWarmupPhase := &RemoteController{ + StartTime: time.Now(), + } + + rcNotinWarmupPhase := &RemoteController{ + StartTime: time.Now().Add(time.Duration(-21) * time.Minute), + } + + //Struct of test case info. Name is required. + testCases := []struct { + name string + rc *RemoteController + newSe *v1alpha32.ServiceEntry + oldSe *v1alpha32.ServiceEntry + skipDestructive bool + diff string + }{ + { + name: "Should return false when in warm up phase but not destructive", + rc: rcWarmupPhase, + newSe: newSeOneEndpoint, + oldSe: oldSeOneEndpoint, + skipDestructive: false, + diff: "", + }, + { + name: "Should return true when in warm up phase but is destructive", + rc: rcWarmupPhase, + newSe: newSeOneEndpoint, + oldSe: oldSeTwoEndpoints, + skipDestructive: true, + diff: "Delete", + }, + { + name: "Should return false when not in warm up phase but is destructive", + rc: rcNotinWarmupPhase, + newSe: newSeOneEndpoint, + oldSe: oldSeTwoEndpoints, + skipDestructive: false, + diff: "Delete", + }, + { + name: "Should return false when in warm up phase but is constructive", + rc: rcWarmupPhase, + newSe: newSeTwoEndpoints, + oldSe: oldSeOneEndpoint, + skipDestructive: false, + diff: "Add", + }, + { + name: "Should return false when not in warm up phase but endpoints updated", + rc: rcNotinWarmupPhase, + newSe: newSeTwoEndpointsUpdated, + oldSe: oldSeTwoEndpoints, + skipDestructive: false, + diff: "Update", + }, + { + name: "Should return true when in warm up phase but endpoints are updated (destructive)", + rc: rcWarmupPhase, + newSe: newSeTwoEndpointsUpdated, + oldSe: oldSeTwoEndpoints, + skipDestructive: true, + diff: "Update", + }, + + } + + //Run the test for every provided case + for _, c := range testCases { + t.Run(c.name, func(t *testing.T) { + skipDestructive, diff := skipDestructiveUpdate(c.rc, c.newSe, c.oldSe) + if skipDestructive == c.skipDestructive { + //perfect + } else { + t.Errorf("Result Failed. Got %v, expected %v", skipDestructive, c.skipDestructive) + } + if c.diff == "" || (c.diff != "" && strings.Contains(diff, c.diff)) { + //perfect + } else { + t.Errorf("Diff Failed. Got %v, expected %v", diff, c.diff) + } + }) + } +} + +func TestAddUpdateServiceEntry(t *testing.T) { + + + fakeIstioClient := istiofake.NewSimpleClientset() + + seCtrl := &istio.ServiceEntryController{ + IstioClient: fakeIstioClient, + } + + twoEndpointSe := v1alpha3.ServiceEntry{ + Hosts: []string{"e2e.my-first-service.mesh"}, + Addresses: []string{"240.10.1.1"}, + Ports: []*v1alpha3.Port{{Number: uint32(common.DefaultServiceEntryPort), + Name: "http", Protocol: "http"}}, + Location: v1alpha3.ServiceEntry_MESH_INTERNAL, + Resolution: v1alpha3.ServiceEntry_DNS, + SubjectAltNames: []string{"spiffe://prefix/my-first-service"}, + Endpoints: []*v1alpha3.ServiceEntry_Endpoint{ + {Address: "dummy.admiral.global-west", Ports: map[string]uint32{"http": 0}, Locality: "us-west-2"}, + {Address: "dummy.admiral.global-east", Ports: map[string]uint32{"http": 0}, Locality: "us-east-2"}, + }, + } + + oneEndpointSe := v1alpha3.ServiceEntry{ + Hosts: []string{"e2e.my-first-service.mesh"}, + Addresses: []string{"240.10.1.1"}, + Ports: []*v1alpha3.Port{{Number: uint32(common.DefaultServiceEntryPort), + Name: "http", Protocol: "http"}}, + Location: v1alpha3.ServiceEntry_MESH_INTERNAL, + Resolution: v1alpha3.ServiceEntry_DNS, + SubjectAltNames: []string{"spiffe://prefix/my-first-service"}, + Endpoints: []*v1alpha3.ServiceEntry_Endpoint{ + {Address: "dummy.admiral.global-west", Ports: map[string]uint32{"http": 0}, Locality: "us-west-2"}, + }, + } + + newSeOneEndpoint := &v1alpha32.ServiceEntry{ + ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "namespace"}, + Spec: oneEndpointSe, + } + + oldSeTwoEndpoints := &v1alpha32.ServiceEntry{ + ObjectMeta: v12.ObjectMeta{Name: "se2", Namespace: "namespace"}, + Spec: twoEndpointSe, + } + + seCtrl.IstioClient.NetworkingV1alpha3().ServiceEntries("namespace").Create(oldSeTwoEndpoints) + + rcWarmupPhase := &RemoteController{ + ServiceEntryController: seCtrl, + StartTime: time.Now(), + } + + rcNotinWarmupPhase := &RemoteController{ + ServiceEntryController: seCtrl, + StartTime: time.Now().Add(time.Duration(-21) * time.Minute), + } + + //Struct of test case info. Name is required. + testCases := []struct { + name string + rc *RemoteController + newSe *v1alpha32.ServiceEntry + oldSe *v1alpha32.ServiceEntry + skipDestructive bool + }{ + { + name: "Should add a new SE", + rc: rcWarmupPhase, + newSe: newSeOneEndpoint, + oldSe: nil, + skipDestructive: false, + }, + { + name: "Should not update SE when in warm up mode and the update is destructive", + rc: rcWarmupPhase, + newSe: newSeOneEndpoint, + oldSe: oldSeTwoEndpoints, + skipDestructive: true, + }, + { + name: "Should update an SE", + rc: rcNotinWarmupPhase, + newSe: newSeOneEndpoint, + oldSe: oldSeTwoEndpoints, + skipDestructive: false, + }, + + } + + //Run the test for every provided case + for _, c := range testCases { + t.Run(c.name, func(t *testing.T) { + addUpdateServiceEntry(c.newSe, c.oldSe, "namespace", c.rc) + if c.skipDestructive { + //verify the update did not go through + se, _ := c.rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries("namespace").Get(c.oldSe.Name, v12.GetOptions{}) + _, diff := getServiceEntryDiff(c.oldSe, se) + if diff != "" { + t.Errorf("Failed. Got %v, expected %v", se.Spec.String(), c.oldSe.Spec.String()) + } + } + }) + } } \ No newline at end of file diff --git a/admiral/pkg/clusters/registry.go b/admiral/pkg/clusters/registry.go index 7a926862..6cad0d53 100644 --- a/admiral/pkg/clusters/registry.go +++ b/admiral/pkg/clusters/registry.go @@ -120,6 +120,7 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste stop: stop, ClusterID: clusterID, ApiServer: clientConfig.Host, + StartTime: time.Now(), } var err error diff --git a/admiral/pkg/clusters/types.go b/admiral/pkg/clusters/types.go index 37ebd2e3..72fabbc3 100644 --- a/admiral/pkg/clusters/types.go +++ b/admiral/pkg/clusters/types.go @@ -15,11 +15,13 @@ import ( k8sV1 "k8s.io/api/core/v1" k8s "k8s.io/client-go/kubernetes" "sync" + "time" ) type RemoteController struct { ClusterID string ApiServer string + StartTime time.Time GlobalTraffic *admiral.GlobalTrafficController DeploymentController *admiral.DeploymentController ServiceController *admiral.ServiceController