Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Only apply lone resource if errored before
Browse files Browse the repository at this point in the history
Still does multidoc kubectl action for all resources that did _not_ fail
before.
  • Loading branch information
rndstr committed Oct 10, 2018
1 parent 231a0d3 commit c818454
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Cluster interface {
SomeControllers([]flux.ResourceID) ([]Controller, error)
Ping() error
Export() ([]byte, error)
Sync(SyncDef, bool) error
Sync(SyncDef, map[flux.ResourceID]error) error
PublicSSHKey(regenerate bool) (ssh.PublicKey, error)
}

Expand Down
4 changes: 2 additions & 2 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *Cluster) AllControllers(namespace string) (res []cluster.Controller, er

// Sync performs the given actions on resources. Operations are
// asynchronous, but serialised.
func (c *Cluster) Sync(spec cluster.SyncDef, likelyErrors bool) error {
func (c *Cluster) Sync(spec cluster.SyncDef, errored map[flux.ResourceID]error) error {
logger := log.With(c.logger, "method", "Sync")

cs := makeChangeSet()
Expand Down Expand Up @@ -229,7 +229,7 @@ func (c *Cluster) Sync(spec cluster.SyncDef, likelyErrors bool) error {

c.mu.Lock()
defer c.mu.Unlock()
if applyErrs := c.applier.apply(logger, cs, likelyErrors); len(applyErrs) > 0 {
if applyErrs := c.applier.apply(logger, cs, errored); len(applyErrs) > 0 {
errs = append(errs, applyErrs...)
}

Expand Down
30 changes: 22 additions & 8 deletions cluster/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
)

Expand All @@ -30,7 +31,7 @@ func (c *changeSet) stage(cmd string, o *apiObject) {

// Applier is something that will apply a changeset to the cluster.
type Applier interface {
apply(log.Logger, changeSet, bool) cluster.SyncError
apply(log.Logger, changeSet, map[flux.ResourceID]error) cluster.SyncError
}

type Kubectl struct {
Expand Down Expand Up @@ -113,22 +114,35 @@ func (objs applyOrder) Less(i, j int) bool {
return ranki < rankj
}

func (c *Kubectl) apply(logger log.Logger, cs changeSet, likelyErrors bool) (errs cluster.SyncError) {
func (c *Kubectl) apply(logger log.Logger, cs changeSet, errored map[flux.ResourceID]error) (errs cluster.SyncError) {
f := func(objs []*apiObject, cmd string, args ...string) {
if len(objs) == 0 {
return
}
logger.Log("cmd", cmd, "args", strings.Join(args, " "), "count", len(objs))
args = append(args, cmd)

// Only do the whole bunch if we don't expect some of the
// resources to fail.
if !likelyErrors {
if err := c.doCommand(logger, makeMultidoc(objs), args...); err == nil {
return // success
var multi, single []*apiObject
if len(errored) == 0 {
multi = objs
} else {
for _, obj := range objs {
if _, ok := errored[obj.ResourceID()]; ok {
// Resources that errored before shall be applied separately
single = append(single, obj)
} else {
// everything else will be tried in a multidoc apply.
multi = append(multi, obj)
}
}
}
for _, obj := range objs {

if len(multi) > 0 {
if err := c.doCommand(logger, makeMultidoc(multi), args...); err != nil {
single = append(single, multi...)
}
}
for _, obj := range single {
r := bytes.NewReader(obj.Bytes())
if err := c.doCommand(logger, r, args...); err != nil {
errs = append(errs, cluster.ResourceError{obj.Resource, err})
Expand Down
6 changes: 3 additions & 3 deletions cluster/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type mockApplier struct {
commandRun bool
}

func (m *mockApplier) apply(_ log.Logger, c changeSet) cluster.SyncError {
func (m *mockApplier) apply(_ log.Logger, c changeSet, errored map[flux.ResourceID]error) cluster.SyncError {
if len(c.objs) != 0 {
m.commandRun = true
}
Expand Down Expand Up @@ -56,7 +56,7 @@ func setup(t *testing.T) (*Cluster, *mockApplier) {

func TestSyncNop(t *testing.T) {
kube, mock := setup(t)
if err := kube.Sync(cluster.SyncDef{}); err != nil {
if err := kube.Sync(cluster.SyncDef{}, nil); err != nil {
t.Errorf("%#v", err)
}
if mock.commandRun {
Expand All @@ -72,7 +72,7 @@ func TestSyncMalformed(t *testing.T) {
Apply: rsc{"id", []byte("garbage")},
},
},
})
}, nil)
if err == nil {
t.Error("expected error because malformed resource def, but got nil")
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (m *Mock) Export() ([]byte, error) {
return m.ExportFunc()
}

func (m *Mock) Sync(c SyncDef, likelyErrors bool) error {
func (m *Mock) Sync(c SyncDef, errored map[flux.ResourceID]error) error {
return m.SyncFunc(c)
}

Expand Down
3 changes: 1 addition & 2 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,10 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
return errors.Wrap(err, "loading resources from repo")
}

likelyErrors := len(d.syncErrors.errs) > 0
syncErrors := make(map[flux.ResourceID]error)
var resourceErrors []event.ResourceError
// TODO supply deletes argument from somewhere (command-line?)
if err := fluxsync.Sync(logger, d.Manifests, allResources, d.Cluster, false, likelyErrors); err != nil {
if err := fluxsync.Sync(logger, d.Manifests, allResources, d.Cluster, false, d.syncErrors.errs); err != nil {
logger.Log("err", err)
switch syncerr := err.(type) {
case cluster.SyncError:
Expand Down
5 changes: 3 additions & 2 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"github.com/go-kit/kit/log"
"github.com/pkg/errors"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/policy"
"github.com/weaveworks/flux/resource"
)

// Sync synchronises the cluster to the files in a directory
func Sync(logger log.Logger, m cluster.Manifests, repoResources map[string]resource.Resource, clus cluster.Cluster,
deletes, likelyErrors bool) error {
deletes bool, errored map[flux.ResourceID]error) error {
// Get a map of resources defined in the cluster
clusterBytes, err := clus.Export()

Expand Down Expand Up @@ -43,7 +44,7 @@ func Sync(logger log.Logger, m cluster.Manifests, repoResources map[string]resou
prepareSyncApply(logger, clusterResources, id, res, &sync)
}

return clus.Sync(sync, likelyErrors)
return clus.Sync(sync, errored)
}

func prepareSyncDelete(logger log.Logger, repoResources map[string]resource.Resource, id string, res resource.Resource, sync *cluster.SyncDef) {
Expand Down
7 changes: 4 additions & 3 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// "github.com/weaveworks/flux"
"context"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/cluster/kubernetes"
"github.com/weaveworks/flux/cluster/kubernetes/testfiles"
Expand All @@ -40,7 +41,7 @@ func TestSync(t *testing.T) {
t.Fatal(err)
}

if err := Sync(log.NewNopLogger(), manifests, resources, clus, true, false); err != nil {
if err := Sync(log.NewNopLogger(), manifests, resources, clus, true, nil); err != nil {
t.Fatal(err)
}
checkClusterMatchesFiles(t, manifests, clus, checkout.Dir(), dirs)
Expand All @@ -60,7 +61,7 @@ func TestSync(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := Sync(log.NewNopLogger(), manifests, resources, clus, true, false); err != nil {
if err := Sync(log.NewNopLogger(), manifests, resources, clus, true, nil); err != nil {
t.Fatal(err)
}
checkClusterMatchesFiles(t, manifests, clus, checkout.Dir(), dirs)
Expand Down Expand Up @@ -200,7 +201,7 @@ type syncCluster struct {
resources map[string][]byte
}

func (p *syncCluster) Sync(def cluster.SyncDef, likelyErrors bool) error {
func (p *syncCluster) Sync(def cluster.SyncDef, errored map[flux.ResourceID]error) error {
println("=== Syncing ===")
for _, action := range def.Actions {
if action.Delete != nil {
Expand Down

0 comments on commit c818454

Please sign in to comment.