Skip to content

Commit

Permalink
switch server & controllers to logrus
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Kriss <steve@heptio.com>
  • Loading branch information
skriss committed Sep 15, 2017
1 parent 9177064 commit 616976e
Show file tree
Hide file tree
Showing 13 changed files with 329 additions and 188 deletions.
22 changes: 17 additions & 5 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ func NewCommand() *cobra.Command {
Short: "Run the ark server",
Long: "Run the ark server",
Run: func(c *cobra.Command, args []string) {
s, err := newServer(kubeconfig)
var (
logger = logrus.New()
s, err = newServer(kubeconfig, logger)
)
cmd.CheckError(err)

cmd.CheckError(s.run())
Expand All @@ -89,7 +92,7 @@ type server struct {
logger *logrus.Logger
}

func newServer(kubeconfig string) (*server, error) {
func newServer(kubeconfig string, logger *logrus.Logger) (*server, error) {
clientConfig, err := client.Config(kubeconfig)
if err != nil {
return nil, err
Expand All @@ -115,7 +118,7 @@ func newServer(kubeconfig string) (*server, error) {
sharedInformerFactory: informers.NewSharedInformerFactory(arkClient, 0),
ctx: ctx,
cancelFunc: cancelFunc,
logger: logrus.New(),
logger: logger,
}

return s, nil
Expand Down Expand Up @@ -253,7 +256,7 @@ func (s *server) initBackupService(config *api.Config) error {
return err
}

s.backupService = cloudprovider.NewBackupService(objectStorage)
s.backupService = cloudprovider.NewBackupService(objectStorage, s.logger)
return nil
}

Expand Down Expand Up @@ -391,13 +394,15 @@ func (s *server) runControllers(config *api.Config) error {
ctx,
s.backupService,
cloudBackupCacheResyncPeriod,
s.logger,
)

backupSyncController := controller.NewBackupSyncController(
s.arkClient.ArkV1(),
s.backupService,
config.BackupStorageProvider.Bucket,
config.BackupSyncPeriod.Duration,
s.logger,
)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -431,6 +436,7 @@ func (s *server) runControllers(config *api.Config) error {
s.backupService,
config.BackupStorageProvider.Bucket,
s.snapshotService != nil,
s.logger,
)
wg.Add(1)
go func() {
Expand All @@ -443,6 +449,7 @@ func (s *server) runControllers(config *api.Config) error {
s.arkClient.ArkV1(),
s.sharedInformerFactory.Ark().V1().Schedules(),
config.ScheduleSyncPeriod.Duration,
s.logger,
)
wg.Add(1)
go func() {
Expand All @@ -459,6 +466,7 @@ func (s *server) runControllers(config *api.Config) error {
s.arkClient.ArkV1(),
s.sharedInformerFactory.Ark().V1().Restores(),
s.arkClient.ArkV1(),
s.logger,
)
wg.Add(1)
go func() {
Expand All @@ -475,6 +483,7 @@ func (s *server) runControllers(config *api.Config) error {
config.ResourcePriorities,
s.arkClient.ArkV1(),
s.kubeClient,
s.logger,
)
cmd.CheckError(err)

Expand All @@ -487,6 +496,7 @@ func (s *server) runControllers(config *api.Config) error {
config.BackupStorageProvider.Bucket,
s.sharedInformerFactory.Ark().V1().Backups(),
s.snapshotService != nil,
s.logger,
)
wg.Add(1)
go func() {
Expand All @@ -499,6 +509,7 @@ func (s *server) runControllers(config *api.Config) error {
s.sharedInformerFactory.Ark().V1().DownloadRequests(),
s.backupService,
config.BackupStorageProvider.Bucket,
s.logger,
)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -551,14 +562,15 @@ func newRestorer(
resourcePriorities []string,
backupClient arkv1client.BackupsGetter,
kubeClient kubernetes.Interface,
logger *logrus.Logger,
) (restore.Restorer, error) {
restorers := map[string]restorers.ResourceRestorer{
"persistentvolumes": restorers.NewPersistentVolumeRestorer(snapshotService),
"persistentvolumeclaims": restorers.NewPersistentVolumeClaimRestorer(),
"services": restorers.NewServiceRestorer(),
"namespaces": restorers.NewNamespaceRestorer(),
"pods": restorers.NewPodRestorer(),
"jobs": restorers.NewJobRestorer(),
"jobs": restorers.NewJobRestorer(logger),
}

return restore.NewKubernetesRestorer(
Expand Down
39 changes: 18 additions & 21 deletions pkg/controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,13 @@ type backupController struct {
backupService cloudprovider.BackupService
bucket string
pvProviderExists bool

lister listers.BackupLister
listerSynced cache.InformerSynced
client arkv1client.BackupsGetter
syncHandler func(backupName string) error
queue workqueue.RateLimitingInterface

clock clock.Clock
logger *logrus.Logger
lister listers.BackupLister
listerSynced cache.InformerSynced
client arkv1client.BackupsGetter
syncHandler func(backupName string) error
queue workqueue.RateLimitingInterface
clock clock.Clock
logger *logrus.Logger
}

func NewBackupController(
Expand All @@ -72,20 +70,19 @@ func NewBackupController(
backupService cloudprovider.BackupService,
bucket string,
pvProviderExists bool,
logger *logrus.Logger,
) Interface {
c := &backupController{
backupper: backupper,
backupService: backupService,
bucket: bucket,
pvProviderExists: pvProviderExists,

lister: backupInformer.Lister(),
listerSynced: backupInformer.Informer().HasSynced,
client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "backup"),

clock: &clock.RealClock{},
logger: logrus.New(),
lister: backupInformer.Lister(),
listerSynced: backupInformer.Informer().HasSynced,
client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "backup"),
clock: &clock.RealClock{},
logger: logger,
}

c.syncHandler = c.processBackup
Expand Down Expand Up @@ -210,7 +207,7 @@ func (controller *backupController) processBackup(key string) error {
return err
}

logContext.Info("Getting backup")
logContext.Debug("Getting backup")
backup, err := controller.lister.Backups(ns).Get(name)
if err != nil {
logContext.WithField("error", err).Info("Error getting backup")
Expand Down Expand Up @@ -270,7 +267,7 @@ func (controller *backupController) processBackup(key string) error {
logContext.WithFields(logrus.Fields{
"phase": backup.Status.Phase,
"error": err,
}).Info("error updating backup status")
}).Info("error updating Backup status")
return err
}
backup = updatedBackup
Expand All @@ -279,14 +276,14 @@ func (controller *backupController) processBackup(key string) error {
return nil
}

logContext.Info("running backup")
logContext.Info("Running backup")
// execution & upload of backup
if err := controller.runBackup(backup, controller.bucket); err != nil {
logContext.WithField("error", err).Info("backup failed")
backup.Status.Phase = api.BackupPhaseFailed
}

logContext.Info("updating backup's final status")
logContext.Info("Updating backup's final status")
if _, err = controller.client.Backups(ns).Update(backup); err != nil {
logContext.WithField("error", err).Info("error updating backup's final status")
}
Expand Down
19 changes: 9 additions & 10 deletions pkg/controller/backup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
core "k8s.io/client-go/testing"

testlogger "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -146,18 +147,15 @@ func TestProcessBackup(t *testing.T) {
},
}

// flag.Set("logtostderr", "true")
// flag.Set("v", "4")

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset()

backupper := &fakeBackupper{}

cloudBackups := &BackupService{}

sharedInformers := informers.NewSharedInformerFactory(client, 0)
var (
client = fake.NewSimpleClientset()
backupper = &fakeBackupper{}
cloudBackups = &BackupService{}
sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger, _ = testlogger.NewNullLogger()
)

c := NewBackupController(
sharedInformers.Ark().V1().Backups(),
Expand All @@ -166,6 +164,7 @@ func TestProcessBackup(t *testing.T) {
cloudBackups,
"bucket",
test.allowSnapshots,
logger,
).(*backupController)
c.clock = clock.NewFakeClock(time.Now())

Expand Down
29 changes: 20 additions & 9 deletions pkg/controller/backup_sync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,58 +20,69 @@ import (
"context"
"time"

"github.com/golang/glog"
"github.com/sirupsen/logrus"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/heptio/ark/pkg/cloudprovider"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/typed/ark/v1"
"github.com/heptio/ark/pkg/util/kube"
)

type backupSyncController struct {
client arkv1client.BackupsGetter
backupService cloudprovider.BackupService
bucket string
syncPeriod time.Duration
logger *logrus.Logger
}

func NewBackupSyncController(client arkv1client.BackupsGetter, backupService cloudprovider.BackupService, bucket string, syncPeriod time.Duration) Interface {
func NewBackupSyncController(
client arkv1client.BackupsGetter,
backupService cloudprovider.BackupService,
bucket string,
syncPeriod time.Duration,
logger *logrus.Logger,
) Interface {
if syncPeriod < time.Minute {
glog.Infof("Backup sync period %v is too short. Setting to 1 minute", syncPeriod)
logger.WithField("syncPeriod", syncPeriod).Info("Provided backup sync period is too short. Setting to 1 minute")
syncPeriod = time.Minute
}
return &backupSyncController{
client: client,
backupService: backupService,
bucket: bucket,
syncPeriod: syncPeriod,
logger: logger,
}
}

// Run is a blocking function that continually runs the object storage -> Ark API
// sync process according to the controller's syncPeriod. It will return when it
// receives on the ctx.Done() channel.
func (c *backupSyncController) Run(ctx context.Context, workers int) error {
glog.Info("Running backup sync controller")
c.logger.Info("Running backup sync controller")
wait.Until(c.run, c.syncPeriod, ctx.Done())
return nil
}

func (c *backupSyncController) run() {
glog.Info("Syncing backups from object storage")
c.logger.Info("Syncing backups from object storage")
backups, err := c.backupService.GetAllBackups(c.bucket)
if err != nil {
glog.Errorf("error listing backups: %v", err)
c.logger.WithField("error", err).Error("error listing backups")
return
}
glog.Infof("Found %d backups", len(backups))
c.logger.WithField("backupCount", len(backups)).Info("Got backups from object storage")

for _, cloudBackup := range backups {
glog.Infof("Syncing backup %s/%s", cloudBackup.Namespace, cloudBackup.Name)
logContext := c.logger.WithField("backup", kube.NamespaceAndName(cloudBackup))
logContext.Info("Syncing backup")

cloudBackup.ResourceVersion = ""
if _, err := c.client.Backups(cloudBackup.Namespace).Create(cloudBackup); err != nil && !errors.IsAlreadyExists(err) {
glog.Errorf("error syncing backup %s/%s from object storage: %v", cloudBackup.Namespace, cloudBackup.Name, err)
logContext.WithField("error", err).Error("Error syncing backup from object storaga")
}
}
}
9 changes: 7 additions & 2 deletions pkg/controller/backup_sync_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

testlogger "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"

core "k8s.io/client-go/testing"
Expand Down Expand Up @@ -55,14 +56,18 @@ func TestBackupSyncControllerRun(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
bs := &BackupService{}
client := fake.NewSimpleClientset()
var (
bs = &BackupService{}
client = fake.NewSimpleClientset()
logger, _ = testlogger.NewNullLogger()
)

c := NewBackupSyncController(
client.ArkV1(),
bs,
"bucket",
time.Duration(0),
logger,
).(*backupSyncController)

bs.On("GetAllBackups", "bucket").Return(test.cloudBackups, test.getAllBackupsError)
Expand Down
Loading

0 comments on commit 616976e

Please sign in to comment.