Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Skip multi sync if previously failed
Browse files Browse the repository at this point in the history
`fluxsync.Sync()` applies everything as a multidoc. In case this fails
we retry one by one. In that case, the next attempt will likely fail too
and therefore we won't attempt the multidoc in the first place.
  • Loading branch information
rndstr committed Oct 2, 2018
1 parent 35f3379 commit 231a0d3
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 19 deletions.
2 changes: 1 addition & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Cluster interface {
SomeControllers([]flux.ResourceID) ([]Controller, error)
Ping() error
Export() ([]byte, error)
Sync(SyncDef) error
Sync(SyncDef, bool) error
PublicSSHKey(regenerate bool) (ssh.PublicKey, error)
}

Expand Down
4 changes: 2 additions & 2 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *Cluster) AllControllers(namespace string) (res []cluster.Controller, er

// Sync performs the given actions on resources. Operations are
// asynchronous, but serialised.
func (c *Cluster) Sync(spec cluster.SyncDef) error {
func (c *Cluster) Sync(spec cluster.SyncDef, likelyErrors bool) error {
logger := log.With(c.logger, "method", "Sync")

cs := makeChangeSet()
Expand Down Expand Up @@ -229,7 +229,7 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error {

c.mu.Lock()
defer c.mu.Unlock()
if applyErrs := c.applier.apply(logger, cs); len(applyErrs) > 0 {
if applyErrs := c.applier.apply(logger, cs, likelyErrors); len(applyErrs) > 0 {
errs = append(errs, applyErrs...)
}

Expand Down
22 changes: 14 additions & 8 deletions cluster/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (c *changeSet) stage(cmd string, o *apiObject) {

// Applier is something that will apply a changeset to the cluster.
type Applier interface {
apply(log.Logger, changeSet) cluster.SyncError
apply(log.Logger, changeSet, bool) cluster.SyncError
}

type Kubectl struct {
Expand Down Expand Up @@ -113,19 +113,25 @@ func (objs applyOrder) Less(i, j int) bool {
return ranki < rankj
}

func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncError) {
func (c *Kubectl) apply(logger log.Logger, cs changeSet, likelyErrors bool) (errs cluster.SyncError) {
f := func(objs []*apiObject, cmd string, args ...string) {
if len(objs) == 0 {
return
}
logger.Log("cmd", cmd, "args", strings.Join(args, " "), "count", len(objs))
args = append(args, cmd)
if err := c.doCommand(logger, makeMultidoc(objs), args...); err != nil {
for _, obj := range objs {
r := bytes.NewReader(obj.Bytes())
if err := c.doCommand(logger, r, args...); err != nil {
errs = append(errs, cluster.ResourceError{obj.Resource, err})
}

// Only do the whole bunch if we don't expect some of the
// resources to fail.
if !likelyErrors {
if err := c.doCommand(logger, makeMultidoc(objs), args...); err == nil {
return // success
}
}
for _, obj := range objs {
r := bytes.NewReader(obj.Bytes())
if err := c.doCommand(logger, r, args...); err != nil {
errs = append(errs, cluster.ResourceError{obj.Resource, err})
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (m *Mock) Export() ([]byte, error) {
return m.ExportFunc()
}

func (m *Mock) Sync(c SyncDef) error {
func (m *Mock) Sync(c SyncDef, likelyErrors bool) error {
return m.SyncFunc(c)
}

Expand Down
5 changes: 3 additions & 2 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,11 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
return errors.Wrap(err, "loading resources from repo")
}

likelyErrors := len(d.syncErrors.errs) > 0
syncErrors := make(map[flux.ResourceID]error)
var resourceErrors []event.ResourceError
// TODO supply deletes argument from somewhere (command-line?)
if err := fluxsync.Sync(logger, d.Manifests, allResources, d.Cluster, false); err != nil {
if err := fluxsync.Sync(logger, d.Manifests, allResources, d.Cluster, false, likelyErrors); err != nil {
logger.Log("err", err)
switch syncerr := err.(type) {
case cluster.SyncError:
Expand All @@ -212,7 +213,7 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
}
// Since fluxsync.Sync() applies *all* resources we replace
// all errors here. If there is a recurring issue it will
// show up every time when they sync is run.
// show up every time when the sync is run.
d.syncErrors.mu.Lock()
d.syncErrors.errs = syncErrors
d.syncErrors.mu.Unlock()
Expand Down
5 changes: 3 additions & 2 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
)

// Sync synchronises the cluster to the files in a directory
func Sync(logger log.Logger, m cluster.Manifests, repoResources map[string]resource.Resource, clus cluster.Cluster, deletes bool) error {
func Sync(logger log.Logger, m cluster.Manifests, repoResources map[string]resource.Resource, clus cluster.Cluster,
deletes, likelyErrors bool) error {
// Get a map of resources defined in the cluster
clusterBytes, err := clus.Export()

Expand Down Expand Up @@ -42,7 +43,7 @@ func Sync(logger log.Logger, m cluster.Manifests, repoResources map[string]resou
prepareSyncApply(logger, clusterResources, id, res, &sync)
}

return clus.Sync(sync)
return clus.Sync(sync, likelyErrors)
}

func prepareSyncDelete(logger log.Logger, repoResources map[string]resource.Resource, id string, res resource.Resource, sync *cluster.SyncDef) {
Expand Down
6 changes: 3 additions & 3 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestSync(t *testing.T) {
t.Fatal(err)
}

if err := Sync(log.NewNopLogger(), manifests, resources, clus, true); err != nil {
if err := Sync(log.NewNopLogger(), manifests, resources, clus, true, false); err != nil {
t.Fatal(err)
}
checkClusterMatchesFiles(t, manifests, clus, checkout.Dir(), dirs)
Expand All @@ -60,7 +60,7 @@ func TestSync(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := Sync(log.NewNopLogger(), manifests, resources, clus, true); err != nil {
if err := Sync(log.NewNopLogger(), manifests, resources, clus, true, false); err != nil {
t.Fatal(err)
}
checkClusterMatchesFiles(t, manifests, clus, checkout.Dir(), dirs)
Expand Down Expand Up @@ -200,7 +200,7 @@ type syncCluster struct {
resources map[string][]byte
}

func (p *syncCluster) Sync(def cluster.SyncDef) error {
func (p *syncCluster) Sync(def cluster.SyncDef, likelyErrors bool) error {
println("=== Syncing ===")
for _, action := range def.Actions {
if action.Delete != nil {
Expand Down

0 comments on commit 231a0d3

Please sign in to comment.