Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend full upgrade to any version upgrade of non-HA ES #5408

Merged
merged 5 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions pkg/controller/elasticsearch/driver/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ func (d *defaultDriver) handleUpgrades(

var deletedPods []corev1.Pod

is8xVersionUpgrade, err := is8xVersionUpgrade(d.ES)
isVersionUpgrade, err := isVersionUpgrade(d.ES)
if err != nil {
return results.WithError(err)
}
shouldDoFullRestartUpgrade := isNonHACluster(currentPods, expectedMasters) && is8xVersionUpgrade
shouldDoFullRestartUpgrade := isNonHACluster(currentPods, expectedMasters) && isVersionUpgrade
if shouldDoFullRestartUpgrade {
// unconditional full cluster upgrade
deletedPods, err = run(upgrade.DeleteAll)
Expand Down Expand Up @@ -206,9 +206,8 @@ func isNonHACluster(actualPods []corev1.Pod, expectedMasters []string) bool {
return len(actualMasters) <= 2
}

// is8xVersionUpgrade returns true if a version upgrade involves 8.x which displays different behaviour during version upgrades
// than previous Elastic Stack versions.
func is8xVersionUpgrade(es esv1.Elasticsearch) (bool, error) {
// isVersionUpgrade returns true if a spec change contains a version upgrade.
func isVersionUpgrade(es esv1.Elasticsearch) (bool, error) {
specVersion, err := version.Parse(es.Spec.Version)
if err != nil {
return false, err
Expand All @@ -217,7 +216,7 @@ func is8xVersionUpgrade(es esv1.Elasticsearch) (bool, error) {
if err != nil {
return false, err
}
return specVersion.Major == 8 && specVersion.GT(statusVersion), nil
return specVersion.GT(statusVersion), nil
}

func healthyPods(
Expand Down
19 changes: 5 additions & 14 deletions pkg/controller/elasticsearch/driver/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,24 +306,15 @@ func Test_isNonHACluster(t *testing.T) {
}
}

func Test_is8xVersionUpgrade(t *testing.T) {
func Test_isVersionUpgrade(t *testing.T) {
tests := []struct {
name string
es esv1.Elasticsearch
want bool
wantErr bool
}{
{
name: "not an 8x upgrade",
es: esv1.Elasticsearch{
Spec: esv1.ElasticsearchSpec{Version: "7.17.0"},
Status: esv1.ElasticsearchStatus{Version: "6.8.0"},
},
want: false,
wantErr: false,
},
{
name: "8x upgrade",
name: "upgrade",
es: esv1.Elasticsearch{
Spec: esv1.ElasticsearchSpec{Version: "8.0.0"},
Status: esv1.ElasticsearchStatus{Version: "7.17.0"},
Expand All @@ -332,7 +323,7 @@ func Test_is8xVersionUpgrade(t *testing.T) {
wantErr: false,
},
{
name: "8x minor upgrade",
name: "minor upgrade",
es: esv1.Elasticsearch{
Spec: esv1.ElasticsearchSpec{Version: "8.1.0"},
Status: esv1.ElasticsearchStatus{Version: "8.0.0"},
Expand Down Expand Up @@ -371,11 +362,11 @@ func Test_is8xVersionUpgrade(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := is8xVersionUpgrade(tt.es)
got, err := isVersionUpgrade(tt.es)
if tt.wantErr != (err != nil) {
t.Errorf("wantErr %v got %v", tt.wantErr, err)
}
assert.Equalf(t, tt.want, got, "is8xVersionUpgrade(%v)", tt.es)
assert.Equalf(t, tt.want, got, "isVersionUpgrade(%v)", tt.es)
})
}
}
54 changes: 32 additions & 22 deletions pkg/controller/elasticsearch/version/zen2/initial_master_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
"github.com/elastic/cloud-on-k8s/pkg/utils/set"
)

const (
Expand Down Expand Up @@ -74,8 +75,8 @@ func shouldSetInitialMasterNodes(es esv1.Elasticsearch, k8sClient k8s.Client, no
if !bootstrap.AnnotatedForBootstrap(es) {
return true, nil
}
// - we're upgrading (effectively restarting) a single zen1 master to zen2
return singleZen1MasterUpgrade(k8sClient, es, nodeSpecResources)
// - we're upgrading (effectively restarting) a non-HA zen1 cluster to zen2
return nonHAZen1MasterUpgrade(k8sClient, es, nodeSpecResources)
}

// RemoveZen2BootstrapAnnotation removes the initialMasterNodesAnnotation (if set) once zen2 is bootstrapped
Expand Down Expand Up @@ -123,47 +124,56 @@ func patchInitialMasterNodesConfig(nodeSpecResources nodespec.ResourcesList, ini
return nil
}

// singleZen1MasterUpgrade returns true if expected nodes in nodeSpecResources will lead to upgrading
// the single zen1-compatible master node currently running in the es cluster.
func singleZen1MasterUpgrade(c k8s.Client, es esv1.Elasticsearch, nodeSpecResources nodespec.ResourcesList) (bool, error) {
// looking for a single master node...
// nonHAZen1MasterUpgrade returns true if expected nodes in nodeSpecResources will lead to upgrading
// the one or two zen1-compatible master nodes currently running in the es cluster.
// As we upgrade all nodes at once in one or two node clusters initial master nodes needs to be set as there is no
// existing cluster to join once all v6 nodes have been terminated.
func nonHAZen1MasterUpgrade(c k8s.Client, es esv1.Elasticsearch, nodeSpecResources nodespec.ResourcesList) (bool, error) {
// looking for a non-HA master node setup...
masters, err := sset.GetActualMastersForCluster(c, es)
if err != nil {
return false, err
}
if len(masters) != 1 {
if len(masters) > 2 {
return false, nil
}
currentMaster := masters[0]
// ...not compatible with zen2...
v, err := label.ExtractVersion(currentMaster.Labels)
if err != nil {
return false, err
}
if versionCompatibleWithZen2(v) {
return false, nil

currentMasterNames := set.Make()
for _, currentMaster := range masters {
currentMasterNames.Add(currentMaster.Name)
// ...not compatible with zen2...
v, err := label.ExtractVersion(currentMaster.Labels)
if err != nil {
return false, err
}
// at least one master is already on Zen 2
if versionCompatibleWithZen2(v) {
return false, nil
}
}

// ...that will be replaced
var targetMasters []string
targetMasters := set.Make()
for _, res := range nodeSpecResources {
if label.IsMasterNodeSet(res.StatefulSet) {
targetMasters = append(targetMasters, sset.PodNames(res.StatefulSet)...)
targetMasters.MergeWith(set.Make(sset.PodNames(res.StatefulSet)...))
}
}
if len(targetMasters) == 0 {
if targetMasters.Count() == 0 {
return false, nil
}
if len(targetMasters) > 1 {
if targetMasters.Count() > 2 {
// Covers the case where the user is upgrading to zen2 + adding more masters simultaneously.
// Additional masters will get created before the existing one gets upgraded/restarted.
return false, nil
}
if targetMasters[0] != currentMaster.Name {
// Covers the case where the existing master is replaced by another master in a different NodeSet.

if currentMasterNames.Diff(targetMasters).Count() > 0 {
// Covers the case where the existing masters are replaced by other masters in a different NodeSet.
// The new master will be created before the existing one gets removed.
return false, nil
}
// single zen1 master, will be replaced by a single zen2 master with the same name
// one or two zen1 masters, will be replaced by a one or two zen2 master with the same name
return true, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,18 @@ func TestSetupInitialMasterNodes(t *testing.T) {
sset.TestPod{Name: "es-masterdata-2", Master: true, Data: true, Version: "6.8.5", ClusterName: "es", Namespace: "ns"}.BuildPtr(),
}

v7Master := sset.TestPod{Name: "es-master-1", Master: true, Data: false, Version: "7.5.0", ClusterName: "es", Namespace: "ns"}.BuildPtr()

expectedv7resources := func() nodespec.ResourcesList {
return nodespec.ResourcesList{
{StatefulSet: sset.TestSset{Name: "es-master", Version: "7.5.0", Replicas: 3, Master: true, Data: false, ClusterName: "es"}.Build(), Config: settings.NewCanonicalConfig()},
{StatefulSet: sset.TestSset{Name: "es-masterdata", Version: "7.5.0", Replicas: 3, Master: true, Data: true, ClusterName: "es"}.Build(), Config: settings.NewCanonicalConfig()},
{StatefulSet: sset.TestSset{Name: "es-data", Version: "7.5.0", Replicas: 3, Master: false, Data: true, ClusterName: "es"}.Build(), Config: settings.NewCanonicalConfig()},
}
}
expectedv7SingleMasterResources := func(ssetName string) nodespec.ResourcesList {
expectedv7MasterResources := func(replicas int32, ssetName string) nodespec.ResourcesList {
return nodespec.ResourcesList{
{StatefulSet: sset.TestSset{Name: ssetName, Version: "7.5.0", Replicas: 1, Master: true, Data: false, ClusterName: "es"}.Build(), Config: settings.NewCanonicalConfig()},
{StatefulSet: sset.TestSset{Name: ssetName, Version: "7.5.0", Replicas: replicas, Master: true, Data: false, ClusterName: "es"}.Build(), Config: settings.NewCanonicalConfig()},
}
}
tests := []struct {
Expand Down Expand Up @@ -138,7 +140,7 @@ func TestSetupInitialMasterNodes(t *testing.T) {
{
name: "upgrade single v6 master to single v7 master: should set cluster.initial_master_nodes",
es: withAnnotations(esv7(), map[string]string{bootstrap.ClusterUUIDAnnotationName: "uuid"}),
nodeSpecResources: expectedv7SingleMasterResources("es-master"),
nodeSpecResources: expectedv7MasterResources(1, "es-master"),
k8sClient: k8s.NewFakeClient(v6Masters[0]), // one existing v6 master running
expectedConfigs: []settings.CanonicalConfig{
// master nodes config
Expand All @@ -148,12 +150,33 @@ func TestSetupInitialMasterNodes(t *testing.T) {
},
expectedAnnotation: "es-master-0",
},
{
name: "upgrade two v6 master to two v7 masters: should set cluster.initial_master_nodes",
es: withAnnotations(esv7(), map[string]string{bootstrap.ClusterUUIDAnnotationName: "uuid"}),
nodeSpecResources: expectedv7MasterResources(2, "es-master"),
k8sClient: k8s.NewFakeClient(v6Masters[0], v6Masters[1]), // two existing v6 master running
expectedConfigs: []settings.CanonicalConfig{
// master nodes config
{CanonicalConfig: commonsettings.MustCanonicalConfig(map[string][]string{
esv1.ClusterInitialMasterNodes: {"es-master-0", "es-master-1"},
})},
},
expectedAnnotation: "es-master-0,es-master-1",
},
{
name: "upgrade mixed v6/v7 master to two v7 masters: should not set cluster.initial_master_nodes",
es: withAnnotations(esv7(), map[string]string{bootstrap.ClusterUUIDAnnotationName: "uuid"}),
nodeSpecResources: expectedv7MasterResources(2, "es-master"),
k8sClient: k8s.NewFakeClient(v6Masters[0], v7Master), // mixed masters running
expectedConfigs: []settings.CanonicalConfig{settings.NewCanonicalConfig()},
expectedAnnotation: "",
},
{
name: "upgrade single v6 master to single v7 master in a different statefulset: should not set " +
"cluster.initial_master_nodes since the new master will be created before the old one is removed",
es: withAnnotations(esv7(), map[string]string{bootstrap.ClusterUUIDAnnotationName: "uuid"}),
nodeSpecResources: expectedv7SingleMasterResources("es-different-sset"), // v7 master in a different sset
k8sClient: k8s.NewFakeClient(v6Masters[0]), // one existing v6 master running
nodeSpecResources: expectedv7MasterResources(1, "es-different-sset"), // v7 master in a different sset
k8sClient: k8s.NewFakeClient(v6Masters[0]), // one existing v6 master running
expectedConfigs: []settings.CanonicalConfig{settings.NewCanonicalConfig()},
expectedAnnotation: "",
},
Expand Down
15 changes: 14 additions & 1 deletion pkg/utils/set/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

package set

import "sort"
import (
"sort"
)

type StringSet map[string]struct{}

Expand Down Expand Up @@ -41,6 +43,17 @@ func (set StringSet) Has(s string) (exists bool) {
return
}

func (set StringSet) Diff(other StringSet) StringSet {
diff := Make()
for str := range set {
if other.Has(str) {
continue
}
diff.Add(str)
}
return diff
}

func (set StringSet) AsSlice() sort.StringSlice {
count := set.Count()
if count == 0 {
Expand Down
47 changes: 47 additions & 0 deletions pkg/utils/set/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,50 @@ func TestStringSet_MergeWith(t *testing.T) {
})
}
}

func TestStringSet_Diff(t *testing.T) {
tests := []struct {
name string
set StringSet
other StringSet
want StringSet
}{
{
name: "both nil",
set: nil,
other: nil,
want: StringSet{},
},
{
name: "other nil",
set: StringSet{"a": {}},
other: nil,
want: StringSet{"a": {}},
},
{
name: "same elements",
set: StringSet{"a": {}},
other: StringSet{"a": {}},
want: StringSet{},
},
{
name: "b not in other",
set: StringSet{"a": {}, "b": {}},
other: StringSet{"a": {}},
want: StringSet{"b": {}},
},
{
name: "b not in set",
set: StringSet{"a": {}},
other: StringSet{"a": {}, "b": {}},
want: StringSet{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.set.Diff(tt.other); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Diff() = %v, want %v", got, tt.want)
}
})
}
}
8 changes: 8 additions & 0 deletions test/e2e/es/version_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func TestVersionUpgradeTwoNodes68xTo7x(t *testing.T) {
// due to minimum_master_nodes=2, the cluster is unavailable while the first master is upgraded
initial := elasticsearch.NewBuilder("test-version-up-2-68x-to-7x").
WithVersion(srcVersion).
// 8x non-HA upgrades cannot honour a change budget with maxUnavailable 1 because we are rolling both nodes at once
// setting the change budget accordingly allows this test to pass as otherwise the changeBudgetWatcher step would
// fail as it would detect a change budget violation.
WithChangeBudget(2, 2).
WithESMasterDataNodes(2, elasticsearch.DefaultResources)

mutated := initial.WithNoESTopology().
Expand Down Expand Up @@ -173,6 +177,10 @@ func TestVersionUpgradeTwoNodesToLatest7x(t *testing.T) {

initial := elasticsearch.NewBuilder("test-version-up-2-to-7x").
WithVersion(srcVersion).
// 8x non-HA upgrades cannot honour a change budget with maxUnavailable 1 because we are rolling both nodes at once
// setting the change budget accordingly allows this test to pass as otherwise the changeBudgetWatcher step would
// fail as it would detect a change budget violation.
WithChangeBudget(2, 2).
WithESMasterDataNodes(2, elasticsearch.DefaultResources)

mutated := initial.WithNoESTopology().
Expand Down