Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

backup-operator: Support periodically backup (#1841) #2028

Merged
merged 19 commits into from
Feb 18, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion pkg/apis/etcd/v1beta2/backup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

package v1beta2

import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// AWS S3 related consts
Expand Down Expand Up @@ -94,6 +98,12 @@ type BackupSource struct {
type BackupPolicy struct {
// TimeoutInSecond is the maximal allowed time in second of the entire backup process.
TimeoutInSecond int64 `json:"timeoutInSecond,omitempty"`
// BackupIntervalInSecond is to specify how often operator take snapshot
// 0 is magic number to indicate one-shot backup
BackupIntervalInSecond int64 `json:"backupIntervalInSecond,omitempty"`
// MaxBackups is to specify how many backups we want to keep
// 0 is magic number to indicate un-limited backups
MaxBackups int `json:"maxBackups,omitempty"`
}

// BackupStatus represents the status of the EtcdBackup Custom Resource.
Expand All @@ -106,6 +116,8 @@ type BackupStatus struct {
EtcdVersion string `json:"etcdVersion,omitempty"`
// EtcdRevision is the revision of etcd's KV store where the backup is performed on.
EtcdRevision int64 `json:"etcdRevision,omitempty"`
// LastSuccessDate indicate the time to get snapshot last time
LastSuccessDate time.Time `json:"lastSuccessDate,omitempty"`
}

// S3BackupSource provides the spec how to store backups on S3.
Expand Down
29 changes: 27 additions & 2 deletions pkg/backup/backup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"crypto/tls"
"fmt"
"sort"
"time"

"github.com/coreos/etcd-operator/pkg/backup/writer"
"github.com/coreos/etcd-operator/pkg/util/constants"
Expand Down Expand Up @@ -51,7 +53,7 @@ func NewBackupManagerFromWriter(kubecli kubernetes.Interface, bw writer.Writer,

// SaveSnap uses backup writer to save etcd snapshot to a specified S3 path
// and returns backup etcd server's kv store revision and its version.
func (bm *BackupManager) SaveSnap(ctx context.Context, s3Path string) (int64, string, error) {
func (bm *BackupManager) SaveSnap(ctx context.Context, s3Path string, now time.Time, isPeriodic bool) (int64, string, error) {
etcdcli, rev, err := bm.etcdClientWithMaxRevision(ctx)
if err != nil {
hexfusion marked this conversation as resolved.
Show resolved Hide resolved
return 0, "", fmt.Errorf("create etcd client failed: %v", err)
Expand All @@ -68,14 +70,37 @@ func (bm *BackupManager) SaveSnap(ctx context.Context, s3Path string) (int64, st
return 0, "", fmt.Errorf("failed to receive snapshot (%v)", err)
}
defer rc.Close()

if isPeriodic {
s3Path = fmt.Sprintf(s3Path+"_v%d_%s", rev, now.Format("2006-01-02-15:04:05"))
}
_, err = bm.bw.Write(ctx, s3Path, rc)
if err != nil {
return 0, "", fmt.Errorf("failed to write snapshot (%v)", err)
}
return rev, resp.Version, nil
}

// EnsureMaxbackup to ensure the number of snapshot is under maxcount
// if the number of snapshot exceeded than maxcount, delete oldest snapshot
func (bm *BackupManager) EnsureMaxbackup(ctx context.Context, s3Path string, maxCount int) error {

hasbro17 marked this conversation as resolved.
Show resolved Hide resolved
savedSnapShots, err := bm.bw.List(ctx, s3Path)
hexfusion marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to get exisiting snapashots: %v", err)
}
hexfusion marked this conversation as resolved.
Show resolved Hide resolved
sort.Sort(sort.Reverse(sort.StringSlice(savedSnapShots)))
for i, snapshotPath := range savedSnapShots {
if i < maxCount {
continue
}
err := bm.bw.Delete(ctx, snapshotPath)
if err != nil {
return fmt.Errorf("failed to delete snapshot: %v", err)
}
}
return nil
}

// etcdClientWithMaxRevision gets the etcd endpoint with the maximum kv store revision
// and returns the etcd client of that member.
func (bm *BackupManager) etcdClientWithMaxRevision(ctx context.Context) (*clientv3.Client, int64, error) {
Expand Down
47 changes: 47 additions & 0 deletions pkg/backup/writer/abs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,50 @@ func (absw *absWriter) Write(ctx context.Context, path string, r io.Reader) (int

return blob.Properties.ContentLength, nil
}

func (absw *absWriter) List(ctx context.Context, basePath string) ([]string, error) {
// TODO: support context.
container, _, err := util.ParseBucketAndKey(basePath)
if err != nil {
return nil, err
}

containerRef := absw.abs.GetContainerReference(container)
containerExists, err := containerRef.Exists()
if err != nil {
return nil, err
}
if !containerExists {
return nil, fmt.Errorf("container %v does not exist", container)
}

blobs, err := containerRef.ListBlobs(
storage.ListBlobsParameters{Prefix: basePath})
if err != nil {
return nil, err
}
blobKeys := []string{}
for _, blob := range blobs.Blobs {
blobKeys = append(blobKeys, container+"/"+blob.Name)
}
return blobKeys, nil
}

func (absw *absWriter) Delete(ctx context.Context, path string) error {
// TODO: support context.
container, key, err := util.ParseBucketAndKey(path)
if err != nil {
return err
}
containerRef := absw.abs.GetContainerReference(container)
containerExists, err := containerRef.Exists()
if err != nil {
return err
}
if !containerExists {
return fmt.Errorf("container %v does not exist", container)
}

blob := containerRef.GetBlobReference(key)
return blob.Delete(&storage.DeleteBlobOptions{})
}
35 changes: 35 additions & 0 deletions pkg/backup/writer/gcs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"cloud.google.com/go/storage"
"github.com/sirupsen/logrus"
"google.golang.org/api/iterator"
)

var _ Writer = &gcsWriter{}
Expand Down Expand Up @@ -58,3 +59,37 @@ func (gcsw *gcsWriter) Write(ctx context.Context, path string, r io.Reader) (int
}
return n, err
}

func (gcsw *gcsWriter) List(ctx context.Context, basePath string) ([]string, error) {
bucket, key, err := util.ParseBucketAndKey(basePath)
if err != nil {
return nil, err
}
objects := gcsw.gcs.Bucket(bucket).Objects(ctx, &storage.Query{Prefix: key})
if objects == nil {
return nil, fmt.Errorf("failed to get objects having %s prefix", key)
}

objectKeys := []string{}

for {
objAttrs, err := objects.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
objectKeys = append(objectKeys, bucket+"/"+objAttrs.Name)
}
return objectKeys, nil
}

func (gcsw *gcsWriter) Delete(ctx context.Context, path string) error {
bucket, key, err := util.ParseBucketAndKey(path)
if err != nil {
return err
}

return gcsw.gcs.Bucket(bucket).Object(key).Delete(ctx)
}
36 changes: 36 additions & 0 deletions pkg/backup/writer/s3_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,39 @@ func (s3w *s3Writer) Write(ctx context.Context, path string, r io.Reader) (int64
}
return *resp.ContentLength, nil
}

// List return the file paths which match the given s3 path
func (s3w *s3Writer) List(ctx context.Context, basePath string) ([]string, error) {
bk, key, err := util.ParseBucketAndKey(basePath)
if err != nil {
return nil, err
}

objects, err := s3w.s3.ListObjectsWithContext(ctx,
&s3.ListObjectsInput{
Bucket: aws.String(bk),
Prefix: aws.String(key),
})
if err != nil {
return nil, err
}
objectKeys := []string{}
for _, object := range objects.Contents {
objectKeys = append(objectKeys, bk+"/"+ *object.Key)
}
hexfusion marked this conversation as resolved.
Show resolved Hide resolved
return objectKeys, nil
}

func (s3w *s3Writer) Delete(ctx context.Context, path string) error {
bk, key, err := util.ParseBucketAndKey(path)
if err != nil {
return err
}

_, err = s3w.s3.DeleteObjectWithContext(ctx,
&s3.DeleteObjectInput{
Bucket: aws.String(bk),
Key: aws.String(key),
})
return err
}
6 changes: 6 additions & 0 deletions pkg/backup/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ import (
type Writer interface {
// Write writes a backup file to the given path and returns size of written file.
Write(ctx context.Context, path string, r io.Reader) (int64, error)

// List a backup files
List(ctx context.Context, basePath string) ([]string, error)
hexfusion marked this conversation as resolved.
Show resolved Hide resolved

// Delete a backup file
Delete(ctx context.Context, path string) error
}
16 changes: 12 additions & 4 deletions pkg/controller/backup-operator/abs_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"crypto/tls"
"fmt"
"time"

api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2"
"github.com/coreos/etcd-operator/pkg/backup"
Expand All @@ -28,7 +29,8 @@ import (
)

// handleABS saves etcd cluster's backup to specificed ABS path.
func handleABS(ctx context.Context, kubecli kubernetes.Interface, s *api.ABSBackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) {
func handleABS(ctx context.Context, kubecli kubernetes.Interface, s *api.ABSBackupSource, endpoints []string, clientTLSSecret,
namespace string, isPeriodic bool, maxBackup int) (*api.BackupStatus, error) {
// TODO: controls NewClientFromSecret with ctx. This depends on upstream kubernetes to support API calls with ctx.
cli, err := absfactory.NewClientFromSecret(kubecli, namespace, s.ABSSecret)
if err != nil {
Expand All @@ -39,12 +41,18 @@ func handleABS(ctx context.Context, kubecli kubernetes.Interface, s *api.ABSBack
if tlsConfig, err = generateTLSConfig(kubecli, clientTLSSecret, namespace); err != nil {
return nil, err
}

now := time.Now().UTC()
bm := backup.NewBackupManagerFromWriter(kubecli, writer.NewABSWriter(cli.ABS), tlsConfig, endpoints, namespace)

rev, etcdVersion, err := bm.SaveSnap(ctx, s.Path)
rev, etcdVersion, err := bm.SaveSnap(ctx, s.Path, now, isPeriodic)
if err != nil {
hexfusion marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("failed to save snapshot (%v)", err)
}
return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev}, nil
if maxBackup > 0 {
err := bm.EnsureMaxbackup(ctx, s.Path, maxBackup)
if err != nil {
return nil, fmt.Errorf("succeeded in saving snapshot but failed to delete old snapshot (%v)", err)
}
}
return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev, LastSuccessDate: now}, nil
}
16 changes: 12 additions & 4 deletions pkg/controller/backup-operator/gcs_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"crypto/tls"
"fmt"
"time"

api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2"
"github.com/coreos/etcd-operator/pkg/backup"
Expand All @@ -28,7 +29,8 @@ import (
)

// handleGCS saves etcd cluster's backup to specificed GCS path.
func handleGCS(ctx context.Context, kubecli kubernetes.Interface, s *api.GCSBackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) {
func handleGCS(ctx context.Context, kubecli kubernetes.Interface, s *api.GCSBackupSource, endpoints []string, clientTLSSecret,
namespace string, isPeriodic bool, maxBackup int) (*api.BackupStatus, error) {
// TODO: controls NewClientFromSecret with ctx. This depends on upstream kubernetes to support API calls with ctx.
cli, err := gcsfactory.NewClientFromSecret(ctx, kubecli, namespace, s.GCPSecret)
if err != nil {
Expand All @@ -40,12 +42,18 @@ func handleGCS(ctx context.Context, kubecli kubernetes.Interface, s *api.GCSBack
if tlsConfig, err = generateTLSConfig(kubecli, clientTLSSecret, namespace); err != nil {
return nil, err
}

now := time.Now().UTC()
bm := backup.NewBackupManagerFromWriter(kubecli, writer.NewGCSWriter(cli.GCS), tlsConfig, endpoints, namespace)

rev, etcdVersion, err := bm.SaveSnap(ctx, s.Path)
rev, etcdVersion, err := bm.SaveSnap(ctx, s.Path, now, isPeriodic)
if err != nil {
return nil, fmt.Errorf("failed to save snapshot (%v)", err)
}
return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev}, nil
if maxBackup > 0 {
err := bm.EnsureMaxbackup(ctx, s.Path, maxBackup)
if err != nil {
return nil, fmt.Errorf("succeeded in saving snapshot but failed to delete old snapshot (%v)", err)
}
}
return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev, LastSuccessDate: now}, nil
}
8 changes: 8 additions & 0 deletions pkg/controller/backup-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"os"
"sync"

api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2"
"github.com/coreos/etcd-operator/pkg/client"
Expand Down Expand Up @@ -45,9 +46,16 @@ type Backup struct {
backupCRCli versioned.Interface
kubeExtCli apiextensionsclient.Interface

backupRunnerStore sync.Map

createCRD bool
}

type BackupRunner struct {
spec api.BackupSpec
cancelFunc context.CancelFunc
}

// New creates a backup operator.
func New(createCRD bool) *Backup {
return &Backup{
Expand Down
Loading