Skip to content

Commit

Permalink
Elastic Agent: Set status.ObservedGeneration from metadata.Generation (
Browse files Browse the repository at this point in the history
#5383)

* Add agent observed generation.
Add tests for agent observedGeneration.
modify agent reconcile behavior to ensure status updates for certain situations.
adjust common comparison to allow additional options
Update some documentation.
Make disabling of k8s client's errors simpler.
Make the status update more readable.
Configure initial state for Elastic Agent properly from current state.
Add missing comments for some exported data.
Make errors consistent with capitalization, and namespace+name.
Rename e2e test name to be more descriptive.
Update pod labels in e2e test to properly trigger generation update.
Remove failing client tests.
used keyed structs in tests.

* Run generation and update yaml for observedGeneration description change.

* Properly order imports

* capitalize Agent

* remove erroneous debugging line

* renamed agent e2e test file.
removed erroneous cmp.AllowUnexperted()

* Ignore generation when checking if agent status is updated during k8s test steps.
remove WithDefaultESValidation from agent generation e2e test.

* remove unneeded nil in return from rebase.

* Add missing header to e2e test.

* Remove unused args struct

* move imports around properly

* Defer agent validation to reduce repetitive code.
Remove unneeded struct in tests.
Move Generation e2e tests to it's own package, and use it in all Agent mutation tests.

* Updating metav1.Object -> client.Object, and using type switch to get observedGeneration.

* Allow status to be updated even if associations are broken.

* Allow error to be returned from doReconcile during defer.
Updating E2E tests to have similar design as Kibana, and ES E2E tests for generation bits.

* ensure withmutatedfrom is used in e2e agent tests.

* Ensure we're not returning nil results

* Fixing comparison issues from rebase/merge

* Add nolint for the naked return, which is required for the defer to function properly.

* Handle updating status properly when associations are broken.

* Debugging

* Debugging, and moving the defer

* Go back to previous way of handling fetch association errors

* remove extraneous logging statement

* Move status update to calling function.
Don't worry about status update when FetchWithASsociations fails.
Just return errors from updateStatus

* Remove unused context

* Update the func's comments to be consistent

* remove unneeded return

* move common.LowestVersionFromPods closer to where pods are retrieved.
Update status logic to update health properly.
Add unit test for updating of health in agent status.

* changes to return status instead of modifying a pointer.

* rename variable

* Updating how status is handled for consistency
  • Loading branch information
naemono committed Apr 5, 2022
1 parent b2a76d5 commit c4af92c
Show file tree
Hide file tree
Showing 15 changed files with 394 additions and 57 deletions.
9 changes: 9 additions & 0 deletions config/crds/v1/all-crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,15 @@ spec:
kibanaAssociationStatus:
description: AssociationStatus is the status of an association resource.
type: string
observedGeneration:
description: ObservedGeneration is the most recent generation observed
for this Elastic Agent. It corresponds to the metadata generation,
which is updated on mutation by the API Server. If the generation
observed in status diverges from the generation in metadata, the
Elastic Agent controller has not yet processed the changes contained
in the Elastic Agent specification.
format: int64
type: integer
version:
description: 'Version of the stack resource currently running. During
version upgrades, multiple versions may run in parallel: this value
Expand Down
9 changes: 9 additions & 0 deletions config/crds/v1/bases/agent.k8s.elastic.co_agents.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15663,6 +15663,15 @@ spec:
kibanaAssociationStatus:
description: AssociationStatus is the status of an association resource.
type: string
observedGeneration:
description: ObservedGeneration is the most recent generation observed
for this Elastic Agent. It corresponds to the metadata generation,
which is updated on mutation by the API Server. If the generation
observed in status diverges from the generation in metadata, the
Elastic Agent controller has not yet processed the changes contained
in the Elastic Agent specification.
format: int64
type: integer
version:
description: 'Version of the stack resource currently running. During
version upgrades, multiple versions may run in parallel: this value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,15 @@ spec:
kibanaAssociationStatus:
description: AssociationStatus is the status of an association resource.
type: string
observedGeneration:
description: ObservedGeneration is the most recent generation observed
for this Elastic Agent. It corresponds to the metadata generation,
which is updated on mutation by the API Server. If the generation
observed in status diverges from the generation in metadata, the
Elastic Agent controller has not yet processed the changes contained
in the Elastic Agent specification.
format: int64
type: integer
version:
description: 'Version of the stack resource currently running. During
version upgrades, multiple versions may run in parallel: this value
Expand Down
11 changes: 11 additions & 0 deletions pkg/apis/agent/v1alpha1/agent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ type AgentStatus struct {

// +kubebuilder:validation:Optional
FleetServerAssociationStatus commonv1.AssociationStatus `json:"fleetServerAssociationStatus,omitempty"`

// ObservedGeneration is the most recent generation observed for this Elastic Agent.
// It corresponds to the metadata generation, which is updated on mutation by the API Server.
// If the generation observed in status diverges from the generation in metadata, the Elastic
// Agent controller has not yet processed the changes contained in the Elastic Agent specification.
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

type AgentHealth string
Expand Down Expand Up @@ -295,6 +301,11 @@ func (a *Agent) SecureSettings() []commonv1.SecretSource {
return a.Spec.SecureSettings
}

// GetObservedGeneration will return the observedGeneration from the Elastic Agent's status.
func (a *Agent) GetObservedGeneration() int64 {
return a.Status.ObservedGeneration
}

type AgentESAssociation struct {
*Agent
// ref is the object selector of the Elasticsearch used in Association
Expand Down
37 changes: 24 additions & 13 deletions pkg/controller/agent/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,16 @@ func (r *ReconcileAgent) Reconcile(ctx context.Context, request reconcile.Reques
defer common.LogReconciliationRunNoSideEffects(logconf.FromContext(ctx))()
defer tracing.EndContextTransaction(ctx)

var agent agentv1alpha1.Agent
if err := r.Client.Get(ctx, request.NamespacedName, &agent); err != nil {
agent := &agentv1alpha1.Agent{}
if err := r.Client.Get(ctx, request.NamespacedName, agent); err != nil {
if apierrors.IsNotFound(err) {
r.onDelete(request.NamespacedName)
return reconcile.Result{}, nil
}
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}

if common.IsUnmanaged(&agent) {
if common.IsUnmanaged(agent) {
logconf.FromContext(ctx).Info("Object is currently not managed by this controller. Skipping reconciliation")
return reconcile.Result{}, nil
}
Expand All @@ -144,38 +144,49 @@ func (r *ReconcileAgent) Reconcile(ctx context.Context, request reconcile.Reques
return reconcile.Result{}, nil
}

res, err := r.doReconcile(ctx, agent).Aggregate()
k8s.EmitErrorEvent(r.recorder, err, &agent, events.EventReconciliationError, "Reconciliation error: %v", err)
results, status := r.doReconcile(ctx, *agent)

return res, err
if err := updateStatus(*agent, r.Client, status); err != nil {
if apierrors.IsConflict(err) {
return results.WithResult(reconcile.Result{Requeue: true}).Aggregate()
}
results = results.WithError(err)
}

result, err := results.Aggregate()
k8s.EmitErrorEvent(r.recorder, err, agent, events.EventReconciliationError, "Reconciliation error: %v", err)

return result, err
}

func (r *ReconcileAgent) doReconcile(ctx context.Context, agent agentv1alpha1.Agent) *reconciler.Results {
func (r *ReconcileAgent) doReconcile(ctx context.Context, agent agentv1alpha1.Agent) (*reconciler.Results, agentv1alpha1.AgentStatus) {
defer tracing.Span(&ctx)()
results := reconciler.NewResult(ctx)
status := newStatus(agent)

areAssocsConfigured, err := association.AreConfiguredIfSet(agent.GetAssociations(), r.recorder)
if err != nil {
return results.WithError(err)
return results.WithError(err), status
}
if !areAssocsConfigured {
return results
return results, status
}

// Run basic validations as a fallback in case webhook is disabled.
if err := r.validate(ctx, agent); err != nil {
return results.WithError(err)
results = results.WithError(err)
return results, status
}

driverResults := internalReconcile(Params{
return internalReconcile(Params{
Context: ctx,
Client: r.Client,
EventRecorder: r.recorder,
Watches: r.dynamicWatches,
Agent: agent,
Status: status,
OperatorParams: r.Parameters,
})

return results.WithResults(driverResults)
}

func (r *ReconcileAgent) validate(ctx context.Context, agent agentv1alpha1.Agent) error {
Expand Down
238 changes: 238 additions & 0 deletions pkg/controller/agent/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package agent

import (
"context"
"reflect"
"testing"

"github.com/google/go-cmp/cmp"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

agentv1alpha1 "github.com/elastic/cloud-on-k8s/pkg/apis/agent/v1alpha1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/comparison"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/hash"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/watches"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
"github.com/elastic/cloud-on-k8s/pkg/utils/pointer"
)

func newReconcileAgent(objs ...runtime.Object) *ReconcileAgent {
r := &ReconcileAgent{
Client: k8s.NewFakeClient(objs...),
recorder: record.NewFakeRecorder(100),
dynamicWatches: watches.NewDynamicWatches(),
}
return r
}

func TestReconcileAgent_Reconcile(t *testing.T) {
defaultLabels := NewLabels(agentv1alpha1.Agent{ObjectMeta: metav1.ObjectMeta{Name: "testAgent"}})
tests := []struct {
name string
objs []runtime.Object
request reconcile.Request
want reconcile.Result
expected agentv1alpha1.Agent
wantErr bool
}{
{
name: "valid unmanaged agent does not increment observedGeneration",
objs: []runtime.Object{
&agentv1alpha1.Agent{
ObjectMeta: metav1.ObjectMeta{
Name: "testAgent",
Namespace: "test",
Generation: 1,
Annotations: map[string]string{
common.ManagedAnnotation: "false",
},
},
Spec: agentv1alpha1.AgentSpec{
Version: "8.0.1",
Deployment: &agentv1alpha1.DeploymentSpec{},
},
Status: agentv1alpha1.AgentStatus{
ObservedGeneration: 1,
},
},
},
request: reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: "test",
Name: "testAgent",
},
},
want: reconcile.Result{},
expected: agentv1alpha1.Agent{
ObjectMeta: metav1.ObjectMeta{
Name: "testAgent",
Namespace: "test",
Generation: 1,
Annotations: map[string]string{
common.ManagedAnnotation: "false",
},
},
Spec: agentv1alpha1.AgentSpec{
Version: "8.0.1",
Deployment: &agentv1alpha1.DeploymentSpec{},
},
Status: agentv1alpha1.AgentStatus{
ObservedGeneration: 1,
},
},
wantErr: false,
},
{
name: "too long name fails validation, and updates observedGeneration",
objs: []runtime.Object{
&agentv1alpha1.Agent{
ObjectMeta: metav1.ObjectMeta{
Name: "testAgentwithtoolongofanamereallylongname",
Namespace: "test",
Generation: 2,
},
Status: agentv1alpha1.AgentStatus{
ObservedGeneration: 1,
},
},
},
request: reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: "test",
Name: "testAgentwithtoolongofanamereallylongname",
},
},
want: reconcile.Result{},
expected: agentv1alpha1.Agent{
ObjectMeta: metav1.ObjectMeta{
Name: "testAgentwithtoolongofanamereallylongname",
Namespace: "test",
Generation: 2,
},
Status: agentv1alpha1.AgentStatus{
ObservedGeneration: 2,
},
},
wantErr: true,
},
{
name: "agent with ready deployment+pod updates status.health properly",
objs: []runtime.Object{
&agentv1alpha1.Agent{
ObjectMeta: metav1.ObjectMeta{
Name: "testAgent",
Namespace: "test",
Generation: 2,
},
Spec: agentv1alpha1.AgentSpec{
Version: "8.0.1",
Deployment: &agentv1alpha1.DeploymentSpec{
Replicas: pointer.Int32(1),
},
},
Status: agentv1alpha1.AgentStatus{
ObservedGeneration: 1,
},
},
&appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "testAgent-agent",
Namespace: "test",
Labels: addLabel(defaultLabels, hash.TemplateHashLabelName, "2519944696"),
},
Status: appsv1.DeploymentStatus{
AvailableReplicas: 1,
Replicas: 1,
ReadyReplicas: 1,
Conditions: []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentAvailable,
Status: corev1.ConditionTrue,
},
},
},
},
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testAgent",
Namespace: "test",
Generation: 2,
Labels: map[string]string{NameLabelName: "testAgent", VersionLabelName: "8.0.1"},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
},
},
request: reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: "test",
Name: "testAgent",
},
},
want: reconcile.Result{},
expected: agentv1alpha1.Agent{
ObjectMeta: metav1.ObjectMeta{
Name: "testAgent",
Namespace: "test",
Generation: 2,
},
Spec: agentv1alpha1.AgentSpec{
Version: "8.0.1",
Deployment: &agentv1alpha1.DeploymentSpec{
Replicas: pointer.Int32(1),
},
},
Status: agentv1alpha1.AgentStatus{
Version: "8.0.1",
ExpectedNodes: 1,
AvailableNodes: 1,
ObservedGeneration: 2,
Health: agentv1alpha1.AgentGreenHealth,
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := newReconcileAgent(tt.objs...)
got, err := r.Reconcile(context.Background(), tt.request)
if (err != nil) != tt.wantErr {
t.Errorf("ReconcileAgent.Reconcile() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ReconcileAgent.Reconcile() = %v, want %v", got, tt.want)
}

var agent agentv1alpha1.Agent
if err := r.Client.Get(context.Background(), tt.request.NamespacedName, &agent); err != nil {
t.Error(err)
return
}
// AllowUnexported required because of *AssocConf on the agent.
comparison.AssertEqual(t, &agent, &tt.expected, cmp.AllowUnexported(agentv1alpha1.Agent{}))
})
}
}

func addLabel(labels map[string]string, key, value string) map[string]string {
newLabels := make(map[string]string, len(labels))
for k, v := range labels {
newLabels[k] = v
}
newLabels[key] = value
return newLabels
}
Loading

0 comments on commit c4af92c

Please sign in to comment.