diff --git a/cluster/cluster.go b/cluster/cluster.go index 83c5ec0d6..7ebbe0110 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -70,6 +70,9 @@ type Controller struct { Antecedent flux.ResourceID Labels map[string]string Rollout RolloutStatus + // Errors during the recurring sync from the Git repository to the + // cluster will surface here. + SyncError error Containers ContainersOrExcuse } diff --git a/cluster/kubernetes/kubernetes.go b/cluster/kubernetes/kubernetes.go index 1c395019d..eee280fd4 100644 --- a/cluster/kubernetes/kubernetes.go +++ b/cluster/kubernetes/kubernetes.go @@ -96,6 +96,11 @@ type Cluster struct { logger log.Logger sshKeyRing ssh.KeyRing + // syncErrors keeps a record of all per-resource errors during + // the sync from Git repo to the cluster. + syncErrors map[flux.ResourceID]error + muSyncErrors sync.RWMutex + nsWhitelist []string nsWhitelistLogged map[string]bool // to keep track of whether we've logged a problem with seeing a whitelisted ns @@ -146,6 +151,9 @@ func (c *Cluster) SomeControllers(ids []flux.ResourceID) (res []cluster.Controll } if !isAddon(podController) { + c.muSyncErrors.RLock() + podController.syncError = c.syncErrors[id] + c.muSyncErrors.RUnlock() controllers = append(controllers, podController.toClusterController(id)) } } @@ -180,6 +188,9 @@ func (c *Cluster) AllControllers(namespace string) (res []cluster.Controller, er for _, podController := range podControllers { if !isAddon(podController) { id := flux.MakeResourceID(ns.Name, kind, podController.name) + c.muSyncErrors.RLock() + podController.syncError = c.syncErrors[id] + c.muSyncErrors.RUnlock() allControllers = append(allControllers, podController.toClusterController(id)) } } @@ -221,15 +232,30 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error { c.mu.Lock() defer c.mu.Unlock() - if applyErrs := c.applier.apply(logger, cs); len(applyErrs) > 0 { + c.muSyncErrors.RLock() + if applyErrs := c.applier.apply(logger, cs, c.syncErrors); len(applyErrs) > 0 { errs = append(errs, applyErrs...) } + c.muSyncErrors.RUnlock() // If `nil`, errs is a cluster.SyncError(nil) rather than error(nil) - if errs != nil { - return errs + if errs == nil { + return nil + } + + // It is expected that Cluster.Sync is invoked with *all* resources. + // Otherwise it will override previously recorded sync errors. + c.setSyncErrors(errs) + return errs +} + +func (c *Cluster) setSyncErrors(errs cluster.SyncError) { + c.muSyncErrors.Lock() + defer c.muSyncErrors.Unlock() + c.syncErrors = make(map[flux.ResourceID]error) + for _, e := range errs { + c.syncErrors[e.ResourceID()] = e.Error } - return nil } func (c *Cluster) Ping() error { diff --git a/cluster/kubernetes/resourcekinds.go b/cluster/kubernetes/resourcekinds.go index 48060fffc..409809106 100644 --- a/cluster/kubernetes/resourcekinds.go +++ b/cluster/kubernetes/resourcekinds.go @@ -49,6 +49,7 @@ type podController struct { name string status string rollout cluster.RolloutStatus + syncError error podTemplate apiv1.PodTemplateSpec } @@ -86,6 +87,7 @@ func (pc podController) toClusterController(resourceID flux.ResourceID) cluster. ID: resourceID, Status: pc.status, Rollout: pc.rollout, + SyncError: pc.syncError, Antecedent: antecedent, Labels: pc.GetLabels(), Containers: cluster.ContainersOrExcuse{Containers: clusterContainers, Excuse: excuse}, diff --git a/cluster/kubernetes/sync.go b/cluster/kubernetes/sync.go index c21887b3d..a2bd1b0f1 100644 --- a/cluster/kubernetes/sync.go +++ b/cluster/kubernetes/sync.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" + "github.com/weaveworks/flux" "github.com/weaveworks/flux/cluster" ) @@ -30,7 +31,7 @@ func (c *changeSet) stage(cmd string, o *apiObject) { // Applier is something that will apply a changeset to the cluster. type Applier interface { - apply(log.Logger, changeSet) cluster.SyncError + apply(log.Logger, changeSet, map[flux.ResourceID]error) cluster.SyncError } type Kubectl struct { @@ -113,21 +114,40 @@ func (objs applyOrder) Less(i, j int) bool { return ranki < rankj } -func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncError) { +func (c *Kubectl) apply(logger log.Logger, cs changeSet, errored map[flux.ResourceID]error) (errs cluster.SyncError) { f := func(objs []*apiObject, cmd string, args ...string) { if len(objs) == 0 { return } logger.Log("cmd", cmd, "args", strings.Join(args, " "), "count", len(objs)) args = append(args, cmd) - if err := c.doCommand(logger, makeMultidoc(objs), args...); err != nil { + + var multi, single []*apiObject + if len(errored) == 0 { + multi = objs + } else { for _, obj := range objs { - r := bytes.NewReader(obj.Bytes()) - if err := c.doCommand(logger, r, args...); err != nil { - errs = append(errs, cluster.ResourceError{obj.Resource, err}) + if _, ok := errored[obj.ResourceID()]; ok { + // Resources that errored before shall be applied separately + single = append(single, obj) + } else { + // everything else will be tried in a multidoc apply. + multi = append(multi, obj) } } } + + if len(multi) > 0 { + if err := c.doCommand(logger, makeMultidoc(multi), args...); err != nil { + single = append(single, multi...) + } + } + for _, obj := range single { + r := bytes.NewReader(obj.Bytes()) + if err := c.doCommand(logger, r, args...); err != nil { + errs = append(errs, cluster.ResourceError{obj.Resource, err}) + } + } } // When deleting objects, the only real concern is that we don't diff --git a/cluster/kubernetes/sync_test.go b/cluster/kubernetes/sync_test.go index 8ae9066fc..b3c09cc44 100644 --- a/cluster/kubernetes/sync_test.go +++ b/cluster/kubernetes/sync_test.go @@ -15,7 +15,7 @@ type mockApplier struct { commandRun bool } -func (m *mockApplier) apply(_ log.Logger, c changeSet) cluster.SyncError { +func (m *mockApplier) apply(_ log.Logger, c changeSet, errored map[flux.ResourceID]error) cluster.SyncError { if len(c.objs) != 0 { m.commandRun = true } @@ -69,7 +69,7 @@ func TestSyncMalformed(t *testing.T) { err := kube.Sync(cluster.SyncDef{ Actions: []cluster.SyncAction{ cluster.SyncAction{ - Apply: rsc{"id", []byte("garbage")}, + Apply: rsc{"default:deployment/trash", []byte("garbage")}, }, }, }) diff --git a/daemon/errors.go b/daemon/errors.go index 33fa03f5d..7291cbb26 100644 --- a/daemon/errors.go +++ b/daemon/errors.go @@ -2,11 +2,18 @@ package daemon import ( "fmt" + "sync" + "github.com/weaveworks/flux" fluxerr "github.com/weaveworks/flux/errors" "github.com/weaveworks/flux/job" ) +type SyncErrors struct { + errs map[flux.ResourceID]error + mu sync.Mutex +} + func manifestLoadError(reason error) error { return &fluxerr.Error{ Type: fluxerr.User, diff --git a/daemon/loop.go b/daemon/loop.go index 8a6ac71a1..87d954959 100644 --- a/daemon/loop.go +++ b/daemon/loop.go @@ -191,14 +191,14 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) { return errors.Wrap(err, "loading resources from repo") } - var syncErrors []event.ResourceError + var resourceErrors []event.ResourceError // TODO supply deletes argument from somewhere (command-line?) - if err := fluxsync.Sync(d.Manifests, allResources, d.Cluster, false, logger); err != nil { + if err := fluxsync.Sync(logger, d.Manifests, allResources, d.Cluster, false); err != nil { logger.Log("err", err) switch syncerr := err.(type) { case cluster.SyncError: for _, e := range syncerr { - syncErrors = append(syncErrors, event.ResourceError{ + resourceErrors = append(resourceErrors, event.ResourceError{ ID: e.ResourceID(), Path: e.Source(), Error: e.Error.Error(), @@ -368,7 +368,7 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) { Commits: cs, InitialSync: initialSync, Includes: includes, - Errors: syncErrors, + Errors: resourceErrors, }, }); err != nil { logger.Log("err", err) diff --git a/sync/sync.go b/sync/sync.go index 27cd56c84..e0df168a3 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -10,7 +10,8 @@ import ( ) // Sync synchronises the cluster to the files in a directory -func Sync(m cluster.Manifests, repoResources map[string]resource.Resource, clus cluster.Cluster, deletes bool, logger log.Logger) error { +func Sync(logger log.Logger, m cluster.Manifests, repoResources map[string]resource.Resource, clus cluster.Cluster, + deletes bool) error { // Get a map of resources defined in the cluster clusterBytes, err := clus.Export() diff --git a/sync/sync_test.go b/sync/sync_test.go index ec3daff1c..e9d62dcf9 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -40,7 +40,7 @@ func TestSync(t *testing.T) { t.Fatal(err) } - if err := Sync(manifests, resources, clus, true, log.NewNopLogger()); err != nil { + if err := Sync(log.NewNopLogger(), manifests, resources, clus, true); err != nil { t.Fatal(err) } checkClusterMatchesFiles(t, manifests, clus, checkout.Dir(), dirs) @@ -60,7 +60,7 @@ func TestSync(t *testing.T) { if err != nil { t.Fatal(err) } - if err := Sync(manifests, resources, clus, true, log.NewNopLogger()); err != nil { + if err := Sync(log.NewNopLogger(), manifests, resources, clus, true); err != nil { t.Fatal(err) } checkClusterMatchesFiles(t, manifests, clus, checkout.Dir(), dirs)