From 5b30512a75ac771dbaf327cdcc98c55d8fb68000 Mon Sep 17 00:00:00 2001 From: Vinay Date: Tue, 20 Sep 2022 00:04:35 -0700 Subject: [PATCH] Updating with fixes (#258) * [MESH-2199] - Removing duplicate controller declarations Adding readonly mode for routing policy following DR Signed-off-by: vgonuguntla --- admiral/pkg/clusters/envoyfilter.go | 41 ++++++++++-- admiral/pkg/clusters/envoyfilter_test.go | 34 +++++++++- admiral/pkg/clusters/registry.go | 41 ------------ admiral/pkg/clusters/types.go | 12 ++++ admiral/pkg/clusters/types_test.go | 85 +++++++++++++++++++++++- 5 files changed, 161 insertions(+), 52 deletions(-) diff --git a/admiral/pkg/clusters/envoyfilter.go b/admiral/pkg/clusters/envoyfilter.go index d3b5f2c2..eaf5525e 100644 --- a/admiral/pkg/clusters/envoyfilter.go +++ b/admiral/pkg/clusters/envoyfilter.go @@ -21,14 +21,19 @@ var ( ) const hostsKey = "hosts: " +const pluginKey = "plugin: " func createOrUpdateEnvoyFilter(ctx context.Context, rc *RemoteController, routingPolicy *v1.RoutingPolicy, eventType admiral.EventType, workloadIdentityKey string, admiralCache *AdmiralCache, workloadSelectorMap map[string]string) (*networking.EnvoyFilter, error) { - envoyfilterSpec := constructEnvoyFilterStruct(routingPolicy, workloadSelectorMap) + envoyfilterSpec, err := constructEnvoyFilterStruct(routingPolicy, workloadSelectorMap) + if err != nil { + log.Error("error occurred while constructing envoy filter struct") + return nil, err + } selectorLabelsSha, err := getSha1(workloadIdentityKey + common.GetRoutingPolicyEnv(routingPolicy)) if err != nil { - log.Error("error ocurred while computing workload labels sha1") + log.Error("error occurred while computing workload labels sha1") return nil, err } if len(common.GetEnvoyFilterVersion()) == 0 { @@ -74,7 +79,7 @@ func createOrUpdateEnvoyFilter(ctx context.Context, rc *RemoteController, routin return filter, err } -func constructEnvoyFilterStruct(routingPolicy *v1.RoutingPolicy, workloadSelectorLabels map[string]string) *v1alpha3.EnvoyFilter { +func constructEnvoyFilterStruct(routingPolicy *v1.RoutingPolicy, workloadSelectorLabels map[string]string) (*v1alpha3.EnvoyFilter, error) { var envoyFilterStringConfig string var wasmPath string for key, val := range routingPolicy.Spec.Config { @@ -87,7 +92,16 @@ func constructEnvoyFilterStruct(routingPolicy *v1.RoutingPolicy, workloadSelecto if len(common.GetEnvoyFilterAdditionalConfig()) != 0 { envoyFilterStringConfig += common.GetEnvoyFilterAdditionalConfig() + "\n" } - envoyFilterStringConfig += getHosts(routingPolicy) + hosts, err := getHosts(routingPolicy) + if err != nil { + return nil, err + } + envoyFilterStringConfig += hosts + "\n" + plugin, err := getPlugin(routingPolicy) + if err != nil { + return nil, err + } + envoyFilterStringConfig += plugin configuration := structpb.Struct{ Fields: map[string]*structpb.Value{ @@ -131,7 +145,7 @@ func constructEnvoyFilterStruct(routingPolicy *v1.RoutingPolicy, workloadSelecto } envoyfilterSpec := getEnvoyFilterSpec(workloadSelectorLabels, typedConfig) - return envoyfilterSpec + return envoyfilterSpec, nil } func getEnvoyFilterSpec(workloadSelectorLabels map[string]string, typedConfig *structpb.Struct) *v1alpha3.EnvoyFilter { @@ -176,11 +190,24 @@ func getEnvoyFilterSpec(workloadSelectorLabels map[string]string, typedConfig *s } } -func getHosts(routingPolicy *v1.RoutingPolicy) string { +func getHosts(routingPolicy *v1.RoutingPolicy) (string, error) { hosts := "" for _, host := range routingPolicy.Spec.Hosts { hosts += host + "," } + if len(hosts) == 0 { + log.Error("routing policy hosts cannot be empty") + return "", errors.New("routing policy hosts cannot be empty") + } hosts = strings.TrimSuffix(hosts, ",") - return hostsKey + hosts + return hostsKey + hosts, nil +} + +func getPlugin(routingPolicy *v1.RoutingPolicy) (string, error) { + plugin := routingPolicy.Spec.Plugin + if len(plugin) == 0 { + log.Error("routing policy plugin cannot be empty") + return "", errors.New("routing policy plugin cannot be empty") + } + return pluginKey + plugin, nil } diff --git a/admiral/pkg/clusters/envoyfilter_test.go b/admiral/pkg/clusters/envoyfilter_test.go index 4737062a..a189e935 100644 --- a/admiral/pkg/clusters/envoyfilter_test.go +++ b/admiral/pkg/clusters/envoyfilter_test.go @@ -141,6 +141,38 @@ func TestGetHosts(t *testing.T) { Status: v1.RoutingPolicyStatus{}, } - hosts := getHosts(routingPolicyFoo) + hosts, err := getHosts(routingPolicyFoo) + if err != nil { + assert.Fail(t, err.Error()) + } assert.Equal(t, "hosts: e2e.testservice.mesh,e2e2.testservice.mesh", hosts) } + +func TestGetPlugin(t *testing.T) { + routingPolicyFoo := &v1.RoutingPolicy{ + TypeMeta: time2.TypeMeta{}, + ObjectMeta: time2.ObjectMeta{ + Labels: map[string]string{ + "identity": "foo", + "admiral.io/env": "stage", + }, + }, + Spec: model.RoutingPolicy{ + Plugin: "test", + Hosts: []string{"e2e.testservice.mesh,e2e2.testservice.mesh"}, + Config: map[string]string{ + "cachePrefix": "cache-v1", + "cachettlSec": "86400", + "routingServiceUrl": "e2e.test.routing.service.mesh", + "pathPrefix": "/sayhello,/v1/company/{id}/", + }, + }, + Status: v1.RoutingPolicyStatus{}, + } + + plugin, err := getPlugin(routingPolicyFoo) + if err != nil { + assert.Fail(t, err.Error()) + } + assert.Equal(t, "plugin: test", plugin) +} diff --git a/admiral/pkg/clusters/registry.go b/admiral/pkg/clusters/registry.go index 4cbe79b5..3b0d2443 100644 --- a/admiral/pkg/clusters/registry.go +++ b/admiral/pkg/clusters/registry.go @@ -194,47 +194,6 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste return fmt.Errorf("error with virtualServiceController init: %v", err) } - log.Infof("starting node controller clusterID: %v", clusterID) - rc.NodeController, err = admiral.NewNodeController(clusterID, stop, &NodeHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig) - - if err != nil { - return fmt.Errorf("error with NodeController controller init: %v", err) - } - - log.Infof("starting service controller clusterID: %v", clusterID) - rc.ServiceController, err = admiral.NewServiceController(clusterID, stop, &ServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod) - - if err != nil { - return fmt.Errorf("error with ServiceController controller init: %v", err) - } - - log.Infof("starting service entry controller for custerID: %v", clusterID) - rc.ServiceEntryController, err = istio.NewServiceEntryController(clusterID, stop, &ServiceEntryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod) - - if err != nil { - return fmt.Errorf("error with ServiceEntryController init: %v", err) - } - - log.Infof("starting destination rule controller for custerID: %v", clusterID) - rc.DestinationRuleController, err = istio.NewDestinationRuleController(clusterID, stop, &DestinationRuleHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod) - - if err != nil { - return fmt.Errorf("error with DestinationRuleController init: %v", err) - } - - log.Infof("starting virtual service controller for custerID: %v", clusterID) - rc.VirtualServiceController, err = istio.NewVirtualServiceController(clusterID, stop, &VirtualServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod) - - if err != nil { - return fmt.Errorf("error with VirtualServiceController init: %v", err) - } - - rc.SidecarController, err = istio.NewSidecarController(clusterID, stop, &SidecarHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod) - - if err != nil { - return fmt.Errorf("error with DestinationRuleController init: %v", err) - } - r.PutRemoteController(clusterID, &rc) log.Infof("Create Controller %s", clusterID) diff --git a/admiral/pkg/clusters/types.go b/admiral/pkg/clusters/types.go index 9ec59299..e59b1a73 100644 --- a/admiral/pkg/clusters/types.go +++ b/admiral/pkg/clusters/types.go @@ -282,6 +282,10 @@ func (r *routingPolicyFilterCache) Put(identityEnvKey string, clusterId string, } func (r *routingPolicyFilterCache) Delete(identityEnvKey string) { + if CurrentAdmiralState.ReadOnly { + log.Infof(LogFormat, admiral.Delete, "routingpolicy", identityEnvKey, "", "skipping read-only mode") + return + } if common.GetEnableRoutingPolicy() { defer r.mutex.Unlock() r.mutex.Lock() @@ -292,6 +296,10 @@ func (r *routingPolicyFilterCache) Delete(identityEnvKey string) { } } func (r RoutingPolicyHandler) Added(ctx context.Context, obj *v1.RoutingPolicy) { + if CurrentAdmiralState.ReadOnly { + log.Infof(LogFormat, admiral.Add, "routingpolicy", "", "", "skipping read-only mode") + return + } if common.GetEnableRoutingPolicy() { if common.ShouldIgnoreResource(obj.ObjectMeta) { log.Infof(LogFormat, "success", "routingpolicy", obj.Name, "", "Ignored the RoutingPolicy because of the annotation") @@ -334,6 +342,10 @@ func (r RoutingPolicyHandler) processroutingPolicy(ctx context.Context, dependen } func (r RoutingPolicyHandler) Updated(ctx context.Context, obj *v1.RoutingPolicy) { + if CurrentAdmiralState.ReadOnly { + log.Infof(LogFormat, admiral.Update, "routingpolicy", "", "", "skipping read-only mode") + return + } if common.GetEnableRoutingPolicy() { if common.ShouldIgnoreResource(obj.ObjectMeta) { log.Infof(LogFormat, admiral.Update, "routingpolicy", obj.Name, "", "Ignored the RoutingPolicy because of the annotation") diff --git a/admiral/pkg/clusters/types_test.go b/admiral/pkg/clusters/types_test.go index 85856716..faa3cc58 100644 --- a/admiral/pkg/clusters/types_test.go +++ b/admiral/pkg/clusters/types_test.go @@ -1,6 +1,7 @@ package clusters import ( + "bytes" "context" "fmt" "strings" @@ -8,19 +9,20 @@ import ( "testing" "time" - "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/model" - istiofake "istio.io/client-go/pkg/clientset/versioned/fake" - argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/model" v1 "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1" admiralFake "github.com/istio-ecosystem/admiral/admiral/pkg/client/clientset/versioned/fake" "github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral" "github.com/istio-ecosystem/admiral/admiral/pkg/controller/common" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + istiofake "istio.io/client-go/pkg/clientset/versioned/fake" v12 "k8s.io/api/apps/v1" v13 "k8s.io/api/core/v1" time2 "k8s.io/apimachinery/pkg/apis/meta/v1" + "os" ) var ignoreUnexported = cmpopts.IgnoreUnexported(v1.GlobalTrafficPolicy{}.Status) @@ -454,3 +456,80 @@ func TestRoutingPolicyHandler(t *testing.T) { assert.Nil(t, registry.AdmiralCache.RoutingPolicyFilterCache.Get("bar3stage")) } + +func TestRoutingPolicyReadOnly(t *testing.T) { + p := common.AdmiralParams{ + KubeconfigPath: "testdata/fake.config", + LabelSet: &common.LabelSet{}, + EnableSAN: true, + SANPrefix: "prefix", + HostnameSuffix: "mesh", + SyncNamespace: "ns", + CacheRefreshDuration: time.Minute, + ClusterRegistriesNamespace: "default", + DependenciesNamespace: "default", + SecretResolver: "", + EnableRoutingPolicy: true, + EnvoyFilterVersion: "1.13", + } + + p.LabelSet.WorkloadIdentityKey = "identity" + p.LabelSet.EnvKey = "admiral.io/env" + p.LabelSet.GlobalTrafficDeploymentLabel = "identity" + + handler := RoutingPolicyHandler{} + + testcases := []struct { + name string + rp *v1.RoutingPolicy + readOnly bool + doesError bool + }{ + { + name: "Readonly test for DR scenario - Routing Policy", + rp: &v1.RoutingPolicy{}, + readOnly: true, + doesError: true, + }, + { + name: "Readonly false test for DR scenario - Routing Policy", + rp: &v1.RoutingPolicy{}, + readOnly: false, + doesError: false, + }, + } + + ctx := context.Background() + + for _, c := range testcases { + t.Run(c.name, func(t *testing.T) { + if c.readOnly { + CurrentAdmiralState.ReadOnly = true + } else { + CurrentAdmiralState.ReadOnly = false + } + var buf bytes.Buffer + log.SetOutput(&buf) + defer func() { + log.SetOutput(os.Stderr) + }() + // Add routing policy test + handler.Added(ctx, c.rp) + t.Log(buf.String()) + val := strings.Contains(buf.String(), "skipping read-only mode") + assert.Equal(t, c.doesError, val) + + // Update routing policy test + handler.Updated(ctx, c.rp) + t.Log(buf.String()) + val = strings.Contains(buf.String(), "skipping read-only mode") + assert.Equal(t, c.doesError, val) + + // Delete routing policy test + handler.Deleted(ctx, c.rp) + t.Log(buf.String()) + val = strings.Contains(buf.String(), "skipping read-only mode") + assert.Equal(t, c.doesError, val) + }) + } +}