Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1410 from weaveworks/issue/1010-sync-errors-in-li…
Browse files Browse the repository at this point in the history
…stservices

Return sync errors in ListServices
  • Loading branch information
rndstr committed Oct 26, 2018
2 parents 749646b + 39e2da1 commit 706adb1
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 19 deletions.
3 changes: 3 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type Controller struct {
Antecedent flux.ResourceID
Labels map[string]string
Rollout RolloutStatus
// Errors during the recurring sync from the Git repository to the
// cluster will surface here.
SyncError error

Containers ContainersOrExcuse
}
Expand Down
34 changes: 30 additions & 4 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ type Cluster struct {
logger log.Logger
sshKeyRing ssh.KeyRing

// syncErrors keeps a record of all per-resource errors during
// the sync from Git repo to the cluster.
syncErrors map[flux.ResourceID]error
muSyncErrors sync.RWMutex

nsWhitelist []string
nsWhitelistLogged map[string]bool // to keep track of whether we've logged a problem with seeing a whitelisted ns

Expand Down Expand Up @@ -146,6 +151,9 @@ func (c *Cluster) SomeControllers(ids []flux.ResourceID) (res []cluster.Controll
}

if !isAddon(podController) {
c.muSyncErrors.RLock()
podController.syncError = c.syncErrors[id]
c.muSyncErrors.RUnlock()
controllers = append(controllers, podController.toClusterController(id))
}
}
Expand Down Expand Up @@ -180,6 +188,9 @@ func (c *Cluster) AllControllers(namespace string) (res []cluster.Controller, er
for _, podController := range podControllers {
if !isAddon(podController) {
id := flux.MakeResourceID(ns.Name, kind, podController.name)
c.muSyncErrors.RLock()
podController.syncError = c.syncErrors[id]
c.muSyncErrors.RUnlock()
allControllers = append(allControllers, podController.toClusterController(id))
}
}
Expand Down Expand Up @@ -221,15 +232,30 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error {

c.mu.Lock()
defer c.mu.Unlock()
if applyErrs := c.applier.apply(logger, cs); len(applyErrs) > 0 {
c.muSyncErrors.RLock()
if applyErrs := c.applier.apply(logger, cs, c.syncErrors); len(applyErrs) > 0 {
errs = append(errs, applyErrs...)
}
c.muSyncErrors.RUnlock()

// If `nil`, errs is a cluster.SyncError(nil) rather than error(nil)
if errs != nil {
return errs
if errs == nil {
return nil
}

// It is expected that Cluster.Sync is invoked with *all* resources.
// Otherwise it will override previously recorded sync errors.
c.setSyncErrors(errs)
return errs
}

func (c *Cluster) setSyncErrors(errs cluster.SyncError) {
c.muSyncErrors.Lock()
defer c.muSyncErrors.Unlock()
c.syncErrors = make(map[flux.ResourceID]error)
for _, e := range errs {
c.syncErrors[e.ResourceID()] = e.Error
}
return nil
}

func (c *Cluster) Ping() error {
Expand Down
2 changes: 2 additions & 0 deletions cluster/kubernetes/resourcekinds.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type podController struct {
name string
status string
rollout cluster.RolloutStatus
syncError error
podTemplate apiv1.PodTemplateSpec
}

Expand Down Expand Up @@ -86,6 +87,7 @@ func (pc podController) toClusterController(resourceID flux.ResourceID) cluster.
ID: resourceID,
Status: pc.status,
Rollout: pc.rollout,
SyncError: pc.syncError,
Antecedent: antecedent,
Labels: pc.GetLabels(),
Containers: cluster.ContainersOrExcuse{Containers: clusterContainers, Excuse: excuse},
Expand Down
32 changes: 26 additions & 6 deletions cluster/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
)

Expand All @@ -30,7 +31,7 @@ func (c *changeSet) stage(cmd string, o *apiObject) {

// Applier is something that will apply a changeset to the cluster.
type Applier interface {
apply(log.Logger, changeSet) cluster.SyncError
apply(log.Logger, changeSet, map[flux.ResourceID]error) cluster.SyncError
}

type Kubectl struct {
Expand Down Expand Up @@ -113,21 +114,40 @@ func (objs applyOrder) Less(i, j int) bool {
return ranki < rankj
}

func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncError) {
func (c *Kubectl) apply(logger log.Logger, cs changeSet, errored map[flux.ResourceID]error) (errs cluster.SyncError) {
f := func(objs []*apiObject, cmd string, args ...string) {
if len(objs) == 0 {
return
}
logger.Log("cmd", cmd, "args", strings.Join(args, " "), "count", len(objs))
args = append(args, cmd)
if err := c.doCommand(logger, makeMultidoc(objs), args...); err != nil {

var multi, single []*apiObject
if len(errored) == 0 {
multi = objs
} else {
for _, obj := range objs {
r := bytes.NewReader(obj.Bytes())
if err := c.doCommand(logger, r, args...); err != nil {
errs = append(errs, cluster.ResourceError{obj.Resource, err})
if _, ok := errored[obj.ResourceID()]; ok {
// Resources that errored before shall be applied separately
single = append(single, obj)
} else {
// everything else will be tried in a multidoc apply.
multi = append(multi, obj)
}
}
}

if len(multi) > 0 {
if err := c.doCommand(logger, makeMultidoc(multi), args...); err != nil {
single = append(single, multi...)
}
}
for _, obj := range single {
r := bytes.NewReader(obj.Bytes())
if err := c.doCommand(logger, r, args...); err != nil {
errs = append(errs, cluster.ResourceError{obj.Resource, err})
}
}
}

// When deleting objects, the only real concern is that we don't
Expand Down
4 changes: 2 additions & 2 deletions cluster/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type mockApplier struct {
commandRun bool
}

func (m *mockApplier) apply(_ log.Logger, c changeSet) cluster.SyncError {
func (m *mockApplier) apply(_ log.Logger, c changeSet, errored map[flux.ResourceID]error) cluster.SyncError {
if len(c.objs) != 0 {
m.commandRun = true
}
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestSyncMalformed(t *testing.T) {
err := kube.Sync(cluster.SyncDef{
Actions: []cluster.SyncAction{
cluster.SyncAction{
Apply: rsc{"id", []byte("garbage")},
Apply: rsc{"default:deployment/trash", []byte("garbage")},
},
},
})
Expand Down
7 changes: 7 additions & 0 deletions daemon/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@ package daemon

import (
"fmt"
"sync"

"github.com/weaveworks/flux"
fluxerr "github.com/weaveworks/flux/errors"
"github.com/weaveworks/flux/job"
)

type SyncErrors struct {
errs map[flux.ResourceID]error
mu sync.Mutex
}

func manifestLoadError(reason error) error {
return &fluxerr.Error{
Type: fluxerr.User,
Expand Down
8 changes: 4 additions & 4 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,14 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
return errors.Wrap(err, "loading resources from repo")
}

var syncErrors []event.ResourceError
var resourceErrors []event.ResourceError
// TODO supply deletes argument from somewhere (command-line?)
if err := fluxsync.Sync(d.Manifests, allResources, d.Cluster, false, logger); err != nil {
if err := fluxsync.Sync(logger, d.Manifests, allResources, d.Cluster, false); err != nil {
logger.Log("err", err)
switch syncerr := err.(type) {
case cluster.SyncError:
for _, e := range syncerr {
syncErrors = append(syncErrors, event.ResourceError{
resourceErrors = append(resourceErrors, event.ResourceError{
ID: e.ResourceID(),
Path: e.Source(),
Error: e.Error.Error(),
Expand Down Expand Up @@ -368,7 +368,7 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
Commits: cs,
InitialSync: initialSync,
Includes: includes,
Errors: syncErrors,
Errors: resourceErrors,
},
}); err != nil {
logger.Log("err", err)
Expand Down
3 changes: 2 additions & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
)

// Sync synchronises the cluster to the files in a directory
func Sync(m cluster.Manifests, repoResources map[string]resource.Resource, clus cluster.Cluster, deletes bool, logger log.Logger) error {
func Sync(logger log.Logger, m cluster.Manifests, repoResources map[string]resource.Resource, clus cluster.Cluster,
deletes bool) error {
// Get a map of resources defined in the cluster
clusterBytes, err := clus.Export()

Expand Down
4 changes: 2 additions & 2 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestSync(t *testing.T) {
t.Fatal(err)
}

if err := Sync(manifests, resources, clus, true, log.NewNopLogger()); err != nil {
if err := Sync(log.NewNopLogger(), manifests, resources, clus, true); err != nil {
t.Fatal(err)
}
checkClusterMatchesFiles(t, manifests, clus, checkout.Dir(), dirs)
Expand All @@ -60,7 +60,7 @@ func TestSync(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := Sync(manifests, resources, clus, true, log.NewNopLogger()); err != nil {
if err := Sync(log.NewNopLogger(), manifests, resources, clus, true); err != nil {
t.Fatal(err)
}
checkClusterMatchesFiles(t, manifests, clus, checkout.Dir(), dirs)
Expand Down

0 comments on commit 706adb1

Please sign in to comment.