diff --git a/cluster/kubernetes/kubernetes.go b/cluster/kubernetes/kubernetes.go index dbb3d49bea..91a9ed325a 100644 --- a/cluster/kubernetes/kubernetes.go +++ b/cluster/kubernetes/kubernetes.go @@ -83,8 +83,8 @@ func isAddon(obj namespacedLabeled) bool { type Applier interface { doCommand(log.Logger, string, io.Reader) error - stage(string, *apiObject) - execute(log.Logger, cluster.SyncError) error + stage(string, string, *apiObject) + execute(log.Logger, cluster.SyncError) } // Cluster is a handle to a Kubernetes API server. @@ -195,26 +195,32 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error { errs := cluster.SyncError{} for _, action := range spec.Actions { - if len(action.Delete) > 0 { - obj, err := definitionObj(action.Delete) - if err == nil { - c.applier.stage("delete", obj) - } else { - errs[action.ResourceID] = err + stages := []struct { + b []byte + cmd string + }{ + {action.Delete, "delete"}, + {action.Apply, "apply"}, + } + for _, stage := range stages { + if len(stage.b) == 0 { continue } - } - if len(action.Apply) > 0 { - obj, err := definitionObj(action.Apply) + obj, err := definitionObj(stage.b) + id := action.ResourceID if err == nil { - c.applier.stage("apply", obj) + c.applier.stage(stage.cmd, id, obj) } else { - errs[action.ResourceID] = err - continue + errs[id] = err + break } } } - return c.applier.execute(logger, errs) + c.applier.execute(logger, errs) + if len(errs) != 0 { + return errs + } + return nil } func (c *Cluster) Ping() error { diff --git a/cluster/kubernetes/kubernetes_test.go b/cluster/kubernetes/kubernetes_test.go index 970e697edf..7e1a874e3f 100644 --- a/cluster/kubernetes/kubernetes_test.go +++ b/cluster/kubernetes/kubernetes_test.go @@ -196,18 +196,14 @@ func (m *mockApplier) doCommand(_ log.Logger, command string, _ io.Reader) error } } -func (m *mockApplier) execute(_ log.Logger, errs cluster.SyncError) error { +func (m *mockApplier) execute(_ log.Logger, errs cluster.SyncError) { for _, cmd := range cmds { if len(m.objs[cmd]) > 0 { if err := m.doCommand(nil, cmd, nil); err != nil { - return err + errs[cmd] = err } } } - if len(errs) != 0 { - return errs - } - return nil } func deploymentDef(name string) []byte { diff --git a/cluster/kubernetes/release.go b/cluster/kubernetes/release.go index 6350237cc9..aeea88db70 100644 --- a/cluster/kubernetes/release.go +++ b/cluster/kubernetes/release.go @@ -58,7 +58,7 @@ func (c *Kubectl) connectArgs() []string { return args } -func (c *Kubectl) execute(logger log.Logger, errs cluster.SyncError) error { +func (c *Kubectl) execute(logger log.Logger, errs cluster.SyncError) { defer c.changeSet.clear() for _, cmd := range cmds { @@ -70,14 +70,14 @@ func (c *Kubectl) execute(logger log.Logger, errs cluster.SyncError) error { if err := c.doCommand(logger, cmd, buf); err != nil { // TODO: fallback to one-by-one applying - errs[cmd] = err + for _, obj := range c.objs[cmd] { + r := bytes.NewReader(obj.bytes) + if err := c.doCommand(logger, cmd, r); err != nil { + errs[obj.Metadata.Name] = err + } + } } } - - if len(errs) != 0 { - return errs - } - return nil } func (c *Kubectl) doCommand(logger log.Logger, command string, r io.Reader) error { @@ -104,22 +104,27 @@ func (c *Kubectl) kubectlCommand(args ...string) *exec.Cmd { } type changeSet struct { - objs map[string][]*apiObject + objs map[string][]obj } -func (c *changeSet) stage(cmd string, obj *apiObject) { +func (c *changeSet) stage(cmd, id string, o *apiObject) { if c.objs == nil { - c.objs = make(map[string][]*apiObject) + c.objs = make(map[string][]obj) } - c.objs[cmd] = append(c.objs[cmd], obj) + c.objs[cmd] = append(c.objs[cmd], obj{id, o}) } func (c *changeSet) clear() { if c.objs == nil { - c.objs = make(map[string][]*apiObject) + c.objs = make(map[string][]obj) return } - for cmd, _ := range c.objs { + for cmd := range c.objs { c.objs[cmd] = nil } } + +type obj struct { + id string + *apiObject +}