Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Apply all resources in a single kubectl call #872

Merged
merged 19 commits into from
Jan 2, 2018
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 64 additions & 57 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubernetes
import (
"bytes"
"fmt"
"sync"

k8syaml "github.com/ghodss/yaml"
"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -42,19 +43,15 @@ type extendedClient struct {

type apiObject struct {
bytes []byte
Version string `yaml:"apiVersion"`
Kind string `yaml:"kind"`
Metadata struct {
Name string `yaml:"name"`
Namespace string `yaml:"namespace"`
} `yaml:"metadata"`
}

func (obj *apiObject) namespaceOrDefault() string {
if obj.Metadata.Namespace == "" {
return "default"
}
return obj.Metadata.Namespace
func (o *apiObject) hasNamespace() bool {
return o.Metadata.Namespace != ""
}

// --- add-ons
Expand Down Expand Up @@ -88,56 +85,67 @@ func isAddon(obj namespacedLabeled) bool {

// --- /add ons

type changeSet struct {
nsObjs map[string][]obj
noNsObjs map[string][]obj
}

func makeChangeSet() changeSet {
return changeSet{
nsObjs: make(map[string][]obj),
noNsObjs: make(map[string][]obj),
}
}

func (c *changeSet) stage(cmd, id string, o *apiObject) {
if o.hasNamespace() {
c.nsObjs[cmd] = append(c.nsObjs[cmd], obj{id, o})
} else {
c.noNsObjs[cmd] = append(c.noNsObjs[cmd], obj{id, o})
}
}

type obj struct {
id string
*apiObject
}

type Applier interface {
Delete(logger log.Logger, def *apiObject) error
Apply(logger log.Logger, def *apiObject) error
apply(log.Logger, changeSet, cluster.SyncError)
}

// Cluster is a handle to a Kubernetes API server.
// (Typically, this code is deployed into the same cluster.)
type Cluster struct {
client extendedClient
applier Applier
actionc chan func()
version string // string response for the version command.
logger log.Logger
sshKeyRing ssh.KeyRing

mu sync.Mutex
}

// NewCluster returns a usable cluster. Host should be of the form
// "http://hostname:8080".
// NewCluster returns a usable cluster.
func NewCluster(clientset k8sclient.Interface,
applier Applier,
sshKeyRing ssh.KeyRing,
logger log.Logger) (*Cluster, error) {
logger log.Logger) *Cluster {

c := &Cluster{
client: extendedClient{
clientset.Discovery(),
clientset.Core(),
clientset.Extensions(),
clientset.AppsV1beta1(),
clientset.BatchV2alpha1()},
clientset.BatchV2alpha1(),
},
applier: applier,
actionc: make(chan func()),
logger: logger,
sshKeyRing: sshKeyRing,
}

go c.loop()
return c, nil
}

// Stop terminates the goroutine that serializes and executes requests against
// the cluster. A stopped cluster cannot be restarted.
func (c *Cluster) Stop() {
close(c.actionc)
}

func (c *Cluster) loop() {
for f := range c.actionc {
f()
}
return c
}

// --- cluster.Cluster
Expand Down Expand Up @@ -207,40 +215,39 @@ func (c *Cluster) AllControllers(namespace string) (res []cluster.Controller, er
// Sync performs the given actions on resources. Operations are
// asynchronous, but serialised.
func (c *Cluster) Sync(spec cluster.SyncDef) error {
errc := make(chan error)
c.mu.Lock()
defer c.mu.Unlock()
logger := log.With(c.logger, "method", "Sync")
c.actionc <- func() {
errs := cluster.SyncError{}
for _, action := range spec.Actions {
logger := log.With(logger, "resource", action.ResourceID)
if len(action.Delete) > 0 {
obj, err := definitionObj(action.Delete)
if err == nil {
err = c.applier.Delete(logger, obj)
}
if err != nil {
errs[action.ResourceID] = err
continue
}

cs := makeChangeSet()
errs := cluster.SyncError{}
for _, action := range spec.Actions {
stages := []struct {
b []byte
cmd string
}{
{action.Delete, "delete"},
{action.Apply, "apply"},
}
for _, stage := range stages {
if len(stage.b) == 0 {
continue
}
if len(action.Apply) > 0 {
obj, err := definitionObj(action.Apply)
if err == nil {
err = c.applier.Apply(logger, obj)
}
if err != nil {
errs[action.ResourceID] = err
continue
}
obj, err := definitionObj(stage.b)
id := action.ResourceID
if err == nil {
cs.stage(stage.cmd, id, obj)
} else {
errs[id] = err
break
}
}
if len(errs) > 0 {
errc <- errs
} else {
errc <- nil
}
}
return <-errc
c.applier.apply(logger, cs, errs)

This comment was marked as abuse.

This comment was marked as abuse.

if len(errs) != 0 {
return errs
}
return nil
}

func (c *Cluster) Ping() error {
Expand Down
Loading