Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data mover micro service backup #8046

Merged
merged 3 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/8046-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Data mover micro service backup according to design #7576
207 changes: 200 additions & 7 deletions pkg/cmd/cli/datamover/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,39 @@
package datamover

import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/pkg/buildinfo"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
"github.com/vmware-tanzu/velero/pkg/datamover"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"

ctrl "sigs.k8s.io/controller-runtime"

velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"

ctlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
)

type dataMoverBackupConfig struct {
Expand Down Expand Up @@ -52,7 +75,10 @@
logger.Infof("Starting Velero data-mover backup %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA())

f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
s := newdataMoverBackup(logger, config)
s, err := newdataMoverBackup(logger, f, config)
if err != nil {
exitWithMessage(logger, false, "Failed to create data mover backup, %v", err)

Check warning on line 80 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L78-L80

Added lines #L78 - L80 were not covered by tests
}

s.run()
},
Expand All @@ -73,20 +99,187 @@
return command
}

const (
// defaultCredentialsDirectory is the path on disk where credential
// files will be written to
defaultCredentialsDirectory = "/tmp/credentials"
)

type dataMoverBackup struct {
logger logrus.FieldLogger
config dataMoverBackupConfig
logger logrus.FieldLogger
ctx context.Context
cancelFunc context.CancelFunc
client ctlclient.Client
cache ctlcache.Cache
namespace string
nodeName string
config dataMoverBackupConfig
kubeClient kubernetes.Interface
dataPathMgr *datapath.Manager
}

func newdataMoverBackup(logger logrus.FieldLogger, config dataMoverBackupConfig) *dataMoverBackup {
func newdataMoverBackup(logger logrus.FieldLogger, factory client.Factory, config dataMoverBackupConfig) (*dataMoverBackup, error) {
ctx, cancelFunc := context.WithCancel(context.Background())

Check warning on line 122 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L121-L122

Added lines #L121 - L122 were not covered by tests

clientConfig, err := factory.ClientConfig()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client config")

Check warning on line 127 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L124-L127

Added lines #L124 - L127 were not covered by tests
}

ctrl.SetLogger(zap.New(zap.UseDevMode(true)))

Check warning on line 130 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L130

Added line #L130 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this line is not relevant to the context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check this separately since this code is same with the node-agent server.
If there is any problem, I will modify it for both here and the node-agent server in a separate PR.

Copy link
Contributor Author

@Lyndon-Li Lyndon-Li Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a preliminary check, this is basically OK.
zap.New(zap.UseDevMode(true))also initializes a logger tostderrwhich is the same withlogger logrus.FieldLoggerand the default log level isInfo. Merely, logger logrus.FieldLoggeris with more info about the log level and format, but zap.New(zap.UseDevMode(true))may not comply with them. We probably seldomly need the logs from controller-runtime to be indebug` level.

The same code in the node-agent server was introduced in this PR #2561. Since it is an old one, we don't know any other special reason for not using the provided logger.

Therefore, I suggest we keep it as is until we see problems, then we come back to change all the places the same as here.


scheme := runtime.NewScheme()
if err := velerov1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add velero v1 scheme")

Check warning on line 135 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L132-L135

Added lines #L132 - L135 were not covered by tests
}

if err := velerov2alpha1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add velero v2alpha1 scheme")

Check warning on line 140 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L138-L140

Added lines #L138 - L140 were not covered by tests
}

if err := v1.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add core v1 scheme")

Check warning on line 145 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L143-L145

Added lines #L143 - L145 were not covered by tests
}

nodeName := os.Getenv("NODE_NAME")

Check warning on line 148 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L148

Added line #L148 was not covered by tests

// use a field selector to filter to only pods scheduled on this node.
cacheOption := ctlcache.Options{
Scheme: scheme,
ByObject: map[ctlclient.Object]ctlcache.ByObject{
&v1.Pod{}: {
Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(),
},
&velerov2alpha1api.DataUpload{}: {
Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(),
},
},

Check warning on line 160 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L151-L160

Added lines #L151 - L160 were not covered by tests
}

cli, err := ctlclient.New(clientConfig, ctlclient.Options{
Scheme: scheme,
})
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client")

Check warning on line 168 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L163-L168

Added lines #L163 - L168 were not covered by tests
}

cache, err := ctlcache.New(clientConfig, cacheOption)
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client cache")

Check warning on line 174 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L171-L174

Added lines #L171 - L174 were not covered by tests
}

s := &dataMoverBackup{
logger: logger,
config: config,
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
client: cli,
cache: cache,
config: config,
namespace: factory.Namespace(),
nodeName: nodeName,

Check warning on line 185 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L178-L185

Added lines #L178 - L185 were not covered by tests
}

s.kubeClient, err = factory.KubeClient()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create kube client")

Check warning on line 191 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L188-L191

Added lines #L188 - L191 were not covered by tests
}

return s
s.dataPathMgr = datapath.NewManager(1)

Check warning on line 194 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L194

Added line #L194 was not covered by tests

return s, nil

Check warning on line 196 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L196

Added line #L196 was not covered by tests
}

var funcExitWithMessage = exitWithMessage
var funcCreateDataPathService = (*dataMoverBackup).createDataPathService

func (s *dataMoverBackup) run() {
signals.CancelOnShutdown(s.cancelFunc, s.logger)
go func() {
if err := s.cache.Start(s.ctx); err != nil {
s.logger.WithError(err).Warn("error starting cache")

Check warning on line 206 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L203-L206

Added lines #L203 - L206 were not covered by tests
}
}()

// TODOOO: call s.runDataPath()
time.Sleep(time.Duration(1<<63 - 1))
}

func (s *dataMoverBackup) runDataPath() {
s.logger.Infof("Starting micro service in node %s for du %s", s.nodeName, s.config.duName)

dpService, err := funcCreateDataPathService(s)
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to create data path service for DataUpload %s: %v", s.config.duName, err)
return
}

s.logger.Infof("Starting data path service %s", s.config.duName)

err = dpService.Init()
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to init data path service for DataUpload %s: %v", s.config.duName, err)
return
}

s.logger.Infof("Running data path service %s", s.config.duName)

result, err := dpService.RunCancelableDataPath(s.ctx)
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to run data path service for DataUpload %s: %v", s.config.duName, err)
return
}

s.logger.WithField("du", s.config.duName).Info("Data path service completed")

dpService.Shutdown()

s.logger.WithField("du", s.config.duName).Info("Data path service is shut down")

s.cancelFunc()

funcExitWithMessage(s.logger, true, result)
}

var funcNewCredentialFileStore = credentials.NewNamespacedFileStore
var funcNewCredentialSecretStore = credentials.NewNamespacedSecretStore

func (s *dataMoverBackup) createDataPathService() (dataPathService, error) {
credentialFileStore, err := funcNewCredentialFileStore(
s.client,
s.namespace,
defaultCredentialsDirectory,
filesystem.NewFileSystem(),
)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential file store")
}

credSecretStore, err := funcNewCredentialSecretStore(s.client, s.namespace)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential secret store")
}

credGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}

duInformer, err := s.cache.GetInformer(s.ctx, &velerov2alpha1api.DataUpload{})
if err != nil {
return nil, errors.Wrap(err, "error to get controller-runtime informer from manager")
}

repoEnsurer := repository.NewEnsurer(s.client, s.logger, s.config.resourceTimeout)

return datamover.NewBackupMicroService(s.ctx, s.client, s.kubeClient, s.config.duName, s.namespace, s.nodeName, datapath.AccessPoint{
ByPath: s.config.volumePath,
VolMode: uploader.PersistentVolumeMode(s.config.volumeMode),
}, s.dataPathMgr, repoEnsurer, credGetter, duInformer, s.logger), nil
}
Loading
Loading