diff --git a/pkg/apis/elasticsearch/v1/status.go b/pkg/apis/elasticsearch/v1/status.go index 874547e63b..ff6805d287 100644 --- a/pkg/apis/elasticsearch/v1/status.go +++ b/pkg/apis/elasticsearch/v1/status.go @@ -111,6 +111,7 @@ type ConditionType string const ( ElasticsearchIsReachable ConditionType = "ElasticsearchIsReachable" ReconciliationComplete ConditionType = "ReconciliationComplete" + ResourcesAwareManagement ConditionType = "ResourcesAwareManagement" RunningDesiredVersion ConditionType = "RunningDesiredVersion" ) diff --git a/pkg/controller/elasticsearch/client/client.go b/pkg/controller/elasticsearch/client/client.go index 235d967179..1f330b4b86 100644 --- a/pkg/controller/elasticsearch/client/client.go +++ b/pkg/controller/elasticsearch/client/client.go @@ -57,6 +57,7 @@ type Role struct { type Client interface { AllocationSetter AutoscalingClient + DesiredNodesClient ShardLister LicenseClient // Close idle connections in the underlying http client. diff --git a/pkg/controller/elasticsearch/client/desired_nodes.go b/pkg/controller/elasticsearch/client/desired_nodes.go new file mode 100644 index 0000000000..5d53c4ebbd --- /dev/null +++ b/pkg/controller/elasticsearch/client/desired_nodes.go @@ -0,0 +1,84 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package client + +import ( + "context" + "fmt" + + "github.com/elastic/cloud-on-k8s/pkg/controller/common/version" +) + +var desiredNodesMinVersion = version.MinFor(8, 3, 0) + +type DesiredNodesClient interface { + IsDesiredNodesSupported() bool + // GetLatestDesiredNodes returns the latest desired nodes. + GetLatestDesiredNodes(ctx context.Context) (LatestDesiredNodes, error) + // UpdateDesiredNodes updates the desired nodes API. + UpdateDesiredNodes(ctx context.Context, historyID string, version int64, desiredNodes DesiredNodes) error + // DeleteDesiredNodes deletes the desired nodes from the cluster state. + DeleteDesiredNodes(ctx context.Context) error +} + +type LatestDesiredNodes struct { + HistoryID string `json:"history_id"` + Version int64 `json:"version"` + DesiredNodes []DesiredNode `json:"nodes"` +} + +type DesiredNodes struct { + DesiredNodes []DesiredNode `json:"nodes"` +} + +type DesiredNode struct { + Settings map[string]interface{} `json:"settings"` + ProcessorsRange ProcessorsRange `json:"processors_range"` + Memory string `json:"memory"` + Storage string `json:"storage"` + NodeVersion string `json:"node_version"` +} + +type ProcessorsRange struct { + Min float64 `json:"min"` + Max float64 `json:"max,omitempty"` +} + +func (c *baseClient) GetLatestDesiredNodes(_ context.Context) (LatestDesiredNodes, error) { + return LatestDesiredNodes{}, c.desiredNodesNotAvailable() +} + +func (c *baseClient) UpdateDesiredNodes(_ context.Context, _ string, _ int64, _ DesiredNodes) error { + return c.desiredNodesNotAvailable() +} + +func (c *baseClient) DeleteDesiredNodes(_ context.Context) error { + return c.desiredNodesNotAvailable() +} + +func (c *baseClient) desiredNodesNotAvailable() error { + return fmt.Errorf("the desired nodes API is not available in Elasticsearch %s, it requires %s", c.version, desiredNodesMinVersion) +} + +func (c *baseClient) IsDesiredNodesSupported() bool { + return c.version.GTE(desiredNodesMinVersion) +} + +func (c *clientV8) GetLatestDesiredNodes(ctx context.Context) (LatestDesiredNodes, error) { + var latestDesiredNodes LatestDesiredNodes + err := c.get(ctx, "/_internal/desired_nodes/_latest", &latestDesiredNodes) + return latestDesiredNodes, err +} + +func (c *clientV8) UpdateDesiredNodes(ctx context.Context, historyID string, version int64, desiredNodes DesiredNodes) error { + return c.put( + ctx, + fmt.Sprintf("/_internal/desired_nodes/%s/%d", historyID, version), + &desiredNodes, nil) +} + +func (c *clientV8) DeleteDesiredNodes(ctx context.Context) error { + return c.delete(ctx, "/_internal/desired_nodes") +} diff --git a/pkg/controller/elasticsearch/driver/desired_nodes.go b/pkg/controller/elasticsearch/driver/desired_nodes.go new file mode 100644 index 0000000000..8469adf742 --- /dev/null +++ b/pkg/controller/elasticsearch/driver/desired_nodes.go @@ -0,0 +1,75 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package driver + +import ( + "context" + "errors" + "fmt" + + "go.elastic.co/apm" + corev1 "k8s.io/api/core/v1" + + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/tracing" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/version" + esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec" +) + +func (d *defaultDriver) updateDesiredNodes( + ctx context.Context, + esClient esclient.Client, + esReachable bool, + expectedResources nodespec.ResourcesList, +) *reconciler.Results { + span, ctx := apm.StartSpan(ctx, "update_desired_nodes", tracing.SpanTypeApp) + defer span.End() + results := &reconciler.Results{} + // We compute the desired nodes state to update the condition + var resourceNotAvailableErr *nodespec.ResourceNotAvailable + esVersion, err := version.Parse(d.ES.Spec.Version) + if err != nil { + return results.WithError(err) + } + nodes, requeue, err := expectedResources.ToDesiredNodes(ctx, d.Client, esVersion.FinalizeVersion()) + switch { + case err == nil: + d.ReconcileState.ReportCondition( + esv1.ResourcesAwareManagement, + corev1.ConditionTrue, + fmt.Sprintf("Successfully calculated compute and storage resources from Elasticsearch resource generation %d", d.ES.Generation), + ) + case errors.As(err, &resourceNotAvailableErr): + // It is not possible to build the desired node spec because of the Elasticsearch specification + d.ReconcileState.ReportCondition( + esv1.ResourcesAwareManagement, + corev1.ConditionFalse, + fmt.Sprintf("Cannot get compute and storage resources from Elasticsearch resource generation %d: %s", d.ES.Generation, err.Error()), + ) + // It is fine to continue, error is only reported through the condition. + // We should however clear the desired nodes API since we are in a degraded (not resources aware) mode. + if esReachable { + return results.WithError(esClient.DeleteDesiredNodes(ctx)) + } + return results.WithReconciliationState(defaultRequeue.WithReason("Desired nodes API must be cleared")) + default: + // Unknown error: not nil and not ResourceNotAvailable + d.ReconcileState.ReportCondition( + esv1.ResourcesAwareManagement, + corev1.ConditionUnknown, + fmt.Sprintf("Error while calculating compute and storage resources from Elasticsearch resource generation %d: %s", d.ES.Generation, err.Error()), + ) + return results.WithError(err) + } + if requeue { + results.WithReconciliationState(defaultRequeue.WithReason("Storage capacity is not available in all PVC statuses, requeue to refine the capacity reported in the desired nodes API")) + } + if esReachable { + return results.WithError(esClient.UpdateDesiredNodes(ctx, string(d.ES.UID), d.ES.Generation, esclient.DesiredNodes{DesiredNodes: nodes})) + } + return results.WithReconciliationState(defaultRequeue.WithReason("Waiting for Elasticsearch to be available to update the desired nodes API")) +} diff --git a/pkg/controller/elasticsearch/driver/desired_nodes_test.go b/pkg/controller/elasticsearch/driver/desired_nodes_test.go new file mode 100644 index 0000000000..ae7df6ada0 --- /dev/null +++ b/pkg/controller/elasticsearch/driver/desired_nodes_test.go @@ -0,0 +1,809 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package driver + +import ( + "bytes" + "context" + "errors" + "fmt" + "io/ioutil" + "net/http" + "regexp" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/utils/pointer" + + commonv1 "github.com/elastic/cloud-on-k8s/pkg/apis/common/v1" + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + common "github.com/elastic/cloud-on-k8s/pkg/controller/common/settings" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/version" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/reconcile" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" +) + +func Test_defaultDriver_updateDesiredNodes(t *testing.T) { + type args struct { + esReachable bool + esClientError bool + k8sClientError bool + } + type wantCondition struct { + status corev1.ConditionStatus + messages []string + } + type wantResult struct { + error bool + reason string + requeue bool + requeueAfter time.Duration + } + type want struct { + testdata string // expected captured request + deleteCalled bool + result wantResult + condition *wantCondition + } + tests := []struct { + name string + esBuilder esBuilder + args args + want want + }{ + { + name: "Happy path", + args: args{ + esReachable: true, + }, + esBuilder: newEs("8.3.0"). + withNodeSet( + nodeSet("master", 3). + withCPU("2222m", "3141m"). + withMemory("2333Mi", "2333Mi"). + withStorage("1Gi", "1Gi").pvcCreated(true). + withNodeCfg(map[string]interface{}{ + "node.roles": []string{"master"}, + "node.name": "${POD_NAME}", + "path.data": "/usr/share/elasticsearch/data", + "network.publish_host": "${POD_IP}", + "http.publish_host": "${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc", + "node.attr.k8s_node_name": "${NODE_NAME}", + }), + ).withNodeSet( + nodeSet("hot", 3). + withCPU("", "1"). // Setting only limits is also fine. + withMemory("", "4Gi"). + withStorage("10Gi", "50Gi").pvcCreated(true). + withNodeCfg(map[string]interface{}{ + "node.roles": []string{"data", "ingest"}, + "node.name": "${POD_NAME}", + "path.data": "/usr/share/elasticsearch/data", + "network.publish_host": "${POD_IP}", + "http.publish_host": "${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc", + "node.attr.k8s_node_name": "${NODE_NAME}", + }), + ), + want: want{ + result: wantResult{}, + testdata: "happy_path.json", + condition: &wantCondition{ + status: corev1.ConditionTrue, + messages: []string{"Successfully calculated compute and storage resources from Elasticsearch resource generation "}, + }, + }, + }, + { + name: "Discard prerelease and build number", + args: args{ + esReachable: true, + }, + esBuilder: newEs("8.3.0-SNAPSHOT"). + withNodeSet( + nodeSet("master", 3). + withCPU("2222m", "3141m"). + withMemory("2333Mi", "2333Mi"). + withStorage("1Gi", "1Gi").pvcCreated(true). + withNodeCfg(map[string]interface{}{ + "node.roles": []string{"master"}, + "node.name": "${POD_NAME}", + "path.data": "/usr/share/elasticsearch/data", + "network.publish_host": "${POD_IP}", + "http.publish_host": "${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc", + "node.attr.k8s_node_name": "${NODE_NAME}", + }), + ).withNodeSet( + nodeSet("hot", 3). + withCPU("", "1"). // Setting only limits is also fine. + withMemory("", "4Gi"). + withStorage("10Gi", "50Gi").pvcCreated(true). + withNodeCfg(map[string]interface{}{ + "node.roles": []string{"data", "ingest"}, + "node.name": "${POD_NAME}", + "path.data": "/usr/share/elasticsearch/data", + "network.publish_host": "${POD_IP}", + "http.publish_host": "${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc", + "node.attr.k8s_node_name": "${NODE_NAME}", + }), + ), + want: want{ + result: wantResult{}, + testdata: "happy_path.json", + condition: &wantCondition{ + status: corev1.ConditionTrue, + messages: []string{"Successfully calculated compute and storage resources from Elasticsearch resource generation "}, + }, + }, + }, + { + name: "Expected resources are calculated but Elasticsearch is not reachable", + args: args{ + esReachable: false, + }, + esBuilder: newEs("8.3.0"). + withNodeSet( + nodeSet("master", 3). + withCPU("2222m", "3141m"). + withMemory("2333Mi", "2333Mi"). + withStorage("1Gi", "1Gi").pvcCreated(true). + withNodeCfg(map[string]interface{}{ + "node.roles": []string{"master"}, + "node.name": "${POD_NAME}", + "path.data": "/usr/share/elasticsearch/data", + "network.publish_host": "${POD_IP}", + "http.publish_host": "${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc", + "node.attr.k8s_node_name": "${NODE_NAME}", + }), + ).withNodeSet( + nodeSet("hot", 3). + withCPU("", "1"). + withMemory("", "4Gi"). + withStorage("10Gi", "50Gi").pvcCreated(true). + withNodeCfg(map[string]interface{}{ + "node.roles": []string{"data", "ingest"}, + "node.name": "${POD_NAME}", + "path.data": "/usr/share/elasticsearch/data", + "network.publish_host": "${POD_IP}", + "http.publish_host": "${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc", + "node.attr.k8s_node_name": "${NODE_NAME}", + }), + ), + want: want{ + result: wantResult{ + requeueAfter: defaultRequeue.RequeueAfter, + requeue: defaultRequeue.Requeue, + reason: "Waiting for Elasticsearch to be available to update the desired nodes API", + }, + condition: &wantCondition{ + status: corev1.ConditionTrue, + messages: []string{"Successfully calculated compute and storage resources from Elasticsearch resource generation "}, + }, + }, + }, + { + name: "No PVC yet", + args: args{ + esReachable: true, + }, + esBuilder: newEs("8.3.0"). + withNodeSet( + nodeSet("master", 3). + withCPU("2222m", "3141m"). + withMemory("2333Mi", "2333Mi"). + withStorage("1Gi", ""). + pvcCreated(false). // PVC does not exist yet + withNodeCfg(map[string]interface{}{ + "node.roles": []string{"master"}, + "node.name": "${POD_NAME}", + "path.data": "/usr/share/elasticsearch/data", + "network.publish_host": "${POD_IP}", + "http.publish_host": "${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc", + "node.attr.k8s_node_name": "${NODE_NAME}", + }), + ).withNodeSet( + nodeSet("hot", 3). + withCPU("", "1"). + withMemory("", "4Gi"). + withStorage("50Gi", ""). + pvcCreated(false). // PVC does not exist yet + withNodeCfg(map[string]interface{}{ + "node.roles": []string{"data", "ingest"}, + "node.name": "${POD_NAME}", + "path.data": "/usr/share/elasticsearch/data", + "network.publish_host": "${POD_IP}", + "http.publish_host": "${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc", + "node.attr.k8s_node_name": "${NODE_NAME}", + }), + ), + want: want{ + result: wantResult{ + requeueAfter: defaultRequeue.RequeueAfter, + requeue: defaultRequeue.Requeue, // requeue is expected to get a more accurate storage capacity from the PVC status later + reason: "Storage capacity is not available in all PVC statuses, requeue to refine the capacity reported in the desired nodes API", + }, + testdata: "happy_path.json", + condition: &wantCondition{ + status: corev1.ConditionTrue, + messages: []string{"Successfully calculated compute and storage resources from Elasticsearch resource generation "}, + }, + }, + }, + { + name: "No capacity in PVC status", + args: args{ + esReachable: true, + }, + esBuilder: newEs("8.3.0"). + withNodeSet( + nodeSet("master", 3). + withCPU("2222m", "3141m"). + withMemory("2333Mi", "2333Mi"). + withStorage("1Gi", "" /* No capacity in PVC status */).pvcCreated(true). + withNodeCfg(map[string]interface{}{ + "node.roles": []string{"master"}, + "node.name": "${POD_NAME}", + "path.data": "/usr/share/elasticsearch/data", + "network.publish_host": "${POD_IP}", + "http.publish_host": "${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc", + "node.attr.k8s_node_name": "${NODE_NAME}", + }), + ).withNodeSet( + nodeSet("hot", 3). + withCPU("", "1"). + withMemory("", "4Gi"). + withStorage("50Gi", "" /* No capacity in PVC status */).pvcCreated(true). + withNodeCfg(map[string]interface{}{ + "node.roles": []string{"data", "ingest"}, + "node.name": "${POD_NAME}", + "path.data": "/usr/share/elasticsearch/data", + "network.publish_host": "${POD_IP}", + "http.publish_host": "${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc", + "node.attr.k8s_node_name": "${NODE_NAME}", + }), + ), + want: want{ + result: wantResult{ + requeueAfter: defaultRequeue.RequeueAfter, + requeue: defaultRequeue.Requeue, // requeue is expected to get a more accurate storage capacity from the PVC status later + reason: "Storage capacity is not available in all PVC statuses, requeue to refine the capacity reported in the desired nodes API", + }, + testdata: "happy_path.json", + condition: &wantCondition{ + status: corev1.ConditionTrue, + messages: []string{"Successfully calculated compute and storage resources from Elasticsearch resource generation "}, + }, + }, + }, + { + name: "Multi data path", + args: args{ + esReachable: true, + }, + esBuilder: newEs("8.3.0"). + withNodeSet( + nodeSet("default", 3). + withCPU("2", "4"). + withMemory("2Gi", "2Gi"). + withStorage("1Gi", "1Gi"). + withNodeCfg(map[string]interface{}{ + "path.data": []string{"/usr/share/elasticsearch/data1", "/usr/share/elasticsearch/data2"}, + }), + ), + want: want{ + result: wantResult{}, + deleteCalled: true, + condition: &wantCondition{ + status: corev1.ConditionFalse, + messages: []string{"Elasticsearch path.data must be a string, multiple paths is not supported"}, + }, + }, + }, + { + name: "Elasticsearch client returned an error", + args: args{ + esReachable: true, + esClientError: true, + }, + esBuilder: newEs("8.3.0"). + withNodeSet( + nodeSet("default", 3). + withCPU("2", "4"). + withMemory("2Gi", "2Gi"). + withStorage("1Gi", "1Gi"). + pvcCreated(true). + withNodeCfg(map[string]interface{}{ + "path.data": "/usr/share/elasticsearch/data", + }), + ), + want: want{ + result: wantResult{ + error: true, + reason: "elasticsearch client failed", + }, + condition: &wantCondition{ + status: corev1.ConditionTrue, + messages: []string{"Successfully calculated compute and storage resources from Elasticsearch resource generation "}, + }, + }, + }, + { + name: "Kubernetes client returned an error", + args: args{ + esReachable: true, + k8sClientError: true, + }, + esBuilder: newEs("8.3.0"). + withNodeSet( + nodeSet("default", 3). + withCPU("2", "4"). + withMemory("2Gi", "2Gi"). + withStorage("1Gi", "1Gi"). + pvcCreated(true). + withNodeCfg(map[string]interface{}{ + "path.data": "/usr/share/elasticsearch/data", + }), + ), + want: want{ + result: wantResult{ + error: true, + reason: "k8s client failed", + }, + condition: &wantCondition{ + status: corev1.ConditionUnknown, + messages: []string{"Error while calculating compute and storage resources from Elasticsearch resource generation "}, + }, + }, + }, + { + name: "0 values are not allowed", + args: args{ + esReachable: true, + }, + esBuilder: newEs("8.3.0"). + withNodeSet( + nodeSet("default", 3). + withCPU("0", ""). + withMemory("0", "0"). + withStorage("1Gi", "1Gi"). + withNodeCfg(map[string]interface{}{ + "path.data": "/usr/share/elasticsearch/data", + }), + ), + want: want{ + result: wantResult{}, + deleteCalled: true, + condition: &wantCondition{ + status: corev1.ConditionFalse, + messages: []string{ + "CPU request is set but value is 0", + "memory limit is set but value is 0", + }, + }, + }, + }, + { + name: "No memory limit", + args: args{ + esReachable: true, + }, + esBuilder: newEs("8.3.0"). + withNodeSet( + nodeSet("default", 3). + withCPU("2222m", "3141m"). + withMemory("2333Mi", ""). + withStorage("1Gi", "1Gi"), + ), + want: want{ + result: wantResult{}, + deleteCalled: true, + condition: &wantCondition{ + status: corev1.ConditionFalse, + messages: []string{"memory limit is not set"}, + }, + }, + }, + { + name: "Cannot compute resources and es is not reachable: requeue is expected", + args: args{ + esReachable: false, + }, + esBuilder: newEs("8.3.0"). + withNodeSet( + nodeSet("default", 3). + withCPU("2222m", "3141m"). + withMemory("2333Mi", ""). + withStorage("1Gi", "1Gi"), + ), + want: want{ + result: wantResult{ + requeue: true, + requeueAfter: defaultRequeue.RequeueAfter, + }, + deleteCalled: false, // Elasticsearch is not reachable, client cannot be called + condition: &wantCondition{ + status: corev1.ConditionFalse, + messages: []string{"memory limit is not set"}, + }, + }, + }, + { + name: "Memory request and limit should be the same", + args: args{ + esReachable: true, + }, + esBuilder: newEs("8.3.0"). + withNodeSet( + nodeSet("default", 3). + withCPU("2222m", "3141m"). + withMemory("2333Mi", "2334Mi"). + withStorage("1Gi", "1Gi"), + ), + want: want{ + result: wantResult{}, + deleteCalled: true, + condition: &wantCondition{ + status: corev1.ConditionFalse, + messages: []string{"memory request and limit do not have the same value"}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + es := tt.esBuilder.toEs() + reconcileState, err := reconcile.NewState(es) + if err != nil { + assert.FailNow(t, "fatal: %s", err) + } + + var k8sClient k8s.Client + + if tt.args.k8sClientError { + k8sClient = k8s.NewFailingClient(errors.New("k8s client failed")) + } else { + existingResources := tt.esBuilder.toResources() + k8sClient = k8s.NewFakeClient(existingResources...) + } + + d := &defaultDriver{ + DefaultDriverParameters: DefaultDriverParameters{ + ReconcileState: reconcileState, + ES: es, + Client: k8sClient, + }, + } + + wantClient := wantClient{} + if tt.want.testdata != "" { + parsedRequest, err := ioutil.ReadFile("testdata/desired_nodes/" + tt.want.testdata) + assert.NoError(t, err) + wantClient.request = string(parsedRequest) + // Elasticsearch UID must have been used as the history ID + wantClient.historyID = es.UID + // Elasticsearch generation must have been used as the version + wantClient.version = es.Generation + } + + esClient := fakeEsClient(t, "8.3.0", tt.args.esClientError, wantClient) + got := d.updateDesiredNodes(context.TODO(), esClient, tt.args.esReachable, tt.esBuilder.toExpectedResources()) + + // Check reconcile result + result, err := got.Aggregate() + assert.Equal(t, tt.want.result.error, err != nil, "updateDesiredNodes(...): unexpected error result") + assert.Equal(t, tt.want.result.requeue, result.Requeue, "updateDesiredNodes(...): unexpected requeue result") + assert.Equal(t, tt.want.result.requeueAfter, result.RequeueAfter, "updateDesiredNodes(...): unexpected result.RequeueAfter value") + _, gotReason := got.IsReconciled() + assert.True(t, strings.Contains(gotReason, tt.want.result.reason), "updateDesiredNodes(...): unexpected reconciled reason") + + // Check if the Elasticsearch client has been called as expected + assert.Equal(t, tt.want.deleteCalled, esClient.deleted) + + // Check that the status has been updated accordingly. + if tt.want.condition == nil { + return + } + condition := d.ReconcileState.Index(esv1.ResourcesAwareManagement) + hasCondition := condition >= 0 + assert.True(t, hasCondition, "ResourcesAwareManagement condition should be set") + if !hasCondition { + return + } + c := d.ReconcileState.Conditions[condition] + assert.Equal(t, tt.want.condition.status, c.Status) + for _, expectedMessage := range tt.want.condition.messages { + assert.True(t, strings.Contains(c.Message, expectedMessage), "expected message in condition: %q, got %q", expectedMessage, c.Message) + } + }) + } +} + +// -- Fixtures and helpers -- + +type esBuilder struct { + esVersion string + nodeSets []fakeNodeSet + uid types.UID + resourceVersion string +} + +func (esb esBuilder) toExpectedResources() nodespec.ResourcesList { + resources := make([]nodespec.Resources, len(esb.nodeSets)) + for i := range esb.nodeSets { + fns := esb.nodeSets[i] + ssetname := esv1.StatefulSet("elasticsearch-desired-sample", fns.name) + resources[i] = nodespec.Resources{ + NodeSet: fns.name, + HeadlessService: nodespec.HeadlessService(&es, ssetname), + Config: settings.CanonicalConfig{CanonicalConfig: common.MustCanonicalConfig(fns.nodeConfig)}, + StatefulSet: v1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: ssetname, + Namespace: "default", + }, + Spec: v1.StatefulSetSpec{ + Replicas: pointer.Int32(fns.count), + Template: fns.toPodTemplateSpec(), + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{Name: "elasticsearch-data"}, + Spec: corev1.PersistentVolumeClaimSpec{ + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: fns.claimedStorage.DeepCopy(), + }, + }, + }, + }, + }, + }, + }, + } + } + return resources +} + +func (esb esBuilder) toEs() esv1.Elasticsearch { + nodeSets := make([]esv1.NodeSet, len(esb.nodeSets)) + for i := range esb.nodeSets { + fns := esb.nodeSets[i] + nodeSets[i].Name = fns.name + nodeSets[i].PodTemplate = fns.toPodTemplateSpec() + nodeSets[i].Config = &commonv1.Config{Data: fns.nodeConfig} + nodeSets[i].Count = fns.count + } + + return esv1.Elasticsearch{ + ObjectMeta: metav1.ObjectMeta{ + Name: "elasticsearch-desired-sample", + Namespace: "default", + UID: esb.uid, + ResourceVersion: esb.resourceVersion, + Generation: 1, + }, + Spec: esv1.ElasticsearchSpec{ + Version: esb.esVersion, + NodeSets: nodeSets, + }, + } +} + +func (esb esBuilder) toResources() []runtime.Object { + es := esb.toEs() + result := []runtime.Object{&es} + for _, nodeSet := range esb.nodeSets { + if !nodeSet.pvcExists { + continue + } + for i := 0; i < int(nodeSet.count); i++ { + pvc := corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("elasticsearch-data-elasticsearch-desired-sample-es-%s-%d", nodeSet.name, i), + Namespace: "default", + UID: uuid.NewUUID(), + ResourceVersion: strconv.Itoa(rand.Intn(1000)), + Generation: 1, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{corev1.ResourceStorage: nodeSet.claimedStorage.DeepCopy()}, + }, + }, + } + if nodeSet.storageInStatus != nil { + pvc.Status = corev1.PersistentVolumeClaimStatus{ + Capacity: corev1.ResourceList{corev1.ResourceStorage: nodeSet.storageInStatus.DeepCopy()}, + } + } + result = append(result, &pvc) + } + } + return result +} + +func newEs(esVersion string) esBuilder { + return esBuilder{ + esVersion: esVersion, + uid: uuid.NewUUID(), + resourceVersion: strconv.Itoa(rand.Intn(1000)), + } +} + +func (esb esBuilder) withNodeSet(fn fakeNodeSet) esBuilder { + esb.nodeSets = append(esb.nodeSets, fn) + return esb +} + +type fakeNodeSet struct { + name string + count int32 + nodeConfig map[string]interface{} + cpuRequest, cpuLimit *resource.Quantity + memoryRequest, memoryLimit *resource.Quantity + + pvcExists bool + claimedStorage, storageInStatus *resource.Quantity +} + +func (fn fakeNodeSet) toPodTemplateSpec() corev1.PodTemplateSpec { + // Build the resources + resources := corev1.ResourceRequirements{ + Limits: make(map[corev1.ResourceName]resource.Quantity), + Requests: make(map[corev1.ResourceName]resource.Quantity), + } + if fn.cpuRequest != nil { + resources.Requests[corev1.ResourceCPU] = fn.cpuRequest.DeepCopy() + } + if fn.cpuLimit != nil { + resources.Limits[corev1.ResourceCPU] = fn.cpuLimit.DeepCopy() + } + if fn.memoryRequest != nil { + resources.Requests[corev1.ResourceMemory] = fn.memoryRequest.DeepCopy() + } + if fn.memoryLimit != nil { + resources.Limits[corev1.ResourceMemory] = fn.memoryLimit.DeepCopy() + } + + esContainer := corev1.Container{ + Name: "elasticsearch", + Resources: resources, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "elasticsearch-data", + MountPath: "/usr/share/elasticsearch/data", + }, + }, + } + + return corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{esContainer}, + }, + } +} + +// nodeSet returns a fake nodeSet builder with a given name and a given size. +func nodeSet(name string, count int32) fakeNodeSet { + return fakeNodeSet{ + name: name, + count: count, + } +} + +func (fn fakeNodeSet) withNodeCfg(cfg map[string]interface{}) fakeNodeSet { + fn.nodeConfig = cfg + return fn +} + +func (fn fakeNodeSet) withCPU(request, limit string) fakeNodeSet { + fn.cpuRequest, fn.cpuLimit = parseQuantityStrings(request, limit) + return fn +} + +func (fn fakeNodeSet) withMemory(request, limit string) fakeNodeSet { + fn.memoryRequest, fn.memoryLimit = parseQuantityStrings(request, limit) + return fn +} + +func (fn fakeNodeSet) pvcCreated(created bool) fakeNodeSet { + fn.pvcExists = created + return fn +} + +func (fn fakeNodeSet) withStorage(claimed, inStatus string) fakeNodeSet { + fn.claimedStorage, fn.storageInStatus = parseQuantityStrings(claimed, inStatus) + return fn +} + +func parseQuantityStrings(request, limit string) (requestQuantity *resource.Quantity, requestLimit *resource.Quantity) { + if request != "" { + q := resource.MustParse(request) + requestQuantity = &q + } + if limit != "" { + q := resource.MustParse(limit) + requestLimit = &q + } + return +} + +type desiredNodesFakeClient struct { + client.Client + deleted bool +} + +func (c *desiredNodesFakeClient) UpdateDesiredNodes(ctx context.Context, historyID string, version int64, desiredNodes client.DesiredNodes) error { + return c.Client.UpdateDesiredNodes(ctx, historyID, version, desiredNodes) +} + +func (c *desiredNodesFakeClient) DeleteDesiredNodes(ctx context.Context) error { + c.deleted = true + return c.Client.DeleteDesiredNodes(ctx) +} + +type wantClient struct { + historyID types.UID + version int64 + request string +} + +const expectedPath = `^\/_internal\/desired_nodes\/(?P.*)\/(?P.*)$` + +func fakeEsClient(t *testing.T, esVersion string, err bool, want wantClient) *desiredNodesFakeClient { + t.Helper() + expectedPath := regexp.MustCompile(expectedPath) + c := client.NewMockClient(version.MustParse(esVersion), func(req *http.Request) *http.Response { + if !strings.HasPrefix(req.URL.Path, "/_internal/desired_nodes") { + t.Fatalf("Elasticsearch client has been called on unknown path: %s", req.URL.Path) + } + statusCode := 200 + if err { + statusCode = 500 + } + + if want.request != "" { + var ( + gotHistoryID types.UID + gotVersion int64 + ) + match := expectedPath.FindStringSubmatch(req.URL.Path) + if len(match) > 2 { + gotHistoryID = types.UID(match[1]) + parsedVersion, err := strconv.ParseInt(match[2], 10, 64) + assert.NoError(t, err) + gotVersion = parsedVersion + } + + // Compare history and version + assert.Equal(t, want.historyID, gotHistoryID) + assert.Equal(t, want.version, gotVersion) + + // Compare the request + gotRequest, err := ioutil.ReadAll(req.Body) + assert.NoError(t, err) + require.JSONEq(t, want.request, string(gotRequest)) + } + + return &http.Response{ + StatusCode: statusCode, + Body: ioutil.NopCloser(bytes.NewBufferString(`{"acknowledged":true}`)), + } + }) + return &desiredNodesFakeClient{Client: c} +} diff --git a/pkg/controller/elasticsearch/driver/nodes.go b/pkg/controller/elasticsearch/driver/nodes.go index 9247ee81a4..489f52dd79 100644 --- a/pkg/controller/elasticsearch/driver/nodes.go +++ b/pkg/controller/elasticsearch/driver/nodes.go @@ -82,6 +82,13 @@ func (d *defaultDriver) reconcileNodeSpecs( return results.WithError(err) } + if esClient.IsDesiredNodesSupported() { + results.WithResults(d.updateDesiredNodes(ctx, esClient, esReachable, expectedResources)) + if results.HasError() { + return results + } + } + esState := NewMemoizingESState(ctx, esClient) // Phase 1: apply expected StatefulSets resources and scale up. upscaleCtx := upscaleCtx{ diff --git a/pkg/controller/elasticsearch/driver/testdata/desired_nodes/happy_path.json b/pkg/controller/elasticsearch/driver/testdata/desired_nodes/happy_path.json new file mode 100644 index 0000000000..15b2ca19c9 --- /dev/null +++ b/pkg/controller/elasticsearch/driver/testdata/desired_nodes/happy_path.json @@ -0,0 +1,159 @@ +{ + "nodes": [{ + "settings": { + "http": { + "publish_host": "elasticsearch-desired-sample-es-master-0.elasticsearch-desired-sample-es-master.default.svc" + }, + "network": { + "publish_host": "${POD_IP}" + }, + "node": { + "attr": { + "k8s_node_name": "${NODE_NAME}" + }, + "name": "elasticsearch-desired-sample-es-master-0", + "roles": ["master"] + }, + "path": { + "data": "/usr/share/elasticsearch/data" + } + }, + "processors_range": { + "min": 2.222, + "max": 3.141 + }, + "memory": "2446327808b", + "storage": "1073741824b", + "node_version": "8.3.0" + }, { + "settings": { + "http": { + "publish_host": "elasticsearch-desired-sample-es-master-1.elasticsearch-desired-sample-es-master.default.svc" + }, + "network": { + "publish_host": "${POD_IP}" + }, + "node": { + "attr": { + "k8s_node_name": "${NODE_NAME}" + }, + "name": "elasticsearch-desired-sample-es-master-1", + "roles": ["master"] + }, + "path": { + "data": "/usr/share/elasticsearch/data" + } + }, + "processors_range": { + "min": 2.222, + "max": 3.141 + }, + "memory": "2446327808b", + "storage": "1073741824b", + "node_version": "8.3.0" + }, { + "settings": { + "http": { + "publish_host": "elasticsearch-desired-sample-es-master-2.elasticsearch-desired-sample-es-master.default.svc" + }, + "network": { + "publish_host": "${POD_IP}" + }, + "node": { + "attr": { + "k8s_node_name": "${NODE_NAME}" + }, + "name": "elasticsearch-desired-sample-es-master-2", + "roles": ["master"] + }, + "path": { + "data": "/usr/share/elasticsearch/data" + } + }, + "processors_range": { + "min": 2.222, + "max": 3.141 + }, + "memory": "2446327808b", + "storage": "1073741824b", + "node_version": "8.3.0" + }, { + "settings": { + "http": { + "publish_host": "elasticsearch-desired-sample-es-hot-0.elasticsearch-desired-sample-es-hot.default.svc" + }, + "network": { + "publish_host": "${POD_IP}" + }, + "node": { + "attr": { + "k8s_node_name": "${NODE_NAME}" + }, + "name": "elasticsearch-desired-sample-es-hot-0", + "roles": ["data", "ingest"] + }, + "path": { + "data": "/usr/share/elasticsearch/data" + } + }, + "processors_range": { + "min": 1, + "max": 1 + }, + "memory": "4294967296b", + "storage": "53687091200b", + "node_version": "8.3.0" + }, { + "settings": { + "http": { + "publish_host": "elasticsearch-desired-sample-es-hot-1.elasticsearch-desired-sample-es-hot.default.svc" + }, + "network": { + "publish_host": "${POD_IP}" + }, + "node": { + "attr": { + "k8s_node_name": "${NODE_NAME}" + }, + "name": "elasticsearch-desired-sample-es-hot-1", + "roles": ["data", "ingest"] + }, + "path": { + "data": "/usr/share/elasticsearch/data" + } + }, + "processors_range": { + "min": 1, + "max": 1 + }, + "memory": "4294967296b", + "storage": "53687091200b", + "node_version": "8.3.0" + }, { + "settings": { + "http": { + "publish_host": "elasticsearch-desired-sample-es-hot-2.elasticsearch-desired-sample-es-hot.default.svc" + }, + "network": { + "publish_host": "${POD_IP}" + }, + "node": { + "attr": { + "k8s_node_name": "${NODE_NAME}" + }, + "name": "elasticsearch-desired-sample-es-hot-2", + "roles": ["data", "ingest"] + }, + "path": { + "data": "/usr/share/elasticsearch/data" + } + }, + "processors_range": { + "min": 1, + "max": 1 + }, + "memory": "4294967296b", + "storage": "53687091200b", + "node_version": "8.3.0" + }] +} diff --git a/pkg/controller/elasticsearch/driver/upscale_state.go b/pkg/controller/elasticsearch/driver/upscale_state.go index 213c6e2b32..efe0499b50 100644 --- a/pkg/controller/elasticsearch/driver/upscale_state.go +++ b/pkg/controller/elasticsearch/driver/upscale_state.go @@ -45,7 +45,7 @@ func newUpscaleState( createsAllowed: calculateCreatesAllowed( ctx.es.Spec.UpdateStrategy.ChangeBudget.GetMaxSurgeOrDefault(), actualStatefulSets.ExpectedNodeCount(), - expectedResources.StatefulSets().ExpectedNodeCount()), + expectedResources.ExpectedNodeCount()), upscaleReporter: ctx.upscaleReporter, } } diff --git a/pkg/controller/elasticsearch/nodespec/desired_nodes.go b/pkg/controller/elasticsearch/nodespec/desired_nodes.go new file mode 100644 index 0000000000..c8a717db27 --- /dev/null +++ b/pkg/controller/elasticsearch/nodespec/desired_nodes.go @@ -0,0 +1,314 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package nodespec + +import ( + "context" + "fmt" + "strings" + + "go.elastic.co/apm" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/pointer" + + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/tracing" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" +) + +// ResourceNotAvailable implements the error interface and can be used to raise cases where not all compute or +// storage resources have been detected. +type ResourceNotAvailable struct { + nodeSet string + reasons []string +} + +func (r ResourceNotAvailable) Error() string { + return fmt.Sprintf("cannot compute resources for NodeSet %q: %s", r.nodeSet, strings.Join(r.reasons, ", ")) +} + +type nodeResources []nodeResource + +func (n nodeResources) requeue() bool { + for _, nodeResource := range n { + if nodeResource.requeue { + return true + } + } + return false +} + +type nodeResource struct { + nodeName string + memory, storage int64 + cpu client.ProcessorsRange + requeue bool +} + +type nodeSetResourcesBuilder struct { + nodeSet string + cpu client.ProcessorsRange + memory int64 + reasons []string +} + +func (n nodeSetResourcesBuilder) addReason(reason string) nodeSetResourcesBuilder { + n.reasons = append(n.reasons, reason) + return n +} + +func (n nodeSetResourcesBuilder) toError() error { + if len(n.reasons) == 0 { + return nil + } + return &ResourceNotAvailable{nodeSet: n.nodeSet, reasons: n.reasons} +} + +// withProcessors computes the available CPU resource for the Elasticsearch container. +// It uses the limit if provided, otherwise fallback to the requirement. +// It returns nil if neither the request nor a limit is set. +func (n nodeSetResourcesBuilder) withProcessors(resources corev1.ResourceRequirements) nodeSetResourcesBuilder { + // Try to get the limit + limit, hasLimit := resources.Limits[corev1.ResourceCPU] + if hasLimit { + if limit.IsZero() { + return n.addReason("CPU limit is set but value is 0") + } + n.cpu.Max = limit.AsApproximateFloat64() + } + // Try to get the request + request, hasRequest := resources.Requests[corev1.ResourceCPU] + if hasRequest { + if request.IsZero() { + return n.addReason("CPU request is set but value is 0") + } + n.cpu.Min = request.AsApproximateFloat64() + return n + } else if hasLimit { + // If a limit is set without any request, then Kubernetes copies the limit as the requested value. + n.cpu.Min = n.cpu.Max + return n + } + // Neither the limit nor the request is set + return n.addReason("no CPU request or limit set") +} + +// withMemory computes the available memory resource. +// It returns nil if the limit and the request do not have the same value. +func (n nodeSetResourcesBuilder) withMemory(resources corev1.ResourceRequirements) nodeSetResourcesBuilder { + limit, hasLimit := resources.Limits[corev1.ResourceMemory] + request, hasRequest := resources.Requests[corev1.ResourceMemory] + switch { + case !hasLimit: + // Having a memory limit is mandatory to guess the allocated memory. + return n.addReason("memory limit is not set") + case hasLimit && hasRequest && !limit.Equal(request): + // If request is set it must have the same value as the limit. + return n.addReason("memory request and limit do not have the same value") + } + if limit.IsZero() { + return n.addReason("memory limit is set but value is 0") + } + n.memory = limit.Value() + return n +} + +// withStorage attempts to detect the storage capacity of the Elasticsearch nodes. +// 1. Attempt to detect path settings, an error is raised if multiple data paths are set. +// 2. Detect the data volume name. If none can be detected an error is raised. +// 3. Lookup for the corresponding volume claim. +// 4. For each Pod in the StatefulSet we attempt to read the capacity from the PVC status or from the Spec +// if there is no status yet. +func (n nodeSetResourcesBuilder) withStorage( + ctx context.Context, + k8sClient k8s.Client, + statefulSet appsv1.StatefulSet, + config settings.CanonicalConfig, + esContainer *corev1.Container, +) (nodeResources, error) { + var p pathSetting + if err := config.CanonicalConfig.Unpack(&p); err != nil { + return nil, err + } + if p.PathData == nil { + return nil, n.addReason("Elasticsearch path.data must be a set").toError() + } + pathData, ok := p.PathData.(string) + if !ok { + return nil, n.addReason("Elasticsearch path.data must be a string, multiple paths is not supported").toError() + } + + var volumeName string + for _, mount := range esContainer.VolumeMounts { + if mount.MountPath == pathData { + volumeName = mount.Name + continue + } + } + if len(volumeName) == 0 { + return nil, n.addReason(fmt.Sprintf("Elasticsearch path.data %s must mounted by a volume", pathData)).toError() + } + + var esDataVolumeClaim *corev1.PersistentVolumeClaim + for _, pvc := range statefulSet.Spec.VolumeClaimTemplates { + if pvc.Name == volumeName { + pvc := pvc // return a pointer on a copy + esDataVolumeClaim = &pvc + continue + } + } + + if esDataVolumeClaim == nil { + return nil, n.addReason(fmt.Sprintf("Volume claim with name %q not found in Spec.VolumeClaimTemplates ", volumeName)).toError() + } + + claimedStorage := getClaimedStorage(*esDataVolumeClaim) + if claimedStorage == nil { + return nil, n.addReason(fmt.Sprintf("no storage request in claim %q", esDataVolumeClaim.Name)).toError() + } + + // Stop here if there is at least one reason to not compute the desired state. + if err := n.toError(); err != nil { + return nil, err + } + + nodeResources := make([]nodeResource, sset.GetReplicas(statefulSet)) + for i, podName := range sset.PodNames(statefulSet) { + nodeResources[i].nodeName = podName + nodeResources[i].cpu = n.cpu + nodeResources[i].memory = n.memory + pvcName := fmt.Sprintf("%s-%s", esDataVolumeClaim.Name, podName) + pvc := corev1.PersistentVolumeClaim{} + if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: statefulSet.Namespace, Name: pvcName}, &pvc); err != nil { + if apierrors.IsNotFound(err) { + // PVC does not exist (yet) + nodeResources[i].requeue = true + nodeResources[i].storage = *claimedStorage + continue + } + return nil, err + } + // We first attempt to read the PVC status + if storageInStatus, exists := pvc.Status.Capacity[corev1.ResourceStorage]; exists { + nodeResources[i].storage = storageInStatus.Value() + continue + } + // If there is no storage value in the status use the value in the spec + nodeResources[i].requeue = true + if storageInSpec, exists := pvc.Spec.Resources.Requests[corev1.ResourceStorage]; exists { + nodeResources[i].storage = storageInSpec.Value() + } else { + // PVC does exist, but Spec.Resources.Requests is empty, this is unlikely to happen for a PVC, fall back to claimed storage. + nodeResources[i].storage = *claimedStorage + } + } + + return nodeResources, nil +} + +const ( + envPodName = "${" + settings.EnvPodName + "}" + envNamespace = "${" + settings.EnvNamespace + "}" + envHeadlessServiceName = "${" + settings.HeadlessServiceName + "}" +) + +// ToDesiredNodes returns the desired nodes, as expected by the desired nodes API, from an expected resources list. +// A boolean is also returned to indicate whether a requeue should be done to set a more accurate state of the +// storage capacity. +func (l ResourcesList) ToDesiredNodes( + ctx context.Context, + k8sClient k8s.Client, + version string, +) (desiredNodes []client.DesiredNode, requeue bool, err error) { + span, ctx := apm.StartSpan(ctx, "compute_desired_nodes", tracing.SpanTypeApp) + defer span.End() + desiredNodes = make([]client.DesiredNode, 0, l.ExpectedNodeCount()) + for _, resources := range l { + sts := resources.StatefulSet + esContainer := getElasticsearchContainer(sts.Spec.Template.Spec.Containers) + if esContainer == nil { + return nil, false, fmt.Errorf("cannot find Elasticsearch container in StatefulSet %s/%s", sts.Namespace, sts.Name) + } + + nodeResources, err := nodeSetResourcesBuilder{nodeSet: resources.NodeSet}. + withProcessors(esContainer.Resources). + withMemory(esContainer.Resources). + withStorage(ctx, k8sClient, sts, resources.Config, esContainer) + if err != nil { + return nil, false, err + } + + requeue = requeue || nodeResources.requeue() + + for _, nodeResource := range nodeResources { + // We replace the environment variables in the Elasticsearch configuration with their values if they can be + // evaluated before scheduling. This is for example required for node.name, which must be evaluated before + // calling the desired nodes API. + knownVariablesReplacer := strings.NewReplacer( + envPodName, nodeResource.nodeName, + envNamespace, sts.Namespace, + envHeadlessServiceName, resources.HeadlessService.Name, + ) + var settings map[string]interface{} + if err := resources.Config.CanonicalConfig.Unpack(&settings); err != nil { + return nil, false, err + } + visit(nil, settings, func(s string) string { + return knownVariablesReplacer.Replace(s) + }) + + node := client.DesiredNode{ + NodeVersion: version, + ProcessorsRange: nodeResource.cpu, + Memory: fmt.Sprintf("%db", nodeResource.memory), + Storage: fmt.Sprintf("%db", nodeResource.storage), + Settings: settings, + } + desiredNodes = append(desiredNodes, node) + } + } + + return desiredNodes, requeue, nil +} + +// visit recursively visits a map holding a tree structure and apply a function to nodes that hold a string. +func visit(keys []string, m map[string]interface{}, apply func(string) string) { + for k, v := range m { + if childMap, isMap := v.(map[string]interface{}); isMap { + visit(append(keys, k), childMap, apply) + } + if value, isString := v.(string); isString { + m[k] = apply(value) + } + } +} + +// getElasticsearchContainer returns the Elasticsearch container, or nil if not found. +func getElasticsearchContainer(containers []corev1.Container) *corev1.Container { + for _, c := range containers { + if c.Name == esv1.ElasticsearchContainerName { + return &c + } + } + return nil +} + +func getClaimedStorage(claim corev1.PersistentVolumeClaim) *int64 { + if storage, exists := claim.Spec.Resources.Requests[corev1.ResourceStorage]; exists { + return pointer.Int64Ptr(storage.Value()) + } + return nil +} + +// pathSetting captures secrets settings in the Elasticsearch configuration that we want to reuse. +type pathSetting struct { + PathData interface{} `config:"path.data"` +} diff --git a/pkg/controller/elasticsearch/nodespec/resources.go b/pkg/controller/elasticsearch/nodespec/resources.go index 12679def71..27890a5406 100644 --- a/pkg/controller/elasticsearch/nodespec/resources.go +++ b/pkg/controller/elasticsearch/nodespec/resources.go @@ -22,6 +22,7 @@ import ( // Resources contain per-NodeSet resources to be created. type Resources struct { + NodeSet string StatefulSet appsv1.StatefulSet HeadlessService corev1.Service Config settings.CanonicalConfig @@ -46,6 +47,10 @@ func (l ResourcesList) StatefulSets() sset.StatefulSetList { return ssetList } +func (l ResourcesList) ExpectedNodeCount() int32 { + return l.StatefulSets().ExpectedNodeCount() +} + func BuildExpectedResources( client k8s.Client, es esv1.Elasticsearch, @@ -80,6 +85,7 @@ func BuildExpectedResources( headlessSvc := HeadlessService(&es, statefulSet.Name) nodesResources = append(nodesResources, Resources{ + NodeSet: nodeSpec.Name, StatefulSet: statefulSet, HeadlessService: headlessSvc, Config: cfg, diff --git a/pkg/utils/k8s/k8sutils.go b/pkg/utils/k8s/k8sutils.go index 34d45c76f6..f00fc24764 100644 --- a/pkg/utils/k8s/k8sutils.go +++ b/pkg/utils/k8s/k8sutils.go @@ -63,7 +63,7 @@ func ObjectExists(c Client, ref types.NamespacedName, typedReceiver client.Objec return true, nil } -// IsAvailable checks if both conditions ContainersReady and PodReady of a Pod are true. +// IsPodReady checks if both conditions ContainersReady and PodReady of a Pod are true. func IsPodReady(pod corev1.Pod) bool { conditionsTrue := 0 for _, cond := range pod.Status.Conditions { diff --git a/test/e2e/test/elasticsearch/checks_es.go b/test/e2e/test/elasticsearch/checks_es.go index ad92154df1..ce49e307bd 100644 --- a/test/e2e/test/elasticsearch/checks_es.go +++ b/test/e2e/test/elasticsearch/checks_es.go @@ -6,6 +6,7 @@ package elasticsearch import ( "context" + "errors" "fmt" "strconv" "strings" @@ -15,10 +16,12 @@ import ( "k8s.io/apimachinery/pkg/api/resource" esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/settings" "github.com/elastic/cloud-on-k8s/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/volume" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" "github.com/elastic/cloud-on-k8s/pkg/utils/stringsutil" "github.com/elastic/cloud-on-k8s/test/e2e/test" ) @@ -34,6 +37,7 @@ func (b Builder) CheckStackTestSteps(k *test.K8sClient) test.StepList { e.CheckESNodesTopology(), e.CheckESVersion(), e.CheckESHealthGreen(), + e.CheckDesiredNodesAPI(k), e.CheckTransportCertificatesStep(), } } @@ -88,6 +92,112 @@ func (e *esClusterChecks) CheckESHealthGreen() test.Step { } } +// CheckDesiredNodesAPI validates that the desired nodes API has been called with the expected history ID and version. +// If the desired nodes API cannot be called then the test only validates that the desired nodes state does not exist (404). +func (e *esClusterChecks) CheckDesiredNodesAPI(k *test.K8sClient) test.Step { + return test.Step{ + Name: "Check desired nodes API state", + Test: test.Eventually(func() error { + esClient, err := e.newESClient() + if err != nil { + return err + } + if !esClient.IsDesiredNodesSupported() { + return nil + } + + var es esv1.Elasticsearch + expectedEs := e.Builder.GetExpectedElasticsearch() + if err := k.Client.Get(context.Background(), k8s.ExtractNamespacedName(&expectedEs), &es); err != nil { + return err + } + + expectDesiredNodesAPI, err := expectDesiredNodesAPI(&expectedEs) + if err != nil { + return err + } + latestDesiredNodes, err := esClient.GetLatestDesiredNodes(context.Background()) + if err != nil { + if !expectDesiredNodesAPI && client.IsNotFound(err) { + // It's ok to have a 404 if the desired nodes API can't be called + return nil + } + return err + } + + if !expectDesiredNodesAPI { + return errors.New("desired nodes state should have been cleared") + } + + if latestDesiredNodes.HistoryID != string(es.UID) { + return fmt.Errorf("expected desired nodes history ID %s, but got %s from the API", string(es.UID), latestDesiredNodes.HistoryID) + } + if es.Generation != latestDesiredNodes.Version { + return fmt.Errorf("expected desired nodes version %d, but got %d from the API", es.Generation, latestDesiredNodes.Version) + } + return nil + }), + } +} + +type PathDataSetting struct { + PathData interface{} `config:"path.data"` +} + +// expectDesiredNodesAPI attempts to detect when the desired nodes state is expected to be set. +func expectDesiredNodesAPI(es *esv1.Elasticsearch) (bool, error) { + if usesEmptyDir(*es) { + return false, nil + } + for _, nodeSet := range es.Spec.NodeSets { + if nodeSet.Config != nil && nodeSet.Config.Data != nil { + canonicalConfig, err := settings.NewCanonicalConfigFrom(nodeSet.Config.Data) + if err != nil { + return false, err + } + dataPathSetting := &PathDataSetting{} + if err := canonicalConfig.Unpack(dataPathSetting); err != nil { + return false, err + } + + cfgPathData := dataPathSetting.PathData + pathData, ok := cfgPathData.(string) + if (cfgPathData != nil && !ok) || strings.Contains(pathData, ",") { + // Multi data path + return false, nil + } + } + + var esResources *corev1.ResourceRequirements + for _, c := range nodeSet.PodTemplate.Spec.Containers { + if c.Name == "elasticsearch" { + c := c + esResources = &c.Resources + } + } + if esResources == nil { + // Elasticsearch container not found, very unlikely to happen in an E2E test. + return false, nil + } + memReq, hasMemReq := esResources.Requests[corev1.ResourceMemory] + memLimit, hasMemLimit := esResources.Limits[corev1.ResourceMemory] + if !hasMemLimit { + return false, nil + } + if hasMemReq && !memReq.Equal(memLimit) { + return false, nil + } + + _, hasCPULimit := esResources.Limits[corev1.ResourceCPU] + _, hasCPUReq := esResources.Requests[corev1.ResourceCPU] + if !hasCPULimit && !hasCPUReq { + // We need either the CPU req. or the CPU limit + return false, nil + } + } + return true, nil +} + func (e *esClusterChecks) CheckESNodesTopology() test.Step { return test.Step{ Name: "ES nodes topology should eventually be the expected one",