From ded1e745811ab01bdeece799bd9e0ea1b9d64ef8 Mon Sep 17 00:00:00 2001 From: Ming Qiu Date: Tue, 30 Jan 2024 02:18:58 +0000 Subject: [PATCH] Add maintenance job Signed-off-by: Ming Qiu --- changelogs/unreleased/7451-qiuming-best | 2 + .../repomaintenance/repo_maintenance.go | 145 ++++++++++ pkg/cmd/server/server.go | 62 ++-- .../backup_repository_controller.go | 36 ++- pkg/datapath/file_system.go | 2 +- pkg/repository/maintenance.go | 270 ++++++++++++++++++ pkg/repository/manager.go | 81 ++++-- pkg/repository/manager_test.go | 5 +- pkg/repository/provider/unified_repo.go | 4 + pkg/repository/udmrepo/kopialib/lib_repo.go | 1 - pkg/util/logging/file_hook.go | 21 ++ 11 files changed, 586 insertions(+), 43 deletions(-) create mode 100644 changelogs/unreleased/7451-qiuming-best create mode 100644 pkg/cmd/server/repomaintenance/repo_maintenance.go create mode 100644 pkg/repository/maintenance.go create mode 100644 pkg/util/logging/file_hook.go diff --git a/changelogs/unreleased/7451-qiuming-best b/changelogs/unreleased/7451-qiuming-best new file mode 100644 index 0000000000..b67e2b07cf --- /dev/null +++ b/changelogs/unreleased/7451-qiuming-best @@ -0,0 +1,2 @@ +Add repository maintenance job + diff --git a/pkg/cmd/server/repomaintenance/repo_maintenance.go b/pkg/cmd/server/repomaintenance/repo_maintenance.go new file mode 100644 index 0000000000..c1f2c8a273 --- /dev/null +++ b/pkg/cmd/server/repomaintenance/repo_maintenance.go @@ -0,0 +1,145 @@ +package repomaintenance + +import ( + "context" + "fmt" + "log" + "os" + "strings" + + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "github.com/vmware-tanzu/velero/internal/credentials" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerocli "github.com/vmware-tanzu/velero/pkg/client" + "github.com/vmware-tanzu/velero/pkg/repository" + "github.com/vmware-tanzu/velero/pkg/repository/provider" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" + "github.com/vmware-tanzu/velero/pkg/util/logging" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type Options struct { + VolumeNamespace string + BackupStorageLocation string + RepoType string + KeepLatestMaitenanceJobs int + LogLevelFlag *logging.LevelFlag + FormatFlag *logging.FormatFlag +} + +func (o *Options) BindFlags(flags *pflag.FlagSet) { + flags.StringVar(&o.VolumeNamespace, "repo-name", "", "namespace of the pod/volume that the snapshot is for") + flags.StringVar(&o.BackupStorageLocation, "backup-storage-location", "", "backup's storage location name") + flags.StringVar(&o.RepoType, "repo-type", velerov1api.BackupRepositoryTypeKopia, "type of the repository where the snapshot is stored") + flags.Var(o.LogLevelFlag, "log-level", fmt.Sprintf("The level at which to log. Valid values are %s.", strings.Join(o.LogLevelFlag.AllowedValues(), ", "))) + flags.Var(o.FormatFlag, "log-format", fmt.Sprintf("The format for log output. Valid values are %s.", strings.Join(o.FormatFlag.AllowedValues(), ", "))) +} + +func NewCommand(f velerocli.Factory) *cobra.Command { + o := &Options{ + LogLevelFlag: logging.LogLevelFlag(logrus.InfoLevel), + FormatFlag: logging.NewFormatFlag(), + } + cmd := &cobra.Command{ + Use: "repo-maintenance", + Hidden: true, + Short: "VELERO SERVER COMMAND ONLY - not intended to be run directly by users", + Run: func(c *cobra.Command, args []string) { + o.Run(f) + }, + } + + o.BindFlags(cmd.Flags()) + return cmd +} + +func checkError(err error, log logrus.FieldLogger) { + if err != nil { + if err != context.Canceled { + log.Errorf("An error occurred: %v", err) + } + os.Exit(1) + } +} + +func (o *Options) Run(f velerocli.Factory) { + log.SetOutput(os.Stdout) + logrus.SetOutput(os.Stdout) + logger := logging.DefaultLogger(o.LogLevelFlag.Parse(), o.FormatFlag.Parse()) + + errorFile, err := os.Create("/dev/termination-log") + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to create termination log file: %v\n", err) + return + } + defer errorFile.Close() + + logger.AddHook(&logging.FileHook{ + File: errorFile}) + + scheme := runtime.NewScheme() + err = velerov1api.AddToScheme(scheme) + checkError(err, logger) + + err = v1.AddToScheme(scheme) + checkError(err, logger) + + config, err := f.ClientConfig() + checkError(err, logger) + + cli, err := client.New(config, client.Options{ + Scheme: scheme, + }) + checkError(err, logger) + + credentialFileStore, err := credentials.NewNamespacedFileStore( + cli, + f.Namespace(), + "/tmp/credentials", + filesystem.NewFileSystem(), + ) + checkError(err, logger) + + credentialSecretStore, err := credentials.NewNamespacedSecretStore(cli, f.Namespace()) + checkError(err, logger) + + var repoProvider provider.Provider + if o.RepoType == velerov1api.BackupRepositoryTypeRestic { + repoProvider = provider.NewResticRepositoryProvider(credentialFileStore, filesystem.NewFileSystem(), logger) + } else { + repoProvider = provider.NewUnifiedRepoProvider( + credentials.CredentialGetter{ + FromFile: credentialFileStore, + FromSecret: credentialSecretStore, + }, o.RepoType, cli, logger) + } + + // backupRepository + repo, err := repository.GetBackupRepository(context.Background(), cli, f.Namespace(), + repository.BackupRepositoryKey{ + VolumeNamespace: o.VolumeNamespace, + BackupLocation: o.BackupStorageLocation, + RepositoryType: o.RepoType, + }, true) + checkError(err, logger) + + // bsl + bsl := &velerov1api.BackupStorageLocation{} + err = cli.Get(context.Background(), client.ObjectKey{Namespace: f.Namespace(), Name: repo.Spec.BackupStorageLocation}, bsl) + checkError(err, logger) + + para := provider.RepoParam{ + BackupRepo: repo, + BackupLocation: bsl, + } + + err = repoProvider.BoostRepoConnect(context.Background(), para) + checkError(err, logger) + + err = repoProvider.PruneRepo(context.Background(), para) + checkError(err, logger) +} diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index da135e3148..8d68792011 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -32,24 +32,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "github.com/spf13/cobra" - corev1api "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - kubeerrs "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/discovery" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/utils/clock" - ctrl "sigs.k8s.io/controller-runtime" - ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/manager" - "github.com/vmware-tanzu/velero/internal/credentials" "github.com/vmware-tanzu/velero/internal/storage" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" @@ -58,6 +40,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/buildinfo" "github.com/vmware-tanzu/velero/pkg/client" "github.com/vmware-tanzu/velero/pkg/cmd" + "github.com/vmware-tanzu/velero/pkg/cmd/server/repomaintenance" "github.com/vmware-tanzu/velero/pkg/cmd/util/flag" "github.com/vmware-tanzu/velero/pkg/cmd/util/signals" "github.com/vmware-tanzu/velero/pkg/controller" @@ -77,6 +60,25 @@ import ( "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/logging" + appsv1 "k8s.io/api/apps/v1" + batchv1api "k8s.io/api/batch/v1" + corev1api "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + kubeerrs "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/utils/clock" + ctrl "sigs.k8s.io/controller-runtime" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" ) const ( @@ -135,6 +137,7 @@ type serverConfig struct { defaultSnapshotMoveData bool disableInformerCache bool scheduleSkipImmediately bool + maintenanceCfg repository.MaintenanceConfig } func NewCommand(f client.Factory) *cobra.Command { @@ -166,6 +169,9 @@ func NewCommand(f client.Factory) *cobra.Command { defaultSnapshotMoveData: false, disableInformerCache: defaultDisableInformerCache, scheduleSkipImmediately: false, + maintenanceCfg: repository.MaintenanceConfig{ + KeepLatestMaitenanceJobs: 3, + }, } ) @@ -239,7 +245,17 @@ func NewCommand(f client.Factory) *cobra.Command { command.Flags().BoolVar(&config.defaultSnapshotMoveData, "default-snapshot-move-data", config.defaultSnapshotMoveData, "Move data by default for all snapshots supporting data movement.") command.Flags().BoolVar(&config.disableInformerCache, "disable-informer-cache", config.disableInformerCache, "Disable informer cache for Get calls on restore. With this enabled, it will speed up restore in cases where there are backup resources which already exist in the cluster, but for very large clusters this will increase velero memory usage. Default is false (don't disable).") command.Flags().BoolVar(&config.scheduleSkipImmediately, "schedule-skip-immediately", config.scheduleSkipImmediately, "Skip the first scheduled backup immediately after creating a schedule. Default is false (don't skip).") + command.Flags().IntVar(&config.maintenanceCfg.KeepLatestMaitenanceJobs, "keep-latest-maintenance-jobs", config.maintenanceCfg.KeepLatestMaitenanceJobs, "Number of latest maintenance jobs to keep each repository. Default is 3.") + command.Flags().StringVar(&config.maintenanceCfg.CPURequest, "maintenance-job-cpu-request", config.maintenanceCfg.CPURequest, "CPU request for maintenance job. Default is empty.") + command.Flags().StringVar(&config.maintenanceCfg.MemoryRequest, "maintenance-job-memory-request", config.maintenanceCfg.MemoryRequest, "Memory request for maintenance job. Default is empty.") + command.Flags().StringVar(&config.maintenanceCfg.CPULimit, "maintenance-job-cpu-limit", config.maintenanceCfg.CPULimit, "CPU limit for maintenance job. Default is empty.") + command.Flags().StringVar(&config.maintenanceCfg.MemoryLimit, "maintenance-job-memory-limit", config.maintenanceCfg.MemoryLimit, "Memory limit for maintenance job. Default is empty.") + // inherited from server command + config.maintenanceCfg.FormatFlag = config.formatFlag + config.maintenanceCfg.LogLevelFlag = logLevelFlag + + command.AddCommand(repomaintenance.NewCommand(f)) return command } @@ -346,6 +362,14 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s cancelFunc() return nil, err } + if err := batchv1api.AddToScheme(scheme); err != nil { + cancelFunc() + return nil, err + } + if err := appsv1.AddToScheme(scheme); err != nil { + cancelFunc() + return nil, err + } ctrl.SetLogger(logrusr.New(logger)) @@ -642,7 +666,7 @@ func (s *server) initRepoManager() error { s.repoLocker = repository.NewRepoLocker() s.repoEnsurer = repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout) - s.repoManager = repository.NewManager(s.namespace, s.mgr.GetClient(), s.repoLocker, s.repoEnsurer, s.credentialFileStore, s.credentialSecretStore, s.logger) + s.repoManager = repository.NewManager(s.namespace, s.mgr.GetClient(), s.repoLocker, s.repoEnsurer, s.credentialFileStore, s.credentialSecretStore, s.config.maintenanceCfg, s.logger) return nil } diff --git a/pkg/controller/backup_repository_controller.go b/pkg/controller/backup_repository_controller.go index a84a583a21..a068e4a00b 100644 --- a/pkg/controller/backup_repository_controller.go +++ b/pkg/controller/backup_repository_controller.go @@ -42,8 +42,9 @@ import ( ) const ( - repoSyncPeriod = 5 * time.Minute - defaultMaintainFrequency = 7 * 24 * time.Hour + repoSyncPeriod = 5 * time.Minute + defaultMaintainFrequency = 7 * 24 * time.Hour + defaultMaintainRetryFrequency = 1 * time.Hour ) type BackupRepoReconciler struct { @@ -277,9 +278,30 @@ func ensureRepo(repo *velerov1api.BackupRepository, repoManager repository.Manag return repoManager.PrepareRepo(repo) } -func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error { - log.Debug("backupRepositoryController.runMaintenanceIfDue") +func (r *BackupRepoReconciler) shouldRetryMaintenance(req *velerov1api.BackupRepository, log logrus.FieldLogger) bool { + log.Debug("Checking if maintenance should be retried") + + if req.Status.Message == "" { + return true + } + + job, err := repository.GetLatestMaintenanceJob(r.Client, req.Name) + if err != nil { + log.WithError(err).Error("error getting latest maintenance job") + return false + } + + if job != nil && job.Status.Failed > 0 && job.CreationTimestamp.Add(defaultMaintainRetryFrequency).Before(r.clock.Now()) { + log.Debug("Latest maintenance job failed and is older than retry frequency, retrying maintenance") + return true + } else { + log.Debugf("Latest maintenance job %s is not older than retry frequency, not retrying maintenance", job.Name) + } + + return false +} +func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error { now := r.clock.Now() if !dueForMaintenance(req, now) { @@ -287,6 +309,11 @@ func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *vel return nil } + if !r.shouldRetryMaintenance(req, log) { + log.Debug("not retrying maintenance") + return nil + } + log.Info("Running maintenance on backup repository") // prune failures should be displayed in the `.status.message` field but @@ -300,6 +327,7 @@ func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *vel } return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) { + rr.Status.Message = "" rr.Status.LastMaintenanceTime = &metav1.Time{Time: now} }) } diff --git a/pkg/datapath/file_system.go b/pkg/datapath/file_system.go index 60a0c8347b..b7372d0a82 100644 --- a/pkg/datapath/file_system.go +++ b/pkg/datapath/file_system.go @@ -184,7 +184,7 @@ func (fs *fileSystemBR) Cancel() { func (fs *fileSystemBR) boostRepoConnect(ctx context.Context, repositoryType string, credentialGetter *credentials.CredentialGetter) error { if repositoryType == velerov1api.BackupRepositoryTypeKopia { - if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil { + if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.client, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil { return err } } else { diff --git a/pkg/repository/maintenance.go b/pkg/repository/maintenance.go new file mode 100644 index 0000000000..56ebd67488 --- /dev/null +++ b/pkg/repository/maintenance.go @@ -0,0 +1,270 @@ +/* +Copyright the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package repository + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/vmware-tanzu/velero/pkg/repository/provider" + "github.com/vmware-tanzu/velero/pkg/util/logging" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const RepositoryNameLabel = "velero.io/repo-name" + +// MaintenanceConfig is the configuration for the repo maintenance job +type MaintenanceConfig struct { + KeepLatestMaitenanceJobs int + CPURequest string + MemoryRequest string + CPULimit string + MemoryLimit string + LogLevelFlag *logging.LevelFlag + FormatFlag *logging.FormatFlag +} + +func generateJobName(repo string) string { + millisecond := time.Now().UTC().UnixMilli() // millisecond + + jobName := fmt.Sprintf("%s-maintain-job-%d", repo, millisecond) + if len(jobName) > 63 { // k8s job name length limit + jobName = fmt.Sprintf("repo-maintain-job-%d", millisecond) + } + + return jobName +} + +// getEnvVarsFromVeleroServer get the environment variables from the Velero server deployment +func getEnvVarsFromVeleroServer(deployment *appsv1.Deployment) []v1.EnvVar { + for _, container := range deployment.Spec.Template.Spec.Containers { + // We only have one container in the Velero server deployment + return container.Env + } + return nil +} + +// getVolumeMountsFromVeleroServer get the volume mounts from the Velero server deployment +func getVolumeMountsFromVeleroServer(deployment *appsv1.Deployment) []v1.VolumeMount { + for _, container := range deployment.Spec.Template.Spec.Containers { + // We only have one container in the Velero server deployment + return container.VolumeMounts + } + return nil +} + +// getVolumesFromVeleroServer get the volumes from the Velero server deployment +func getVolumesFromVeleroServer(deployment *appsv1.Deployment) []v1.Volume { + return deployment.Spec.Template.Spec.Volumes +} + +// getServiceAccountFromVeleroServer get the service account from the Velero server deployment +func getServiceAccountFromVeleroServer(deployment *appsv1.Deployment) string { + return deployment.Spec.Template.Spec.ServiceAccountName +} + +// getVeleroServerDeployment get the Velero server deployment +func getVeleroServerDeployment(cli client.Client, namespace string) (*appsv1.Deployment, error) { + // Get the Velero server deployment + deployment := appsv1.Deployment{} + err := cli.Get(context.TODO(), types.NamespacedName{Name: "velero", Namespace: namespace}, &deployment) + if err != nil { + return nil, err + } + return &deployment, nil +} + +func getVeleroServerImage(deployment *appsv1.Deployment) string { + // Get the image of the Velero server deployment + return deployment.Spec.Template.Spec.Containers[0].Image +} + +func getResourceRequirements(cpuRequest, memoryRequest, cpuLimit, memoryLimit string) v1.ResourceRequirements { + res := v1.ResourceRequirements{} + if cpuRequest != "" { + res.Requests[v1.ResourceCPU] = resource.MustParse(cpuRequest) + } + if memoryRequest != "" { + res.Requests[v1.ResourceMemory] = resource.MustParse(memoryRequest) + } + if cpuLimit != "" { + res.Limits[v1.ResourceCPU] = resource.MustParse(cpuLimit) + } + if memoryLimit != "" { + res.Limits[v1.ResourceMemory] = resource.MustParse(memoryLimit) + } + return res +} + +func buildMaintenanceJob(m MaintenanceConfig, param provider.RepoParam, cli client.Client, namespace string) (*batchv1.Job, error) { + // Get the Velero server deployment + deployment, err := getVeleroServerDeployment(cli, namespace) + if err != nil { + return nil, err + } + + // Get the environment variables from the Velero server deployment + envVars := getEnvVarsFromVeleroServer(deployment) + + // Get the volume mounts from the Velero server deployment + volumeMounts := getVolumeMountsFromVeleroServer(deployment) + + // Get the volumes from the Velero server deployment + volumes := getVolumesFromVeleroServer(deployment) + + // Get the service account from the Velero server deployment + serviceAccount := getServiceAccountFromVeleroServer(deployment) + + // Get image + image := getVeleroServerImage(deployment) + + // Set resource limits and requests + resources := getResourceRequirements(m.CPURequest, m.MemoryRequest, m.CPULimit, m.MemoryLimit) + + // Set arguments + args := []string{"server", "repo-maintenance"} + args = append(args, fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace)) + args = append(args, fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType)) + args = append(args, fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name)) + args = append(args, fmt.Sprintf("--log-level=%s", m.LogLevelFlag.String())) + args = append(args, fmt.Sprintf("--log-format=%s", m.FormatFlag.String())) + + // build the maintenance job + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: generateJobName(param.BackupRepo.Name), + Namespace: param.BackupRepo.Namespace, + Labels: map[string]string{ + RepositoryNameLabel: param.BackupRepo.Name, + }, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: new(int32), // Never retry + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "velero-repo-maintenance-pod", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "velero-repo-maintenance-container", + Image: image, + Command: []string{ + "/velero", + }, + Args: args, + ImagePullPolicy: v1.PullIfNotPresent, + Env: envVars, + VolumeMounts: volumeMounts, + Resources: resources, + }, + }, + RestartPolicy: v1.RestartPolicyNever, + Volumes: volumes, + ServiceAccountName: serviceAccount, + }, + }, + }, + }, nil +} + +// deleteOldMaintenanceJobs deletes old maintenance jobs and keeps the latest N jobs +func deleteOldMaintenanceJobs(cli client.Client, repo string, keep int) error { + // Get the maintenance job list by label + jobList := &batchv1.JobList{} + err := cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo})) + if err != nil { + return err + } + + // Delete old maintenance jobs + if len(jobList.Items) > keep { + sort.Slice(jobList.Items, func(i, j int) bool { + return jobList.Items[i].CreationTimestamp.Before(&jobList.Items[j].CreationTimestamp) + }) + for i := 0; i < len(jobList.Items)-keep; i++ { + err = cli.Delete(context.TODO(), &jobList.Items[i]) + if err != nil { + return err + } + } + } + + return nil +} + +func waitForJobComplete(ctx context.Context, client client.Client, job *batchv1.Job) error { + return wait.PollImmediateUntil(1, func() (bool, error) { + err := client.Get(ctx, types.NamespacedName{Namespace: job.Namespace, Name: job.Name}, job) + if err != nil { + return false, err + } + if job.Status.Succeeded > 0 { + return true, nil + } + + if job.Status.Failed > 0 { + return true, fmt.Errorf("maintenance job %s failed", job.Name) + } + return false, nil + }, ctx.Done()) +} + +func getMaintenanceResultFromJob(cli client.Client, job *batchv1.Job) (string, error) { + // Get the maintenance job related pod by label selector + podList := &v1.PodList{} + err := cli.List(context.TODO(), podList, client.InNamespace(job.Namespace), client.MatchingLabels(map[string]string{"job-name": job.Name})) + if err != nil { + return "", err + } + + if len(podList.Items) == 0 { + return "", fmt.Errorf("no pod found for job %s", job.Name) + } + + // we only have one maintenance pod for the job + return podList.Items[0].Status.ContainerStatuses[0].State.Terminated.Message, nil + +} + +func GetLatestMaintenanceJob(cli client.Client, repo string) (*batchv1.Job, error) { + // Get the maintenance job list by label + jobList := &batchv1.JobList{} + err := cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo})) + if err != nil { + return nil, err + } + + if len(jobList.Items) == 0 { + return nil, nil + } + + // Get the latest maintenance job + sort.Slice(jobList.Items, func(i, j int) bool { + return jobList.Items[i].CreationTimestamp.Time.After(jobList.Items[j].CreationTimestamp.Time) + }) + return &jobList.Items[0], nil +} diff --git a/pkg/repository/manager.go b/pkg/repository/manager.go index 3e412a73a4..829e64a3a9 100644 --- a/pkg/repository/manager.go +++ b/pkg/repository/manager.go @@ -77,13 +77,14 @@ type Manager interface { } type manager struct { - namespace string - providers map[string]provider.Provider - client client.Client - repoLocker *RepoLocker - repoEnsurer *Ensurer - fileSystem filesystem.Interface - log logrus.FieldLogger + namespace string + providers map[string]provider.Provider + client client.Client + repoLocker *RepoLocker + repoEnsurer *Ensurer + fileSystem filesystem.Interface + maintenanceCfg MaintenanceConfig + log logrus.FieldLogger } // NewManager create a new repository manager. @@ -94,23 +95,25 @@ func NewManager( repoEnsurer *Ensurer, credentialFileStore credentials.FileStore, credentialSecretStore credentials.SecretStore, + maintenanceCfg MaintenanceConfig, log logrus.FieldLogger, ) Manager { mgr := &manager{ - namespace: namespace, - client: client, - providers: map[string]provider.Provider{}, - repoLocker: repoLocker, - repoEnsurer: repoEnsurer, - fileSystem: filesystem.NewFileSystem(), - log: log, + namespace: namespace, + client: client, + providers: map[string]provider.Provider{}, + repoLocker: repoLocker, + repoEnsurer: repoEnsurer, + fileSystem: filesystem.NewFileSystem(), + maintenanceCfg: maintenanceCfg, + log: log, } mgr.providers[velerov1api.BackupRepositoryTypeRestic] = provider.NewResticRepositoryProvider(credentialFileStore, mgr.fileSystem, mgr.log) mgr.providers[velerov1api.BackupRepositoryTypeKopia] = provider.NewUnifiedRepoProvider(credentials.CredentialGetter{ FromFile: credentialFileStore, FromSecret: credentialSecretStore, - }, velerov1api.BackupRepositoryTypeKopia, mgr.log) + }, velerov1api.BackupRepositoryTypeKopia, client, mgr.log) return mgr } @@ -177,7 +180,53 @@ func (m *manager) PruneRepo(repo *velerov1api.BackupRepository) error { return errors.WithStack(err) } - return prd.PruneRepo(context.Background(), param) + log := m.log.WithFields(logrus.Fields{ + "BSL name": param.BackupLocation.Name, + "repo type": param.BackupRepo.Spec.RepositoryType, + "repo name": param.BackupRepo.Name, + "repo UID": param.BackupRepo.UID, + }) + + log.Info("Start to maintence repo") + + maintenanceJob, err := buildMaintenanceJob(m.maintenanceCfg, param, m.client, m.namespace) + if err != nil { + return errors.Wrap(err, "error to build maintenance job") + } + + if err := m.client.Create(context.TODO(), maintenanceJob); err != nil { + return errors.Wrap(err, "error to create maintenance job") + } + log.Debugf("Creating maintenance job: %v", maintenanceJob.Name) + + defer func() { + if err := deleteOldMaintenanceJobs(m.client, param.BackupRepo.Name, + m.maintenanceCfg.KeepLatestMaitenanceJobs); err != nil { + log.WithError(err).Error("Failed to delete maintenance job") + } + }() + + var jobErr error + if err := waitForJobComplete(context.TODO(), m.client, maintenanceJob); err != nil { + log.WithError(err).Error("Error to wait for maintenance job complete") + jobErr = err // we won't return here for job may failed by maintenance failure, we want return the actual error + } + + result, err := getMaintenanceResultFromJob(m.client, maintenanceJob) + if err != nil { + return errors.Wrap(err, "error to get maintenance job result") + } + + if result != "" { + return errors.New(fmt.Sprintf("Maintenance job %s failed: %s", maintenanceJob.Name, result)) + } + + if jobErr != nil { + return errors.Wrap(err, "error to wait for maintenance job complete") + } + + log.Info("Maintence repo complete") + return nil } func (m *manager) UnlockRepo(repo *velerov1api.BackupRepository) error { diff --git a/pkg/repository/manager_test.go b/pkg/repository/manager_test.go index 4d84919d2a..b0084b824f 100644 --- a/pkg/repository/manager_test.go +++ b/pkg/repository/manager_test.go @@ -21,12 +21,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + kbclient "sigs.k8s.io/controller-runtime/pkg/client" ) func TestGetRepositoryProvider(t *testing.T) { - mgr := NewManager("", nil, nil, nil, nil, nil, nil).(*manager) + var fakeClient kbclient.Client + mgr := NewManager("", fakeClient, nil, nil, nil, nil, MaintenanceConfig{}, nil).(*manager) repo := &velerov1.BackupRepository{} // empty repository type diff --git a/pkg/repository/provider/unified_repo.go b/pkg/repository/provider/unified_repo.go index 11c0845497..aaaf6f034a 100644 --- a/pkg/repository/provider/unified_repo.go +++ b/pkg/repository/provider/unified_repo.go @@ -29,6 +29,7 @@ import ( "github.com/kopia/kopia/repo" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" @@ -43,6 +44,7 @@ type unifiedRepoProvider struct { workPath string repoService udmrepo.BackupRepoService repoBackend string + cli client.Client log logrus.FieldLogger } @@ -73,11 +75,13 @@ const ( func NewUnifiedRepoProvider( credentialGetter credentials.CredentialGetter, repoBackend string, + cli client.Client, log logrus.FieldLogger, ) Provider { repo := unifiedRepoProvider{ credentialGetter: credentialGetter, repoBackend: repoBackend, + cli: cli, log: log, } diff --git a/pkg/repository/udmrepo/kopialib/lib_repo.go b/pkg/repository/udmrepo/kopialib/lib_repo.go index a0a99d2b45..a031885c30 100644 --- a/pkg/repository/udmrepo/kopialib/lib_repo.go +++ b/pkg/repository/udmrepo/kopialib/lib_repo.go @@ -210,7 +210,6 @@ func (km *kopiaMaintenance) runMaintenance(ctx context.Context, rep repo.DirectR if err != nil { return errors.Wrapf(err, "error to run maintenance under mode %s", km.mode) } - return nil } diff --git a/pkg/util/logging/file_hook.go b/pkg/util/logging/file_hook.go new file mode 100644 index 0000000000..7e09a46160 --- /dev/null +++ b/pkg/util/logging/file_hook.go @@ -0,0 +1,21 @@ +package logging + +import ( + "os" + + "github.com/sirupsen/logrus" +) + +// FileHook logs errors into a specified file. +type FileHook struct { + File *os.File +} + +func (hook *FileHook) Levels() []logrus.Level { + return []logrus.Level{logrus.ErrorLevel} +} + +func (hook *FileHook) Fire(entry *logrus.Entry) error { + _, err := hook.File.WriteString(entry.Message + "\n") + return err +}