Skip to content

Commit

Permalink
data mover micro service backup
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
  • Loading branch information
Lyndon-Li committed Jul 25, 2024
1 parent 442cc76 commit 0786dbc
Show file tree
Hide file tree
Showing 10 changed files with 1,579 additions and 8 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8046-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Data mover micro service backup according to design #7576
208 changes: 200 additions & 8 deletions pkg/cmd/cli/datamover/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,39 @@ limitations under the License.
package datamover

import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/pkg/buildinfo"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
"github.com/vmware-tanzu/velero/pkg/datamover"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"

ctrl "sigs.k8s.io/controller-runtime"

velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"

ctlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
)

type dataMoverBackupConfig struct {
Expand Down Expand Up @@ -52,7 +75,10 @@ func NewBackupCommand(f client.Factory) *cobra.Command {
logger.Infof("Starting Velero data-mover backup %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA())

f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
s := newdataMoverBackup(logger, config)
s, err := newdataMoverBackup(logger, f, config)
if err != nil {
exitWithMessage(logger, false, "Failed to create data mover backup, %v", err)
}

s.run()
},
Expand All @@ -73,20 +99,186 @@ func NewBackupCommand(f client.Factory) *cobra.Command {
return command
}

const (
// defaultCredentialsDirectory is the path on disk where credential
// files will be written to
defaultCredentialsDirectory = "/tmp/credentials"
)

type dataMoverBackup struct {
logger logrus.FieldLogger
config dataMoverBackupConfig
logger logrus.FieldLogger
ctx context.Context
cancelFunc context.CancelFunc
client ctlclient.Client
cache ctlcache.Cache
namespace string
nodeName string
config dataMoverBackupConfig
kubeClient kubernetes.Interface
dataPathMgr *datapath.Manager
}

func newdataMoverBackup(logger logrus.FieldLogger, config dataMoverBackupConfig) *dataMoverBackup {
func newdataMoverBackup(logger logrus.FieldLogger, factory client.Factory, config dataMoverBackupConfig) (*dataMoverBackup, error) {
ctx, cancelFunc := context.WithCancel(context.Background())

clientConfig, err := factory.ClientConfig()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client config")
}

ctrl.SetLogger(zap.New(zap.UseDevMode(true)))

scheme := runtime.NewScheme()
if err := velerov1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add velero v1 scheme")
}

if err := velerov2alpha1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add velero v2alpha1 scheme")
}

if err := v1.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add core v1 scheme")
}

nodeName := os.Getenv("NODE_NAME")

// use a field selector to filter to only pods scheduled on this node.
cacheOption := ctlcache.Options{
Scheme: scheme,
ByObject: map[ctlclient.Object]ctlcache.ByObject{
&v1.Pod{}: {
Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(),
},
&velerov2alpha1api.DataUpload{}: {
Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(),
},
},
}

cli, err := ctlclient.New(clientConfig, ctlclient.Options{
Scheme: scheme,
})
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client")
}

cache, err := ctlcache.New(clientConfig, cacheOption)
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client cache")
}

s := &dataMoverBackup{
logger: logger,
config: config,
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
client: cli,
cache: cache,
config: config,
namespace: factory.Namespace(),
nodeName: nodeName,
}

s.kubeClient, err = factory.KubeClient()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create kube client")
}

return s
s.dataPathMgr = datapath.NewManager(1)

return s, nil
}

var funcExitWithMessage = exitWithMessage
var funcCreateDataPathService = (*dataMoverBackup).createDataPathService

func (s *dataMoverBackup) run() {
time.Sleep(time.Duration(1<<63 - 1))
signals.CancelOnShutdown(s.cancelFunc, s.logger)
go func() {
if err := s.cache.Start(s.ctx); err != nil {
s.logger.WithError(err).Warn("error starting cache")
}
}()

s.runDataPath()
}

func (s *dataMoverBackup) runDataPath() {
s.logger.Infof("Starting micro service in node %s for du %s", s.nodeName, s.config.duName)

dpService, err := funcCreateDataPathService(s)
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to create data path service for DataUpload %s: %v", s.config.duName, err)
return
}

s.logger.Infof("Starting data path service %s", s.config.duName)

err = dpService.Init()
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to init data path service for DataUpload %s: %v", s.config.duName, err)
return
}

s.logger.Infof("Running data path service %s", s.config.duName)

result, err := dpService.RunCancelableDataPath(s.ctx)
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to run data path service for DataUpload %s: %v", s.config.duName, err)
return
}

s.logger.WithField("du", s.config.duName).Info("Data path service completed")

dpService.Shutdown()

s.logger.WithField("du", s.config.duName).Info("Data path service is shut down")

s.cancelFunc()

funcExitWithMessage(s.logger, true, result)
}

var funcNewCredentialFileStore = credentials.NewNamespacedFileStore
var funcNewCredentialSecretStore = credentials.NewNamespacedSecretStore

func (s *dataMoverBackup) createDataPathService() (dataPathService, error) {
credentialFileStore, err := funcNewCredentialFileStore(
s.client,
s.namespace,
defaultCredentialsDirectory,
filesystem.NewFileSystem(),
)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential file store")
}

credSecretStore, err := funcNewCredentialSecretStore(s.client, s.namespace)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential secret store")
}

credGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}

duInformer, err := s.cache.GetInformer(s.ctx, &velerov2alpha1api.DataUpload{})
if err != nil {
return nil, errors.Wrap(err, "error to get controller-runtime informer from manager")
}

repoEnsurer := repository.NewEnsurer(s.client, s.logger, s.config.resourceTimeout)

return datamover.NewBackupMicroService(s.ctx, s.client, s.kubeClient, s.config.duName, s.namespace, s.nodeName, datapath.AccessPoint{
ByPath: s.config.volumePath,
VolMode: uploader.PersistentVolumeMode(s.config.volumeMode),
}, s.dataPathMgr, repoEnsurer, credGetter, duInformer, s.logger), nil
}
Loading

0 comments on commit 0786dbc

Please sign in to comment.