Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

backup-operator: Support periodically backup (#1841) #2028

Merged
merged 19 commits into from
Feb 18, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@

### Changed

- EtcdBackup: Support periodically backup. This change added 3 new fileds in EtcdBackup schema, 2 variables is in spec, 1 varialbe is in status.
- in spec.backupPolicy
- maxBackup which indicate maximum number of backup to keep
- backupIntervalInSecond which indicate how often do backup operation.
- in status
- LastSuccessDate which indicate the last time to succeed in taking backup

### Removed

### Fixed
Expand Down
10 changes: 10 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions example/etcd-backup-operator/periodic_backup_cr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: "etcd.database.coreos.com/v1beta2"
kind: "EtcdBackup"
metadata:
name: example-etcd-cluster-periodic-backup
spec:
etcdEndpoints: [<etcd-cluster-endpoints>]
storageType: S3
backupPolicy:
# 0 > enable periodic backup
backupIntervalInSecond: 125
maxBackups: 4
s3:
# The format of "path" must be: "<s3-bucket-name>/<path-to-backup-file>"
# e.g: "mybucket/etcd.backup"
path: <full-s3-path>
awsSecret: <aws-secret>
12 changes: 11 additions & 1 deletion pkg/apis/etcd/v1beta2/backup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package v1beta2

import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// AWS S3 related consts
Expand Down Expand Up @@ -94,6 +96,12 @@ type BackupSource struct {
type BackupPolicy struct {
// TimeoutInSecond is the maximal allowed time in second of the entire backup process.
TimeoutInSecond int64 `json:"timeoutInSecond,omitempty"`
// BackupIntervalInSecond is to specify how often operator take snapshot
// 0 is magic number to indicate one-shot backup
BackupIntervalInSecond int64 `json:"backupIntervalInSecond,omitempty"`
// MaxBackups is to specify how many backups we want to keep
// 0 is magic number to indicate un-limited backups
MaxBackups int `json:"maxBackups,omitempty"`
}

// BackupStatus represents the status of the EtcdBackup Custom Resource.
Expand All @@ -106,6 +114,8 @@ type BackupStatus struct {
EtcdVersion string `json:"etcdVersion,omitempty"`
// EtcdRevision is the revision of etcd's KV store where the backup is performed on.
EtcdRevision int64 `json:"etcdRevision,omitempty"`
// LastSuccessDate indicate the time to get snapshot last time
LastSuccessDate metav1.Time `json:"lastSuccessDate,omitempty"`
}

// S3BackupSource provides the spec how to store backups on S3.
Expand Down
5 changes: 3 additions & 2 deletions pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 33 additions & 7 deletions pkg/backup/backup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ import (
"context"
"crypto/tls"
"fmt"
"sort"
"time"

"github.com/coreos/etcd-operator/pkg/backup/writer"
"github.com/coreos/etcd-operator/pkg/util/constants"

"github.com/coreos/etcd/clientv3"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -51,29 +54,52 @@ func NewBackupManagerFromWriter(kubecli kubernetes.Interface, bw writer.Writer,

// SaveSnap uses backup writer to save etcd snapshot to a specified S3 path
// and returns backup etcd server's kv store revision and its version.
func (bm *BackupManager) SaveSnap(ctx context.Context, s3Path string) (int64, string, error) {
func (bm *BackupManager) SaveSnap(ctx context.Context, s3Path string, isPeriodic bool) (int64, string, *metav1.Time, error) {
now := time.Now().UTC()
etcdcli, rev, err := bm.etcdClientWithMaxRevision(ctx)
if err != nil {
hexfusion marked this conversation as resolved.
Show resolved Hide resolved
return 0, "", fmt.Errorf("create etcd client failed: %v", err)
return 0, "", nil, fmt.Errorf("create etcd client failed: %v", err)
}
defer etcdcli.Close()

resp, err := etcdcli.Status(ctx, etcdcli.Endpoints()[0])
if err != nil {
return 0, "", fmt.Errorf("failed to retrieve etcd version from the status call: %v", err)
return 0, "", nil, fmt.Errorf("failed to retrieve etcd version from the status call: %v", err)
}

rc, err := etcdcli.Snapshot(ctx)
if err != nil {
return 0, "", fmt.Errorf("failed to receive snapshot (%v)", err)
return 0, "", nil, fmt.Errorf("failed to receive snapshot (%v)", err)
}
defer rc.Close()

if isPeriodic {
s3Path = fmt.Sprintf(s3Path+"_v%d_%s", rev, now.Format("2006-01-02-15:04:05"))
}
_, err = bm.bw.Write(ctx, s3Path, rc)
if err != nil {
return 0, "", fmt.Errorf("failed to write snapshot (%v)", err)
return 0, "", nil, fmt.Errorf("failed to write snapshot (%v)", err)
}
return rev, resp.Version, &metav1.Time{Time: now}, nil
}

// EnsureMaxBackup to ensure the number of snapshot is under maxcount
// if the number of snapshot exceeded than maxcount, delete oldest snapshot
func (bm *BackupManager) EnsureMaxBackup(ctx context.Context, basePath string, maxCount int) error {
savedSnapShots, err := bm.bw.List(ctx, basePath)
if err != nil {
return fmt.Errorf("failed to get exisiting snapshots: %v", err)
}
hexfusion marked this conversation as resolved.
Show resolved Hide resolved
sort.Sort(sort.Reverse(sort.StringSlice(savedSnapShots)))
for i, snapshotPath := range savedSnapShots {
if i < maxCount {
continue
}
err := bm.bw.Delete(ctx, snapshotPath)
if err != nil {
return fmt.Errorf("failed to delete snapshot: %v", err)
}
}
return rev, resp.Version, nil
return nil
}

// etcdClientWithMaxRevision gets the etcd endpoint with the maximum kv store revision
Expand Down
47 changes: 47 additions & 0 deletions pkg/backup/writer/abs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,50 @@ func (absw *absWriter) Write(ctx context.Context, path string, r io.Reader) (int

return blob.Properties.ContentLength, nil
}

func (absw *absWriter) List(ctx context.Context, basePath string) ([]string, error) {
// TODO: support context.
container, _, err := util.ParseBucketAndKey(basePath)
if err != nil {
return nil, err
}

containerRef := absw.abs.GetContainerReference(container)
containerExists, err := containerRef.Exists()
if err != nil {
return nil, err
}
if !containerExists {
return nil, fmt.Errorf("container %v does not exist", container)
}

blobs, err := containerRef.ListBlobs(
storage.ListBlobsParameters{Prefix: basePath})
if err != nil {
return nil, err
}
blobKeys := []string{}
for _, blob := range blobs.Blobs {
blobKeys = append(blobKeys, container+"/"+blob.Name)
}
return blobKeys, nil
}

func (absw *absWriter) Delete(ctx context.Context, path string) error {
// TODO: support context.
container, key, err := util.ParseBucketAndKey(path)
if err != nil {
return err
}
containerRef := absw.abs.GetContainerReference(container)
containerExists, err := containerRef.Exists()
if err != nil {
return err
}
if !containerExists {
return fmt.Errorf("container %v does not exist", container)
}

blob := containerRef.GetBlobReference(key)
return blob.Delete(&storage.DeleteBlobOptions{})
}
35 changes: 35 additions & 0 deletions pkg/backup/writer/gcs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"cloud.google.com/go/storage"
"github.com/sirupsen/logrus"
"google.golang.org/api/iterator"
)

var _ Writer = &gcsWriter{}
Expand Down Expand Up @@ -58,3 +59,37 @@ func (gcsw *gcsWriter) Write(ctx context.Context, path string, r io.Reader) (int
}
return n, err
}

func (gcsw *gcsWriter) List(ctx context.Context, basePath string) ([]string, error) {
bucket, key, err := util.ParseBucketAndKey(basePath)
if err != nil {
return nil, err
}
objects := gcsw.gcs.Bucket(bucket).Objects(ctx, &storage.Query{Prefix: key})
if objects == nil {
return nil, fmt.Errorf("failed to get objects having %s prefix", key)
}

objectKeys := []string{}

for {
objAttrs, err := objects.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
objectKeys = append(objectKeys, bucket+"/"+objAttrs.Name)
}
return objectKeys, nil
}

func (gcsw *gcsWriter) Delete(ctx context.Context, path string) error {
bucket, key, err := util.ParseBucketAndKey(path)
if err != nil {
return err
}

return gcsw.gcs.Bucket(bucket).Object(key).Delete(ctx)
}
36 changes: 36 additions & 0 deletions pkg/backup/writer/s3_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,39 @@ func (s3w *s3Writer) Write(ctx context.Context, path string, r io.Reader) (int64
}
return *resp.ContentLength, nil
}

// List return the file paths which match the given s3 path
func (s3w *s3Writer) List(ctx context.Context, basePath string) ([]string, error) {
bk, key, err := util.ParseBucketAndKey(basePath)
if err != nil {
return nil, err
}

objects, err := s3w.s3.ListObjectsWithContext(ctx,
&s3.ListObjectsInput{
Bucket: aws.String(bk),
Prefix: aws.String(key),
})
if err != nil {
return nil, err
}
objectKeys := []string{}
for _, object := range objects.Contents {
objectKeys = append(objectKeys, bk+"/"+*object.Key)
}
hexfusion marked this conversation as resolved.
Show resolved Hide resolved
return objectKeys, nil
}

func (s3w *s3Writer) Delete(ctx context.Context, path string) error {
bk, key, err := util.ParseBucketAndKey(path)
if err != nil {
return err
}

_, err = s3w.s3.DeleteObjectWithContext(ctx,
&s3.DeleteObjectInput{
Bucket: aws.String(bk),
Key: aws.String(key),
})
return err
}
6 changes: 6 additions & 0 deletions pkg/backup/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ import (
type Writer interface {
// Write writes a backup file to the given path and returns size of written file.
Write(ctx context.Context, path string, r io.Reader) (int64, error)

// List backup files
List(ctx context.Context, basePath string) ([]string, error)
hexfusion marked this conversation as resolved.
Show resolved Hide resolved

// Delete a backup file
Delete(ctx context.Context, path string) error
}
14 changes: 10 additions & 4 deletions pkg/controller/backup-operator/abs_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
)

// handleABS saves etcd cluster's backup to specificed ABS path.
func handleABS(ctx context.Context, kubecli kubernetes.Interface, s *api.ABSBackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) {
func handleABS(ctx context.Context, kubecli kubernetes.Interface, s *api.ABSBackupSource, endpoints []string, clientTLSSecret,
namespace string, isPeriodic bool, maxBackup int) (*api.BackupStatus, error) {
// TODO: controls NewClientFromSecret with ctx. This depends on upstream kubernetes to support API calls with ctx.
cli, err := absfactory.NewClientFromSecret(kubecli, namespace, s.ABSSecret)
if err != nil {
Expand All @@ -39,12 +40,17 @@ func handleABS(ctx context.Context, kubecli kubernetes.Interface, s *api.ABSBack
if tlsConfig, err = generateTLSConfig(kubecli, clientTLSSecret, namespace); err != nil {
return nil, err
}

bm := backup.NewBackupManagerFromWriter(kubecli, writer.NewABSWriter(cli.ABS), tlsConfig, endpoints, namespace)

rev, etcdVersion, err := bm.SaveSnap(ctx, s.Path)
rev, etcdVersion, now, err := bm.SaveSnap(ctx, s.Path, isPeriodic)
if err != nil {
hexfusion marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("failed to save snapshot (%v)", err)
}
return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev}, nil
if maxBackup > 0 {
err := bm.EnsureMaxBackup(ctx, s.Path, maxBackup)
if err != nil {
return nil, fmt.Errorf("succeeded in saving snapshot but failed to delete old snapshot (%v)", err)
}
}
return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev, LastSuccessDate: *now}, nil
}
Loading