diff --git a/changelogs/unreleased/7451-qiuming-best b/changelogs/unreleased/7451-qiuming-best new file mode 100644 index 0000000000..c57823a1c6 --- /dev/null +++ b/changelogs/unreleased/7451-qiuming-best @@ -0,0 +1 @@ +Add repository maintenance job diff --git a/pkg/cmd/cli/install/install.go b/pkg/cmd/cli/install/install.go index 1c6772887c..48629252d9 100644 --- a/pkg/cmd/cli/install/install.go +++ b/pkg/cmd/cli/install/install.go @@ -23,6 +23,7 @@ import ( "strings" "time" + "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/pkg/errors" @@ -84,6 +85,7 @@ type Options struct { DefaultSnapshotMoveData bool DisableInformerCache bool ScheduleSkipImmediately bool + MaintenanceCfg repository.MaintenanceConfig } // BindFlags adds command line values to the options struct. @@ -128,6 +130,11 @@ func (o *Options) BindFlags(flags *pflag.FlagSet) { flags.BoolVar(&o.DefaultSnapshotMoveData, "default-snapshot-move-data", o.DefaultSnapshotMoveData, "Bool flag to configure Velero server to move data by default for all snapshots supporting data movement. Optional.") flags.BoolVar(&o.DisableInformerCache, "disable-informer-cache", o.DisableInformerCache, "Disable informer cache for Get calls on restore. With this enabled, it will speed up restore in cases where there are backup resources which already exist in the cluster, but for very large clusters this will increase velero memory usage. Default is false (don't disable). Optional.") flags.BoolVar(&o.ScheduleSkipImmediately, "schedule-skip-immediately", o.ScheduleSkipImmediately, "Skip the first scheduled backup immediately after creating a schedule. Default is false (don't skip).") + flags.IntVar(&o.MaintenanceCfg.KeepLatestMaitenanceJobs, "keep-latest-maintenance-jobs", o.MaintenanceCfg.KeepLatestMaitenanceJobs, "Number of latest maintenance jobs to keep each repository. Optional.") + flags.StringVar(&o.MaintenanceCfg.CPURequest, "maintenance-job-cpu-request", o.MaintenanceCfg.CPURequest, "CPU request for maintenance jobs. Default is no limit.") + flags.StringVar(&o.MaintenanceCfg.MemRequest, "maintenance-job-mem-request", o.MaintenanceCfg.MemRequest, "Memory request for maintenance jobs. Default is no limit.") + flags.StringVar(&o.MaintenanceCfg.CPULimit, "maintenance-job-cpu-limit", o.MaintenanceCfg.CPULimit, "CPU limit for maintenance jobs. Default is no limit.") + flags.StringVar(&o.MaintenanceCfg.MemLimit, "maintenance-job-mem-limit", o.MaintenanceCfg.MemLimit, "Memory limit for maintenance jobs. Default is no limit.") } // NewInstallOptions instantiates a new, default InstallOptions struct. @@ -157,6 +164,9 @@ func NewInstallOptions() *Options { DefaultSnapshotMoveData: false, DisableInformerCache: false, ScheduleSkipImmediately: false, + MaintenanceCfg: repository.MaintenanceConfig{ + KeepLatestMaitenanceJobs: repository.DefaultKeepLatestMaitenanceJobs, + }, } } @@ -224,6 +234,7 @@ func (o *Options) AsVeleroOptions() (*install.VeleroOptions, error) { DefaultSnapshotMoveData: o.DefaultSnapshotMoveData, DisableInformerCache: o.DisableInformerCache, ScheduleSkipImmediately: o.ScheduleSkipImmediately, + MaintenanceCfg: o.MaintenanceCfg, }, nil } diff --git a/pkg/cmd/cli/repomantenance/maintenance.go b/pkg/cmd/cli/repomantenance/maintenance.go new file mode 100644 index 0000000000..4e69c65f4f --- /dev/null +++ b/pkg/cmd/cli/repomantenance/maintenance.go @@ -0,0 +1,179 @@ +package repomantenance + +import ( + "context" + "fmt" + "os" + "strings" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vmware-tanzu/velero/internal/credentials" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerocli "github.com/vmware-tanzu/velero/pkg/client" + "github.com/vmware-tanzu/velero/pkg/repository" + "github.com/vmware-tanzu/velero/pkg/repository/provider" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" + "github.com/vmware-tanzu/velero/pkg/util/logging" +) + +type Options struct { + RepoName string + BackupStorageLocation string + RepoType string + LogLevelFlag *logging.LevelFlag + FormatFlag *logging.FormatFlag +} + +func (o *Options) BindFlags(flags *pflag.FlagSet) { + flags.StringVar(&o.RepoName, "repo-name", "", "namespace of the pod/volume that the snapshot is for") + flags.StringVar(&o.BackupStorageLocation, "backup-storage-location", "", "backup's storage location name") + flags.StringVar(&o.RepoType, "repo-type", velerov1api.BackupRepositoryTypeKopia, "type of the repository where the snapshot is stored") + flags.Var(o.LogLevelFlag, "log-level", fmt.Sprintf("The level at which to log. Valid values are %s.", strings.Join(o.LogLevelFlag.AllowedValues(), ", "))) + flags.Var(o.FormatFlag, "log-format", fmt.Sprintf("The format for log output. Valid values are %s.", strings.Join(o.FormatFlag.AllowedValues(), ", "))) +} + +func NewCommand(f velerocli.Factory) *cobra.Command { + o := &Options{ + LogLevelFlag: logging.LogLevelFlag(logrus.InfoLevel), + FormatFlag: logging.NewFormatFlag(), + } + cmd := &cobra.Command{ + Use: "repo-maintenance", + Hidden: true, + Short: "VELERO INTERNAL COMMAND ONLY - not intended to be run directly by users", + Run: func(c *cobra.Command, args []string) { + o.Run(f) + }, + } + + o.BindFlags(cmd.Flags()) + return cmd +} + +func (o *Options) Run(f velerocli.Factory) { + logger := logging.DefaultLogger(o.LogLevelFlag.Parse(), o.FormatFlag.Parse()) + logger.SetOutput(os.Stdout) + + pruneError := o.runRepoPrune(f, f.Namespace(), logger) + defer func() { + if pruneError != nil { + os.Exit(1) + } + }() + + if pruneError != nil { + logger.WithError(pruneError).Error("An error occurred when running repo prune") + terminationLogFile, err := os.Create("/dev/termination-log") + if err != nil { + logger.WithError(err).Error("Failed to create termination log file") + return + } + defer terminationLogFile.Close() + + if _, errWrite := terminationLogFile.WriteString(fmt.Sprintf("An error occurred: %v", err)); errWrite != nil { + logger.WithError(errWrite).Error("Failed to write error to termination log file") + } + } +} + +func (o *Options) initClient(f velerocli.Factory) (client.Client, error) { + scheme := runtime.NewScheme() + err := velerov1api.AddToScheme(scheme) + if err != nil { + return nil, errors.Wrap(err, "failed to add velero scheme") + } + + err = v1.AddToScheme(scheme) + if err != nil { + return nil, errors.Wrap(err, "failed to add api core scheme") + } + + config, err := f.ClientConfig() + if err != nil { + return nil, errors.Wrap(err, "failed to get client config") + } + + cli, err := client.New(config, client.Options{ + Scheme: scheme, + }) + if err != nil { + return nil, errors.Wrap(err, "failed to create client") + } + + return cli, nil +} + +func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger logrus.FieldLogger) error { + cli, err := o.initClient(f) + if err != nil { + return err + } + + credentialFileStore, err := credentials.NewNamespacedFileStore( + cli, + namespace, + "/tmp/credentials", + filesystem.NewFileSystem(), + ) + if err != nil { + return errors.Wrap(err, "failed to create namespaced file store") + } + + credentialSecretStore, err := credentials.NewNamespacedSecretStore(cli, namespace) + if err != nil { + return errors.Wrap(err, "failed to create namespaced secret store") + } + + var repoProvider provider.Provider + if o.RepoType == velerov1api.BackupRepositoryTypeRestic { + repoProvider = provider.NewResticRepositoryProvider(credentialFileStore, filesystem.NewFileSystem(), logger) + } else { + repoProvider = provider.NewUnifiedRepoProvider( + credentials.CredentialGetter{ + FromFile: credentialFileStore, + FromSecret: credentialSecretStore, + }, o.RepoType, cli, logger) + } + + // backupRepository + repo, err := repository.GetBackupRepository(context.Background(), cli, namespace, + repository.BackupRepositoryKey{ + VolumeNamespace: o.RepoName, + BackupLocation: o.BackupStorageLocation, + RepositoryType: o.RepoType, + }, true) + + if err != nil { + return errors.Wrap(err, "failed to get backup repository") + } + + // bsl + bsl := &velerov1api.BackupStorageLocation{} + err = cli.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: repo.Spec.BackupStorageLocation}, bsl) + if err != nil { + return errors.Wrap(err, "failed to get backup storage location") + } + + para := provider.RepoParam{ + BackupRepo: repo, + BackupLocation: bsl, + } + + err = repoProvider.BoostRepoConnect(context.Background(), para) + if err != nil { + return errors.Wrap(err, "failed to boost repo connect") + } + + err = repoProvider.PruneRepo(context.Background(), para) + if err != nil { + return errors.Wrap(err, "failed to prune repo") + } + return nil +} diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 6ca3d59498..420faa5448 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -32,6 +32,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "github.com/spf13/cobra" + appsv1 "k8s.io/api/apps/v1" + batchv1api "k8s.io/api/batch/v1" corev1api "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -136,6 +138,7 @@ type serverConfig struct { defaultSnapshotMoveData bool disableInformerCache bool scheduleSkipImmediately bool + maintenanceCfg repository.MaintenanceConfig } func NewCommand(f client.Factory) *cobra.Command { @@ -167,6 +170,9 @@ func NewCommand(f client.Factory) *cobra.Command { defaultSnapshotMoveData: false, disableInformerCache: defaultDisableInformerCache, scheduleSkipImmediately: false, + maintenanceCfg: repository.MaintenanceConfig{ + KeepLatestMaitenanceJobs: repository.DefaultKeepLatestMaitenanceJobs, + }, } ) @@ -240,7 +246,15 @@ func NewCommand(f client.Factory) *cobra.Command { command.Flags().BoolVar(&config.defaultSnapshotMoveData, "default-snapshot-move-data", config.defaultSnapshotMoveData, "Move data by default for all snapshots supporting data movement.") command.Flags().BoolVar(&config.disableInformerCache, "disable-informer-cache", config.disableInformerCache, "Disable informer cache for Get calls on restore. With this enabled, it will speed up restore in cases where there are backup resources which already exist in the cluster, but for very large clusters this will increase velero memory usage. Default is false (don't disable).") command.Flags().BoolVar(&config.scheduleSkipImmediately, "schedule-skip-immediately", config.scheduleSkipImmediately, "Skip the first scheduled backup immediately after creating a schedule. Default is false (don't skip).") - + command.Flags().IntVar(&config.maintenanceCfg.KeepLatestMaitenanceJobs, "keep-latest-maintenance-jobs", config.maintenanceCfg.KeepLatestMaitenanceJobs, "Number of latest maintenance jobs to keep each repository. Optional.") + command.Flags().StringVar(&config.maintenanceCfg.CPURequest, "maintenance-job-cpu-request", config.maintenanceCfg.CPURequest, "CPU request for maintenance job. Default is no limit.") + command.Flags().StringVar(&config.maintenanceCfg.MemRequest, "maintenance-job-mem-request", config.maintenanceCfg.MemRequest, "Memory request for maintenance job. Default is no limit.") + command.Flags().StringVar(&config.maintenanceCfg.CPULimit, "maintenance-job-cpu-limit", config.maintenanceCfg.CPULimit, "CPU limit for maintenance job. Default is no limit.") + command.Flags().StringVar(&config.maintenanceCfg.MemLimit, "maintenance-job-mem-limit", config.maintenanceCfg.MemLimit, "Memory limit for maintenance job. Default is no limit.") + + // maintenance job log setting inherited from velero server + config.maintenanceCfg.FormatFlag = config.formatFlag + config.maintenanceCfg.LogLevelFlag = logLevelFlag return command } @@ -347,6 +361,14 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s cancelFunc() return nil, err } + if err := batchv1api.AddToScheme(scheme); err != nil { + cancelFunc() + return nil, err + } + if err := appsv1.AddToScheme(scheme); err != nil { + cancelFunc() + return nil, err + } ctrl.SetLogger(logrusr.New(logger)) @@ -647,7 +669,7 @@ func (s *server) initRepoManager() error { s.repoLocker = repository.NewRepoLocker() s.repoEnsurer = repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout) - s.repoManager = repository.NewManager(s.namespace, s.mgr.GetClient(), s.repoLocker, s.repoEnsurer, s.credentialFileStore, s.credentialSecretStore, s.logger) + s.repoManager = repository.NewManager(s.namespace, s.mgr.GetClient(), s.repoLocker, s.repoEnsurer, s.credentialFileStore, s.credentialSecretStore, s.config.maintenanceCfg, s.logger) return nil } diff --git a/pkg/cmd/velero/velero.go b/pkg/cmd/velero/velero.go index b249416e17..972f5bb73c 100644 --- a/pkg/cmd/velero/velero.go +++ b/pkg/cmd/velero/velero.go @@ -26,6 +26,7 @@ import ( "k8s.io/klog/v2" "github.com/vmware-tanzu/velero/pkg/cmd/cli/debug" + "github.com/vmware-tanzu/velero/pkg/cmd/cli/repomantenance" "github.com/vmware-tanzu/velero/pkg/client" "github.com/vmware-tanzu/velero/pkg/cmd/cli/backup" @@ -122,6 +123,7 @@ operations can also be performed as 'velero backup get' and 'velero schedule cre backuplocation.NewCommand(f), snapshotlocation.NewCommand(f), debug.NewCommand(f), + repomantenance.NewCommand(f), ) // init and add the klog flags diff --git a/pkg/controller/backup_repository_controller.go b/pkg/controller/backup_repository_controller.go index 108c6c470c..e4d68d9998 100644 --- a/pkg/controller/backup_repository_controller.go +++ b/pkg/controller/backup_repository_controller.go @@ -189,10 +189,16 @@ func (r *BackupRepoReconciler) Reconcile(ctx context.Context, req ctrl.Request) } switch backupRepo.Status.Phase { + case velerov1api.BackupRepositoryPhaseNotReady: + ready, err := r.checkNotReadyRepo(ctx, backupRepo, log) + if err != nil { + return ctrl.Result{}, err + } else if !ready { + return ctrl.Result{}, nil + } + fallthrough case velerov1api.BackupRepositoryPhaseReady: return ctrl.Result{}, r.runMaintenanceIfDue(ctx, backupRepo, log) - case velerov1api.BackupRepositoryPhaseNotReady: - return ctrl.Result{}, r.checkNotReadyRepo(ctx, backupRepo, log) } return ctrl.Result{}, nil @@ -277,8 +283,6 @@ func ensureRepo(repo *velerov1api.BackupRepository, repoManager repository.Manag } func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error { - log.Debug("backupRepositoryController.runMaintenanceIfDue") - now := r.clock.Now() if !dueForMaintenance(req, now) { @@ -291,6 +295,7 @@ func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *vel // prune failures should be displayed in the `.status.message` field but // should not cause the repo to move to `NotReady`. log.Debug("Pruning repo") + if err := r.repositoryManager.PruneRepo(req); err != nil { log.WithError(err).Warn("error pruning repository") return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) { @@ -299,6 +304,7 @@ func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *vel } return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) { + rr.Status.Message = "" rr.Status.LastMaintenanceTime = &metav1.Time{Time: now} }) } @@ -307,28 +313,32 @@ func dueForMaintenance(req *velerov1api.BackupRepository, now time.Time) bool { return req.Status.LastMaintenanceTime == nil || req.Status.LastMaintenanceTime.Add(req.Spec.MaintenanceFrequency.Duration).Before(now) } -func (r *BackupRepoReconciler) checkNotReadyRepo(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error { +func (r *BackupRepoReconciler) checkNotReadyRepo(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) (bool, error) { log.Info("Checking backup repository for readiness") repoIdentifier, err := r.getIdentiferByBSL(ctx, req) if err != nil { - return r.patchBackupRepository(ctx, req, repoNotReady(err.Error())) + return false, r.patchBackupRepository(ctx, req, repoNotReady(err.Error())) } if repoIdentifier != req.Spec.ResticIdentifier { if err := r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) { rr.Spec.ResticIdentifier = repoIdentifier }); err != nil { - return err + return false, err } } // we need to ensure it (first check, if check fails, attempt to init) // because we don't know if it's been successfully initialized yet. if err := ensureRepo(req, r.repositoryManager); err != nil { - return r.patchBackupRepository(ctx, req, repoNotReady(err.Error())) + return false, r.patchBackupRepository(ctx, req, repoNotReady(err.Error())) + } + err = r.patchBackupRepository(ctx, req, repoReady()) + if err != nil { + return false, err } - return r.patchBackupRepository(ctx, req, repoReady()) + return true, nil } func repoNotReady(msg string) func(*velerov1api.BackupRepository) { diff --git a/pkg/controller/backup_repository_controller_test.go b/pkg/controller/backup_repository_controller_test.go index 22c64b45e8..a80a2da2b3 100644 --- a/pkg/controller/backup_repository_controller_test.go +++ b/pkg/controller/backup_repository_controller_test.go @@ -93,7 +93,7 @@ func TestCheckNotReadyRepo(t *testing.T) { err = reconciler.Client.Create(context.TODO(), locations) assert.NoError(t, err) - err = reconciler.checkNotReadyRepo(context.TODO(), rr, reconciler.logger) + _, err = reconciler.checkNotReadyRepo(context.TODO(), rr, reconciler.logger) assert.NoError(t, err) assert.Equal(t, rr.Status.Phase, velerov1api.BackupRepositoryPhaseReady) assert.Equal(t, "s3:test.amazonaws.com/bucket/restic/volume-ns-1", rr.Spec.ResticIdentifier) diff --git a/pkg/datapath/file_system.go b/pkg/datapath/file_system.go index 60a0c8347b..b7372d0a82 100644 --- a/pkg/datapath/file_system.go +++ b/pkg/datapath/file_system.go @@ -184,7 +184,7 @@ func (fs *fileSystemBR) Cancel() { func (fs *fileSystemBR) boostRepoConnect(ctx context.Context, repositoryType string, credentialGetter *credentials.CredentialGetter) error { if repositoryType == velerov1api.BackupRepositoryTypeKopia { - if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil { + if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.client, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil { return err } } else { diff --git a/pkg/install/deployment.go b/pkg/install/deployment.go index 383b40856a..a8c2a131ab 100644 --- a/pkg/install/deployment.go +++ b/pkg/install/deployment.go @@ -27,6 +27,7 @@ import ( "github.com/vmware-tanzu/velero/internal/velero" "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/repository" ) type podTemplateOption func(*podTemplateConfig) @@ -51,6 +52,7 @@ type podTemplateConfig struct { privilegedNodeAgent bool disableInformerCache bool scheduleSkipImmediately bool + maintenanceConfig repository.MaintenanceConfig } func WithImage(image string) podTemplateOption { @@ -177,6 +179,12 @@ func WithScheduleSkipImmediately(b bool) podTemplateOption { } } +func WithMaintenanceConfig(config repository.MaintenanceConfig) podTemplateOption { + return func(c *podTemplateConfig) { + c.maintenanceConfig = config + } +} + func Deployment(namespace string, opts ...podTemplateOption) *appsv1.Deployment { // TODO: Add support for server args c := &podTemplateConfig{ @@ -234,6 +242,26 @@ func Deployment(namespace string, opts ...podTemplateOption) *appsv1.Deployment args = append(args, fmt.Sprintf("--fs-backup-timeout=%v", c.podVolumeOperationTimeout)) } + if c.maintenanceConfig.KeepLatestMaitenanceJobs > 0 { + args = append(args, fmt.Sprintf("--keep-latest-maintenance-jobs=%d", c.maintenanceConfig.KeepLatestMaitenanceJobs)) + } + + if c.maintenanceConfig.CPULimit != "" { + args = append(args, fmt.Sprintf("--maintenance-job-cpu-limit=%s", c.maintenanceConfig.CPULimit)) + } + + if c.maintenanceConfig.CPURequest != "" { + args = append(args, fmt.Sprintf("--maintenance-job-cpu-request=%s", c.maintenanceConfig.CPURequest)) + } + + if c.maintenanceConfig.MemLimit != "" { + args = append(args, fmt.Sprintf("--maintenance-job-mem-limit=%s", c.maintenanceConfig.MemLimit)) + } + + if c.maintenanceConfig.MemRequest != "" { + args = append(args, fmt.Sprintf("--maintenance-job-mem-request=%s", c.maintenanceConfig.MemRequest)) + } + deployment := &appsv1.Deployment{ ObjectMeta: objectMeta(namespace, "velero"), TypeMeta: metav1.TypeMeta{ diff --git a/pkg/install/deployment_test.go b/pkg/install/deployment_test.go index 8e9649737d..fcbd4fc7fd 100644 --- a/pkg/install/deployment_test.go +++ b/pkg/install/deployment_test.go @@ -22,6 +22,8 @@ import ( "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" + + "github.com/vmware-tanzu/velero/pkg/repository" ) func TestDeployment(t *testing.T) { @@ -68,4 +70,18 @@ func TestDeployment(t *testing.T) { deploy = Deployment("velero", WithDisableInformerCache()) assert.Len(t, deploy.Spec.Template.Spec.Containers[0].Args, 2) assert.Equal(t, "--disable-informer-cache=true", deploy.Spec.Template.Spec.Containers[0].Args[1]) + + deploy = Deployment("velero", WithMaintenanceConfig(repository.MaintenanceConfig{ + KeepLatestMaitenanceJobs: 3, + CPURequest: "100m", + MemRequest: "256Mi", + CPULimit: "200m", + MemLimit: "512Mi", + })) + assert.Len(t, deploy.Spec.Template.Spec.Containers[0].Args, 6) + assert.Equal(t, "--keep-latest-maintenance-jobs=3", deploy.Spec.Template.Spec.Containers[0].Args[1]) + assert.Equal(t, "--maintenance-job-cpu-limit=200m", deploy.Spec.Template.Spec.Containers[0].Args[2]) + assert.Equal(t, "--maintenance-job-cpu-request=100m", deploy.Spec.Template.Spec.Containers[0].Args[3]) + assert.Equal(t, "--maintenance-job-mem-limit=512Mi", deploy.Spec.Template.Spec.Containers[0].Args[4]) + assert.Equal(t, "--maintenance-job-mem-request=256Mi", deploy.Spec.Template.Spec.Containers[0].Args[5]) } diff --git a/pkg/install/resources.go b/pkg/install/resources.go index 4c721200ef..171fc2ece4 100644 --- a/pkg/install/resources.go +++ b/pkg/install/resources.go @@ -30,6 +30,8 @@ import ( v1crds "github.com/vmware-tanzu/velero/config/crd/v1/crds" v2alpha1crds "github.com/vmware-tanzu/velero/config/crd/v2alpha1/crds" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/repository" + "github.com/vmware-tanzu/velero/pkg/util/logging" ) const ( @@ -261,6 +263,9 @@ type VeleroOptions struct { DefaultSnapshotMoveData bool DisableInformerCache bool ScheduleSkipImmediately bool + FormatFlag *logging.FormatFlag + LogLevelFlag *logging.LevelFlag + MaintenanceCfg repository.MaintenanceConfig } func AllCRDs() *unstructured.UnstructuredList { @@ -345,6 +350,7 @@ func AllResources(o *VeleroOptions) *unstructured.UnstructuredList { WithPodVolumeOperationTimeout(o.PodVolumeOperationTimeout), WithUploaderType(o.UploaderType), WithScheduleSkipImmediately(o.ScheduleSkipImmediately), + WithMaintenanceConfig(o.MaintenanceCfg), } if len(o.Features) > 0 { diff --git a/pkg/repository/maintenance.go b/pkg/repository/maintenance.go new file mode 100644 index 0000000000..7a298f4690 --- /dev/null +++ b/pkg/repository/maintenance.go @@ -0,0 +1,258 @@ +/* +Copyright the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package repository + +import ( + "context" + "fmt" + "sort" + "time" + + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/pkg/errors" + + "github.com/vmware-tanzu/velero/pkg/repository/provider" + "github.com/vmware-tanzu/velero/pkg/util/kube" + "github.com/vmware-tanzu/velero/pkg/util/logging" + veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero" +) + +const RepositoryNameLabel = "velero.io/repo-name" +const DefaultKeepLatestMaitenanceJobs = 3 +const DefaultMaintenanceJobCPURequest = "0" +const DefaultMaintenanceJobCPULimit = "0" +const DefaultMaintenanceJobMemRequest = "0" +const DefaultMaintenanceJobMemLimit = "0" + +// MaintenanceConfig is the configuration for the repo maintenance job +type MaintenanceConfig struct { + KeepLatestMaitenanceJobs int + CPURequest string + MemRequest string + CPULimit string + MemLimit string + LogLevelFlag *logging.LevelFlag + FormatFlag *logging.FormatFlag +} + +func generateJobName(repo string) string { + millisecond := time.Now().UTC().UnixMilli() // millisecond + + jobName := fmt.Sprintf("%s-maintain-job-%d", repo, millisecond) + if len(jobName) > 63 { // k8s job name length limit + jobName = fmt.Sprintf("repo-maintain-job-%d", millisecond) + } + + return jobName +} + +func buildMaintenanceJob(m MaintenanceConfig, param provider.RepoParam, cli client.Client, namespace string) (*batchv1.Job, error) { + // Get the Velero server deployment + deployment := &appsv1.Deployment{} + err := cli.Get(context.TODO(), types.NamespacedName{Name: "velero", Namespace: namespace}, deployment) + if err != nil { + return nil, err + } + + // Get the environment variables from the Velero server deployment + envVars := veleroutil.GetEnvVarsFromVeleroServer(deployment) + + // Get the volume mounts from the Velero server deployment + volumeMounts := veleroutil.GetVolumeMountsFromVeleroServer(deployment) + + // Get the volumes from the Velero server deployment + volumes := veleroutil.GetVolumesFromVeleroServer(deployment) + + // Get the service account from the Velero server deployment + serviceAccount := veleroutil.GetServiceAccountFromVeleroServer(deployment) + + // Get image + image := veleroutil.GetVeleroServerImage(deployment) + + // Set resource limits and requests + if m.CPURequest == "" { + m.CPURequest = DefaultMaintenanceJobCPURequest + } + if m.MemRequest == "" { + m.MemRequest = DefaultMaintenanceJobMemRequest + } + if m.CPULimit == "" { + m.CPULimit = DefaultMaintenanceJobCPULimit + } + if m.MemLimit == "" { + m.MemLimit = DefaultMaintenanceJobMemLimit + } + + resources, err := kube.ParseResourceRequirements(m.CPURequest, m.MemRequest, m.CPULimit, m.MemLimit) + if err != nil { + return nil, errors.Wrap(err, "failed to parse resource requirements for maintenance job") + } + + // Set arguments + args := []string{"repo-maintenance"} + args = append(args, fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace)) + args = append(args, fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType)) + args = append(args, fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name)) + args = append(args, fmt.Sprintf("--log-level=%s", m.LogLevelFlag.String())) + args = append(args, fmt.Sprintf("--log-format=%s", m.FormatFlag.String())) + + // build the maintenance job + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: generateJobName(param.BackupRepo.Name), + Namespace: param.BackupRepo.Namespace, + Labels: map[string]string{ + RepositoryNameLabel: param.BackupRepo.Name, + }, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: new(int32), // Never retry + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "velero-repo-maintenance-pod", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "velero-repo-maintenance-container", + Image: image, + Command: []string{ + "/velero", + }, + Args: args, + ImagePullPolicy: v1.PullIfNotPresent, + Env: envVars, + VolumeMounts: volumeMounts, + Resources: resources, + }, + }, + RestartPolicy: v1.RestartPolicyNever, + Volumes: volumes, + ServiceAccountName: serviceAccount, + }, + }, + }, + } + + if affinity := veleroutil.GetAffinityFromVeleroServer(deployment); affinity != nil { + job.Spec.Template.Spec.Affinity = affinity + } + + if tolerations := veleroutil.GetTolerationsFromVeleroServer(deployment); tolerations != nil { + job.Spec.Template.Spec.Tolerations = tolerations + } + + if nodeSelector := veleroutil.GetNodeSelectorFromVeleroServer(deployment); nodeSelector != nil { + job.Spec.Template.Spec.NodeSelector = nodeSelector + } + + if labels := veleroutil.GetVeleroServerLables(deployment); len(labels) > 0 { + job.Spec.Template.Labels = labels + } + + if annotations := veleroutil.GetVeleroServerAnnotations(deployment); len(annotations) > 0 { + job.Spec.Template.Annotations = annotations + } + + return job, nil +} + +// deleteOldMaintenanceJobs deletes old maintenance jobs and keeps the latest N jobs +func deleteOldMaintenanceJobs(cli client.Client, repo string, keep int) error { + // Get the maintenance job list by label + jobList := &batchv1.JobList{} + err := cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo})) + if err != nil { + return err + } + + // Delete old maintenance jobs + if len(jobList.Items) > keep { + sort.Slice(jobList.Items, func(i, j int) bool { + return jobList.Items[i].CreationTimestamp.Before(&jobList.Items[j].CreationTimestamp) + }) + for i := 0; i < len(jobList.Items)-keep; i++ { + err = cli.Delete(context.TODO(), &jobList.Items[i], client.PropagationPolicy(metav1.DeletePropagationBackground)) + if err != nil { + return err + } + } + } + + return nil +} + +func waitForJobComplete(ctx context.Context, client client.Client, job *batchv1.Job) error { + return wait.PollUntilContextCancel(ctx, 1, true, func(ctx context.Context) (bool, error) { + err := client.Get(ctx, types.NamespacedName{Namespace: job.Namespace, Name: job.Name}, job) + if err != nil && !apierrors.IsNotFound(err) { + return false, err + } + + if job.Status.Succeeded > 0 { + return true, nil + } + + if job.Status.Failed > 0 { + return true, fmt.Errorf("maintenance job %s/%s failed", job.Namespace, job.Name) + } + return false, nil + }) +} + +func getMaintenanceResultFromJob(cli client.Client, job *batchv1.Job) (string, error) { + // Get the maintenance job related pod by label selector + podList := &v1.PodList{} + err := cli.List(context.TODO(), podList, client.InNamespace(job.Namespace), client.MatchingLabels(map[string]string{"job-name": job.Name})) + if err != nil { + return "", err + } + + if len(podList.Items) == 0 { + return "", fmt.Errorf("no pod found for job %s", job.Name) + } + + // we only have one maintenance pod for the job + return podList.Items[0].Status.ContainerStatuses[0].State.Terminated.Message, nil +} + +func GetLatestMaintenanceJob(cli client.Client, repo string) (*batchv1.Job, error) { + // Get the maintenance job list by label + jobList := &batchv1.JobList{} + err := cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo})) + if err != nil { + return nil, err + } + + if len(jobList.Items) == 0 { + return nil, nil + } + + // Get the latest maintenance job + sort.Slice(jobList.Items, func(i, j int) bool { + return jobList.Items[i].CreationTimestamp.Time.After(jobList.Items[j].CreationTimestamp.Time) + }) + return &jobList.Items[0], nil +} diff --git a/pkg/repository/maintenance_test.go b/pkg/repository/maintenance_test.go new file mode 100644 index 0000000000..c27fb92875 --- /dev/null +++ b/pkg/repository/maintenance_test.go @@ -0,0 +1,408 @@ +/* +Copyright the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package repository + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/repository/provider" + "github.com/vmware-tanzu/velero/pkg/util/logging" +) + +func TestGenerateJobName1(t *testing.T) { + testCases := []struct { + repo string + expectedStart string + }{ + { + repo: "example", + expectedStart: "example-maintain-job-", + }, + { + repo: strings.Repeat("a", 60), + expectedStart: "repo-maintain-job-", + }, + } + + for _, tc := range testCases { + t.Run(tc.repo, func(t *testing.T) { + // Call the function to test + jobName := generateJobName(tc.repo) + + // Check if the generated job name starts with the expected prefix + if !strings.HasPrefix(jobName, tc.expectedStart) { + t.Errorf("generated job name does not start with expected prefix") + } + + // Check if the length of the generated job name exceeds the Kubernetes limit + if len(jobName) > 63 { + t.Errorf("generated job name exceeds Kubernetes limit") + } + }) + } +} +func TestDeleteOldMaintenanceJobs(t *testing.T) { + // Set up test repo and keep value + repo := "test-repo" + keep := 2 + + // Create some maintenance jobs for testing + var objs []client.Object + // Create a newer job + newerJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job1", + Namespace: "default", + Labels: map[string]string{RepositoryNameLabel: repo}, + }, + Spec: batchv1.JobSpec{}, + } + objs = append(objs, newerJob) + // Create older jobs + for i := 2; i <= 3; i++ { + olderJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("job%d", i), + Namespace: "default", + Labels: map[string]string{RepositoryNameLabel: repo}, + CreationTimestamp: metav1.Time{ + Time: metav1.Now().Add(time.Duration(-24*i) * time.Hour), + }, + }, + Spec: batchv1.JobSpec{}, + } + objs = append(objs, olderJob) + } + // Create a fake Kubernetes client + scheme := runtime.NewScheme() + _ = batchv1.AddToScheme(scheme) + cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build() + + // Call the function + err := deleteOldMaintenanceJobs(cli, repo, keep) + assert.NoError(t, err) + + // Get the remaining jobs + jobList := &batchv1.JobList{} + err = cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo})) + assert.NoError(t, err) + + // We expect the number of jobs to be equal to 'keep' + assert.Equal(t, keep, len(jobList.Items)) + + // We expect that the oldest jobs were deleted + // Job3 should not be present in the remaining list + assert.NotContains(t, jobList.Items, objs[2]) + + // Job2 should also not be present in the remaining list + assert.NotContains(t, jobList.Items, objs[1]) +} + +func TestWaitForJobComplete(t *testing.T) { + // Set up test job + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "default", + }, + Status: batchv1.JobStatus{}, + } + + // Define test cases + tests := []struct { + description string // Test case description + jobStatus batchv1.JobStatus // Job status to set for the test + expectError bool // Whether an error is expected + }{ + { + description: "Job Succeeded", + jobStatus: batchv1.JobStatus{ + Succeeded: 1, + }, + expectError: false, + }, + { + description: "Job Failed", + jobStatus: batchv1.JobStatus{ + Failed: 1, + }, + expectError: true, + }, + } + + // Run tests + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + // Set the job status + job.Status = tc.jobStatus + // Create a fake Kubernetes client + cli := fake.NewClientBuilder().WithObjects(job).Build() + // Call the function + err := waitForJobComplete(context.Background(), cli, job) + + // Check if the error matches the expectation + if tc.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestGetMaintenanceResultFromJob(t *testing.T) { + // Set up test job + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "default", + }, + } + + // Set up test pod + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Labels: map[string]string{"job-name": job.Name}, + }, + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + Message: "test message", + }, + }, + }, + }, + }, + } + + // Create a fake Kubernetes client + cli := fake.NewClientBuilder().WithObjects(job, pod).Build() + + // Call the function + result, err := getMaintenanceResultFromJob(cli, job) + + // Check if the result and error match the expectation + assert.NoError(t, err) + assert.Equal(t, "test message", result) +} + +func TestGetLatestMaintenanceJob(t *testing.T) { + // Set up test repo + repo := "test-repo" + + // Create some maintenance jobs for testing + var objs []client.Object + // Create a newer job + newerJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job1", + Namespace: "default", + Labels: map[string]string{RepositoryNameLabel: repo}, + CreationTimestamp: metav1.Time{ + Time: metav1.Now().Add(time.Duration(-24) * time.Hour), + }, + }, + Spec: batchv1.JobSpec{}, + } + objs = append(objs, newerJob) + + // Create an older job + olderJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job2", + Namespace: "default", + Labels: map[string]string{RepositoryNameLabel: repo}, + }, + Spec: batchv1.JobSpec{}, + } + objs = append(objs, olderJob) + + // Create a fake Kubernetes client + scheme := runtime.NewScheme() + _ = batchv1.AddToScheme(scheme) + cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build() + + // Call the function + job, err := GetLatestMaintenanceJob(cli, repo) + assert.NoError(t, err) + + // We expect the returned job to be the newer job + assert.Equal(t, newerJob.Name, job.Name) +} +func TestBuildMaintenanceJob(t *testing.T) { + testCases := []struct { + name string + m MaintenanceConfig + deploy *appsv1.Deployment + expectedJobName string + expectedError bool + }{ + { + name: "Valid maintenance job", + m: MaintenanceConfig{ + CPURequest: "100m", + MemRequest: "128Mi", + CPULimit: "200m", + MemLimit: "256Mi", + LogLevelFlag: logging.LogLevelFlag(logrus.InfoLevel), + FormatFlag: logging.NewFormatFlag(), + }, + deploy: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "velero", + Namespace: "velero", + }, + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "velero-repo-maintenance-container", + Image: "velero-image", + }, + }, + }, + }, + }, + }, + expectedJobName: "test-123-maintain-job", + expectedError: false, + }, + { + name: "Error getting Velero server deployment", + m: MaintenanceConfig{ + CPURequest: "100m", + MemRequest: "128Mi", + CPULimit: "200m", + MemLimit: "256Mi", + LogLevelFlag: logging.LogLevelFlag(logrus.InfoLevel), + FormatFlag: logging.NewFormatFlag(), + }, + expectedJobName: "", + expectedError: true, + }, + } + + param := provider.RepoParam{ + BackupRepo: &velerov1api.BackupRepository{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "test-123", + }, + Spec: velerov1api.BackupRepositorySpec{ + VolumeNamespace: "test-123", + RepositoryType: "kopia", + }, + }, + BackupLocation: &velerov1api.BackupStorageLocation{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "test-location", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create a fake clientset with resources + objs := []runtime.Object{param.BackupLocation, param.BackupRepo} + + if tc.deploy != nil { + objs = append(objs, tc.deploy) + } + scheme := runtime.NewScheme() + _ = appsv1.AddToScheme(scheme) + _ = velerov1api.AddToScheme(scheme) + cli := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build() + + // Call the function to test + job, err := buildMaintenanceJob(tc.m, param, cli, "velero") + + // Check the error + if tc.expectedError { + assert.Error(t, err) + assert.Nil(t, job) + } else { + assert.NoError(t, err) + assert.NotNil(t, job) + assert.Contains(t, job.Name, tc.expectedJobName) + assert.Equal(t, param.BackupRepo.Namespace, job.Namespace) + assert.Equal(t, param.BackupRepo.Name, job.Labels[RepositoryNameLabel]) + + // Check container + assert.Len(t, job.Spec.Template.Spec.Containers, 1) + container := job.Spec.Template.Spec.Containers[0] + assert.Equal(t, "velero-repo-maintenance-container", container.Name) + assert.Equal(t, "velero-image", container.Image) + assert.Equal(t, v1.PullIfNotPresent, container.ImagePullPolicy) + + // Check resources + expectedResources := v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(tc.m.CPURequest), + v1.ResourceMemory: resource.MustParse(tc.m.MemRequest), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(tc.m.CPULimit), + v1.ResourceMemory: resource.MustParse(tc.m.MemLimit), + }, + } + assert.Equal(t, expectedResources, container.Resources) + + // Check args + expectedArgs := []string{ + "repo-maintenance", + fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace), + fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType), + fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name), + fmt.Sprintf("--log-level=%s", tc.m.LogLevelFlag.String()), + fmt.Sprintf("--log-format=%s", tc.m.FormatFlag.String()), + } + assert.Equal(t, expectedArgs, container.Args) + + // Check affinity + assert.Nil(t, job.Spec.Template.Spec.Affinity) + + // Check tolerations + assert.Nil(t, job.Spec.Template.Spec.Tolerations) + + // Check node selector + assert.Nil(t, job.Spec.Template.Spec.NodeSelector) + } + }) + } +} diff --git a/pkg/repository/manager.go b/pkg/repository/manager.go index 3e412a73a4..8ed5afc4a3 100644 --- a/pkg/repository/manager.go +++ b/pkg/repository/manager.go @@ -77,13 +77,14 @@ type Manager interface { } type manager struct { - namespace string - providers map[string]provider.Provider - client client.Client - repoLocker *RepoLocker - repoEnsurer *Ensurer - fileSystem filesystem.Interface - log logrus.FieldLogger + namespace string + providers map[string]provider.Provider + client client.Client + repoLocker *RepoLocker + repoEnsurer *Ensurer + fileSystem filesystem.Interface + maintenanceCfg MaintenanceConfig + log logrus.FieldLogger } // NewManager create a new repository manager. @@ -94,23 +95,25 @@ func NewManager( repoEnsurer *Ensurer, credentialFileStore credentials.FileStore, credentialSecretStore credentials.SecretStore, + maintenanceCfg MaintenanceConfig, log logrus.FieldLogger, ) Manager { mgr := &manager{ - namespace: namespace, - client: client, - providers: map[string]provider.Provider{}, - repoLocker: repoLocker, - repoEnsurer: repoEnsurer, - fileSystem: filesystem.NewFileSystem(), - log: log, + namespace: namespace, + client: client, + providers: map[string]provider.Provider{}, + repoLocker: repoLocker, + repoEnsurer: repoEnsurer, + fileSystem: filesystem.NewFileSystem(), + maintenanceCfg: maintenanceCfg, + log: log, } mgr.providers[velerov1api.BackupRepositoryTypeRestic] = provider.NewResticRepositoryProvider(credentialFileStore, mgr.fileSystem, mgr.log) mgr.providers[velerov1api.BackupRepositoryTypeKopia] = provider.NewUnifiedRepoProvider(credentials.CredentialGetter{ FromFile: credentialFileStore, FromSecret: credentialSecretStore, - }, velerov1api.BackupRepositoryTypeKopia, mgr.log) + }, velerov1api.BackupRepositoryTypeKopia, client, mgr.log) return mgr } @@ -177,7 +180,55 @@ func (m *manager) PruneRepo(repo *velerov1api.BackupRepository) error { return errors.WithStack(err) } - return prd.PruneRepo(context.Background(), param) + log := m.log.WithFields(logrus.Fields{ + "BSL name": param.BackupLocation.Name, + "repo type": param.BackupRepo.Spec.RepositoryType, + "repo name": param.BackupRepo.Name, + "repo UID": param.BackupRepo.UID, + }) + + log.Info("Start to maintence repo") + + maintenanceJob, err := buildMaintenanceJob(m.maintenanceCfg, param, m.client, m.namespace) + if err != nil { + return errors.Wrap(err, "error to build maintenance job") + } + + log = log.WithField("job", fmt.Sprintf("%s/%s", maintenanceJob.Namespace, maintenanceJob.Name)) + + if err := m.client.Create(context.TODO(), maintenanceJob); err != nil { + return errors.Wrap(err, "error to create maintenance job") + } + log.Debug("Creating maintenance job") + + defer func() { + if err := deleteOldMaintenanceJobs(m.client, param.BackupRepo.Name, + m.maintenanceCfg.KeepLatestMaitenanceJobs); err != nil { + log.WithError(err).Error("Failed to delete maintenance job") + } + }() + + var jobErr error + if err := waitForJobComplete(context.TODO(), m.client, maintenanceJob); err != nil { + log.WithError(err).Error("Error to wait for maintenance job complete") + jobErr = err // we won't return here for job may failed by maintenance failure, we want return the actual error + } + + result, err := getMaintenanceResultFromJob(m.client, maintenanceJob) + if err != nil { + return errors.Wrap(err, "error to get maintenance job result") + } + + if result != "" { + return errors.New(fmt.Sprintf("Maintenance job %s failed: %s", maintenanceJob.Name, result)) + } + + if jobErr != nil { + return errors.Wrap(jobErr, "error to wait for maintenance job complete") + } + + log.Info("Maintenance repo complete") + return nil } func (m *manager) UnlockRepo(repo *velerov1api.BackupRepository) error { diff --git a/pkg/repository/manager_test.go b/pkg/repository/manager_test.go index 4d84919d2a..679cc8066f 100644 --- a/pkg/repository/manager_test.go +++ b/pkg/repository/manager_test.go @@ -21,12 +21,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + kbclient "sigs.k8s.io/controller-runtime/pkg/client" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" ) func TestGetRepositoryProvider(t *testing.T) { - mgr := NewManager("", nil, nil, nil, nil, nil, nil).(*manager) + var fakeClient kbclient.Client + mgr := NewManager("", fakeClient, nil, nil, nil, nil, MaintenanceConfig{}, nil).(*manager) repo := &velerov1.BackupRepository{} // empty repository type diff --git a/pkg/repository/provider/unified_repo.go b/pkg/repository/provider/unified_repo.go index 11c0845497..aaaf6f034a 100644 --- a/pkg/repository/provider/unified_repo.go +++ b/pkg/repository/provider/unified_repo.go @@ -29,6 +29,7 @@ import ( "github.com/kopia/kopia/repo" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" @@ -43,6 +44,7 @@ type unifiedRepoProvider struct { workPath string repoService udmrepo.BackupRepoService repoBackend string + cli client.Client log logrus.FieldLogger } @@ -73,11 +75,13 @@ const ( func NewUnifiedRepoProvider( credentialGetter credentials.CredentialGetter, repoBackend string, + cli client.Client, log logrus.FieldLogger, ) Provider { repo := unifiedRepoProvider{ credentialGetter: credentialGetter, repoBackend: repoBackend, + cli: cli, log: log, } diff --git a/pkg/util/velero/velero.go b/pkg/util/velero/velero.go new file mode 100644 index 0000000000..bc52253365 --- /dev/null +++ b/pkg/util/velero/velero.go @@ -0,0 +1,80 @@ +/* +Copyright the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package velero + +import ( + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" +) + +// GetNodeSelectorFromVeleroServer get the node selector from the Velero server deployment +func GetNodeSelectorFromVeleroServer(deployment *appsv1.Deployment) map[string]string { + return deployment.Spec.Template.Spec.NodeSelector +} + +// GetTolerationsFromVeleroServer get the tolerations from the Velero server deployment +func GetTolerationsFromVeleroServer(deployment *appsv1.Deployment) []v1.Toleration { + return deployment.Spec.Template.Spec.Tolerations +} + +// GetAffinityFromVeleroServer get the affinity from the Velero server deployment +func GetAffinityFromVeleroServer(deployment *appsv1.Deployment) *v1.Affinity { + return deployment.Spec.Template.Spec.Affinity +} + +// GetEnvVarsFromVeleroServer get the environment variables from the Velero server deployment +func GetEnvVarsFromVeleroServer(deployment *appsv1.Deployment) []v1.EnvVar { + for _, container := range deployment.Spec.Template.Spec.Containers { + // We only have one container in the Velero server deployment + return container.Env + } + return nil +} + +// GetVolumeMountsFromVeleroServer get the volume mounts from the Velero server deployment +func GetVolumeMountsFromVeleroServer(deployment *appsv1.Deployment) []v1.VolumeMount { + for _, container := range deployment.Spec.Template.Spec.Containers { + // We only have one container in the Velero server deployment + return container.VolumeMounts + } + return nil +} + +// GetVolumesFromVeleroServer get the volumes from the Velero server deployment +func GetVolumesFromVeleroServer(deployment *appsv1.Deployment) []v1.Volume { + return deployment.Spec.Template.Spec.Volumes +} + +// GetServiceAccountFromVeleroServer get the service account from the Velero server deployment +func GetServiceAccountFromVeleroServer(deployment *appsv1.Deployment) string { + return deployment.Spec.Template.Spec.ServiceAccountName +} + +// getVeleroServerImage get the image of the Velero server deployment +func GetVeleroServerImage(deployment *appsv1.Deployment) string { + return deployment.Spec.Template.Spec.Containers[0].Image +} + +// GetVeleroServerLables get the labels of the Velero server deployment +func GetVeleroServerLables(deployment *appsv1.Deployment) map[string]string { + return deployment.Spec.Template.Labels +} + +// GetVeleroServerAnnotations get the annotations of the Velero server deployment +func GetVeleroServerAnnotations(deployment *appsv1.Deployment) map[string]string { + return deployment.Spec.Template.Annotations +} diff --git a/pkg/util/velero/velero_test.go b/pkg/util/velero/velero_test.go new file mode 100644 index 0000000000..b03a393f56 --- /dev/null +++ b/pkg/util/velero/velero_test.go @@ -0,0 +1,614 @@ +/* +Copyright the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package velero + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestGetNodeSelectorFromVeleroServer(t *testing.T) { + tests := []struct { + name string + deploy *appsv1.Deployment + want map[string]string + }{ + { + name: "no node selector", + deploy: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + NodeSelector: map[string]string{}, + }, + }, + }, + }, + want: map[string]string{}, + }, + { + name: "node selector", + deploy: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + NodeSelector: map[string]string{ + "foo": "bar", + }, + }, + }, + }, + }, + want: map[string]string{ + "foo": "bar", + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := GetNodeSelectorFromVeleroServer(test.deploy) + if len(got) != len(test.want) { + t.Errorf("expected node selector to have %d elements, got %d", len(test.want), len(got)) + } + for k, v := range test.want { + if got[k] != v { + t.Errorf("expected node selector to have key %s with value %s, got %s", k, v, got[k]) + } + } + }) + } +} + +func TestGetTolerationsFromVeleroServer(t *testing.T) { + tests := []struct { + name string + deploy *appsv1.Deployment + want []v1.Toleration + }{ + { + name: "no tolerations", + deploy: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Tolerations: []v1.Toleration{}, + }, + }, + }, + }, + want: []v1.Toleration{}, + }, + { + name: "tolerations", + deploy: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Tolerations: []v1.Toleration{ + { + Key: "foo", + Operator: "Exists", + }, + }, + }, + }, + }, + }, + want: []v1.Toleration{ + { + Key: "foo", + Operator: "Exists", + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := GetTolerationsFromVeleroServer(test.deploy) + if len(got) != len(test.want) { + t.Errorf("expected tolerations to have %d elements, got %d", len(test.want), len(got)) + } + for i, want := range test.want { + if got[i] != want { + t.Errorf("expected toleration at index %d to be %v, got %v", i, want, got[i]) + } + } + }) + } +} + +func TestGetAffinityFromVeleroServer(t *testing.T) { + tests := []struct { + name string + deploy *appsv1.Deployment + want *v1.Affinity + }{ + { + name: "no affinity", + deploy: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Affinity: nil, + }, + }, + }, + }, + want: nil, + }, + { + name: "affinity", + deploy: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "foo", + Operator: "In", + Values: []string{"bar"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + want: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "foo", + Operator: "In", + Values: []string{"bar"}, + }, + }, + }, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := GetAffinityFromVeleroServer(test.deploy) + + if got == nil { + if test.want != nil { + t.Errorf("expected affinity to be %v, got nil", test.want) + } + } else { + if test.want == nil { + t.Errorf("expected affinity to be nil, got %v", got) + } else { + if got.NodeAffinity == nil { + if test.want.NodeAffinity != nil { + t.Errorf("expected node affinity to be %v, got nil", test.want.NodeAffinity) + } + } else { + if test.want.NodeAffinity == nil { + t.Errorf("expected node affinity to be nil, got %v", got.NodeAffinity) + } else { + if got.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + if test.want.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { + t.Errorf("expected required during scheduling ignored during execution to be %v, got nil", test.want.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution) + } + } else { + if test.want.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + t.Errorf("expected required during scheduling ignored during execution to be nil, got %v", got.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution) + } else { + if !reflect.DeepEqual(got.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution, test.want.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution) { + t.Errorf("expected required during scheduling ignored during execution to be %v, got %v", test.want.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution, got.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution) + } + } + } + } + } + } + } + }) + } +} + +func TestGetEnvVarsFromVeleroServer(t *testing.T) { + tests := []struct { + name string + deploy *appsv1.Deployment + want []v1.EnvVar + }{ + { + name: "no env vars", + deploy: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Env: []v1.EnvVar{}, + }, + }, + }, + }, + }, + }, + want: []v1.EnvVar{}, + }, + { + name: "env vars", + deploy: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Env: []v1.EnvVar{ + { + Name: "foo", + Value: "bar", + }, + }, + }, + }, + }, + }, + }, + }, + want: []v1.EnvVar{ + { + Name: "foo", + Value: "bar", + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := GetEnvVarsFromVeleroServer(test.deploy) + if len(got) != len(test.want) { + t.Errorf("expected env vars to have %d elements, got %d", len(test.want), len(got)) + } + for i, want := range test.want { + if got[i] != want { + t.Errorf("expected env var at index %d to be %v, got %v", i, want, got[i]) + } + } + }) + } +} + +func TestGetVolumeMountsFromVeleroServer(t *testing.T) { + tests := []struct { + name string + deploy *appsv1.Deployment + want []v1.VolumeMount + }{ + { + name: "no volume mounts", + deploy: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + VolumeMounts: []v1.VolumeMount{}, + }, + }, + }, + }, + }, + }, + want: []v1.VolumeMount{}, + }, + { + name: "volume mounts", + deploy: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + VolumeMounts: []v1.VolumeMount{ + { + Name: "foo", + MountPath: "/bar", + }, + }, + }, + }, + }, + }, + }, + }, + want: []v1.VolumeMount{ + { + Name: "foo", + MountPath: "/bar", + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := GetVolumeMountsFromVeleroServer(test.deploy) + if len(got) != len(test.want) { + t.Errorf("expected volume mounts to have %d elements, got %d", len(test.want), len(got)) + } + for i, want := range test.want { + if got[i] != want { + t.Errorf("expected volume mount at index %d to be %v, got %v", i, want, got[i]) + } + } + }) + } +} + +func TestGetVolumesFromVeleroServer(t *testing.T) { + tests := []struct { + name string + deploy *appsv1.Deployment + want []v1.Volume + }{ + { + name: "no volumes", + deploy: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Volumes: []v1.Volume{}, + }, + }, + }, + }, + want: []v1.Volume{}, + }, + { + name: "volumes", + deploy: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "foo", + }, + }, + }, + }, + }, + }, + want: []v1.Volume{ + { + Name: "foo", + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := GetVolumesFromVeleroServer(test.deploy) + if len(got) != len(test.want) { + t.Errorf("expected volumes to have %d elements, got %d", len(test.want), len(got)) + } + for i, want := range test.want { + if got[i] != want { + t.Errorf("expected volume at index %d to be %v, got %v", i, want, got[i]) + } + } + }) + } +} + +func TestGetServiceAccountFromVeleroServer(t *testing.T) { + tests := []struct { + name string + deploy *appsv1.Deployment + want string + }{ + { + name: "no service account", + deploy: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + ServiceAccountName: "", + }, + }, + }, + }, + want: "", + }, + { + name: "service account", + deploy: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + ServiceAccountName: "foo", + }, + }, + }, + }, + want: "foo", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := GetServiceAccountFromVeleroServer(test.deploy) + if got != test.want { + t.Errorf("expected service account to be %s, got %s", test.want, got) + } + }) + } +} + +func TestGetVeleroServerImage(t *testing.T) { + tests := []struct { + name string + deploy *appsv1.Deployment + want string + }{ + { + name: "velero server image", + deploy: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Image: "velero/velero:latest", + }, + }, + }, + }, + }, + }, + want: "velero/velero:latest", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := GetVeleroServerImage(test.deploy) + if got != test.want { + t.Errorf("expected velero server image to be %s, got %s", test.want, got) + } + }) + } +} + +func TestGetVeleroServerLables(t *testing.T) { + tests := []struct { + name string + deployment *appsv1.Deployment + expected map[string]string + }{ + { + name: "Empty Labels", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{}, + }, + }, + }, + }, + expected: map[string]string{}, + }, + { + name: "Non-empty Labels", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "velero", + "component": "server", + }, + }, + }, + }, + }, + expected: map[string]string{ + "app": "velero", + "component": "server", + }, + }, + } + + // Run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := GetVeleroServerLables(tt.deployment) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGetVeleroServerAnnotations(t *testing.T) { + tests := []struct { + name string + deployment *appsv1.Deployment + expected map[string]string + }{ + { + name: "Empty Labels", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{}, + }, + }, + }, + }, + expected: map[string]string{}, + }, + { + name: "Non-empty Labels", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "app": "velero", + "component": "server", + }, + }, + }, + }, + }, + expected: map[string]string{ + "app": "velero", + "component": "server", + }, + }, + } + + // Run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := GetVeleroServerAnnotations(tt.deployment) + assert.Equal(t, tt.expected, result) + }) + } +}