Skip to content

Commit

Permalink
Added cluster secret update function (istio-ecosystem#55)
Browse files Browse the repository at this point in the history
Signed-off-by: sa <sushanth_a@intuit.com>
  • Loading branch information
gaopan233 authored and sa committed Apr 28, 2020
1 parent b099baa commit df9eca5
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 30 deletions.
8 changes: 8 additions & 0 deletions admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func createSecretController(ctx context.Context, w *RemoteRegistry) error {

err = secret.StartSecretController(w.secretClient,
w.createCacheController,
w.updateCacheController,
w.deleteCacheController,
common.GetClusterRegistriesNamespace(),
ctx, common.GetSecretResolver())
Expand Down Expand Up @@ -191,6 +192,13 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste
return nil
}

func (r *RemoteRegistry) updateCacheController(clientConfig *rest.Config, clusterID string, resyncPeriod time.Duration) error {
if err := r.deleteCacheController(clusterID); err != nil {
return err
}
return r.createCacheController(clientConfig, clusterID, resyncPeriod)
}

func (r *RemoteRegistry) deleteCacheController(clusterID string) error {

controller, ok := r.remoteControllers[clusterID]
Expand Down
117 changes: 88 additions & 29 deletions admiral/pkg/controller/secret/secretcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ var LoadKubeConfig = clientcmd.Load
// addSecretCallback prototype for the add secret callback function.
type addSecretCallback func(config *rest.Config, dataKey string, resyncPeriod time.Duration) error

// updateSecretCallback prototype for the update secret callback function.
type updateSecretCallback func(config *rest.Config, dataKey string, resyncPeriod time.Duration) error

// removeSecretCallback prototype for the remove secret callback function.
type removeSecretCallback func(dataKey string) error

Expand All @@ -58,6 +61,7 @@ type Controller struct {
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
addCallback addSecretCallback
updateCallback updateSecretCallback
removeCallback removeSecretCallback
secretResolver resolver.SecretResolver
}
Expand Down Expand Up @@ -86,6 +90,7 @@ func NewController(
namespace string,
cs *ClusterStore,
addCallback addSecretCallback,
updateCallback updateSecretCallback,
removeCallback removeSecretCallback,
secretResolverType string) *Controller {

Expand Down Expand Up @@ -126,6 +131,7 @@ func NewController(
informer: secretsInformer,
queue: queue,
addCallback: addCallback,
updateCallback: updateCallback,
removeCallback: removeCallback,
secretResolver: secretResolver,
}
Expand All @@ -139,6 +145,13 @@ func NewController(
queue.Add(key)
}
},
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
log.Infof("Processing update: %s", key)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
log.Infof("Processing delete: %s", key)
Expand Down Expand Up @@ -172,10 +185,17 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}

// StartSecretController creates the secret controller.
func StartSecretController(k8s kubernetes.Interface, addCallback addSecretCallback, removeCallback removeSecretCallback, namespace string, ctx context.Context, secretResolverType string) error {
func StartSecretController(
k8s kubernetes.Interface,
addCallback addSecretCallback,
updateCallback updateSecretCallback,
removeCallback removeSecretCallback,
namespace string,
ctx context.Context,
secretResolverType string) error {

clusterStore := newClustersStore()
controller := NewController(k8s, namespace, clusterStore, addCallback, removeCallback, secretResolverType)
controller := NewController(k8s, namespace, clusterStore, addCallback, updateCallback, removeCallback, secretResolverType)

go controller.Run(ctx.Done())

Expand Down Expand Up @@ -227,52 +247,91 @@ func (c *Controller) processItem(secretName string) error {
return nil
}

func (c *Controller) createRemoteCluster(kubeConfig []byte, secretName string, clusterID string, namespace string) (*RemoteCluster, *rest.Config, error) {
if len(kubeConfig) == 0 {
log.Infof("Data '%s' in the secret %s in namespace %s is empty, and disregarded ",
clusterID, secretName, namespace)
return nil, nil, errors.New("kubeconfig is empty")
}

kubeConfig, err := c.secretResolver.FetchKubeConfig(clusterID, kubeConfig)

if err != nil {
log.Errorf("Failed to fetch kubeconfig for cluster '%s' using secret resolver: %v, err: %v",
clusterID, c.secretResolver, err)
return nil, nil, errors.New("kubeconfig cannot be fetched")
}

clusterConfig, err := LoadKubeConfig(kubeConfig)

if err != nil {
log.Infof("Data '%s' in the secret %s in namespace %s is not a kubeconfig: %v",
clusterID, secretName, namespace, err)
log.Infof("KubeConfig: '%s'", string(kubeConfig))
return nil, nil, errors.New("clusterConfig cannot be loaded")
}

clientConfig := clientcmd.NewDefaultClientConfig(*clusterConfig, &clientcmd.ConfigOverrides{})

var restConfig *rest.Config
restConfig, err = clientConfig.ClientConfig()

if err != nil {
log.Errorf("error during conversion of secret to client config: %v", err)
return nil, nil, errors.New("restConfig cannot be built")
}

return &RemoteCluster{
secretName: secretName,
}, restConfig, nil
}

func (c *Controller) addMemberCluster(secretName string, s *corev1.Secret) {
for clusterID, kubeConfig := range s.Data {
// clusterID must be unique even across multiple secrets
if _, ok := c.cs.remoteClusters[clusterID]; !ok {
if len(kubeConfig) == 0 {
log.Infof("Data '%s' in the secret %s in namespace %s is empty, and disregarded ",
clusterID, secretName, s.ObjectMeta.Namespace)
continue
}
if prev, ok := c.cs.remoteClusters[clusterID]; !ok {
log.Infof("Adding cluster_id=%v from secret=%v", clusterID, secretName)

kubeConfig, err := c.secretResolver.FetchKubeConfig(clusterID, kubeConfig)
remoteCluster, restConfig, err := c.createRemoteCluster(kubeConfig, secretName, clusterID, s.ObjectMeta.Namespace)

if err != nil {
log.Errorf("Failed to fetch kubeconfig for cluster '%s' using secret resolver: %v, err: %v",
clusterID, c.secretResolver, err)
log.Errorf("Failed to add remote cluster from secret=%v for cluster_id=%v: %v",
secretName, clusterID, err)
continue
}

clusterConfig, err := LoadKubeConfig(kubeConfig)
if err != nil {
log.Infof("Data '%s' in the secret %s in namespace %s is not a kubeconfig: %v",
clusterID, secretName, s.ObjectMeta.Namespace, err)
log.Infof("KubeConfig: '%s'", string(kubeConfig))
c.cs.remoteClusters[clusterID] = remoteCluster

if err := c.addCallback(restConfig, clusterID, 2 * time.Minute); err != nil {
log.Errorf("error during secret loading for clusterID: %s %v", clusterID, err)
continue
}

log.Infof("Adding new cluster member: %s", clusterID)
c.cs.remoteClusters[clusterID] = &RemoteCluster{}
c.cs.remoteClusters[clusterID].secretName = secretName
clientConfig := clientcmd.NewDefaultClientConfig(*clusterConfig, &clientcmd.ConfigOverrides{})
log.Infof("Secret loaded for cluster %s in the secret %s in namespace %s.",clusterID,c.cs.remoteClusters[clusterID].secretName, s.ObjectMeta.Namespace)

} else {
if prev.secretName != secretName {
log.Errorf("ClusterID reused in two different secrets: %v and %v. ClusterID "+
"must be unique across all secrets", prev.secretName, secretName)
continue
}

var restConfig *rest.Config
restConfig, err = clientConfig.ClientConfig()
log.Infof("Updating cluster %v from secret %v", clusterID, secretName)

remoteCluster, restConfig, err := c.createRemoteCluster(kubeConfig, secretName, clusterID, s.ObjectMeta.Namespace)
if err != nil {
log.Errorf("error during conversion of secret to client config: %v", err)
log.Errorf("Error updating cluster_id=%v from secret=%v: %v",
clusterID, secretName, err)
continue
}

err = c.addCallback(restConfig, clusterID, 2 * time.Minute)
if err != nil {
log.Errorf("error during create of clusterID: %s %v", clusterID, err)
c.cs.remoteClusters[clusterID] = remoteCluster
if err := c.updateCallback(restConfig, clusterID, 2 * time.Minute); err != nil {
log.Errorf("Error updating cluster_id from secret=%v: %s %v",
clusterID, secretName, err)
}
} else {
log.Infof("Cluster %s in the secret %s in namespace %s already exists",
clusterID, c.cs.remoteClusters[clusterID].secretName, s.ObjectMeta.Namespace)
}

}
log.Infof("Number of remote clusters: %d", len(c.cs.remoteClusters))
}
Expand Down
150 changes: 149 additions & 1 deletion admiral/pkg/controller/secret/secretcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ package secret

import (
"context"
"fmt"
"k8s.io/client-go/rest"
"sync"
"testing"
"time"

. "github.com/onsi/gomega"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
Expand All @@ -34,6 +37,57 @@ const secretNameSpace string = "istio-system"
var testCreateControllerCalled bool
var testDeleteControllerCalled bool


func makeSecret(secret, clusterID string, kubeconfig []byte) *v1.Secret {
return &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secret,
Namespace: secretNameSpace,
Labels: map[string]string{
filterLabel: "true",
},
},
Data: map[string][]byte{
clusterID: kubeconfig,
},
}
}

var (
mu sync.Mutex
added string
updated string
deleted string
)

func addCallback(config *rest.Config, id string, resyncPeriod time.Duration) error {
mu.Lock()
defer mu.Unlock()
added = id
return nil
}

func updateCallback(config *rest.Config, id string, resyncPeriod time.Duration) error {
mu.Lock()
defer mu.Unlock()
updated = id
return nil
}

func deleteCallback(id string) error {
mu.Lock()
defer mu.Unlock()
deleted = id
return nil
}

func resetCallbackData() {
added = ""
updated = ""
deleted = ""
}


func testCreateController(clientConfig *rest.Config, clusterID string, resyncPeriod time.Duration) error {
testCreateControllerCalled = true
return nil
Expand Down Expand Up @@ -71,7 +125,15 @@ func deleteMultiClusterSecret(k8s *fake.Clientset) error {
}

func mockLoadKubeConfig(kubeconfig []byte) (*clientcmdapi.Config, error) {
return &clientcmdapi.Config{}, nil
config := clientcmdapi.NewConfig()
config.Clusters["clean"] = &clientcmdapi.Cluster{
Server: "https://anything.com:8080",
}
config.Contexts["clean"] = &clientcmdapi.Context{
Cluster: "clean",
}
config.CurrentContext = "clean"
return config, nil
}

func verifyControllerDeleted(t *testing.T, timeoutName string) {
Expand All @@ -86,6 +148,7 @@ func verifyControllerCreated(t *testing.T, timeoutName string) {
})
}

/*
func Test_SecretController(t *testing.T) {
LoadKubeConfig = mockLoadKubeConfig
Expand Down Expand Up @@ -127,4 +190,89 @@ func Test_SecretController(t *testing.T) {
if testCreateControllerCalled != false {
t.Fatalf("Test failed on delete secret, create callback function called")
}
}*/


func Test_SecretController(t *testing.T) {
g := NewWithT(t)

LoadKubeConfig = mockLoadKubeConfig

clientset := fake.NewSimpleClientset()

var (
secret0 = makeSecret("s0", "c0", []byte("kubeconfig0-0"))
secret0UpdateKubeconfigChanged = makeSecret("s0", "c0", []byte("kubeconfig0-1"))
secret1 = makeSecret("s1", "c1", []byte("kubeconfig1-0"))
)

steps := []struct {
// only set one of these per step. The others should be nil.
add *v1.Secret
update *v1.Secret
delete *v1.Secret

// only set one of these per step. The others should be empty.
wantAdded string
wantUpdated string
wantDeleted string
}{
{add: secret0, wantAdded: "c0"},
{update: secret0UpdateKubeconfigChanged, wantUpdated: "c0"},
{add: secret1, wantAdded: "c1"},
{delete: secret0, wantDeleted: "c0"},
{delete: secret1, wantDeleted: "c1"},
}

// Start the secret controller and sleep to allow secret process to start.
g.Expect(
StartSecretController(clientset, addCallback, updateCallback, deleteCallback, secretNameSpace, context.TODO(), "")).
Should(Succeed())

for i, step := range steps {
resetCallbackData()

t.Run(fmt.Sprintf("[%v]", i), func(t *testing.T) {
g := NewWithT(t)

switch {
case step.add != nil:
_, err := clientset.CoreV1().Secrets(secretNameSpace).Create(step.add)
g.Expect(err).Should(BeNil())
case step.update != nil:
_, err := clientset.CoreV1().Secrets(secretNameSpace).Update(step.update)
g.Expect(err).Should(BeNil())
case step.delete != nil:
g.Expect(clientset.CoreV1().Secrets(secretNameSpace).Delete(step.delete.Name, &metav1.DeleteOptions{})).
Should(Succeed())
}

switch {
case step.wantAdded != "":
g.Eventually(func() string {
mu.Lock()
defer mu.Unlock()
return added
}, 10*time.Second).Should(Equal(step.wantAdded))
case step.wantUpdated != "":
g.Eventually(func() string {
mu.Lock()
defer mu.Unlock()
return updated
}, 10*time.Second).Should(Equal(step.wantUpdated))
case step.wantDeleted != "":
g.Eventually(func() string {
mu.Lock()
defer mu.Unlock()
return deleted
}, 10*time.Second).Should(Equal(step.wantDeleted))
default:
g.Consistently(func() bool {
mu.Lock()
defer mu.Unlock()
return added == "" && updated == "" && deleted == ""
}).Should(Equal(true))
}
})
}
}
Loading

0 comments on commit df9eca5

Please sign in to comment.