Skip to content

Commit

Permalink
Wait for PV/namespace to delete before restore
Browse files Browse the repository at this point in the history
If a PV already exists, wait for it, it's associated PVC, and associated
namespace to be deleted before attempting to restore it.

If a namespace already exists, wait for it to be deleted before
attempting to restore it.

Signed-off-by: Nolan Brubaker <brubakern@vmware.com>
  • Loading branch information
Nolan Brubaker authored and nrb committed Feb 6, 2019
1 parent 3054a38 commit 890202f
Show file tree
Hide file tree
Showing 7 changed files with 677 additions and 100 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/826-nrb
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Wait for PVs and namespaces to delete before attempting to restore them.
25 changes: 14 additions & 11 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ const (
// the port where prometheus metrics are exposed
defaultMetricsAddress = ":8085"

defaultBackupSyncPeriod = time.Minute
defaultPodVolumeOperationTimeout = 60 * time.Minute
defaultBackupSyncPeriod = time.Minute
defaultPodVolumeOperationTimeout = 60 * time.Minute
defaultResourceTerminatingTimeout = 10 * time.Minute

// server's client default qps and burst
defaultClientQPS float32 = 20.0
Expand All @@ -85,14 +86,14 @@ const (
)

type serverConfig struct {
pluginDir, metricsAddress, defaultBackupLocation string
backupSyncPeriod, podVolumeOperationTimeout time.Duration
restoreResourcePriorities []string
defaultVolumeSnapshotLocations map[string]string
restoreOnly bool
clientQPS float32
clientBurst int
profilerAddress string
pluginDir, metricsAddress, defaultBackupLocation string
backupSyncPeriod, podVolumeOperationTimeout, resourceTerminatingTimeout time.Duration
restoreResourcePriorities []string
defaultVolumeSnapshotLocations map[string]string
restoreOnly bool
clientQPS float32
clientBurst int
profilerAddress string
}

func NewCommand() *cobra.Command {
Expand All @@ -110,6 +111,7 @@ func NewCommand() *cobra.Command {
clientQPS: defaultClientQPS,
clientBurst: defaultClientBurst,
profilerAddress: defaultProfilerAddress,
resourceTerminatingTimeout: defaultResourceTerminatingTimeout,
}
)

Expand Down Expand Up @@ -168,6 +170,7 @@ func NewCommand() *cobra.Command {
command.Flags().Float32Var(&config.clientQPS, "client-qps", config.clientQPS, "maximum number of requests per second by the server to the Kubernetes API once the burst limit has been reached")
command.Flags().IntVar(&config.clientBurst, "client-burst", config.clientBurst, "maximum number of requests by the server to the Kubernetes API in a short period of time")
command.Flags().StringVar(&config.profilerAddress, "profiler-address", config.profilerAddress, "the address to expose the pprof profiler")
command.Flags().DurationVar(&config.resourceTerminatingTimeout, "terminating-resource-timeout", config.resourceTerminatingTimeout, "how long to wait on persistent volumes and namespaces to terminate during a restore before timing out")

return command
}
Expand Down Expand Up @@ -615,7 +618,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
backupDeletionController.Run(ctx, 1)
wg.Done()
}()

}

restorer, err := restore.NewKubernetesRestorer(
Expand All @@ -625,6 +627,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.kubeClient.CoreV1().Namespaces(),
s.resticManager,
s.config.podVolumeOperationTimeout,
s.config.resourceTerminatingTimeout,
s.logger,
)
cmd.CheckError(err)
Expand Down
211 changes: 159 additions & 52 deletions pkg/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

Expand Down Expand Up @@ -83,14 +84,15 @@ type kindString string

// kubernetesRestorer implements Restorer for restoring into a Kubernetes cluster.
type kubernetesRestorer struct {
discoveryHelper discovery.Helper
dynamicFactory client.DynamicFactory
namespaceClient corev1.NamespaceInterface
resticRestorerFactory restic.RestorerFactory
resticTimeout time.Duration
resourcePriorities []string
fileSystem filesystem.Interface
logger logrus.FieldLogger
discoveryHelper discovery.Helper
dynamicFactory client.DynamicFactory
namespaceClient corev1.NamespaceInterface
resticRestorerFactory restic.RestorerFactory
resticTimeout time.Duration
resourceTerminatingTimeout time.Duration
resourcePriorities []string
fileSystem filesystem.Interface
logger logrus.FieldLogger
}

// prioritizeResources returns an ordered, fully-resolved list of resources to restore based on
Expand Down Expand Up @@ -159,17 +161,19 @@ func NewKubernetesRestorer(
namespaceClient corev1.NamespaceInterface,
resticRestorerFactory restic.RestorerFactory,
resticTimeout time.Duration,
resourceTerminatingTimeout time.Duration,
logger logrus.FieldLogger,
) (Restorer, error) {
return &kubernetesRestorer{
discoveryHelper: discoveryHelper,
dynamicFactory: dynamicFactory,
namespaceClient: namespaceClient,
resticRestorerFactory: resticRestorerFactory,
resticTimeout: resticTimeout,
resourcePriorities: resourcePriorities,
logger: logger,
fileSystem: filesystem.NewFileSystem(),
discoveryHelper: discoveryHelper,
dynamicFactory: dynamicFactory,
namespaceClient: namespaceClient,
resticRestorerFactory: resticRestorerFactory,
resticTimeout: resticTimeout,
resourceTerminatingTimeout: resourceTerminatingTimeout,
resourcePriorities: resourcePriorities,
logger: logger,
fileSystem: filesystem.NewFileSystem(),
}, nil
}

Expand Down Expand Up @@ -245,21 +249,22 @@ func (kr *kubernetesRestorer) Restore(
}

restoreCtx := &context{
backup: backup,
backupReader: backupReader,
restore: restore,
prioritizedResources: prioritizedResources,
selector: selector,
log: log,
dynamicFactory: kr.dynamicFactory,
fileSystem: kr.fileSystem,
namespaceClient: kr.namespaceClient,
actions: resolvedActions,
blockStoreGetter: blockStoreGetter,
resticRestorer: resticRestorer,
pvsToProvision: sets.NewString(),
pvRestorer: pvRestorer,
volumeSnapshots: volumeSnapshots,
backup: backup,
backupReader: backupReader,
restore: restore,
prioritizedResources: prioritizedResources,
selector: selector,
log: log,
dynamicFactory: kr.dynamicFactory,
fileSystem: kr.fileSystem,
namespaceClient: kr.namespaceClient,
actions: resolvedActions,
blockStoreGetter: blockStoreGetter,
resticRestorer: resticRestorer,
pvsToProvision: sets.NewString(),
pvRestorer: pvRestorer,
volumeSnapshots: volumeSnapshots,
resourceTerminatingTimeout: kr.resourceTerminatingTimeout,
}

return restoreCtx.execute()
Expand Down Expand Up @@ -327,24 +332,25 @@ func resolveActions(actions []ItemAction, helper discovery.Helper) ([]resolvedAc
}

type context struct {
backup *api.Backup
backupReader io.Reader
restore *api.Restore
prioritizedResources []schema.GroupResource
selector labels.Selector
log logrus.FieldLogger
dynamicFactory client.DynamicFactory
fileSystem filesystem.Interface
namespaceClient corev1.NamespaceInterface
actions []resolvedAction
blockStoreGetter BlockStoreGetter
resticRestorer restic.Restorer
globalWaitGroup velerosync.ErrorGroup
resourceWaitGroup sync.WaitGroup
resourceWatches []watch.Interface
pvsToProvision sets.String
pvRestorer PVRestorer
volumeSnapshots []*volume.Snapshot
backup *api.Backup
backupReader io.Reader
restore *api.Restore
prioritizedResources []schema.GroupResource
selector labels.Selector
log logrus.FieldLogger
dynamicFactory client.DynamicFactory
fileSystem filesystem.Interface
namespaceClient corev1.NamespaceInterface
actions []resolvedAction
blockStoreGetter BlockStoreGetter
resticRestorer restic.Restorer
globalWaitGroup velerosync.ErrorGroup
resourceWaitGroup sync.WaitGroup
resourceWatches []watch.Interface
pvsToProvision sets.String
pvRestorer PVRestorer
volumeSnapshots []*volume.Snapshot
resourceTerminatingTimeout time.Duration
}

func (ctx *context) execute() (api.RestoreResult, api.RestoreResult) {
Expand Down Expand Up @@ -474,7 +480,7 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe
if !existingNamespaces.Has(mappedNsName) {
logger := ctx.log.WithField("namespace", nsName)
ns := getNamespace(logger, filepath.Join(dir, api.ResourcesDir, "namespaces", api.ClusterScopedDir, nsName+".json"), mappedNsName)
if _, err := kube.EnsureNamespaceExists(ns, ctx.namespaceClient); err != nil {
if _, err := kube.EnsureNamespaceExistsAndIsReady(ns, ctx.namespaceClient, ctx.resourceTerminatingTimeout); err != nil {
addVeleroError(&errs, err)
continue
}
Expand Down Expand Up @@ -578,6 +584,102 @@ func addToResult(r *api.RestoreResult, ns string, e error) {
}
}

func (ctx *context) shouldRestore(name string, pvClient client.Dynamic) (bool, error) {
pvLogger := ctx.log.WithField("pvName", name)

var shouldRestore bool
err := wait.PollImmediate(time.Second, ctx.resourceTerminatingTimeout, func() (bool, error) {
clusterPV, err := pvClient.Get(name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
pvLogger.Debug("PV not found, safe to restore")
// PV not found, can safely exit loop and proceed with restore.
shouldRestore = true
return true, nil
}
if err != nil {
return false, errors.Wrapf(err, "could not retrieve in-cluster copy of PV %s", name)

}
phase, err := collections.GetString(clusterPV.UnstructuredContent(), "status.phase")
if err != nil {
// Break the loop since we couldn't read the phase
return false, errors.Wrapf(err, "error getting phase for in-cluster PV %s", name)
}

if phase == string(v1.VolumeReleased) || clusterPV.GetDeletionTimestamp() != nil {
// PV was found and marked for deletion, or it was released; wait for it to go away.
pvLogger.Debugf("PV found, but marked for deletion, waiting")
return false, nil
}

// Check for the namespace and PVC to see if anything that's referencing the PV is deleting.
// If either the namespace or PVC is in a deleting/terminating state, wait for them to finish before
// trying to restore the PV
// Not doing so may result in the underlying PV disappearing but not restoring due to timing issues,
// then the PVC getting restored and showing as lost.
namespace, err := collections.GetString(clusterPV.UnstructuredContent(), "spec.claimRef.namespace")
if err != nil {
return false, errors.Wrapf(err, "error looking up namespace name for in-cluster PV %s", name)
}
pvcName, err := collections.GetString(clusterPV.UnstructuredContent(), "spec.claimRef.name")
if err != nil {
return false, errors.Wrapf(err, "error looking up persistentvolumeclaim for in-cluster PV %s", name)
}

// Have to create the PVC client here because we don't know what namespace we're using til we get to this point.
// Using a dynamic client since it's easier to mock for testing
pvcResource := metav1.APIResource{Name: "persistentvolumeclaims", Namespaced: true}
pvcClient, err := ctx.dynamicFactory.ClientForGroupVersionResource(schema.GroupVersion{Group: "", Version: "v1"}, pvcResource, namespace)
if err != nil {
return false, errors.Wrapf(err, "error getting pvc client")
}

pvc, err := pvcClient.Get(pvcName, metav1.GetOptions{})

if apierrors.IsNotFound(err) {
pvLogger.Debugf("PVC %s for PV not found, waiting", pvcName)
// PVC wasn't found, but the PV still exists, so continue to wait.
return false, nil
}
if err != nil {
return false, errors.Wrapf(err, "error getting claim %s for persistent volume", pvcName)
}

if pvc != nil && pvc.GetDeletionTimestamp() != nil {
pvLogger.Debugf("PVC for PV marked for deletion, waiting")
// PVC is still deleting, continue to wait.
return false, nil
}

// Check the namespace associated with the claimRef to see if it's deleting/terminating before proceeding
ns, err := ctx.namespaceClient.Get(namespace, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
pvLogger.Debugf("namespace %s for PV not found, waiting", namespace)
// namespace not found but the PV still exists, so continue to wait
return false, nil
}
if err != nil {
return false, errors.Wrapf(err, "error getting namespace %s associated with PV %s", namespace, name)
}

if ns != nil && (ns.GetDeletionTimestamp() != nil || ns.Status.Phase == v1.NamespaceTerminating) {
pvLogger.Debugf("namespace %s associated with PV is deleting, waiting", namespace)
// namespace is in the process of deleting, keep looping
return false, nil
}

// None of the PV, PVC, or NS are marked for deletion, break the loop.
pvLogger.Debug("PV, associated PVC and namespace are not marked for deletion")
return true, nil
})

if err == wait.ErrWaitTimeout {
pvLogger.Debug("timeout reached waiting for persistent volume to delete")
}

return shouldRestore, err
}

// restoreResource restores the specified cluster or namespace scoped resource. If namespace is
// empty we are restoring a cluster level resource, otherwise into the specified namespace.
func (ctx *context) restoreResource(resource, namespace, resourcePath string) (api.RestoreResult, api.RestoreResult) {
Expand Down Expand Up @@ -696,10 +798,15 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a

// Check if the PV exists in the cluster before attempting to create
// a volume from the snapshot, in order to avoid orphaned volumes (GH #609)
_, err := resourceClient.Get(name, metav1.GetOptions{})
shouldRestoreSnapshot, err := ctx.shouldRestore(name, resourceClient)

if err != nil {
addToResult(&errs, namespace, errors.Wrapf(err, "error waiting on in-cluster persistentvolume %s", name))
continue
}

// PV's existence will be recorded later. Just skip the volume restore logic.
if apierrors.IsNotFound(err) {
if shouldRestoreSnapshot {
// restore the PV from snapshot (if applicable)
updatedObj, err := ctx.pvRestorer.executePVAction(obj)
if err != nil {
Expand Down
Loading

0 comments on commit 890202f

Please sign in to comment.