Skip to content

Commit

Permalink
batch delete snapshot
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
  • Loading branch information
Lyndon-Li committed Feb 19, 2024
1 parent 5787935 commit a007768
Show file tree
Hide file tree
Showing 4 changed files with 357 additions and 27 deletions.
42 changes: 31 additions & 11 deletions pkg/controller/backup_deletion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,9 +510,11 @@ func (r *backupDeletionReconciler) deletePodVolumeSnapshots(ctx context.Context,
return []error{err}
}

return r.batchDeleteSnapshots(ctx, directSnapshots, backup)
return batchDeleteSnapshots(ctx, r.repoEnsurer, r.repoMgr, directSnapshots, backup, r.logger)

Check warning on line 513 in pkg/controller/backup_deletion_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/backup_deletion_controller.go#L513

Added line #L513 was not covered by tests
}

var batchDeleteSnapshotFunc = batchDeleteSnapshots

func (r *backupDeletionReconciler) deleteMovedSnapshots(ctx context.Context, backup *velerov1api.Backup) []error {
if r.repoMgr == nil {
return nil
Expand All @@ -532,34 +534,50 @@ func (r *backupDeletionReconciler) deleteMovedSnapshots(ctx context.Context, bac
directSnapshots := map[string][]repository.SnapshotIdentifier{}
for i := range list.Items {
cm := list.Items[i]
snapshot := repository.SnapshotIdentifier{}
if cm.Data == nil || len(cm.Data) == 0 {
errs = append(errs, errors.New("no snapshot info in config"))
continue
}

b, err := json.Marshal(cm.Data)
if err != nil {
errs = append(errs, errors.Wrapf(err, "fail to marshal the snapshot info into JSON"))
continue
}

snapshot := repository.SnapshotIdentifier{}
if err := json.Unmarshal(b, &snapshot); err != nil {
errs = append(errs, errors.Wrapf(err, "failed to unmarshal snapshot info"))
continue
}

if snapshot.SnapshotID == "" || snapshot.VolumeNamespace == "" || snapshot.RepositoryType == "" {
errs = append(errs, errors.Errorf("invalid snapshot, ID %s, namespace %s, repository %s", snapshot.SnapshotID, snapshot.VolumeNamespace, snapshot.RepositoryType))
continue
}

if directSnapshots[snapshot.VolumeNamespace] == nil {
directSnapshots[snapshot.VolumeNamespace] = []repository.SnapshotIdentifier{}
}

directSnapshots[snapshot.VolumeNamespace] = append(directSnapshots[snapshot.VolumeNamespace], snapshot)

r.logger.Infof("Deleted snapshot %s, namespace: %s, repo type: %s", snapshot.SnapshotID, snapshot.VolumeNamespace, snapshot.RepositoryType)
r.logger.Infof("Deleting snapshot %s, namespace: %s, repo type: %s", snapshot.SnapshotID, snapshot.VolumeNamespace, snapshot.RepositoryType)
}

for i := range list.Items {
cm := list.Items[i]
if err := r.Client.Delete(ctx, &cm); err != nil {
r.logger.Warnf("Failed to delete snapshot info configmap %s/%s: %v", cm.Namespace, cm.Name, err)
}
}

if len(errs) > 0 {
return errs
if len(directSnapshots) > 0 {
deleteErrs := batchDeleteSnapshotFunc(ctx, r.repoEnsurer, r.repoMgr, directSnapshots, backup, r.logger)
errs = append(errs, deleteErrs...)
}

return r.batchDeleteSnapshots(ctx, directSnapshots, backup)
return errs
}

func (r *backupDeletionReconciler) patchDeleteBackupRequest(ctx context.Context, req *velerov1api.DeleteBackupRequest, mutate func(*velerov1api.DeleteBackupRequest)) (*velerov1api.DeleteBackupRequest, error) {
Expand Down Expand Up @@ -615,7 +633,8 @@ func getSnapshotsInBackup(ctx context.Context, backup *velerov1api.Backup, kbCli
return podvolume.GetSnapshotIdentifier(podVolumeBackups), nil
}

func (r *backupDeletionReconciler) batchDeleteSnapshots(ctx context.Context, directSnapshots map[string][]repository.SnapshotIdentifier, backup *velerov1api.Backup) []error {
func batchDeleteSnapshots(ctx context.Context, repoEnsurer *repository.Ensurer, repoMgr repository.Manager,
directSnapshots map[string][]repository.SnapshotIdentifier, backup *velerov1api.Backup, logger logrus.FieldLogger) []error {
var errs []error
for volumeNamespace, snapshots := range directSnapshots {
batchForget := []string{}
Expand All @@ -624,18 +643,19 @@ func (r *backupDeletionReconciler) batchDeleteSnapshots(ctx context.Context, dir
}

Check warning on line 643 in pkg/controller/backup_deletion_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/backup_deletion_controller.go#L637-L643

Added lines #L637 - L643 were not covered by tests

// For volumes in one backup, the BSL and repositoryType should always be the same
repo, err := r.repoEnsurer.EnsureRepo(ctx, backup.Namespace, volumeNamespace, backup.Spec.StorageLocation, snapshots[0].RepositoryType)
repoType := snapshots[0].RepositoryType
repo, err := repoEnsurer.EnsureRepo(ctx, backup.Namespace, volumeNamespace, backup.Spec.StorageLocation, repoType)
if err != nil {
errs = append(errs, errors.Wrapf(err, "error to ensure repo %s-%s-%s, skip deleting PVB snapshots %v", backup.Spec.StorageLocation, volumeNamespace, snapshots[0].RepositoryType, batchForget))
errs = append(errs, errors.Wrapf(err, "error to ensure repo %s-%s-%s, skip deleting PVB snapshots %v", backup.Spec.StorageLocation, volumeNamespace, repoType, batchForget))
continue

Check warning on line 650 in pkg/controller/backup_deletion_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/backup_deletion_controller.go#L646-L650

Added lines #L646 - L650 were not covered by tests
}

if forgetErrs := r.repoMgr.BatchForget(ctx, repo, batchForget); len(forgetErrs) > 0 {
if forgetErrs := repoMgr.BatchForget(ctx, repo, batchForget); len(forgetErrs) > 0 {
errs = append(errs, forgetErrs...)
continue

Check warning on line 655 in pkg/controller/backup_deletion_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/backup_deletion_controller.go#L653-L655

Added lines #L653 - L655 were not covered by tests
}

r.logger.Infof("Batch deleted snapshots %v", batchForget)
logger.Infof("Batch deleted snapshots %v", batchForget)

Check warning on line 658 in pkg/controller/backup_deletion_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/backup_deletion_controller.go#L658

Added line #L658 was not covered by tests
}

return errs

Check warning on line 661 in pkg/controller/backup_deletion_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/backup_deletion_controller.go#L661

Added line #L661 was not covered by tests
Expand Down
183 changes: 169 additions & 14 deletions pkg/controller/backup_deletion_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package controller

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"reflect"
Expand Down Expand Up @@ -54,6 +56,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
pluginmocks "github.com/vmware-tanzu/velero/pkg/plugin/mocks"
"github.com/vmware-tanzu/velero/pkg/repository"
repomocks "github.com/vmware-tanzu/velero/pkg/repository/mocks"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)

Expand Down Expand Up @@ -850,26 +853,178 @@ func TestGetSnapshotsInBackup(t *testing.T) {
assert.NoError(t, err)

assert.True(t, reflect.DeepEqual(res, test.expected))
})
}
}

// for k, v := range res {
func batchDeleteSucceed(ctx context.Context, repoEnsurer *repository.Ensurer, repoMgr repository.Manager, directSnapshots map[string][]repository.SnapshotIdentifier, backup *velerov1api.Backup, logger logrus.FieldLogger) []error {
return nil
}

// }
func batchDeleteFail(ctx context.Context, repoEnsurer *repository.Ensurer, repoMgr repository.Manager, directSnapshots map[string][]repository.SnapshotIdentifier, backup *velerov1api.Backup, logger logrus.FieldLogger) []error {
return []error{
errors.New("fake-delete-1"),
errors.New("fake-delete-2"),
}
}

// // sort to ensure good compare of slices
// less := func(snapshots []repository.SnapshotIdentifier) func(i, j int) bool {
// return func(i, j int) bool {
// if snapshots[i].VolumeNamespace == snapshots[j].VolumeNamespace {
// return snapshots[i].SnapshotID < snapshots[j].SnapshotID
// }
// return snapshots[i].VolumeNamespace < snapshots[j].VolumeNamespace
// }
func generateSnapshotData(snapshot *repository.SnapshotIdentifier) (map[string]string, error) {
if snapshot == nil {
return nil, nil
}

// }
b, err := json.Marshal(snapshot)
if err != nil {
return nil, err
}

// sort.Slice(test.expected, less(test.expected))
// sort.Slice(res, less(res))
data := make(map[string]string)
if err := json.Unmarshal(b, &data); err != nil {
return nil, err
}

return data, nil
}

// assert.Equal(t, test.expected, res)
func TestDeleteMovedSnapshots(t *testing.T) {
tests := []struct {
name string
repoMgr repository.Manager
batchDeleteSucceed bool
backupName string
snapshots []*repository.SnapshotIdentifier
expected []string
}{
{
name: "repoMgr is nil",
},
{
name: "no cm",
repoMgr: repomocks.NewManager(t),
},
{
name: "bad cm info",
repoMgr: repomocks.NewManager(t),
backupName: "backup-01",
snapshots: []*repository.SnapshotIdentifier{nil},
expected: []string{"no snapshot info in config"},
},
{
name: "invalid snapshots",
repoMgr: repomocks.NewManager(t),
backupName: "backup-01",
snapshots: []*repository.SnapshotIdentifier{
{
RepositoryType: "repo-1",
VolumeNamespace: "ns-1",
},
{
SnapshotID: "snapshot-1",
VolumeNamespace: "ns-1",
},
{
SnapshotID: "snapshot-1",
RepositoryType: "repo-1",
},
},
batchDeleteSucceed: true,
expected: []string{
"invalid snapshot, ID , namespace ns-1, repository repo-1",
"invalid snapshot, ID snapshot-1, namespace ns-1, repository ",
"invalid snapshot, ID snapshot-1, namespace , repository repo-1",
},
},
{
name: "batch delete succeed",
repoMgr: repomocks.NewManager(t),
backupName: "backup-01",
snapshots: []*repository.SnapshotIdentifier{

{
SnapshotID: "snapshot-1",
RepositoryType: "repo-1",
VolumeNamespace: "ns-1",
},
},
batchDeleteSucceed: true,
expected: []string{},
},
{
name: "batch delete fail",
repoMgr: repomocks.NewManager(t),
backupName: "backup-01",
snapshots: []*repository.SnapshotIdentifier{
{
RepositoryType: "repo-1",
VolumeNamespace: "ns-1",
},
{
SnapshotID: "snapshot-1",
RepositoryType: "repo-1",
VolumeNamespace: "ns-1",
},
},
expected: []string{"invalid snapshot, ID , namespace ns-1, repository repo-1", "fake-delete-1", "fake-delete-2"},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
objs := []runtime.Object{}
for i, snapshot := range test.snapshots {
snapshotData, err := generateSnapshotData(snapshot)
require.NoError(t, err)

cm := corev1api.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1api.SchemeGroupVersion.String(),
Kind: "ConfigMap",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: fmt.Sprintf("du-info-%d", i),
Labels: map[string]string{
velerov1api.BackupNameLabel: test.backupName,
velerov1api.DataUploadSnapshotInfoLabel: "true",
},
},
Data: snapshotData,
}

objs = append(objs, &cm)
}

veleroBackup := &velerov1api.Backup{}
controller := NewBackupDeletionReconciler(
velerotest.NewLogger(),
velerotest.NewFakeControllerRuntimeClient(t, objs...),
NewBackupTracker(),
test.repoMgr,
metrics.NewServerMetrics(),
nil, // discovery helper
func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager },
NewFakeSingleObjectBackupStoreGetter(backupStore),
velerotest.NewFakeCredentialsFileStore("", nil),
nil,
)

veleroBackup.Name = test.backupName

if test.batchDeleteSucceed {
batchDeleteSnapshotFunc = batchDeleteSucceed
} else {
batchDeleteSnapshotFunc = batchDeleteFail
}

errs := controller.deleteMovedSnapshots(context.Background(), veleroBackup)
if test.expected == nil {
assert.Nil(t, errs)
} else {
assert.Equal(t, len(test.expected), len(errs))
for i := range test.expected {
assert.EqualError(t, errs[i], test.expected[i])
}
}
})
}
}
4 changes: 2 additions & 2 deletions pkg/repository/provider/unified_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (urp *unifiedRepoProvider) BatchForget(ctx context.Context, snapshotIDs []s
for _, snapshotID := range snapshotIDs {
err = bkRepo.DeleteManifest(ctx, udmrepo.ID(snapshotID))
if err != nil {
errs = append(errs, errors.Wrap(err, "error to delete manifest"))
errs = append(errs, errors.Wrapf(err, "error to delete manifest %s", snapshotID))
}
}

Expand All @@ -361,7 +361,7 @@ func (urp *unifiedRepoProvider) BatchForget(ctx context.Context, snapshotIDs []s

log.Debug("Forget snapshot complete")

return nil
return errs
}

func (urp *unifiedRepoProvider) DefaultMaintenanceFrequency(ctx context.Context, param RepoParam) time.Duration {
Expand Down
Loading

0 comments on commit a007768

Please sign in to comment.