From 890202f2e4f7321932c03bb2cfeb1c62f2a186ed Mon Sep 17 00:00:00 2001 From: Nolan Brubaker Date: Fri, 7 Sep 2018 10:42:57 -0400 Subject: [PATCH] Wait for PV/namespace to delete before restore If a PV already exists, wait for it, it's associated PVC, and associated namespace to be deleted before attempting to restore it. If a namespace already exists, wait for it to be deleted before attempting to restore it. Signed-off-by: Nolan Brubaker --- changelogs/unreleased/826-nrb | 1 + pkg/cmd/server/server.go | 25 +-- pkg/restore/restore.go | 211 ++++++++++++++++------ pkg/restore/restore_test.go | 308 +++++++++++++++++++++++++++++--- pkg/util/kube/utils.go | 58 +++++- pkg/util/kube/utils_test.go | 97 +++++++++- pkg/util/test/fake_namespace.go | 77 ++++++++ 7 files changed, 677 insertions(+), 100 deletions(-) create mode 100644 changelogs/unreleased/826-nrb create mode 100644 pkg/util/test/fake_namespace.go diff --git a/changelogs/unreleased/826-nrb b/changelogs/unreleased/826-nrb new file mode 100644 index 0000000000..6d5356c681 --- /dev/null +++ b/changelogs/unreleased/826-nrb @@ -0,0 +1 @@ +Wait for PVs and namespaces to delete before attempting to restore them. diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 867a763ecd..e2e543bf4e 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -74,8 +74,9 @@ const ( // the port where prometheus metrics are exposed defaultMetricsAddress = ":8085" - defaultBackupSyncPeriod = time.Minute - defaultPodVolumeOperationTimeout = 60 * time.Minute + defaultBackupSyncPeriod = time.Minute + defaultPodVolumeOperationTimeout = 60 * time.Minute + defaultResourceTerminatingTimeout = 10 * time.Minute // server's client default qps and burst defaultClientQPS float32 = 20.0 @@ -85,14 +86,14 @@ const ( ) type serverConfig struct { - pluginDir, metricsAddress, defaultBackupLocation string - backupSyncPeriod, podVolumeOperationTimeout time.Duration - restoreResourcePriorities []string - defaultVolumeSnapshotLocations map[string]string - restoreOnly bool - clientQPS float32 - clientBurst int - profilerAddress string + pluginDir, metricsAddress, defaultBackupLocation string + backupSyncPeriod, podVolumeOperationTimeout, resourceTerminatingTimeout time.Duration + restoreResourcePriorities []string + defaultVolumeSnapshotLocations map[string]string + restoreOnly bool + clientQPS float32 + clientBurst int + profilerAddress string } func NewCommand() *cobra.Command { @@ -110,6 +111,7 @@ func NewCommand() *cobra.Command { clientQPS: defaultClientQPS, clientBurst: defaultClientBurst, profilerAddress: defaultProfilerAddress, + resourceTerminatingTimeout: defaultResourceTerminatingTimeout, } ) @@ -168,6 +170,7 @@ func NewCommand() *cobra.Command { command.Flags().Float32Var(&config.clientQPS, "client-qps", config.clientQPS, "maximum number of requests per second by the server to the Kubernetes API once the burst limit has been reached") command.Flags().IntVar(&config.clientBurst, "client-burst", config.clientBurst, "maximum number of requests by the server to the Kubernetes API in a short period of time") command.Flags().StringVar(&config.profilerAddress, "profiler-address", config.profilerAddress, "the address to expose the pprof profiler") + command.Flags().DurationVar(&config.resourceTerminatingTimeout, "terminating-resource-timeout", config.resourceTerminatingTimeout, "how long to wait on persistent volumes and namespaces to terminate during a restore before timing out") return command } @@ -615,7 +618,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string backupDeletionController.Run(ctx, 1) wg.Done() }() - } restorer, err := restore.NewKubernetesRestorer( @@ -625,6 +627,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string s.kubeClient.CoreV1().Namespaces(), s.resticManager, s.config.podVolumeOperationTimeout, + s.config.resourceTerminatingTimeout, s.logger, ) cmd.CheckError(err) diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index 3f0ce759e9..c939ad9373 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -42,6 +42,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" kubeerrs "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -83,14 +84,15 @@ type kindString string // kubernetesRestorer implements Restorer for restoring into a Kubernetes cluster. type kubernetesRestorer struct { - discoveryHelper discovery.Helper - dynamicFactory client.DynamicFactory - namespaceClient corev1.NamespaceInterface - resticRestorerFactory restic.RestorerFactory - resticTimeout time.Duration - resourcePriorities []string - fileSystem filesystem.Interface - logger logrus.FieldLogger + discoveryHelper discovery.Helper + dynamicFactory client.DynamicFactory + namespaceClient corev1.NamespaceInterface + resticRestorerFactory restic.RestorerFactory + resticTimeout time.Duration + resourceTerminatingTimeout time.Duration + resourcePriorities []string + fileSystem filesystem.Interface + logger logrus.FieldLogger } // prioritizeResources returns an ordered, fully-resolved list of resources to restore based on @@ -159,17 +161,19 @@ func NewKubernetesRestorer( namespaceClient corev1.NamespaceInterface, resticRestorerFactory restic.RestorerFactory, resticTimeout time.Duration, + resourceTerminatingTimeout time.Duration, logger logrus.FieldLogger, ) (Restorer, error) { return &kubernetesRestorer{ - discoveryHelper: discoveryHelper, - dynamicFactory: dynamicFactory, - namespaceClient: namespaceClient, - resticRestorerFactory: resticRestorerFactory, - resticTimeout: resticTimeout, - resourcePriorities: resourcePriorities, - logger: logger, - fileSystem: filesystem.NewFileSystem(), + discoveryHelper: discoveryHelper, + dynamicFactory: dynamicFactory, + namespaceClient: namespaceClient, + resticRestorerFactory: resticRestorerFactory, + resticTimeout: resticTimeout, + resourceTerminatingTimeout: resourceTerminatingTimeout, + resourcePriorities: resourcePriorities, + logger: logger, + fileSystem: filesystem.NewFileSystem(), }, nil } @@ -245,21 +249,22 @@ func (kr *kubernetesRestorer) Restore( } restoreCtx := &context{ - backup: backup, - backupReader: backupReader, - restore: restore, - prioritizedResources: prioritizedResources, - selector: selector, - log: log, - dynamicFactory: kr.dynamicFactory, - fileSystem: kr.fileSystem, - namespaceClient: kr.namespaceClient, - actions: resolvedActions, - blockStoreGetter: blockStoreGetter, - resticRestorer: resticRestorer, - pvsToProvision: sets.NewString(), - pvRestorer: pvRestorer, - volumeSnapshots: volumeSnapshots, + backup: backup, + backupReader: backupReader, + restore: restore, + prioritizedResources: prioritizedResources, + selector: selector, + log: log, + dynamicFactory: kr.dynamicFactory, + fileSystem: kr.fileSystem, + namespaceClient: kr.namespaceClient, + actions: resolvedActions, + blockStoreGetter: blockStoreGetter, + resticRestorer: resticRestorer, + pvsToProvision: sets.NewString(), + pvRestorer: pvRestorer, + volumeSnapshots: volumeSnapshots, + resourceTerminatingTimeout: kr.resourceTerminatingTimeout, } return restoreCtx.execute() @@ -327,24 +332,25 @@ func resolveActions(actions []ItemAction, helper discovery.Helper) ([]resolvedAc } type context struct { - backup *api.Backup - backupReader io.Reader - restore *api.Restore - prioritizedResources []schema.GroupResource - selector labels.Selector - log logrus.FieldLogger - dynamicFactory client.DynamicFactory - fileSystem filesystem.Interface - namespaceClient corev1.NamespaceInterface - actions []resolvedAction - blockStoreGetter BlockStoreGetter - resticRestorer restic.Restorer - globalWaitGroup velerosync.ErrorGroup - resourceWaitGroup sync.WaitGroup - resourceWatches []watch.Interface - pvsToProvision sets.String - pvRestorer PVRestorer - volumeSnapshots []*volume.Snapshot + backup *api.Backup + backupReader io.Reader + restore *api.Restore + prioritizedResources []schema.GroupResource + selector labels.Selector + log logrus.FieldLogger + dynamicFactory client.DynamicFactory + fileSystem filesystem.Interface + namespaceClient corev1.NamespaceInterface + actions []resolvedAction + blockStoreGetter BlockStoreGetter + resticRestorer restic.Restorer + globalWaitGroup velerosync.ErrorGroup + resourceWaitGroup sync.WaitGroup + resourceWatches []watch.Interface + pvsToProvision sets.String + pvRestorer PVRestorer + volumeSnapshots []*volume.Snapshot + resourceTerminatingTimeout time.Duration } func (ctx *context) execute() (api.RestoreResult, api.RestoreResult) { @@ -474,7 +480,7 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe if !existingNamespaces.Has(mappedNsName) { logger := ctx.log.WithField("namespace", nsName) ns := getNamespace(logger, filepath.Join(dir, api.ResourcesDir, "namespaces", api.ClusterScopedDir, nsName+".json"), mappedNsName) - if _, err := kube.EnsureNamespaceExists(ns, ctx.namespaceClient); err != nil { + if _, err := kube.EnsureNamespaceExistsAndIsReady(ns, ctx.namespaceClient, ctx.resourceTerminatingTimeout); err != nil { addVeleroError(&errs, err) continue } @@ -578,6 +584,102 @@ func addToResult(r *api.RestoreResult, ns string, e error) { } } +func (ctx *context) shouldRestore(name string, pvClient client.Dynamic) (bool, error) { + pvLogger := ctx.log.WithField("pvName", name) + + var shouldRestore bool + err := wait.PollImmediate(time.Second, ctx.resourceTerminatingTimeout, func() (bool, error) { + clusterPV, err := pvClient.Get(name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + pvLogger.Debug("PV not found, safe to restore") + // PV not found, can safely exit loop and proceed with restore. + shouldRestore = true + return true, nil + } + if err != nil { + return false, errors.Wrapf(err, "could not retrieve in-cluster copy of PV %s", name) + + } + phase, err := collections.GetString(clusterPV.UnstructuredContent(), "status.phase") + if err != nil { + // Break the loop since we couldn't read the phase + return false, errors.Wrapf(err, "error getting phase for in-cluster PV %s", name) + } + + if phase == string(v1.VolumeReleased) || clusterPV.GetDeletionTimestamp() != nil { + // PV was found and marked for deletion, or it was released; wait for it to go away. + pvLogger.Debugf("PV found, but marked for deletion, waiting") + return false, nil + } + + // Check for the namespace and PVC to see if anything that's referencing the PV is deleting. + // If either the namespace or PVC is in a deleting/terminating state, wait for them to finish before + // trying to restore the PV + // Not doing so may result in the underlying PV disappearing but not restoring due to timing issues, + // then the PVC getting restored and showing as lost. + namespace, err := collections.GetString(clusterPV.UnstructuredContent(), "spec.claimRef.namespace") + if err != nil { + return false, errors.Wrapf(err, "error looking up namespace name for in-cluster PV %s", name) + } + pvcName, err := collections.GetString(clusterPV.UnstructuredContent(), "spec.claimRef.name") + if err != nil { + return false, errors.Wrapf(err, "error looking up persistentvolumeclaim for in-cluster PV %s", name) + } + + // Have to create the PVC client here because we don't know what namespace we're using til we get to this point. + // Using a dynamic client since it's easier to mock for testing + pvcResource := metav1.APIResource{Name: "persistentvolumeclaims", Namespaced: true} + pvcClient, err := ctx.dynamicFactory.ClientForGroupVersionResource(schema.GroupVersion{Group: "", Version: "v1"}, pvcResource, namespace) + if err != nil { + return false, errors.Wrapf(err, "error getting pvc client") + } + + pvc, err := pvcClient.Get(pvcName, metav1.GetOptions{}) + + if apierrors.IsNotFound(err) { + pvLogger.Debugf("PVC %s for PV not found, waiting", pvcName) + // PVC wasn't found, but the PV still exists, so continue to wait. + return false, nil + } + if err != nil { + return false, errors.Wrapf(err, "error getting claim %s for persistent volume", pvcName) + } + + if pvc != nil && pvc.GetDeletionTimestamp() != nil { + pvLogger.Debugf("PVC for PV marked for deletion, waiting") + // PVC is still deleting, continue to wait. + return false, nil + } + + // Check the namespace associated with the claimRef to see if it's deleting/terminating before proceeding + ns, err := ctx.namespaceClient.Get(namespace, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + pvLogger.Debugf("namespace %s for PV not found, waiting", namespace) + // namespace not found but the PV still exists, so continue to wait + return false, nil + } + if err != nil { + return false, errors.Wrapf(err, "error getting namespace %s associated with PV %s", namespace, name) + } + + if ns != nil && (ns.GetDeletionTimestamp() != nil || ns.Status.Phase == v1.NamespaceTerminating) { + pvLogger.Debugf("namespace %s associated with PV is deleting, waiting", namespace) + // namespace is in the process of deleting, keep looping + return false, nil + } + + // None of the PV, PVC, or NS are marked for deletion, break the loop. + pvLogger.Debug("PV, associated PVC and namespace are not marked for deletion") + return true, nil + }) + + if err == wait.ErrWaitTimeout { + pvLogger.Debug("timeout reached waiting for persistent volume to delete") + } + + return shouldRestore, err +} + // restoreResource restores the specified cluster or namespace scoped resource. If namespace is // empty we are restoring a cluster level resource, otherwise into the specified namespace. func (ctx *context) restoreResource(resource, namespace, resourcePath string) (api.RestoreResult, api.RestoreResult) { @@ -696,10 +798,15 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a // Check if the PV exists in the cluster before attempting to create // a volume from the snapshot, in order to avoid orphaned volumes (GH #609) - _, err := resourceClient.Get(name, metav1.GetOptions{}) + shouldRestoreSnapshot, err := ctx.shouldRestore(name, resourceClient) + + if err != nil { + addToResult(&errs, namespace, errors.Wrapf(err, "error waiting on in-cluster persistentvolume %s", name)) + continue + } // PV's existence will be recorded later. Just skip the volume restore logic. - if apierrors.IsNotFound(err) { + if shouldRestoreSnapshot { // restore the PV from snapshot (if applicable) updatedObj, err := ctx.pvRestorer.executePVAction(obj) if err != nil { diff --git a/pkg/restore/restore_test.go b/pkg/restore/restore_test.go index c8f8eb2dae..475365b9bf 100644 --- a/pkg/restore/restore_test.go +++ b/pkg/restore/restore_test.go @@ -188,14 +188,18 @@ func TestRestoreNamespaceFiltering(t *testing.T) { t.Run(test.name, func(t *testing.T) { log := velerotest.NewLogger() + nsClient := &velerotest.FakeNamespaceClient{} + ctx := &context{ restore: test.restore, - namespaceClient: &fakeNamespaceClient{}, + namespaceClient: nsClient, fileSystem: test.fileSystem, log: log, prioritizedResources: test.prioritizedResources, } + nsClient.On("Get", mock.Anything, metav1.GetOptions{}).Return(&v1.Namespace{}, nil) + warnings, errors := ctx.restoreFromDir(test.baseDir) assert.Empty(t, warnings.Velero) @@ -280,14 +284,18 @@ func TestRestorePriority(t *testing.T) { t.Run(test.name, func(t *testing.T) { log := velerotest.NewLogger() + nsClient := &velerotest.FakeNamespaceClient{} + ctx := &context{ restore: test.restore, - namespaceClient: &fakeNamespaceClient{}, + namespaceClient: nsClient, fileSystem: test.fileSystem, prioritizedResources: test.prioritizedResources, log: log, } + nsClient.On("Get", mock.Anything, metav1.GetOptions{}).Return(&v1.Namespace{}, nil) + warnings, errors := ctx.restoreFromDir(test.baseDir) assert.Empty(t, warnings.Velero) @@ -324,19 +332,23 @@ func TestNamespaceRemapping(t *testing.T) { gv := schema.GroupVersion{Group: "", Version: "v1"} dynamicFactory.On("ClientForGroupVersionResource", gv, resource, expectedNS).Return(resourceClient, nil) - namespaceClient := &fakeNamespaceClient{} + nsClient := &velerotest.FakeNamespaceClient{} ctx := &context{ dynamicFactory: dynamicFactory, fileSystem: fileSystem, selector: labelSelector, - namespaceClient: namespaceClient, + namespaceClient: nsClient, prioritizedResources: prioritizedResources, restore: restore, backup: &api.Backup{}, log: velerotest.NewLogger(), } + nsClient.On("Get", "ns-2", metav1.GetOptions{}).Return(&v1.Namespace{}, k8serrors.NewNotFound(schema.GroupResource{Resource: "namespaces"}, "ns-2")) + ns := newTestNamespace("ns-2").Namespace + nsClient.On("Create", ns).Return(ns, nil) + warnings, errors := ctx.restoreFromDir(baseDir) assert.Empty(t, warnings.Velero) @@ -347,8 +359,7 @@ func TestNamespaceRemapping(t *testing.T) { assert.Empty(t, errors.Namespaces) // ensure the remapped NS (only) was created via the namespaceClient - assert.Equal(t, 1, len(namespaceClient.createdNamespaces)) - assert.Equal(t, "ns-2", namespaceClient.createdNamespaces[0].Name) + nsClient.AssertExpectations(t) // ensure that we did not try to create namespaces via dynamic client dynamicFactory.AssertNotCalled(t, "ClientForGroupVersionResource", gv, metav1.APIResource{Name: "namespaces", Namespaced: true}, "") @@ -606,11 +617,11 @@ func TestRestoreResourceForNamespace(t *testing.T) { pvResource := metav1.APIResource{Name: "persistentvolumes", Namespaced: false} dynamicFactory.On("ClientForGroupVersionResource", gv, pvResource, test.namespace).Return(resourceClient, nil) resourceClient.On("Watch", metav1.ListOptions{}).Return(&fakeWatch{}, nil) + if test.resourcePath == "persistentvolumes" { + resourceClient.On("Get", mock.Anything, metav1.GetOptions{}).Return(&unstructured.Unstructured{}, k8serrors.NewNotFound(schema.GroupResource{Resource: "persistentvolumes"}, "")) + } // Assume the persistentvolume doesn't already exist in the cluster. - var empty *unstructured.Unstructured - resourceClient.On("Get", newTestPV().PersistentVolume.Name, metav1.GetOptions{}).Return(empty, nil) - saResource := metav1.APIResource{Name: "serviceaccounts", Namespaced: true} dynamicFactory.On("ClientForGroupVersionResource", gv, saResource, test.namespace).Return(resourceClient, nil) @@ -947,6 +958,14 @@ status: pvcBytes, err := json.Marshal(pvcObj) require.NoError(t, err) + unstructuredPVCMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pvcObj) + require.NoError(t, err) + unstructuredPVC := &unstructured.Unstructured{Object: unstructuredPVCMap} + + nsClient := &velerotest.FakeNamespaceClient{} + ns := newTestNamespace(pvcObj.Namespace).Namespace + nsClient.On("Get", pvcObj.Namespace, mock.Anything).Return(ns, nil) + backup := &api.Backup{} if test.haveSnapshot && test.legacyBackup { backup.Status.VolumeBackups = map[string]*api.VolumeBackupInfo{ @@ -976,10 +995,11 @@ status: Name: "my-restore", }, }, - backup: backup, - log: velerotest.NewLogger(), - pvsToProvision: sets.NewString(), - pvRestorer: pvRestorer, + backup: backup, + log: velerotest.NewLogger(), + pvsToProvision: sets.NewString(), + pvRestorer: pvRestorer, + namespaceClient: nsClient, } if test.haveSnapshot && !test.legacyBackup { @@ -1001,8 +1021,12 @@ status: unstructuredPV := &unstructured.Unstructured{Object: unstructuredPVMap} if test.expectPVFound { - pvClient.On("Get", unstructuredPV.GetName(), metav1.GetOptions{}).Return(unstructuredPV, nil) - pvClient.On("Create", mock.Anything).Return(unstructuredPV, k8serrors.NewAlreadyExists(kuberesource.PersistentVolumes, unstructuredPV.GetName())) + // Copy the PV so that later modifcations don't affect what's returned by our faked calls. + inClusterPV := unstructuredPV.DeepCopy() + pvClient.On("Get", inClusterPV.GetName(), metav1.GetOptions{}).Return(inClusterPV, nil) + pvClient.On("Create", mock.Anything).Return(inClusterPV, k8serrors.NewAlreadyExists(kuberesource.PersistentVolumes, inClusterPV.GetName())) + inClusterPVC := unstructuredPVC.DeepCopy() + pvcClient.On("Get", pvcObj.Name, mock.Anything).Return(inClusterPVC, nil) } // Only set up the client expectation if the test has the proper prerequisites @@ -1046,12 +1070,7 @@ status: assert.Empty(t, warnings.Velero) assert.Empty(t, warnings.Namespaces) assert.Equal(t, api.RestoreResult{}, errors) - - if test.expectPVFound { - assert.Equal(t, 1, len(warnings.Cluster)) - } else { - assert.Empty(t, warnings.Cluster) - } + assert.Empty(t, warnings.Cluster) // Prep PVC restore // Handle expectations @@ -1062,9 +1081,10 @@ status: delete(pvcObj.Annotations, key) } - unstructuredPVCMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pvcObj) + // Recreate the unstructured PVC since the object was edited. + unstructuredPVCMap, err = runtime.DefaultUnstructuredConverter.ToUnstructured(pvcObj) require.NoError(t, err) - unstructuredPVC := &unstructured.Unstructured{Object: unstructuredPVCMap} + unstructuredPVC = &unstructured.Unstructured{Object: unstructuredPVCMap} resetMetadataAndStatus(unstructuredPVC) addRestoreLabels(unstructuredPVC, ctx.restore.Name, ctx.restore.Spec.BackupName) @@ -1576,6 +1596,244 @@ func TestIsPVReady(t *testing.T) { } } +func TestShouldRestore(t *testing.T) { + pv := `apiVersion: v1 +kind: PersistentVolume +metadata: + annotations: + EXPORT_block: "\nEXPORT\n{\n\tExport_Id = 1;\n\tPath = /export/pvc-6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce;\n\tPseudo + = /export/pvc-6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce;\n\tAccess_Type = RW;\n\tSquash + = no_root_squash;\n\tSecType = sys;\n\tFilesystem_id = 1.1;\n\tFSAL {\n\t\tName + = VFS;\n\t}\n}\n" + Export_Id: "1" + Project_Id: "0" + Project_block: "" + Provisioner_Id: 5fdf4025-78a5-11e8-9ece-0242ac110004 + kubernetes.io/createdby: nfs-dynamic-provisioner + pv.kubernetes.io/provisioned-by: example.com/nfs + volume.beta.kubernetes.io/mount-options: vers=4.1 + creationTimestamp: 2018-06-25T18:27:35Z + finalizers: + - kubernetes.io/pv-protection + name: pvc-6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce + resourceVersion: "2576" + selfLink: /api/v1/persistentvolumes/pvc-6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce + uid: 6ecd24e4-78a5-11e8-a0d8-e2ad1e9734ce +spec: + accessModes: + - ReadWriteMany + capacity: + storage: 1Mi + claimRef: + apiVersion: v1 + kind: PersistentVolumeClaim + name: nfs + namespace: default + resourceVersion: "2565" + uid: 6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce + nfs: + path: /export/pvc-6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce + server: 10.103.235.254 + storageClassName: example-nfs +status: + phase: Bound` + + pvc := `apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + annotations: + control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"5fdf5572-78a5-11e8-9ece-0242ac110004","leaseDurationSeconds":15,"acquireTime":"2018-06-25T18:27:35Z","renewTime":"2018-06-25T18:27:37Z","leaderTransitions":0}' + kubectl.kubernetes.io/last-applied-configuration: | + {"apiVersion":"v1","kind":"PersistentVolumeClaim","metadata":{"annotations":{},"name":"nfs","namespace":"default"},"spec":{"accessModes":["ReadWriteMany"],"resources":{"requests":{"storage":"1Mi"}},"storageClassName":"example-nfs"}} + pv.kubernetes.io/bind-completed: "yes" + pv.kubernetes.io/bound-by-controller: "yes" + volume.beta.kubernetes.io/storage-provisioner: example.com/nfs + creationTimestamp: 2018-06-25T18:27:28Z + finalizers: + - kubernetes.io/pvc-protection + name: nfs + namespace: default + resourceVersion: "2578" + selfLink: /api/v1/namespaces/default/persistentvolumeclaims/nfs + uid: 6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce +spec: + accessModes: + - ReadWriteMany + resources: + requests: + storage: 1Mi + storageClassName: example-nfs + volumeName: pvc-6a74b5af-78a5-11e8-a0d8-e2ad1e9734ce +status: + accessModes: + - ReadWriteMany + capacity: + storage: 1Mi + phase: Bound` + + tests := []struct { + name string + expectNSFound bool + expectPVFound bool + pvPhase string + expectPVCFound bool + expectPVCGet bool + expectPVCDeleting bool + expectNSGet bool + expectNSDeleting bool + nsPhase v1.NamespacePhase + expectedResult bool + }{ + { + name: "pv not found, no associated pvc or namespace", + expectedResult: true, + }, + { + name: "pv found, phase released", + pvPhase: string(v1.VolumeReleased), + expectPVFound: true, + expectedResult: false, + }, + { + name: "pv found, has associated pvc and namespace that's aren't deleting", + expectPVFound: true, + expectPVCGet: true, + expectNSGet: true, + expectPVCFound: true, + expectedResult: false, + }, + { + name: "pv found, has associated pvc that's deleting, don't look up namespace", + expectPVFound: true, + expectPVCGet: true, + expectPVCFound: true, + expectPVCDeleting: true, + expectedResult: false, + }, + { + name: "pv found, has associated pvc that's not deleting, has associated namespace that's terminating", + expectPVFound: true, + expectPVCGet: true, + expectPVCFound: true, + expectNSGet: true, + expectNSFound: true, + nsPhase: v1.NamespaceTerminating, + expectedResult: false, + }, + { + name: "pv found, has associated pvc that's not deleting, has associated namespace that has deletion timestamp", + expectPVFound: true, + expectPVCGet: true, + expectPVCFound: true, + expectNSGet: true, + expectNSFound: true, + expectNSDeleting: true, + expectedResult: false, + }, + { + name: "pv found, associated pvc not found, namespace not queried", + expectPVFound: true, + expectPVCGet: true, + expectedResult: false, + }, + { + name: "pv found, associated pvc found, namespace not found", + expectPVFound: true, + expectPVCGet: true, + expectPVCFound: true, + expectNSGet: true, + expectedResult: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dynamicFactory := &velerotest.FakeDynamicFactory{} + gv := schema.GroupVersion{Group: "", Version: "v1"} + + pvClient := &velerotest.FakeDynamicClient{} + defer pvClient.AssertExpectations(t) + + pvResource := metav1.APIResource{Name: "persistentvolumes", Namespaced: false} + dynamicFactory.On("ClientForGroupVersionResource", gv, pvResource, "").Return(pvClient, nil) + + pvcClient := &velerotest.FakeDynamicClient{} + defer pvcClient.AssertExpectations(t) + + pvcResource := metav1.APIResource{Name: "persistentvolumeclaims", Namespaced: true} + dynamicFactory.On("ClientForGroupVersionResource", gv, pvcResource, "default").Return(pvcClient, nil) + + obj, _, err := scheme.Codecs.UniversalDecoder(v1.SchemeGroupVersion).Decode([]byte(pv), nil, &unstructured.Unstructured{}) + pvObj := obj.(*unstructured.Unstructured) + require.NoError(t, err) + + obj, _, err = scheme.Codecs.UniversalDecoder(v1.SchemeGroupVersion).Decode([]byte(pvc), nil, &unstructured.Unstructured{}) + pvcObj := obj.(*unstructured.Unstructured) + require.NoError(t, err) + + nsClient := &velerotest.FakeNamespaceClient{} + defer nsClient.AssertExpectations(t) + ns := newTestNamespace(pvcObj.GetNamespace()).Namespace + + // Set up test expectations + if test.pvPhase != "" { + status, err := collections.GetMap(pvObj.UnstructuredContent(), "status") + require.NoError(t, err) + status["phase"] = test.pvPhase + } + + if test.expectPVFound { + pvClient.On("Get", pvObj.GetName(), metav1.GetOptions{}).Return(pvObj, nil) + } else { + pvClient.On("Get", pvObj.GetName(), metav1.GetOptions{}).Return(&unstructured.Unstructured{}, k8serrors.NewNotFound(schema.GroupResource{Resource: "persistentvolumes"}, pvObj.GetName())) + } + + if test.expectPVCDeleting { + pvcObj.SetDeletionTimestamp(&metav1.Time{Time: time.Now()}) + } + + // the pv needs to be found before moving on to look for pvc/namespace + // however, even if the pv is found, we may be testing the PV's phase and not expecting + // the pvc/namespace to be looked up + if test.expectPVCGet { + if test.expectPVCFound { + pvcClient.On("Get", pvcObj.GetName(), metav1.GetOptions{}).Return(pvcObj, nil) + } else { + pvcClient.On("Get", pvcObj.GetName(), metav1.GetOptions{}).Return(&unstructured.Unstructured{}, k8serrors.NewNotFound(schema.GroupResource{Resource: "persistentvolumeclaims"}, pvcObj.GetName())) + } + } + + if test.nsPhase != "" { + ns.Status.Phase = test.nsPhase + } + + if test.expectNSDeleting { + ns.SetDeletionTimestamp(&metav1.Time{Time: time.Now()}) + } + + if test.expectNSGet { + if test.expectNSFound { + nsClient.On("Get", pvcObj.GetNamespace(), mock.Anything).Return(ns, nil) + } else { + nsClient.On("Get", pvcObj.GetNamespace(), metav1.GetOptions{}).Return(&v1.Namespace{}, k8serrors.NewNotFound(schema.GroupResource{Resource: "namespaces"}, pvcObj.GetNamespace())) + } + } + + ctx := &context{ + dynamicFactory: dynamicFactory, + log: velerotest.NewLogger(), + namespaceClient: nsClient, + resourceTerminatingTimeout: 1 * time.Millisecond, + } + + result, err := ctx.shouldRestore(pvObj.GetName(), pvClient) + + assert.Equal(t, test.expectedResult, result) + }) + + } +} + type testUnstructured struct { *unstructured.Unstructured } @@ -1784,10 +2042,6 @@ type testNamespace struct { func newTestNamespace(name string) *testNamespace { return &testNamespace{ Namespace: &v1.Namespace{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Namespace", - }, ObjectMeta: metav1.ObjectMeta{ Name: name, }, diff --git a/pkg/util/kube/utils.go b/pkg/util/kube/utils.go index 0e26437290..95ea61d94e 100644 --- a/pkg/util/kube/utils.go +++ b/pkg/util/kube/utils.go @@ -18,11 +18,13 @@ package kube import ( "fmt" + "time" "github.com/pkg/errors" corev1api "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1listers "k8s.io/client-go/listers/core/v1" ) @@ -35,18 +37,58 @@ func NamespaceAndName(objMeta metav1.Object) string { return fmt.Sprintf("%s/%s", objMeta.GetNamespace(), objMeta.GetName()) } -// EnsureNamespaceExists attempts to create the provided Kubernetes namespace. It returns two values: -// a bool indicating whether or not the namespace was created, and an error if the create failed +// EnsureNamespaceExistsAndIsReady attempts to create the provided Kubernetes namespace. It returns two values: +// a bool indicating whether or not the namespace is ready, and an error if the create failed // for a reason other than that the namespace already exists. Note that in the case where the -// namespace already exists, this function will return (false, nil). -func EnsureNamespaceExists(namespace *corev1api.Namespace, client corev1client.NamespaceInterface) (bool, error) { - if _, err := client.Create(namespace); err == nil { +// namespace already exists and is not ready, this function will return (false, nil). +// If the namespace exists and is marked for deletion, this function will wait up to the timeout for it to fully delete. +func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client corev1client.NamespaceInterface, timeout time.Duration) (bool, error) { + var ready bool + err := wait.PollImmediate(time.Second, timeout, func() (bool, error) { + clusterNS, err := client.Get(namespace.Name, metav1.GetOptions{}) + + if apierrors.IsNotFound(err) { + // Namespace isn't in cluster, we're good to create. + return true, nil + } + + if err != nil { + // Return the err and exit the loop. + return true, err + } + + if clusterNS != nil && (clusterNS.GetDeletionTimestamp() != nil || clusterNS.Status.Phase == corev1api.NamespaceTerminating) { + // Marked for deletion, keep waiting + return false, nil + } + + // clusterNS found, is not nil, and not marked for deletion, therefore we shouldn't create it. + ready = true + return true, nil + }) + + // err will be set if we timed out or encountered issues retrieving the namespace, + if err != nil { + return false, errors.Wrapf(err, "error getting namespace %s", namespace.Name) + } + + // In the case the namespace already exists and isn't marked for deletion, assume it's ready for use. + if ready { return true, nil - } else if apierrors.IsAlreadyExists(err) { - return false, nil - } else { + } + + clusterNS, err := client.Create(namespace) + if apierrors.IsAlreadyExists(err) { + if clusterNS != nil && (clusterNS.GetDeletionTimestamp() != nil || clusterNS.Status.Phase == corev1api.NamespaceTerminating) { + // Somehow created after all our polling and marked for deletion, return an error + return false, errors.Errorf("namespace %s created and marked for termination after timeout", namespace.Name) + } + } else if err != nil { return false, errors.Wrapf(err, "error creating namespace %s", namespace.Name) } + + // The namespace created successfully + return true, nil } // GetVolumeDirectory gets the name of the directory on the host, under /var/lib/kubelet/pods//volumes/, diff --git a/pkg/util/kube/utils_test.go b/pkg/util/kube/utils_test.go index a7761f61f3..c50f7d0b6e 100644 --- a/pkg/util/kube/utils_test.go +++ b/pkg/util/kube/utils_test.go @@ -18,12 +18,105 @@ package kube import ( "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + + velerotest "github.com/heptio/velero/pkg/util/test" ) func TestNamespaceAndName(t *testing.T) { //TODO } -func TestEnsureNamespaceExists(t *testing.T) { - //TODO +func TestEnsureNamespaceExistsAndIsReady(t *testing.T) { + tests := []struct { + name string + expectNSFound bool + nsPhase v1.NamespacePhase + nsDeleting bool + expectCreate bool + alreadyExists bool + expectedResult bool + }{ + { + name: "namespace found, not deleting", + expectNSFound: true, + expectedResult: true, + }, + { + name: "namespace found, terminating phase", + expectNSFound: true, + nsPhase: v1.NamespaceTerminating, + expectedResult: false, + }, + { + name: "namespace found, deletiontimestamp set", + expectNSFound: true, + nsDeleting: true, + expectedResult: false, + }, + { + name: "namespace not found, successfully created", + expectCreate: true, + expectedResult: true, + }, + { + name: "namespace not found initially, create returns already exists error, returned namespace is ready", + alreadyExists: true, + expectedResult: true, + }, + { + name: "namespace not found initially, create returns already exists error, returned namespace is terminating", + alreadyExists: true, + nsPhase: v1.NamespaceTerminating, + expectedResult: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + namespace := &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + } + + if test.nsPhase != "" { + namespace.Status.Phase = test.nsPhase + } + + if test.nsDeleting { + namespace.SetDeletionTimestamp(&metav1.Time{Time: time.Now()}) + } + + timeout := time.Millisecond + + nsClient := &velerotest.FakeNamespaceClient{} + defer nsClient.AssertExpectations(t) + + if test.expectNSFound { + nsClient.On("Get", "test", metav1.GetOptions{}).Return(namespace, nil) + } else { + nsClient.On("Get", "test", metav1.GetOptions{}).Return(&v1.Namespace{}, k8serrors.NewNotFound(schema.GroupResource{Resource: "namespaces"}, "test")) + } + + if test.alreadyExists { + nsClient.On("Create", namespace).Return(namespace, k8serrors.NewAlreadyExists(schema.GroupResource{Resource: "namespaces"}, "test")) + } + + if test.expectCreate { + nsClient.On("Create", namespace).Return(namespace, nil) + } + + result, _ := EnsureNamespaceExistsAndIsReady(namespace, nsClient, timeout) + + assert.Equal(t, test.expectedResult, result) + }) + } + } diff --git a/pkg/util/test/fake_namespace.go b/pkg/util/test/fake_namespace.go new file mode 100644 index 0000000000..94e9c3af35 --- /dev/null +++ b/pkg/util/test/fake_namespace.go @@ -0,0 +1,77 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test + +import ( + "github.com/stretchr/testify/mock" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" +) + +type FakeNamespaceClient struct { + mock.Mock +} + +var _ corev1.NamespaceInterface = &FakeNamespaceClient{} + +func (c *FakeNamespaceClient) List(options metav1.ListOptions) (*v1.NamespaceList, error) { + args := c.Called(options) + return args.Get(0).(*v1.NamespaceList), args.Error(1) +} + +func (c *FakeNamespaceClient) Create(obj *v1.Namespace) (*v1.Namespace, error) { + args := c.Called(obj) + return args.Get(0).(*v1.Namespace), args.Error(1) +} + +func (c *FakeNamespaceClient) Watch(options metav1.ListOptions) (watch.Interface, error) { + args := c.Called(options) + return args.Get(0).(watch.Interface), args.Error(1) +} + +func (c *FakeNamespaceClient) Get(name string, opts metav1.GetOptions) (*v1.Namespace, error) { + args := c.Called(name, opts) + return args.Get(0).(*v1.Namespace), args.Error(1) +} + +func (c *FakeNamespaceClient) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (*v1.Namespace, error) { + args := c.Called(name, pt, data, subresources) + return args.Get(0).(*v1.Namespace), args.Error(1) +} + +func (c *FakeNamespaceClient) Delete(name string, opts *metav1.DeleteOptions) error { + args := c.Called(name, opts) + return args.Error(1) +} + +func (c *FakeNamespaceClient) Finalize(item *v1.Namespace) (*v1.Namespace, error) { + args := c.Called(item) + return args.Get(0).(*v1.Namespace), args.Error(1) +} + +func (c *FakeNamespaceClient) Update(namespace *v1.Namespace) (*v1.Namespace, error) { + args := c.Called(namespace) + return args.Get(0).(*v1.Namespace), args.Error(1) +} + +func (c *FakeNamespaceClient) UpdateStatus(namespace *v1.Namespace) (*v1.Namespace, error) { + args := c.Called(namespace) + return args.Get(0).(*v1.Namespace), args.Error(1) +}