Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #872 from weaveworks/kubectl-megapply
Browse files Browse the repository at this point in the history
Apply all resources in a single kubectl call
  • Loading branch information
Sam Broughton committed Jan 2, 2018
2 parents 687e1f7 + 8c3f720 commit 0972ee9
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 446 deletions.
122 changes: 65 additions & 57 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubernetes
import (
"bytes"
"fmt"
"sync"

k8syaml "github.com/ghodss/yaml"
"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -42,19 +43,15 @@ type extendedClient struct {

type apiObject struct {
bytes []byte
Version string `yaml:"apiVersion"`
Kind string `yaml:"kind"`
Metadata struct {
Name string `yaml:"name"`
Namespace string `yaml:"namespace"`
} `yaml:"metadata"`
}

func (obj *apiObject) namespaceOrDefault() string {
if obj.Metadata.Namespace == "" {
return "default"
}
return obj.Metadata.Namespace
func (o *apiObject) hasNamespace() bool {
return o.Metadata.Namespace != ""
}

// --- add-ons
Expand Down Expand Up @@ -88,56 +85,67 @@ func isAddon(obj namespacedLabeled) bool {

// --- /add ons

type changeSet struct {
nsObjs map[string][]obj
noNsObjs map[string][]obj
}

func makeChangeSet() changeSet {
return changeSet{
nsObjs: make(map[string][]obj),
noNsObjs: make(map[string][]obj),
}
}

func (c *changeSet) stage(cmd, id string, o *apiObject) {
if o.hasNamespace() {
c.nsObjs[cmd] = append(c.nsObjs[cmd], obj{id, o})
} else {
c.noNsObjs[cmd] = append(c.noNsObjs[cmd], obj{id, o})
}
}

type obj struct {
id string
*apiObject
}

type Applier interface {
Delete(logger log.Logger, def *apiObject) error
Apply(logger log.Logger, def *apiObject) error
apply(log.Logger, changeSet, cluster.SyncError)
}

// Cluster is a handle to a Kubernetes API server.
// (Typically, this code is deployed into the same cluster.)
type Cluster struct {
client extendedClient
applier Applier
actionc chan func()
version string // string response for the version command.
logger log.Logger
sshKeyRing ssh.KeyRing

mu sync.Mutex
}

// NewCluster returns a usable cluster. Host should be of the form
// "http://hostname:8080".
// NewCluster returns a usable cluster.
func NewCluster(clientset k8sclient.Interface,
applier Applier,
sshKeyRing ssh.KeyRing,
logger log.Logger) (*Cluster, error) {
logger log.Logger) *Cluster {

c := &Cluster{
client: extendedClient{
clientset.Discovery(),
clientset.Core(),
clientset.Extensions(),
clientset.AppsV1beta1(),
clientset.BatchV2alpha1()},
clientset.BatchV2alpha1(),
},
applier: applier,
actionc: make(chan func()),
logger: logger,
sshKeyRing: sshKeyRing,
}

go c.loop()
return c, nil
}

// Stop terminates the goroutine that serializes and executes requests against
// the cluster. A stopped cluster cannot be restarted.
func (c *Cluster) Stop() {
close(c.actionc)
}

func (c *Cluster) loop() {
for f := range c.actionc {
f()
}
return c
}

// --- cluster.Cluster
Expand Down Expand Up @@ -207,40 +215,40 @@ func (c *Cluster) AllControllers(namespace string) (res []cluster.Controller, er
// Sync performs the given actions on resources. Operations are
// asynchronous, but serialised.
func (c *Cluster) Sync(spec cluster.SyncDef) error {
errc := make(chan error)
logger := log.With(c.logger, "method", "Sync")
c.actionc <- func() {
errs := cluster.SyncError{}
for _, action := range spec.Actions {
logger := log.With(logger, "resource", action.ResourceID)
if len(action.Delete) > 0 {
obj, err := definitionObj(action.Delete)
if err == nil {
err = c.applier.Delete(logger, obj)
}
if err != nil {
errs[action.ResourceID] = err
continue
}

cs := makeChangeSet()
errs := cluster.SyncError{}
for _, action := range spec.Actions {
stages := []struct {
b []byte
cmd string
}{
{action.Delete, "delete"},
{action.Apply, "apply"},
}
for _, stage := range stages {
if len(stage.b) == 0 {
continue
}
if len(action.Apply) > 0 {
obj, err := definitionObj(action.Apply)
if err == nil {
err = c.applier.Apply(logger, obj)
}
if err != nil {
errs[action.ResourceID] = err
continue
}
obj, err := definitionObj(stage.b)
id := action.ResourceID
if err == nil {
cs.stage(stage.cmd, id, obj)
} else {
errs[id] = err
break
}
}
if len(errs) > 0 {
errc <- errs
} else {
errc <- nil
}
}
return <-errc

c.mu.Lock()
defer c.mu.Unlock()
c.applier.apply(logger, cs, errs)
if len(errs) != 0 {
return errs
}
return nil
}

func (c *Cluster) Ping() error {
Expand Down
Loading

0 comments on commit 0972ee9

Please sign in to comment.