Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for the desired nodes API #5650

Merged
merged 24 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/apis/elasticsearch/v1/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type ConditionType string
const (
ElasticsearchIsReachable ConditionType = "ElasticsearchIsReachable"
ReconciliationComplete ConditionType = "ReconciliationComplete"
ResourcesAwareManagement ConditionType = "ResourcesAwareManagement"
RunningDesiredVersion ConditionType = "RunningDesiredVersion"
)

Expand Down
1 change: 1 addition & 0 deletions pkg/controller/elasticsearch/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Role struct {
type Client interface {
AllocationSetter
AutoscalingClient
DesiredNodesClient
ShardLister
LicenseClient
// Close idle connections in the underlying http client.
Expand Down
84 changes: 84 additions & 0 deletions pkg/controller/elasticsearch/client/desired_nodes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package client

import (
"context"
"fmt"

"github.com/elastic/cloud-on-k8s/pkg/controller/common/version"
)

var desiredNodesMinVersion = version.MinFor(8, 3, 0)

type DesiredNodesClient interface {
IsDesiredNodesSupported() bool
// GetLatestDesiredNodes returns the latest desired nodes.
GetLatestDesiredNodes(ctx context.Context) (LatestDesiredNodes, error)
// UpdateDesiredNodes updates the desired nodes API.
UpdateDesiredNodes(ctx context.Context, historyID string, version int64, desiredNodes DesiredNodes) error
// DeleteDesiredNodes deletes the desired nodes from the cluster state.
DeleteDesiredNodes(ctx context.Context) error
}

type LatestDesiredNodes struct {
HistoryID string `json:"history_id"`
Version int64 `json:"version"`
DesiredNodes []DesiredNode `json:"nodes"`
}

type DesiredNodes struct {
DesiredNodes []DesiredNode `json:"nodes"`
}

type DesiredNode struct {
Settings map[string]interface{} `json:"settings"`
ProcessorsRange ProcessorsRange `json:"processors_range"`
Memory string `json:"memory"`
Storage string `json:"storage"`
NodeVersion string `json:"node_version"`
}

type ProcessorsRange struct {
Min float64 `json:"min"`
Max float64 `json:"max,omitempty"`
}

func (c *baseClient) GetLatestDesiredNodes(_ context.Context) (LatestDesiredNodes, error) {
return LatestDesiredNodes{}, c.desiredNodesNotAvailable()
}

func (c *baseClient) UpdateDesiredNodes(_ context.Context, _ string, _ int64, _ DesiredNodes) error {
return c.desiredNodesNotAvailable()
}

func (c *baseClient) DeleteDesiredNodes(_ context.Context) error {
return c.desiredNodesNotAvailable()
}

func (c *baseClient) desiredNodesNotAvailable() error {
return fmt.Errorf("the desired nodes API is not available in Elasticsearch %s, it requires %s", c.version, desiredNodesMinVersion)
}

func (c *baseClient) IsDesiredNodesSupported() bool {
return c.version.GTE(desiredNodesMinVersion)
}

func (c *clientV8) GetLatestDesiredNodes(ctx context.Context) (LatestDesiredNodes, error) {
var latestDesiredNodes LatestDesiredNodes
err := c.get(ctx, "/_internal/desired_nodes/_latest", &latestDesiredNodes)
return latestDesiredNodes, err
}

func (c *clientV8) UpdateDesiredNodes(ctx context.Context, historyID string, version int64, desiredNodes DesiredNodes) error {
return c.put(
ctx,
fmt.Sprintf("/_internal/desired_nodes/%s/%d", historyID, version),
&desiredNodes, nil)
}

func (c *clientV8) DeleteDesiredNodes(ctx context.Context) error {
return c.delete(ctx, "/_internal/desired_nodes")
}
75 changes: 75 additions & 0 deletions pkg/controller/elasticsearch/driver/desired_nodes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package driver

import (
"context"
"errors"
"fmt"

"go.elastic.co/apm"
corev1 "k8s.io/api/core/v1"

esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/version"
esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec"
)

func (d *defaultDriver) updateDesiredNodes(
ctx context.Context,
esClient esclient.Client,
esReachable bool,
expectedResources nodespec.ResourcesList,
) *reconciler.Results {
span, ctx := apm.StartSpan(ctx, "update_desired_nodes", tracing.SpanTypeApp)
defer span.End()
results := &reconciler.Results{}
// We compute the desired nodes state to update the condition
var resourceNotAvailableErr *nodespec.ResourceNotAvailable
esVersion, err := version.Parse(d.ES.Spec.Version)
if err != nil {
return results.WithError(err)
}
nodes, requeue, err := expectedResources.ToDesiredNodes(ctx, d.Client, esVersion.FinalizeVersion())
switch {
case err == nil:
d.ReconcileState.ReportCondition(
esv1.ResourcesAwareManagement,
corev1.ConditionTrue,
fmt.Sprintf("Successfully calculated compute and storage resources from Elasticsearch resource generation %d", d.ES.Generation),
)
case errors.As(err, &resourceNotAvailableErr):
// It is not possible to build the desired node spec because of the Elasticsearch specification
d.ReconcileState.ReportCondition(
esv1.ResourcesAwareManagement,
corev1.ConditionFalse,
fmt.Sprintf("Cannot get compute and storage resources from Elasticsearch resource generation %d: %s", d.ES.Generation, err.Error()),
)
// It is fine to continue, error is only reported through the condition.
// We should however clear the desired nodes API since we are in a degraded (not resources aware) mode.
if esReachable {
return results.WithError(esClient.DeleteDesiredNodes(ctx))
barkbay marked this conversation as resolved.
Show resolved Hide resolved
}
return results.WithReconciliationState(defaultRequeue.WithReason("Desired nodes API must be cleared"))
default:
// Unknown error: not nil and not ResourceNotAvailable
d.ReconcileState.ReportCondition(
esv1.ResourcesAwareManagement,
corev1.ConditionUnknown,
fmt.Sprintf("Error while calculating compute and storage resources from Elasticsearch resource generation %d: %s", d.ES.Generation, err.Error()),
)
return results.WithError(err)
}
if requeue {
results.WithReconciliationState(defaultRequeue.WithReason("Storage capacity is not available in all PVC statuses, requeue to refine the capacity reported in the desired nodes API"))
}
if esReachable {
return results.WithError(esClient.UpdateDesiredNodes(ctx, string(d.ES.UID), d.ES.Generation, esclient.DesiredNodes{DesiredNodes: nodes}))
}
return results.WithReconciliationState(defaultRequeue.WithReason("Waiting for Elasticsearch to be available to update the desired nodes API"))
}
Loading