Skip to content

Commit

Permalink
Add Certificates Refresh Controller - Control Plane (#58)
Browse files Browse the repository at this point in the history
This commit implements a Certificates Controller which allows to refresh certificates in control plane nodes.
  • Loading branch information
mateoflorido authored Sep 30, 2024
1 parent 490f0b6 commit 353cfa5
Show file tree
Hide file tree
Showing 9 changed files with 415 additions and 5 deletions.
11 changes: 11 additions & 0 deletions bootstrap/api/v1beta2/certificates_refresh_consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package v1beta2

const (
CertificatesRefreshAnnotation = "v1beta2.k8sd.io/refresh-certificates"
)

const (
CertificatesRefreshInProgressEvent = "CertificatesRefreshInProgress"
CertificatesRefreshDoneEvent = "CertificatesRefreshDone"
CertificatesRefreshFailedEvent = "CertificatesRefreshFailed"
)
5 changes: 5 additions & 0 deletions bootstrap/api/v1beta2/machine_consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package v1beta2

const (
MachineCertificatesExpiryDateAnnotation = "machine.cluster.x-k8s.io/certificates-expiry"
)
247 changes: 247 additions & 0 deletions bootstrap/controllers/certificates_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
package controllers

import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
bsutil "sigs.k8s.io/cluster-api/bootstrap/util"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

bootstrapv1 "github.com/canonical/cluster-api-k8s/bootstrap/api/v1beta2"
"github.com/canonical/cluster-api-k8s/pkg/ck8s"
utiltime "github.com/canonical/cluster-api-k8s/pkg/time"
"github.com/canonical/cluster-api-k8s/pkg/token"
)

// CertificatesReconciler reconciles a Machine's certificates.
type CertificatesReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
recorder record.EventRecorder

K8sdDialTimeout time.Duration

managementCluster ck8s.ManagementCluster
}

// SetupWithManager sets up the controller with the Manager.
func (r *CertificatesReconciler) SetupWithManager(mgr ctrl.Manager) error {
if _, err := ctrl.NewControllerManagedBy(mgr).For(&clusterv1.Machine{}).Build(r); err != nil {
return err
}

r.Scheme = mgr.GetScheme()
r.recorder = mgr.GetEventRecorderFor("ck8s-certificates-controller")

if r.managementCluster == nil {
r.managementCluster = &ck8s.Management{
Client: r.Client,
K8sdDialTimeout: r.K8sdDialTimeout,
}
}
return nil
}

type CertificatesScope struct {
Cluster *clusterv1.Cluster
Config *bootstrapv1.CK8sConfig
Log logr.Logger
Machine *clusterv1.Machine
Patcher *patch.Helper
Workload *ck8s.Workload
}

// +kubebuilder:rbac:groups=bootstrap.cluster.x-k8s.io,resources=ck8sconfigs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=bootstrap.cluster.x-k8s.io,resources=ck8sconfigs/status,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=clusters;clusters/status;machines;machines/status,verbs=get;list;watch
// +kubebuilder:rbac:groups=exp.cluster.x-k8s.io,resources=machinepools;machinepools/status,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=secrets;events;configmaps,verbs=get;list;watch;create;update;patch;delete

func (r *CertificatesReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("namespace", req.Namespace, "machine", req.Name)

m := &clusterv1.Machine{}
if err := r.Get(ctx, req.NamespacedName, m); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
return ctrl.Result{}, err
}

if !m.ObjectMeta.DeletionTimestamp.IsZero() {
// Machine is being deleted, return early.
return ctrl.Result{}, nil
}

mAnnotations := m.GetAnnotations()

var refreshCertificates, hasExpiryDateAnnotation bool
_, refreshCertificates = mAnnotations[bootstrapv1.CertificatesRefreshAnnotation]
_, hasExpiryDateAnnotation = mAnnotations[bootstrapv1.MachineCertificatesExpiryDateAnnotation]
if !refreshCertificates && hasExpiryDateAnnotation {
// No need to refresh certificates or update expiry date, return early.
return ctrl.Result{}, nil
}

// Look up for the CK8sConfig.
config := &bootstrapv1.CK8sConfig{}
if err := r.Client.Get(ctx, types.NamespacedName{Namespace: m.Namespace, Name: m.Spec.Bootstrap.ConfigRef.Name}, config); err != nil {
return ctrl.Result{}, err
}

// Get the owner of the CK8sConfig to determine if it's a control plane or worker node.
configOwner, err := bsutil.GetConfigOwner(ctx, r.Client, config)
if err != nil {
log.Error(err, "Failed to get config owner")
return ctrl.Result{}, err
}
if configOwner == nil {
return ctrl.Result{}, nil
}

cluster, err := util.GetClusterByName(ctx, r.Client, m.GetNamespace(), m.Spec.ClusterName)
if err != nil {
return ctrl.Result{}, err
}

microclusterPort := config.Spec.ControlPlaneConfig.GetMicroclusterPort()
workload, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster), microclusterPort)
if err != nil {
return ctrl.Result{}, err
}

patchHelper, err := patch.NewHelper(m, r.Client)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to create patch helper for machine: %w", err)
}

scope := &CertificatesScope{
Log: log,
Machine: m,
Config: config,
Cluster: cluster,
Patcher: patchHelper,
Workload: workload,
}

if !hasExpiryDateAnnotation {
if err := r.updateExpiryDateAnnotation(ctx, scope); err != nil {
return ctrl.Result{}, err
}
}

if refreshCertificates {
if configOwner.IsControlPlaneMachine() {
if err := r.refreshControlPlaneCertificates(ctx, scope); err != nil {
return ctrl.Result{}, err
}
} else {
log.Info("worker nodes are not supported yet")
return ctrl.Result{}, nil
}
}

return ctrl.Result{}, nil
}

func (r *CertificatesReconciler) refreshControlPlaneCertificates(ctx context.Context, scope *CertificatesScope) error {
nodeToken, err := token.LookupNodeToken(ctx, r.Client, util.ObjectKey(scope.Cluster), scope.Machine.Name)
if err != nil {
return fmt.Errorf("failed to lookup node token: %w", err)
}

mAnnotations := scope.Machine.GetAnnotations()

refreshAnnotation, ok := mAnnotations[bootstrapv1.CertificatesRefreshAnnotation]
if !ok {
return nil
}

r.recorder.Eventf(
scope.Machine,
corev1.EventTypeNormal,
bootstrapv1.CertificatesRefreshInProgressEvent,
"Certificates refresh in progress. TTL: %s", refreshAnnotation,
)

seconds, err := utiltime.TTLToSeconds(refreshAnnotation)
if err != nil {
return fmt.Errorf("failed to parse expires-in annotation value: %w", err)
}

controlPlaneConfig := scope.Config.Spec.ControlPlaneConfig
controlPlaneEndpoint := scope.Cluster.Spec.ControlPlaneEndpoint.Host

extraSANs := controlPlaneConfig.ExtraSANs
extraSANs = append(extraSANs, controlPlaneEndpoint)

expirySecondsUnix, err := scope.Workload.RefreshCertificates(ctx, scope.Machine, *nodeToken, seconds, extraSANs)
if err != nil {
r.recorder.Eventf(
scope.Machine,
corev1.EventTypeWarning,
bootstrapv1.CertificatesRefreshFailedEvent,
"Failed to refresh certificates: %v", err,
)
return fmt.Errorf("failed to refresh certificates: %w", err)
}

expiryTime := time.Unix(int64(expirySecondsUnix), 0)

delete(mAnnotations, bootstrapv1.CertificatesRefreshAnnotation)
mAnnotations[bootstrapv1.MachineCertificatesExpiryDateAnnotation] = expiryTime.Format(time.RFC3339)
scope.Machine.SetAnnotations(mAnnotations)
if err := scope.Patcher.Patch(ctx, scope.Machine); err != nil {
return fmt.Errorf("failed to patch machine annotations: %w", err)
}

r.recorder.Eventf(
scope.Machine,
corev1.EventTypeNormal,
bootstrapv1.CertificatesRefreshDoneEvent,
"Certificates refreshed, will expire at %s", expiryTime,
)

scope.Log.Info("Certificates refreshed",
"cluster", scope.Cluster.Name,
"machine", scope.Machine.Name,
"expiry", expiryTime.Format(time.RFC3339),
)

return nil
}

func (r *CertificatesReconciler) updateExpiryDateAnnotation(ctx context.Context, scope *CertificatesScope) error {
nodeToken, err := token.LookupNodeToken(ctx, r.Client, util.ObjectKey(scope.Cluster), scope.Machine.Name)
if err != nil {
return fmt.Errorf("failed to lookup node token: %w", err)
}

mAnnotations := scope.Machine.GetAnnotations()

expiryDateString, err := scope.Workload.GetCertificatesExpiryDate(ctx, scope.Machine, *nodeToken)
if err != nil {
return fmt.Errorf("failed to get certificates expiry date: %w", err)
}

mAnnotations[bootstrapv1.MachineCertificatesExpiryDateAnnotation] = expiryDateString
scope.Machine.SetAnnotations(mAnnotations)
if err := scope.Patcher.Patch(ctx, scope.Machine); err != nil {
return fmt.Errorf("failed to patch machine annotations: %w", err)
}

return nil
}
9 changes: 9 additions & 0 deletions bootstrap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,15 @@ func main() {
os.Exit(1)
}

if err = (&controllers.CertificatesReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Certificates"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Certificates")
os.Exit(1)
}

if os.Getenv("ENABLE_WEBHOOKS") != "false" {
if err = (&bootstrapv1.CK8sConfig{}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "CK8sConfig")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/canonical/cluster-api-k8s
go 1.22.6

require (
github.com/canonical/k8s-snap-api v1.0.7
github.com/canonical/k8s-snap-api v1.0.8
github.com/go-logr/logr v1.4.1
github.com/google/uuid v1.4.0
github.com/onsi/ginkgo v1.16.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
github.com/canonical/k8s-snap-api v1.0.7 h1:40qz+9IcV90ZN/wTMuOraZcuqoyRHaJck1J3c7FcWrQ=
github.com/canonical/k8s-snap-api v1.0.7/go.mod h1:LDPoIYCeYnfgOFrwVPJ/4edGU264w7BB7g0GsVi36AY=
github.com/canonical/k8s-snap-api v1.0.8 h1:W360Y4ulkAdCdQqbfQ7zXs3/Ty8SWENO3/Bzz8ZAEPE=
github.com/canonical/k8s-snap-api v1.0.8/go.mod h1:LDPoIYCeYnfgOFrwVPJ/4edGU264w7BB7g0GsVi36AY=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand Down
7 changes: 5 additions & 2 deletions pkg/ck8s/manifests/k8sd-proxy-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ spec:
name: k8sd-proxy-config
key: k8sd-port
args:
- TCP4-LISTEN:2380,fork,reuseaddr
- TCP4:$(HOSTIP):$(K8SD_PORT)
# socat was closing the connection after 0.5s of inactivity, some
# queries were taking longer than that.
- -t 5
- TCP4-LISTEN:2380,fork,reuseaddr,nodelay
- TCP4:$(HOSTIP):$(K8SD_PORT),nodelay
resources:
limits:
memory: 200Mi
Expand Down
57 changes: 57 additions & 0 deletions pkg/ck8s/workload_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,14 @@ func (w *Workload) GetK8sdProxyForControlPlane(ctx context.Context, options k8sd

// GetK8sdProxyForMachine returns a k8sd proxy client for the machine.
func (w *Workload) GetK8sdProxyForMachine(ctx context.Context, machine *clusterv1.Machine) (*K8sdClient, error) {
if machine == nil {
return nil, fmt.Errorf("machine object is nil")
}

if machine.Status.NodeRef == nil {
return nil, fmt.Errorf("machine %s has no node reference", machine.Name)
}

node := &corev1.Node{}
if err := w.Client.Get(ctx, ctrlclient.ObjectKey{Name: machine.Status.NodeRef.Name}, node); err != nil {
return nil, fmt.Errorf("failed to get node: %w", err)
Expand All @@ -215,6 +223,55 @@ func (w *Workload) GetK8sdProxyForMachine(ctx context.Context, machine *clusterv
return w.K8sdClientGenerator.forNode(ctx, node)
}

func (w *Workload) GetCertificatesExpiryDate(ctx context.Context, machine *clusterv1.Machine, nodeToken string) (string, error) {
request := apiv1.CertificatesExpiryRequest{}
response := &apiv1.CertificatesExpiryResponse{}

header := map[string][]string{
"node-token": {nodeToken},
}
k8sdProxy, err := w.GetK8sdProxyForMachine(ctx, machine)
if err != nil {
return "", fmt.Errorf("failed to create k8sd proxy: %w", err)
}

if err := w.doK8sdRequest(ctx, k8sdProxy, http.MethodPost, "1.0/x/capi/certificates-expiry", header, request, response); err != nil {
return "", fmt.Errorf("failed to get certificates expiry date: %w", err)
}

return response.ExpiryDate, nil
}

func (w *Workload) RefreshCertificates(ctx context.Context, machine *clusterv1.Machine, nodeToken string, expirationSeconds int, extraSANs []string) (int, error) {
planRequest := apiv1.ClusterAPICertificatesPlanRequest{}
planResponse := &apiv1.ClusterAPICertificatesPlanResponse{}

header := map[string][]string{
"node-token": {nodeToken},
}

k8sdProxy, err := w.GetK8sdProxyForMachine(ctx, machine)
if err != nil {
return 0, fmt.Errorf("failed to create k8sd proxy: %w", err)
}

if err := w.doK8sdRequest(ctx, k8sdProxy, http.MethodPost, "1.0/x/capi/refresh-certs/plan", header, planRequest, planResponse); err != nil {
return 0, fmt.Errorf("failed to refresh certificates: %w", err)
}

runRequest := apiv1.ClusterAPICertificatesRunRequest{
ExpirationSeconds: expirationSeconds,
Seed: planResponse.Seed,
ExtraSANs: extraSANs,
}
runResponse := &apiv1.ClusterAPICertificatesRunResponse{}
if err := w.doK8sdRequest(ctx, k8sdProxy, http.MethodPost, "1.0/x/capi/refresh-certs/run", header, runRequest, runResponse); err != nil {
return 0, fmt.Errorf("failed to run refresh certificates: %w", err)
}

return runResponse.ExpirationSeconds, nil
}

func (w *Workload) RefreshMachine(ctx context.Context, machine *clusterv1.Machine, nodeToken string, upgradeOption string) (string, error) {
request := apiv1.SnapRefreshRequest{}
response := &apiv1.SnapRefreshResponse{}
Expand Down
Loading

0 comments on commit 353cfa5

Please sign in to comment.