Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restic repository management fixes #1367

Merged
merged 7 commits into from
Apr 17, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/controller/restic_repository_controller.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2018 the Velero contributors.
Copyright 2018, 2019 the Velero contributors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,7 +74,7 @@ func NewResticRepositoryController(
},
)

c.resyncPeriod = 30 * time.Minute
c.resyncPeriod = 5 * time.Minute
c.resyncFunc = c.enqueueAllRepositories

return c
Expand Down
67 changes: 43 additions & 24 deletions pkg/restic/repository_ensurer.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2018 the Velero contributors.
Copyright 2018, 2019 the Velero contributors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand All @@ -39,8 +40,8 @@ type repositoryEnsurer struct {
repoLister velerov1listers.ResticRepositoryLister
repoClient velerov1client.ResticRepositoriesGetter

readyChansLock sync.Mutex
readyChans map[string]chan *velerov1api.ResticRepository
repoChansLock sync.Mutex
repoChans map[string]chan *velerov1api.ResticRepository

// repoLocksMu synchronizes reads/writes to the repoLocks map itself
// since maps are not threadsafe.
Expand All @@ -58,7 +59,7 @@ func newRepositoryEnsurer(repoInformer velerov1informers.ResticRepositoryInforme
log: log,
repoLister: repoInformer.Lister(),
repoClient: repoClient,
readyChans: make(map[string]chan *velerov1api.ResticRepository),
repoChans: make(map[string]chan *velerov1api.ResticRepository),
repoLocks: make(map[repoKey]*sync.Mutex),
}

Expand All @@ -68,20 +69,27 @@ func newRepositoryEnsurer(repoInformer velerov1informers.ResticRepositoryInforme
oldObj := old.(*velerov1api.ResticRepository)
newObj := upd.(*velerov1api.ResticRepository)

if oldObj.Status.Phase != velerov1api.ResticRepositoryPhaseReady && newObj.Status.Phase == velerov1api.ResticRepositoryPhaseReady {
r.readyChansLock.Lock()
defer r.readyChansLock.Unlock()
// we're only interested in phase-changing updates
if oldObj.Status.Phase == newObj.Status.Phase {
return
}

// we're only interested in updates where the updated object is either Ready or NotReady
if newObj.Status.Phase != velerov1api.ResticRepositoryPhaseReady && newObj.Status.Phase != velerov1api.ResticRepositoryPhaseNotReady {
return
}

key := repoLabels(newObj.Spec.VolumeNamespace, newObj.Spec.BackupStorageLocation).String()
readyChan, ok := r.readyChans[key]
if !ok {
log.Errorf("No ready channel found for repository %s/%s", newObj.Namespace, newObj.Name)
return
}
r.repoChansLock.Lock()
defer r.repoChansLock.Unlock()

readyChan <- newObj
delete(r.readyChans, key)
key := repoLabels(newObj.Spec.VolumeNamespace, newObj.Spec.BackupStorageLocation).String()
repoChan, ok := r.repoChans[key]
if !ok {
log.Debugf("No ready channel found for repository %s/%s", newObj.Namespace, newObj.Name)
return
}

repoChan <- newObj
},
},
)
Expand Down Expand Up @@ -132,7 +140,7 @@ func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam
}
if len(repos) == 1 {
if repos[0].Status.Phase != velerov1api.ResticRepositoryPhaseReady {
return nil, errors.New("restic repository is not ready")
return nil, errors.Errorf("restic repository is not ready: %s", repos[0].Status.Message)
}

log.Debug("Ready repository found")
Expand All @@ -155,27 +163,38 @@ func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam
},
}

readyChan := r.getReadyChan(selector.String())
defer close(readyChan)
repoChan := r.getRepoChan(selector.String())
defer func() {
delete(r.repoChans, selector.String())
close(repoChan)
}()

if _, err := r.repoClient.ResticRepositories(namespace).Create(repo); err != nil {
return nil, errors.Wrapf(err, "unable to create restic repository resource")
}

select {
// repositories should become either ready or not ready quickly if they're
// newly created.
case <-time.After(time.Minute):
return nil, errors.New("timed out waiting for restic repository to become ready")
case <-ctx.Done():
return nil, errors.New("timed out waiting for restic repository to become ready")
case res := <-readyChan:
case res := <-repoChan:
if res.Status.Phase == velerov1api.ResticRepositoryPhaseNotReady {
return nil, errors.Errorf("restic repository is not ready: %s", res.Status.Message)
}

return res, nil
}
}

func (r *repositoryEnsurer) getReadyChan(name string) chan *velerov1api.ResticRepository {
r.readyChansLock.Lock()
defer r.readyChansLock.Unlock()
func (r *repositoryEnsurer) getRepoChan(name string) chan *velerov1api.ResticRepository {
r.repoChansLock.Lock()
defer r.repoChansLock.Unlock()

r.readyChans[name] = make(chan *velerov1api.ResticRepository)
return r.readyChans[name]
r.repoChans[name] = make(chan *velerov1api.ResticRepository)
return r.repoChans[name]
}

func (r *repositoryEnsurer) repoLock(volumeNamespace, backupLocation string) *sync.Mutex {
Expand Down