diff --git a/admiral/pkg/clusters/registry.go b/admiral/pkg/clusters/registry.go index f9f03fef..986bf1ec 100644 --- a/admiral/pkg/clusters/registry.go +++ b/admiral/pkg/clusters/registry.go @@ -88,6 +88,7 @@ func createSecretController(ctx context.Context, w *RemoteRegistry) error { err = secret.StartSecretController(w.secretClient, w.createCacheController, + w.updateCacheController, w.deleteCacheController, common.GetClusterRegistriesNamespace(), ctx, common.GetSecretResolver()) @@ -182,6 +183,13 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste return nil } +func (r *RemoteRegistry) updateCacheController(clientConfig *rest.Config, clusterID string, resyncPeriod time.Duration) error { + if err := r.deleteCacheController(clusterID); err != nil { + return err + } + return r.createCacheController(clientConfig, clusterID, resyncPeriod) +} + func (r *RemoteRegistry) deleteCacheController(clusterID string) error { controller, ok := r.remoteControllers[clusterID] diff --git a/admiral/pkg/controller/secret/secretcontroller.go b/admiral/pkg/controller/secret/secretcontroller.go index 0f03ef6f..a3954c25 100644 --- a/admiral/pkg/controller/secret/secretcontroller.go +++ b/admiral/pkg/controller/secret/secretcontroller.go @@ -47,6 +47,9 @@ var LoadKubeConfig = clientcmd.Load // addSecretCallback prototype for the add secret callback function. type addSecretCallback func(config *rest.Config, dataKey string, resyncPeriod time.Duration) error +// updateSecretCallback prototype for the update secret callback function. +type updateSecretCallback func(config *rest.Config, dataKey string, resyncPeriod time.Duration) error + // removeSecretCallback prototype for the remove secret callback function. type removeSecretCallback func(dataKey string) error @@ -58,6 +61,7 @@ type Controller struct { queue workqueue.RateLimitingInterface informer cache.SharedIndexInformer addCallback addSecretCallback + updateCallback updateSecretCallback removeCallback removeSecretCallback secretResolver resolver.SecretResolver } @@ -86,6 +90,7 @@ func NewController( namespace string, cs *ClusterStore, addCallback addSecretCallback, + updateCallback updateSecretCallback, removeCallback removeSecretCallback, secretResolverType string) *Controller { @@ -126,6 +131,7 @@ func NewController( informer: secretsInformer, queue: queue, addCallback: addCallback, + updateCallback: updateCallback, removeCallback: removeCallback, secretResolver: secretResolver, } @@ -139,6 +145,13 @@ func NewController( queue.Add(key) } }, + UpdateFunc: func(oldObj interface{}, newObj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(newObj) + log.Infof("Processing update: %s", key) + if err == nil { + queue.Add(key) + } + }, DeleteFunc: func(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) log.Infof("Processing delete: %s", key) @@ -172,10 +185,17 @@ func (c *Controller) Run(stopCh <-chan struct{}) { } // StartSecretController creates the secret controller. -func StartSecretController(k8s kubernetes.Interface, addCallback addSecretCallback, removeCallback removeSecretCallback, namespace string, ctx context.Context, secretResolverType string) error { +func StartSecretController( + k8s kubernetes.Interface, + addCallback addSecretCallback, + updateCallback updateSecretCallback, + removeCallback removeSecretCallback, + namespace string, + ctx context.Context, + secretResolverType string) error { clusterStore := newClustersStore() - controller := NewController(k8s, namespace, clusterStore, addCallback, removeCallback, secretResolverType) + controller := NewController(k8s, namespace, clusterStore, addCallback, updateCallback, removeCallback, secretResolverType) go controller.Run(ctx.Done()) @@ -227,52 +247,91 @@ func (c *Controller) processItem(secretName string) error { return nil } +func (c *Controller) createRemoteCluster(kubeConfig []byte, secretName string, clusterID string, namespace string) (*RemoteCluster, *rest.Config, error) { + if len(kubeConfig) == 0 { + log.Infof("Data '%s' in the secret %s in namespace %s is empty, and disregarded ", + clusterID, secretName, namespace) + return nil, nil, errors.New("kubeconfig is empty") + } + + kubeConfig, err := c.secretResolver.FetchKubeConfig(clusterID, kubeConfig) + + if err != nil { + log.Errorf("Failed to fetch kubeconfig for cluster '%s' using secret resolver: %v, err: %v", + clusterID, c.secretResolver, err) + return nil, nil, errors.New("kubeconfig cannot be fetched") + } + + clusterConfig, err := LoadKubeConfig(kubeConfig) + + if err != nil { + log.Infof("Data '%s' in the secret %s in namespace %s is not a kubeconfig: %v", + clusterID, secretName, namespace, err) + log.Infof("KubeConfig: '%s'", string(kubeConfig)) + return nil, nil, errors.New("clusterConfig cannot be loaded") + } + + clientConfig := clientcmd.NewDefaultClientConfig(*clusterConfig, &clientcmd.ConfigOverrides{}) + + var restConfig *rest.Config + restConfig, err = clientConfig.ClientConfig() + + if err != nil { + log.Errorf("error during conversion of secret to client config: %v", err) + return nil, nil, errors.New("restConfig cannot be built") + } + + return &RemoteCluster{ + secretName: secretName, + }, restConfig, nil +} + func (c *Controller) addMemberCluster(secretName string, s *corev1.Secret) { for clusterID, kubeConfig := range s.Data { // clusterID must be unique even across multiple secrets - if _, ok := c.cs.remoteClusters[clusterID]; !ok { - if len(kubeConfig) == 0 { - log.Infof("Data '%s' in the secret %s in namespace %s is empty, and disregarded ", - clusterID, secretName, s.ObjectMeta.Namespace) - continue - } + if prev, ok := c.cs.remoteClusters[clusterID]; !ok { + log.Infof("Adding cluster_id=%v from secret=%v", clusterID, secretName) - kubeConfig, err := c.secretResolver.FetchKubeConfig(clusterID, kubeConfig) + remoteCluster, restConfig, err := c.createRemoteCluster(kubeConfig, secretName, clusterID, s.ObjectMeta.Namespace) if err != nil { - log.Errorf("Failed to fetch kubeconfig for cluster '%s' using secret resolver: %v, err: %v", - clusterID, c.secretResolver, err) + log.Errorf("Failed to add remote cluster from secret=%v for cluster_id=%v: %v", + secretName, clusterID, err) continue } - clusterConfig, err := LoadKubeConfig(kubeConfig) - if err != nil { - log.Infof("Data '%s' in the secret %s in namespace %s is not a kubeconfig: %v", - clusterID, secretName, s.ObjectMeta.Namespace, err) - log.Infof("KubeConfig: '%s'", string(kubeConfig)) + c.cs.remoteClusters[clusterID] = remoteCluster + + if err := c.addCallback(restConfig, clusterID, 2 * time.Minute); err != nil { + log.Errorf("error during secret loading for clusterID: %s %v", clusterID, err) continue } - log.Infof("Adding new cluster member: %s", clusterID) - c.cs.remoteClusters[clusterID] = &RemoteCluster{} - c.cs.remoteClusters[clusterID].secretName = secretName - clientConfig := clientcmd.NewDefaultClientConfig(*clusterConfig, &clientcmd.ConfigOverrides{}) + log.Infof("Secret loaded for cluster %s in the secret %s in namespace %s.",clusterID,c.cs.remoteClusters[clusterID].secretName, s.ObjectMeta.Namespace) + + } else { + if prev.secretName != secretName { + log.Errorf("ClusterID reused in two different secrets: %v and %v. ClusterID "+ + "must be unique across all secrets", prev.secretName, secretName) + continue + } - var restConfig *rest.Config - restConfig, err = clientConfig.ClientConfig() + log.Infof("Updating cluster %v from secret %v", clusterID, secretName) + remoteCluster, restConfig, err := c.createRemoteCluster(kubeConfig, secretName, clusterID, s.ObjectMeta.Namespace) if err != nil { - log.Errorf("error during conversion of secret to client config: %v", err) + log.Errorf("Error updating cluster_id=%v from secret=%v: %v", + clusterID, secretName, err) + continue } - err = c.addCallback(restConfig, clusterID, 2 * time.Minute) - if err != nil { - log.Errorf("error during create of clusterID: %s %v", clusterID, err) + c.cs.remoteClusters[clusterID] = remoteCluster + if err := c.updateCallback(restConfig, clusterID, 2 * time.Minute); err != nil { + log.Errorf("Error updating cluster_id from secret=%v: %s %v", + clusterID, secretName, err) } - } else { - log.Infof("Cluster %s in the secret %s in namespace %s already exists", - clusterID, c.cs.remoteClusters[clusterID].secretName, s.ObjectMeta.Namespace) } + } log.Infof("Number of remote clusters: %d", len(c.cs.remoteClusters)) } diff --git a/admiral/pkg/controller/secret/secretcontroller_test.go b/admiral/pkg/controller/secret/secretcontroller_test.go index 6a66cc4b..61f17bba 100644 --- a/admiral/pkg/controller/secret/secretcontroller_test.go +++ b/admiral/pkg/controller/secret/secretcontroller_test.go @@ -16,10 +16,13 @@ package secret import ( "context" + "fmt" "k8s.io/client-go/rest" + "sync" "testing" "time" + . "github.com/onsi/gomega" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" @@ -34,6 +37,57 @@ const secretNameSpace string = "istio-system" var testCreateControllerCalled bool var testDeleteControllerCalled bool + +func makeSecret(secret, clusterID string, kubeconfig []byte) *v1.Secret { + return &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: secret, + Namespace: secretNameSpace, + Labels: map[string]string{ + filterLabel: "true", + }, + }, + Data: map[string][]byte{ + clusterID: kubeconfig, + }, + } +} + +var ( + mu sync.Mutex + added string + updated string + deleted string +) + +func addCallback(config *rest.Config, id string, resyncPeriod time.Duration) error { + mu.Lock() + defer mu.Unlock() + added = id + return nil +} + +func updateCallback(config *rest.Config, id string, resyncPeriod time.Duration) error { + mu.Lock() + defer mu.Unlock() + updated = id + return nil +} + +func deleteCallback(id string) error { + mu.Lock() + defer mu.Unlock() + deleted = id + return nil +} + +func resetCallbackData() { + added = "" + updated = "" + deleted = "" +} + + func testCreateController(clientConfig *rest.Config, clusterID string, resyncPeriod time.Duration) error { testCreateControllerCalled = true return nil @@ -71,7 +125,15 @@ func deleteMultiClusterSecret(k8s *fake.Clientset) error { } func mockLoadKubeConfig(kubeconfig []byte) (*clientcmdapi.Config, error) { - return &clientcmdapi.Config{}, nil + config := clientcmdapi.NewConfig() + config.Clusters["clean"] = &clientcmdapi.Cluster{ + Server: "https://anything.com:8080", + } + config.Contexts["clean"] = &clientcmdapi.Context{ + Cluster: "clean", + } + config.CurrentContext = "clean" + return config, nil } func verifyControllerDeleted(t *testing.T, timeoutName string) { @@ -86,6 +148,7 @@ func verifyControllerCreated(t *testing.T, timeoutName string) { }) } +/* func Test_SecretController(t *testing.T) { LoadKubeConfig = mockLoadKubeConfig @@ -127,4 +190,89 @@ func Test_SecretController(t *testing.T) { if testCreateControllerCalled != false { t.Fatalf("Test failed on delete secret, create callback function called") } +}*/ + + +func Test_SecretController(t *testing.T) { + g := NewWithT(t) + + LoadKubeConfig = mockLoadKubeConfig + + clientset := fake.NewSimpleClientset() + + var ( + secret0 = makeSecret("s0", "c0", []byte("kubeconfig0-0")) + secret0UpdateKubeconfigChanged = makeSecret("s0", "c0", []byte("kubeconfig0-1")) + secret1 = makeSecret("s1", "c1", []byte("kubeconfig1-0")) + ) + + steps := []struct { + // only set one of these per step. The others should be nil. + add *v1.Secret + update *v1.Secret + delete *v1.Secret + + // only set one of these per step. The others should be empty. + wantAdded string + wantUpdated string + wantDeleted string + }{ + {add: secret0, wantAdded: "c0"}, + {update: secret0UpdateKubeconfigChanged, wantUpdated: "c0"}, + {add: secret1, wantAdded: "c1"}, + {delete: secret0, wantDeleted: "c0"}, + {delete: secret1, wantDeleted: "c1"}, + } + + // Start the secret controller and sleep to allow secret process to start. + g.Expect( + StartSecretController(clientset, addCallback, updateCallback, deleteCallback, secretNameSpace, context.TODO(), "")). + Should(Succeed()) + + for i, step := range steps { + resetCallbackData() + + t.Run(fmt.Sprintf("[%v]", i), func(t *testing.T) { + g := NewWithT(t) + + switch { + case step.add != nil: + _, err := clientset.CoreV1().Secrets(secretNameSpace).Create(step.add) + g.Expect(err).Should(BeNil()) + case step.update != nil: + _, err := clientset.CoreV1().Secrets(secretNameSpace).Update(step.update) + g.Expect(err).Should(BeNil()) + case step.delete != nil: + g.Expect(clientset.CoreV1().Secrets(secretNameSpace).Delete(step.delete.Name, &metav1.DeleteOptions{})). + Should(Succeed()) + } + + switch { + case step.wantAdded != "": + g.Eventually(func() string { + mu.Lock() + defer mu.Unlock() + return added + }, 10*time.Second).Should(Equal(step.wantAdded)) + case step.wantUpdated != "": + g.Eventually(func() string { + mu.Lock() + defer mu.Unlock() + return updated + }, 10*time.Second).Should(Equal(step.wantUpdated)) + case step.wantDeleted != "": + g.Eventually(func() string { + mu.Lock() + defer mu.Unlock() + return deleted + }, 10*time.Second).Should(Equal(step.wantDeleted)) + default: + g.Consistently(func() bool { + mu.Lock() + defer mu.Unlock() + return added == "" && updated == "" && deleted == "" + }).Should(Equal(true)) + } + }) + } } diff --git a/go.mod b/go.mod index e928bea7..4af7a5ec 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/mailru/easyjson v0.7.1 // indirect github.com/natefinch/lumberjack v0.0.0-20170531160350-a96e63847dc3 // indirect github.com/onsi/ginkgo v1.10.2 // indirect + github.com/onsi/gomega v1.7.0 github.com/prometheus/common v0.7.0 // indirect github.com/sirupsen/logrus v1.4.2 github.com/spf13/cobra v0.0.5