Skip to content

Commit

Permalink
Wait for namespace to terminate before restoring
Browse files Browse the repository at this point in the history
Fixes #691

Signed-off-by: Nolan Brubaker <nolan@heptio.com>
  • Loading branch information
Nolan Brubaker committed Sep 25, 2018
1 parent 368787c commit c648c00
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 40 deletions.
1 change: 1 addition & 0 deletions docs/cli-reference/ark_server.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ ark server [flags]
-h, --help help for server
--log-level the level at which to log. Valid values are debug, info, warning, error, fatal, panic. (default info)
--metrics-address string the address to expose prometheus metrics (default ":8085")
--namespace-timeout duration duration to wait on namespace termination before failing a restore (default 10m0s)
--plugin-dir string directory containing Ark plugins (default "/plugins")
--restic-timeout duration how long backups/restores of pod volumes should be allowed to run before timing out (default 1h0m0s)
--restore-only run in a mode where only restores are allowed; backups, schedules, and garbage-collection are all disabled
Expand Down
80 changes: 48 additions & 32 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
kubeinformers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -77,10 +78,10 @@ const (
)

type serverConfig struct {
pluginDir, metricsAddress, defaultBackupLocation string
backupSyncPeriod, podVolumeOperationTimeout time.Duration
restoreResourcePriorities []string
restoreOnly bool
pluginDir, metricsAddress, defaultBackupLocation string
backupSyncPeriod, podVolumeOperationTimeout, namespaceTimeout time.Duration
restoreResourcePriorities []string
restoreOnly bool
}

func NewCommand() *cobra.Command {
Expand All @@ -93,6 +94,7 @@ func NewCommand() *cobra.Command {
backupSyncPeriod: defaultBackupSyncPeriod,
podVolumeOperationTimeout: defaultPodVolumeOperationTimeout,
restoreResourcePriorities: defaultRestorePriorities,
namespaceTimeout: defaultNamespaceTimeout,
}
)

Expand Down Expand Up @@ -143,6 +145,7 @@ func NewCommand() *cobra.Command {
command.Flags().BoolVar(&config.restoreOnly, "restore-only", config.restoreOnly, "run in a mode where only restores are allowed; backups, schedules, and garbage-collection are all disabled")
command.Flags().StringSliceVar(&config.restoreResourcePriorities, "restore-resource-priorities", config.restoreResourcePriorities, "desired order of resource restores; any resource not in the list will be restored alphabetically after the prioritized resources")
command.Flags().StringVar(&config.defaultBackupLocation, "default-backup-storage-location", config.defaultBackupLocation, "name of the default backup storage location")
command.Flags().DurationVar(&config.namespaceTimeout, "namespace-timeout", config.namespaceTimeout, "duration to wait on namespace termination before failing a restore")

return command
}
Expand All @@ -162,25 +165,26 @@ func getServerNamespace(namespaceFlag *pflag.Flag) string {
}

type server struct {
namespace string
metricsAddress string
kubeClientConfig *rest.Config
kubeClient kubernetes.Interface
arkClient clientset.Interface
blockStore cloudprovider.BlockStore
discoveryClient discovery.DiscoveryInterface
discoveryHelper arkdiscovery.Helper
dynamicClient dynamic.Interface
sharedInformerFactory informers.SharedInformerFactory
ctx context.Context
cancelFunc context.CancelFunc
logger logrus.FieldLogger
logLevel logrus.Level
pluginRegistry plugin.Registry
pluginManager plugin.Manager
resticManager restic.RepositoryManager
metrics *metrics.ServerMetrics
config serverConfig
namespace string
metricsAddress string
kubeClientConfig *rest.Config
kubeClient kubernetes.Interface
arkClient clientset.Interface
blockStore cloudprovider.BlockStore
discoveryClient discovery.DiscoveryInterface
discoveryHelper arkdiscovery.Helper
dynamicClient dynamic.Interface
sharedInformerFactory informers.SharedInformerFactory
kubeSharedInformerFactory kubeinformers.SharedInformerFactory
ctx context.Context
cancelFunc context.CancelFunc
logger logrus.FieldLogger
logLevel logrus.Level
pluginRegistry plugin.Registry
pluginManager plugin.Manager
resticManager restic.RepositoryManager
metrics *metrics.ServerMetrics
config serverConfig
}

func newServer(namespace, baseName string, config serverConfig, logger *logrus.Logger) (*server, error) {
Expand Down Expand Up @@ -216,14 +220,15 @@ func newServer(namespace, baseName string, config serverConfig, logger *logrus.L
ctx, cancelFunc := context.WithCancel(context.Background())

s := &server{
namespace: namespace,
metricsAddress: config.metricsAddress,
kubeClientConfig: clientConfig,
kubeClient: kubeClient,
arkClient: arkClient,
discoveryClient: arkClient.Discovery(),
dynamicClient: dynamicClient,
sharedInformerFactory: informers.NewSharedInformerFactoryWithOptions(arkClient, 0, informers.WithNamespace(namespace)),
namespace: namespace,
metricsAddress: config.metricsAddress,
kubeClientConfig: clientConfig,
kubeClient: kubeClient,
arkClient: arkClient,
discoveryClient: arkClient.Discovery(),
dynamicClient: dynamicClient,
sharedInformerFactory: informers.NewSharedInformerFactoryWithOptions(arkClient, 0, informers.WithNamespace(namespace)),
kubeSharedInformerFactory: kubeinformers.NewSharedInformerFactory(kubeClient, 0),
ctx: ctx,
cancelFunc: cancelFunc,
logger: logger,
Expand Down Expand Up @@ -417,6 +422,7 @@ func (s *server) loadConfig() (*api.Config, error) {
const (
defaultBackupSyncPeriod = 60 * time.Minute
defaultPodVolumeOperationTimeout = 60 * time.Minute
defaultNamespaceTimeout = 10 * time.Minute
)

// - Namespaces go first because all namespaced resources depend on them.
Expand Down Expand Up @@ -659,7 +665,6 @@ func (s *server) runControllers(config *api.Config, defaultBackupLocation *api.B
backupDeletionController.Run(ctx, 1)
wg.Done()
}()

}

restorer, err := restore.NewKubernetesRestorer(
Expand All @@ -669,12 +674,22 @@ func (s *server) runControllers(config *api.Config, defaultBackupLocation *api.B
s.config.restoreResourcePriorities,
s.arkClient.ArkV1(),
s.kubeClient.CoreV1().Namespaces(),
s.kubeSharedInformerFactory.Core().V1().Namespaces(),
s.resticManager,
s.config.podVolumeOperationTimeout,
s.config.namespaceTimeout,
s.logger,
)
cmd.CheckError(err)

// Wait for cache sync here so that the event handlers added in NewKubernetesRestorer are ready by the time
// the restore controller is started
// TODO(nrb): We never hit this debug statement if cache.WaitForCacheSync is uncommented.
// Removing that call allows the server to start.
s.logger.Debugln("Waiting for kubesharedinformer to sync")
cache.WaitForCacheSync(ctx.Done(), s.kubeSharedInformerFactory.Core().V1().Namespaces().Informer().HasSynced)
s.logger.Debugln("kubesharedinformer synced")

restoreController := controller.NewRestoreController(
s.namespace,
s.sharedInformerFactory.Ark().V1().Restores(),
Expand Down Expand Up @@ -731,6 +746,7 @@ func (s *server) runControllers(config *api.Config, defaultBackupLocation *api.B

// SHARED INFORMERS HAVE TO BE STARTED AFTER ALL CONTROLLERS
go s.sharedInformerFactory.Start(ctx.Done())
go s.kubeSharedInformerFactory.Start(ctx.Done())

// TODO(1.0): remove
cache.WaitForCacheSync(ctx.Done(), s.sharedInformerFactory.Ark().V1().Backups().Informer().HasSynced)
Expand Down
109 changes: 106 additions & 3 deletions pkg/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ import (
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"

api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/client"
Expand Down Expand Up @@ -78,6 +80,8 @@ type kubernetesRestorer struct {
namespaceClient corev1.NamespaceInterface
resticRestorerFactory restic.RestorerFactory
resticTimeout time.Duration
namespaceTimeout time.Duration
nsWaiter *namespaceWaiter
resourcePriorities []string
fileSystem filesystem.Interface
logger logrus.FieldLogger
Expand Down Expand Up @@ -149,23 +153,52 @@ func NewKubernetesRestorer(
resourcePriorities []string,
backupClient arkv1client.BackupsGetter,
namespaceClient corev1.NamespaceInterface,
namespaceInformer corev1informers.NamespaceInformer,
resticRestorerFactory restic.RestorerFactory,
resticTimeout time.Duration,
namespaceTimeout time.Duration,
logger logrus.FieldLogger,
) (Restorer, error) {
return &kubernetesRestorer{
kr := &kubernetesRestorer{
discoveryHelper: discoveryHelper,
dynamicFactory: dynamicFactory,
blockStore: blockStore,
backupClient: backupClient,
namespaceClient: namespaceClient,
resticRestorerFactory: resticRestorerFactory,
resticTimeout: resticTimeout,
namespaceTimeout: namespaceTimeout,
nsWaiter: &namespaceWaiter{chans: make(map[string]chan *v1.Namespace)},
resourcePriorities: resourcePriorities,
logger: logger,

fileSystem: filesystem.NewFileSystem(),
}, nil
}

namespaceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
// DeletedFinalStateUnknown is a status that an object is given if an informer missed the actual delete event but caught up later.
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}

ns, ok := obj.(*v1.Namespace)
// See if the NS is being watched for delete events. If not, no-op.
deleteChan, ok := kr.nsWaiter.getDeleteChan(ns.Name)
if !ok {
return
}

// If the the name of the namespace is found in nsDeleteChans,
// add the NS object to the channel and wait for it to be received.
deleteChan <- ns
kr.nsWaiter.removeDeleteChan(ns.Name)
},
},
)

return kr, nil
}

// Restore executes a restore into the target Kubernetes cluster according to the restore spec
Expand Down Expand Up @@ -237,11 +270,13 @@ func (kr *kubernetesRestorer) Restore(log logrus.FieldLogger, restore *api.Resto
dynamicFactory: kr.dynamicFactory,
fileSystem: kr.fileSystem,
namespaceClient: kr.namespaceClient,
nsWaiter: kr.nsWaiter,
actions: resolvedActions,
blockStore: kr.blockStore,
resticRestorer: resticRestorer,
pvsToProvision: sets.NewString(),
pvRestorer: pvRestorer,
namespaceTimeout: kr.namespaceTimeout,
}

return restoreCtx.execute()
Expand Down Expand Up @@ -308,6 +343,52 @@ func resolveActions(actions []ItemAction, helper discovery.Helper) ([]resolvedAc
return resolved, nil
}

// namespaceWaiter encapsulates channels for namespaces waiting to terminate, along with a mutex for the map.
type namespaceWaiter struct {
chansLock sync.Mutex
chans map[string]chan *v1.Namespace
}

func (nsw *namespaceWaiter) getDeleteChan(ns string) (chan *v1.Namespace, bool) {
nsw.chansLock.Lock()
defer nsw.chansLock.Unlock()

ch, ok := nsw.chans[ns]
if !ok {
return nil, ok
}
return ch, ok
}

func (nsw *namespaceWaiter) addDeleteChan(ns string) {
nsw.chansLock.Lock()
defer nsw.chansLock.Unlock()

nsw.chans[ns] = make(chan *v1.Namespace)
}

func (nsw *namespaceWaiter) removeDeleteChan(ns string) {
nsw.chansLock.Lock()
defer nsw.chansLock.Unlock()

delete(nsw.chans, ns)
}

func (nsw *namespaceWaiter) waitForDelete(ns string, timeout time.Duration) (*v1.Namespace, error) {
deleteChan, ok := nsw.getDeleteChan(ns)
if !ok {
return nil, errors.Errorf("Namespace %s not found in list of channels waiting to delete", ns)
}

select {
case <-time.After(timeout):
return nil, errors.Errorf("Namespace %s did not terminate before the alloted timeout.", ns)
case res := <-deleteChan:
return res, nil
}

}

type context struct {
backup *api.Backup
backupReader io.Reader
Expand All @@ -318,6 +399,7 @@ type context struct {
dynamicFactory client.DynamicFactory
fileSystem filesystem.Interface
namespaceClient corev1.NamespaceInterface
nsWaiter *namespaceWaiter
actions []resolvedAction
blockStore cloudprovider.BlockStore
resticRestorer restic.Restorer
Expand All @@ -326,6 +408,7 @@ type context struct {
resourceWatches []watch.Interface
pvsToProvision sets.String
pvRestorer PVRestorer
namespaceTimeout time.Duration
}

func (ctx *context) infof(msg string, args ...interface{}) {
Expand Down Expand Up @@ -394,6 +477,8 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe
for _, resource := range ctx.prioritizedResources {
// we don't want to explicitly restore namespace API objs because we'll handle
// them as a special case prior to restoring anything into them
// They're a special case since a user may choose to restore only pods, but those pods need to
// be placed in a namespace.
if resource == kuberesource.Namespaces {
continue
}
Expand Down Expand Up @@ -459,11 +544,27 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe
if !existingNamespaces.Has(mappedNsName) {
logger := ctx.logger.WithField("namespace", nsName)
ns := getNamespace(logger, filepath.Join(dir, api.ResourcesDir, "namespaces", api.ClusterScopedDir, nsName+".json"), mappedNsName)
if _, err := kube.EnsureNamespaceExists(ns, ctx.namespaceClient); err != nil {
nsIsReady, err := kube.EnsureNamespaceExistsAndIsReady(ns, ctx.namespaceClient)
if err != nil {
addArkError(&errs, err)
continue
}

if !nsIsReady {
logger.Debugln("----> still terminating, trying to add a wait.")
// TODO: double check to see if still Terminating, may hit a race condition
ctx.nsWaiter.addDeleteChan(mappedNsName)
_, err = ctx.nsWaiter.waitForDelete(mappedNsName, ctx.namespaceTimeout)
if err != nil {
logger.Debugln("----> terminate waiter timed out")
addArkError(&errs, err)
return warnings, errs
}
// If we got here, the terminiation was successful, so go ahead and try to make the NS again.
logger.Debugln("----> terminate waiter found termination, proceeding")
nsIsReady, err = kube.EnsureNamespaceExistsAndIsReady(ns, ctx.namespaceClient)
}

// keep track of namespaces that we know exist so we don't
// have to try to create them multiple times
existingNamespaces.Insert(mappedNsName)
Expand Down Expand Up @@ -696,6 +797,8 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
go func() {
defer ctx.resourceWaitGroup.Done()

// TODO(nrb): This is currently conflicting with the namespace termination waits. If the namespace becomes
// ready to restore, everything in the NS seems to be restored but the PV never gets created and the PVC is "lost"
if _, err := waitForReady(resourceWatch.ResultChan(), name, isPVReady, time.Minute, ctx.logger); err != nil {
ctx.logger.Warnf("Timeout reached waiting for persistent volume %s to become ready", name)
addArkError(&warnings, fmt.Errorf("timeout reached waiting for persistent volume %s to become ready", name))
Expand Down
18 changes: 13 additions & 5 deletions pkg/util/kube/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,23 @@ func NamespaceAndName(objMeta metav1.Object) string {
return fmt.Sprintf("%s/%s", objMeta.GetNamespace(), objMeta.GetName())
}

// EnsureNamespaceExists attempts to create the provided Kubernetes namespace. It returns two values:
// a bool indicating whether or not the namespace was created, and an error if the create failed
// EnsureNamespaceExistsAndIsReady attempts to create the provided Kubernetes namespace. It returns two values:
// a bool indicating whether or not the namespace is ready, and an error if the create failed
// for a reason other than that the namespace already exists. Note that in the case where the
// namespace already exists, this function will return (false, nil).
func EnsureNamespaceExists(namespace *corev1api.Namespace, client corev1client.NamespaceInterface) (bool, error) {
// namespace already exists and is not ready, this function will return (false, nil).
func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client corev1client.NamespaceInterface) (bool, error) {
if _, err := client.Create(namespace); err == nil {
return true, nil
} else if apierrors.IsAlreadyExists(err) {
return false, nil
// Do a follow up Get because Create returns an uninitialized namespace object, not the one that exists.
ns, err := client.Get(namespace.Name, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, "error getting namespace %s", namespace.Name)
}
if ns.Status.Phase == corev1api.NamespaceTerminating {
return false, nil
}
return true, nil
} else {
return false, errors.Wrapf(err, "error creating namespace %s", namespace.Name)
}
Expand Down

0 comments on commit c648c00

Please sign in to comment.