From 1a0749933ef8d7e6df75de1f2d152404f623b78f Mon Sep 17 00:00:00 2001 From: Berkay Tekin Oz Date: Fri, 7 Jun 2024 10:54:23 +0000 Subject: [PATCH 1/9] Initial version for k8sd-proxy --- .github/workflows/sync-images.yaml | 23 +++ .../controllers/ck8sconfig_controller.go | 33 ++- hack/sync-images.sh | 11 + hack/tools/go.mod | 19 ++ hack/tools/go.sum | 40 ++++ hack/tools/regsync.sh | 8 + hack/tools/tools.go | 5 + hack/upstream-images.yaml | 9 + pkg/ck8s/k8sd-proxy.yaml | 56 +++++ pkg/ck8s/k8sd_proxy.go | 192 ++++++++++++++++++ pkg/proxy/dial.go | 2 +- 11 files changed, 388 insertions(+), 10 deletions(-) create mode 100644 .github/workflows/sync-images.yaml create mode 100755 hack/sync-images.sh create mode 100644 hack/tools/go.mod create mode 100644 hack/tools/go.sum create mode 100755 hack/tools/regsync.sh create mode 100644 hack/tools/tools.go create mode 100644 hack/upstream-images.yaml create mode 100644 pkg/ck8s/k8sd-proxy.yaml create mode 100644 pkg/ck8s/k8sd_proxy.go diff --git a/.github/workflows/sync-images.yaml b/.github/workflows/sync-images.yaml new file mode 100644 index 00000000..35a84bf8 --- /dev/null +++ b/.github/workflows/sync-images.yaml @@ -0,0 +1,23 @@ +name: Sync upstream images to ghcr.io + +on: + workflow_dispatch: + +jobs: + publish: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Install Go + uses: actions/setup-go@v5 + with: + go-version: "1.22" + + - name: Sync images + env: + USERNAME: ${{ github.actor }} + PASSWORD: ${{ secrets.GITHUB_TOKEN }} + run: | + ./hack/sync-images.sh \ No newline at end of file diff --git a/bootstrap/controllers/ck8sconfig_controller.go b/bootstrap/controllers/ck8sconfig_controller.go index a7ffcee8..600c6505 100644 --- a/bootstrap/controllers/ck8sconfig_controller.go +++ b/bootstrap/controllers/ck8sconfig_controller.go @@ -17,9 +17,11 @@ limitations under the License. package controllers import ( + "bytes" "context" "errors" "fmt" + "text/template" "time" "github.com/go-logr/logr" @@ -242,16 +244,29 @@ func (r *CK8sConfigReconciler) joinControlplane(ctx context.Context, scope *Scop return err } + var k8sdProxyDaemonset bytes.Buffer + + t, err := template.New("k8sd-proxy-daemonset").Parse(ck8s.K8sdProxyDaemonsetYaml) + if err != nil { + return err + } + + if err := t.Execute(&k8sdProxyDaemonset, struct { + K8sdPort int + }{ + K8sdPort: configStruct.ControlPlaneConfig.MicroclusterPort, + }); err != nil { + return err + } + // TODO(neoaggelos): figure out what is needed for k8sd proxy - // if scope.Config.Spec.IsEtcdEmbedded() { - // etcdProxyFile := bootstrapv1.File{ - // Path: etcd.EtcdProxyDaemonsetYamlLocation, - // Content: etcd.EtcdProxyDaemonsetYaml, - // Owner: "root:root", - // Permissions: "0640", - // } - // files = append(files, etcdProxyFile) - // } + k8sdProxyFile := bootstrapv1.File{ + Path: ck8s.K8sdProxyDaemonsetYamlLocation, + Content: k8sdProxyDaemonset.String(), + Owner: "root:root", + Permissions: "0400", + } + files = append(files, k8sdProxyFile) input := cloudinit.JoinControlPlaneInput{ BaseUserData: cloudinit.BaseUserData{ diff --git a/hack/sync-images.sh b/hack/sync-images.sh new file mode 100755 index 00000000..17e09c4c --- /dev/null +++ b/hack/sync-images.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +# Description: +# Sync images from upstream repositories under ghcr.io/canonical. +# +# Usage: +# $ USERNAME="$username" PASSWORD="$password" ./sync-images.sh + +DIR="$(realpath "$(dirname "${0}")")" + +"${DIR}/tools/regsync.sh" once -c "${DIR}/upstream-images.yaml" \ No newline at end of file diff --git a/hack/tools/go.mod b/hack/tools/go.mod new file mode 100644 index 00000000..321ce8c4 --- /dev/null +++ b/hack/tools/go.mod @@ -0,0 +1,19 @@ +module github.com/canonical/cluster-api-k8s/tools + +go 1.22.4 + +require github.com/regclient/regclient v0.6.1 + +require ( + github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/klauspost/compress v1.17.8 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + github.com/spf13/cobra v1.8.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + github.com/ulikunitz/xz v0.5.12 // indirect + golang.org/x/sys v0.20.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/hack/tools/go.sum b/hack/tools/go.sum new file mode 100644 index 00000000..42e110eb --- /dev/null +++ b/hack/tools/go.sum @@ -0,0 +1,40 @@ +github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7 h1:UhxFibDNY/bfvqU5CAUmr9zpesgbU6SWc8/B4mflAE4= +github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/olareg/olareg v0.1.0 h1:1dXBOgPrig5N7zoXyIZVQqU0QBo6sD9pbL6UYjY75CA= +github.com/olareg/olareg v0.1.0/go.mod h1:RBuU7JW7SoIIxZKzLRhq8sVtQeAHzCAtRrXEBx2KlM4= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/regclient/regclient v0.6.1 h1:4PxrGxMXrLpPrSaet8QZl568CVOolyHyukLL9UyogoU= +github.com/regclient/regclient v0.6.1/go.mod h1:hCKbRHYMx6LJntAhXzWVV7Oxyn9DzNVJoOKJaSnU5BM= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= +github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/ulikunitz/xz v0.5.12 h1:37Nm15o69RwBkXM0J6A5OlE67RZTfzUxTj8fB3dfcsc= +github.com/ulikunitz/xz v0.5.12/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/hack/tools/regsync.sh b/hack/tools/regsync.sh new file mode 100755 index 00000000..3c04a8c5 --- /dev/null +++ b/hack/tools/regsync.sh @@ -0,0 +1,8 @@ +#!/bin/bash -eu + +# Run regsync +TOOLS_DIR="$(realpath `dirname "${0}"`)" +( + cd "${TOOLS_DIR}" + go run github.com/regclient/regclient/cmd/regsync "${@}" +) \ No newline at end of file diff --git a/hack/tools/tools.go b/hack/tools/tools.go new file mode 100644 index 00000000..1c97d316 --- /dev/null +++ b/hack/tools/tools.go @@ -0,0 +1,5 @@ +package main + +import ( + _ "github.com/regclient/regclient/cmd/regsync" +) diff --git a/hack/upstream-images.yaml b/hack/upstream-images.yaml new file mode 100644 index 00000000..5d87a27d --- /dev/null +++ b/hack/upstream-images.yaml @@ -0,0 +1,9 @@ +version: 1 +creds: + - registry: ghcr.io + user: '{{ env "USERNAME" }}' + pass: '{{ env "PASSWORD" }}' +sync: + - source: alpine/socat:1.8.0.0 + target: ghcr.io/canonical/cluster-api-k8s/socat:1.8.0.0 + type: image \ No newline at end of file diff --git a/pkg/ck8s/k8sd-proxy.yaml b/pkg/ck8s/k8sd-proxy.yaml new file mode 100644 index 00000000..3ed22c45 --- /dev/null +++ b/pkg/ck8s/k8sd-proxy.yaml @@ -0,0 +1,56 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: k8sd-proxy-config + namespace: kube-system +data: + k8sd-port: "{{ .K8sdPort }}" +--- +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: k8sd-proxy + namespace: kube-system + labels: + app: k8sd-proxy +spec: + selector: + matchLabels: + app: k8sd-proxy + template: + metadata: + labels: + app: k8sd-proxy + spec: + tolerations: + - key: node-role.kubernetes.io/control-plane + operator: Exists + effect: NoSchedule + - key: node-role.kubernetes.io/master + operator: Exists + effect: NoSchedule + containers: + - name: k8sd-proxy + image: ghcr.io/canonical/cluster-api-k8s/socat:1.8.0.0 + env: + # TODO: Make this more robust by possibly finding/parsing the right IP. + # This works as a start but might not be sufficient as the kubelet IP might not match microcluster IP. + - name: HOSTIP + valueFrom: + fieldRef: + fieldPath: status.hostIP + - name: K8SD_PORT + valueFrom: + configMapKeyRef: + name: k8sd-proxy-config + key: k8sd-port + args: + - TCP4-LISTEN:2380,fork,reuseaddr + - TCP4:$(HOSTIP):$(K8SD_PORT) + resources: + limits: + memory: 200Mi + requests: + cpu: 100m + memory: 200Mi + terminationGracePeriodSeconds: 30 diff --git a/pkg/ck8s/k8sd_proxy.go b/pkg/ck8s/k8sd_proxy.go new file mode 100644 index 00000000..4b37a9b8 --- /dev/null +++ b/pkg/ck8s/k8sd_proxy.go @@ -0,0 +1,192 @@ +package ck8s + +import ( + "context" + "crypto/tls" + _ "embed" + "fmt" + "net/http" + "time" + + "github.com/canonical/cluster-api-k8s/pkg/proxy" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +const K8sdProxyDaemonsetYamlLocation = "/opt/capi/manifests/k8sd-proxy.yaml" + +//go:embed k8sd-proxy.yaml +var K8sdProxyDaemonsetYaml string + +type K8sdProxy struct { + nodeIP string + client *http.Client +} + +type K8sdProxyGenerator struct { + restConfig *rest.Config + clientset *kubernetes.Clientset + proxyClientTimeout time.Duration + k8sdPort int +} + +func NewK8sdProxyGenerator(restConfig *rest.Config, clientset *kubernetes.Clientset, proxyClientTimeout time.Duration, k8sdPort int) (*K8sdProxyGenerator, error) { + return &K8sdProxyGenerator{ + restConfig: restConfig, + clientset: clientset, + proxyClientTimeout: proxyClientTimeout, + k8sdPort: k8sdPort, + }, nil +} + +func (g *K8sdProxyGenerator) forNodeName(ctx context.Context, nodeName string) (*K8sdProxy, error) { + node, err := g.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return nil, errors.Wrap(err, "unable to get node in target cluster") + } + + return g.forNode(ctx, node) +} + +func (g *K8sdProxyGenerator) forNode(ctx context.Context, node *corev1.Node) (*K8sdProxy, error) { + podmap, err := g.getProxyPods(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get proxy pods: %w", err) + } + + podname, ok := podmap[node.Name] + if !ok { + return nil, fmt.Errorf("this node does not have a k8sd proxy pod") + } + + nodeInternalIP, err := g.getNodeInternalIP(node) + if err != nil { + return nil, fmt.Errorf("failed to get internal IP for node %s: %w", node.Name, err) + } + + client, err := g.NewHTTPClient(ctx, podname) + if err != nil { + return nil, err + } + + if err := g.checkIfK8sdIsReachable(ctx, client, nodeInternalIP); err != nil { + return nil, fmt.Errorf("failed to reach k8sd through proxy client: %w", err) + } + + return &K8sdProxy{ + nodeIP: nodeInternalIP, + client: client, + }, nil +} + +func (g *K8sdProxyGenerator) forControlPlane(ctx context.Context) (*K8sdProxy, error) { + cplaneNodes, err := g.getControlPlaneNodes(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get control plane nodes: %w", err) + } + + for _, node := range cplaneNodes.Items { + proxy, err := g.forNode(ctx, &node) + if err != nil { + continue + } + + return proxy, nil + } + + return nil, fmt.Errorf("failed to find a control plane node with a reachable k8sd proxy") +} + +func (g *K8sdProxyGenerator) checkIfK8sdIsReachable(ctx context.Context, client *http.Client, nodeIP string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://%s:%v/", nodeIP, g.k8sdPort), nil) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + res, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to reach k8sd through proxy client: %w", err) + } + res.Body.Close() + + return nil +} + +func (g *K8sdProxyGenerator) getNodeInternalIP(node *corev1.Node) (string, error) { + // TODO: Make this more robust by possibly finding/parsing the right IP. + // This works as a start but might not be sufficient as the kubelet IP might not match microcluster IP. + for _, addr := range node.Status.Addresses { + if addr.Type == "InternalIP" { + return addr.Address, nil + } + } + + return "", fmt.Errorf("unable to find internal IP for node %s", node.Name) +} + +func (g *K8sdProxyGenerator) getControlPlaneNodes(ctx context.Context) (*corev1.NodeList, error) { + nodes, err := g.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/control-plane="}) + if err != nil { + return nil, errors.Wrap(err, "unable to list nodes in target cluster") + } + + if len(nodes.Items) == 0 { + return nil, errors.New("there isn't any nodes registered in target cluster") + } + + return nodes, nil +} + +func (g *K8sdProxyGenerator) getProxyPods(ctx context.Context) (map[string]string, error) { + pods, err := g.clientset.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{LabelSelector: "app=k8sd-proxy"}) + if err != nil { + return nil, errors.Wrap(err, "unable to list k8sd-proxy pods in target cluster") + } + + if len(pods.Items) == 0 { + return nil, errors.New("there isn't any k8sd-proxy pods in target cluster") + } + + podmap := make(map[string]string, len(pods.Items)) + for _, pod := range pods.Items { + podmap[pod.Spec.NodeName] = pod.Name + } + + return podmap, nil +} + +func (g *K8sdProxyGenerator) NewHTTPClient(ctx context.Context, podName string) (*http.Client, error) { + p := proxy.Proxy{ + Kind: "pods", + Namespace: metav1.NamespaceSystem, + ResourceName: podName, + KubeConfig: g.restConfig, + Port: 2380, + } + + dialer, err := proxy.NewDialer(p) + if err != nil { + return nil, fmt.Errorf("failed to create proxy dialer: %w", err) + } + + // We return a http client with the same parameters as http.DefaultClient + // and an overridden DialContext to proxy the requests through api server. + return &http.Client{ + Transport: &http.Transport{ + Proxy: http.DefaultTransport.(*http.Transport).Proxy, + DialContext: dialer.DialContext, + ForceAttemptHTTP2: http.DefaultTransport.(*http.Transport).ForceAttemptHTTP2, + MaxIdleConns: http.DefaultTransport.(*http.Transport).MaxIdleConns, + IdleConnTimeout: http.DefaultTransport.(*http.Transport).IdleConnTimeout, + TLSHandshakeTimeout: http.DefaultTransport.(*http.Transport).TLSHandshakeTimeout, + ExpectContinueTimeout: http.DefaultTransport.(*http.Transport).ExpectContinueTimeout, + // TODO: Workaround for now, address later on + // get the certificate fingerprint from the matching node through a resource in the cluster (TBD), and validate it in the TLSClientConfig + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + Timeout: g.proxyClientTimeout, + }, nil +} diff --git a/pkg/proxy/dial.go b/pkg/proxy/dial.go index e4c1024f..d3d80729 100644 --- a/pkg/proxy/dial.go +++ b/pkg/proxy/dial.go @@ -89,7 +89,7 @@ func (d *Dialer) DialContext(_ context.Context, _ string, addr string) (net.Conn Post(). Resource(d.proxy.Kind). Namespace(d.proxy.Namespace). - Name(addr). + Name(d.proxy.ResourceName). SubResource("portforward") dialer := spdy.NewDialer(d.upgrader, &http.Client{Transport: d.proxyTransport}, "POST", req.URL()) From 54020342869ca1654ea2c48c07be5c1ba243a172 Mon Sep 17 00:00:00 2001 From: Berkay Tekin Oz Date: Thu, 13 Jun 2024 20:32:04 +0000 Subject: [PATCH 2/9] Addressing comments, initial tests to reach k8sd --- .github/workflows/sync-images.yaml | 2 +- .../controllers/ck8sconfig_controller.go | 69 +++++++------- .../ck8scontrolplane_controller.go | 34 +++++-- .../controllers/machine_controller.go | 3 +- controlplane/main.go | 16 +--- hack/sync-images.sh | 2 +- hack/tools/regsync.sh | 2 +- hack/upstream-images.yaml | 2 +- pkg/ck8s/k8sd_proxy.go | 95 ++++--------------- pkg/ck8s/management_cluster.go | 16 ++-- pkg/ck8s/workload_cluster.go | 51 +++++++++- 11 files changed, 152 insertions(+), 140 deletions(-) diff --git a/.github/workflows/sync-images.yaml b/.github/workflows/sync-images.yaml index 35a84bf8..f07bdb91 100644 --- a/.github/workflows/sync-images.yaml +++ b/.github/workflows/sync-images.yaml @@ -20,4 +20,4 @@ jobs: USERNAME: ${{ github.actor }} PASSWORD: ${{ secrets.GITHUB_TOKEN }} run: | - ./hack/sync-images.sh \ No newline at end of file + ./hack/sync-images.sh diff --git a/bootstrap/controllers/ck8sconfig_controller.go b/bootstrap/controllers/ck8sconfig_controller.go index 600c6505..96b4faa9 100644 --- a/bootstrap/controllers/ck8sconfig_controller.go +++ b/bootstrap/controllers/ck8sconfig_controller.go @@ -244,29 +244,16 @@ func (r *CK8sConfigReconciler) joinControlplane(ctx context.Context, scope *Scop return err } - var k8sdProxyDaemonset bytes.Buffer - - t, err := template.New("k8sd-proxy-daemonset").Parse(ck8s.K8sdProxyDaemonsetYaml) - if err != nil { - return err - } - - if err := t.Execute(&k8sdProxyDaemonset, struct { - K8sdPort int - }{ - K8sdPort: configStruct.ControlPlaneConfig.MicroclusterPort, - }); err != nil { - return err - } - // TODO(neoaggelos): figure out what is needed for k8sd proxy - k8sdProxyFile := bootstrapv1.File{ - Path: ck8s.K8sdProxyDaemonsetYamlLocation, - Content: k8sdProxyDaemonset.String(), - Owner: "root:root", - Permissions: "0400", - } - files = append(files, k8sdProxyFile) + // if scope.Config.Spec.IsEtcdEmbedded() { + // etcdProxyFile := bootstrapv1.File{ + // Path: etcd.EtcdProxyDaemonsetYamlLocation, + // Content: etcd.EtcdProxyDaemonsetYaml, + // Owner: "root:root", + // Permissions: "0640", + // } + // files = append(files, etcdProxyFile) + // } input := cloudinit.JoinControlPlaneInput{ BaseUserData: cloudinit.BaseUserData{ @@ -476,16 +463,34 @@ func (r *CK8sConfigReconciler) handleClusterNotInitialized(ctx context.Context, return ctrl.Result{}, err } - // TODO(neoaggelos): deploy k8sd-proxy daemonsets - // if scope.Config.Spec.IsK8sDqlite() { - // etcdProxyFile := bootstrapv1.File{ - // Path: etcd.EtcdProxyDaemonsetYamlLocation, - // Content: etcd.EtcdProxyDaemonsetYaml, - // Owner: "root:root", - // Permissions: "0640", - // } - // files = append(files, etcdProxyFile) - // } + var microclusterPort int + microclusterPort = scope.Config.Spec.ControlPlaneConfig.MicroclusterPort + if microclusterPort == 0 { + microclusterPort = 2380 + } + + var k8sdProxyDaemonset bytes.Buffer + + t, err := template.New("k8sd-proxy-daemonset").Parse(ck8s.K8sdProxyDaemonsetYaml) + if err != nil { + return ctrl.Result{}, err + } + + if err := t.Execute(&k8sdProxyDaemonset, struct { + K8sdPort int + }{ + K8sdPort: microclusterPort, + }); err != nil { + return ctrl.Result{}, err + } + + k8sdProxyFile := bootstrapv1.File{ + Path: ck8s.K8sdProxyDaemonsetYamlLocation, + Content: k8sdProxyDaemonset.String(), + Owner: "root:root", + Permissions: "0400", + } + files = append(files, k8sdProxyFile) cpinput := cloudinit.InitControlPlaneInput{ BaseUserData: cloudinit.BaseUserData{ diff --git a/controlplane/controllers/ck8scontrolplane_controller.go b/controlplane/controllers/ck8scontrolplane_controller.go index e8a74daf..6de77f6b 100644 --- a/controlplane/controllers/ck8scontrolplane_controller.go +++ b/controlplane/controllers/ck8scontrolplane_controller.go @@ -61,8 +61,7 @@ type CK8sControlPlaneReconciler struct { controller controller.Controller recorder record.EventRecorder - EtcdDialTimeout time.Duration - EtcdCallTimeout time.Duration + K8sdDialTimeout time.Duration managementCluster ck8s.ManagementCluster managementClusterUncached ck8s.ManagementCluster @@ -153,6 +152,31 @@ func (r *CK8sControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Req } } + var microclusterPort int + microclusterPort = kcp.Spec.CK8sConfigSpec.ControlPlaneConfig.MicroclusterPort + if microclusterPort == 0 { + microclusterPort = 2380 + } + + w, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster)) + if err != nil { + logger.Error(err, "failed to get workload cluster") + } + + proxy, err := w.GetK8sdProxyForControlPlane(ctx) + if err != nil { + logger.Error(err, "failed to get k8sd proxy for control plane") + } + + if proxy != nil { + err = ck8s.CheckIfK8sdIsReachable(ctx, proxy.Client, proxy.NodeIP, microclusterPort) + if err != nil { + logger.Error(err, "failed to reach k8sd") + } else { + logger.Info("k8sd is reachable") + } + } + // Always attempt to Patch the CK8sControlPlane object and status after each reconciliation. if patchErr := patchCK8sControlPlane(ctx, patchHelper, kcp); patchErr != nil { logger.Error(err, "Failed to patch CK8sControlPlane") @@ -294,16 +318,14 @@ func (r *CK8sControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr c if r.managementCluster == nil { r.managementCluster = &ck8s.Management{ Client: r.Client, - EtcdDialTimeout: r.EtcdDialTimeout, - EtcdCallTimeout: r.EtcdCallTimeout, + K8sdDialTimeout: r.K8sdDialTimeout, } } if r.managementClusterUncached == nil { r.managementClusterUncached = &ck8s.Management{ Client: mgr.GetAPIReader(), - EtcdDialTimeout: r.EtcdDialTimeout, - EtcdCallTimeout: r.EtcdCallTimeout, + K8sdDialTimeout: r.K8sdDialTimeout, } } diff --git a/controlplane/controllers/machine_controller.go b/controlplane/controllers/machine_controller.go index 5f72f19b..f8d7f3ec 100644 --- a/controlplane/controllers/machine_controller.go +++ b/controlplane/controllers/machine_controller.go @@ -22,8 +22,7 @@ type MachineReconciler struct { Log logr.Logger Scheme *runtime.Scheme - EtcdDialTimeout time.Duration - EtcdCallTimeout time.Duration + K8sdDialTimeout time.Duration // NOTE(neoaggelos): See note below /** diff --git a/controlplane/main.go b/controlplane/main.go index 6648d8f2..c673db2c 100644 --- a/controlplane/main.go +++ b/controlplane/main.go @@ -56,8 +56,7 @@ func main() { var metricsAddr string var enableLeaderElection bool var syncPeriod time.Duration - var etcdDialTimeout time.Duration - var etcdCallTimeout time.Duration + var k8sdDialTimeout time.Duration flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, @@ -67,11 +66,8 @@ func main() { flag.DurationVar(&syncPeriod, "sync-period", 10*time.Minute, "The minimum interval at which watched resources are reconciled (e.g. 15m)") - flag.DurationVar(&etcdDialTimeout, "etcd-dial-timeout-duration", 10*time.Second, - "Duration that the etcd client waits at most to establish a connection with etcd") - - flag.DurationVar(&etcdCallTimeout, "etcd-call-timeout-duration", 15*time.Second, - "Duration that the etcd client waits at most for read and write operations to etcd.") + flag.DurationVar(&k8sdDialTimeout, "k8sd-dial-timeout-duration", 10*time.Second, + "Duration that the proxy client waits at most to establish a connection with k8sd") flag.Parse() @@ -103,8 +99,7 @@ func main() { Client: mgr.GetClient(), Log: ctrPlaneLogger, Scheme: mgr.GetScheme(), - EtcdDialTimeout: etcdDialTimeout, - EtcdCallTimeout: etcdCallTimeout, + K8sdDialTimeout: k8sdDialTimeout, }).SetupWithManager(ctx, mgr, &ctrPlaneLogger); err != nil { setupLog.Error(err, "unable to create controller", "controller", "CK8sControlPlane") os.Exit(1) @@ -115,8 +110,7 @@ func main() { Client: mgr.GetClient(), Log: ctrMachineLogger, Scheme: mgr.GetScheme(), - EtcdDialTimeout: etcdDialTimeout, - EtcdCallTimeout: etcdCallTimeout, + K8sdDialTimeout: k8sdDialTimeout, }).SetupWithManager(ctx, mgr, &ctrMachineLogger); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Machine") os.Exit(1) diff --git a/hack/sync-images.sh b/hack/sync-images.sh index 17e09c4c..4fdc0b54 100755 --- a/hack/sync-images.sh +++ b/hack/sync-images.sh @@ -8,4 +8,4 @@ DIR="$(realpath "$(dirname "${0}")")" -"${DIR}/tools/regsync.sh" once -c "${DIR}/upstream-images.yaml" \ No newline at end of file +"${DIR}/tools/regsync.sh" once -c "${DIR}/upstream-images.yaml" diff --git a/hack/tools/regsync.sh b/hack/tools/regsync.sh index 3c04a8c5..015f21e5 100755 --- a/hack/tools/regsync.sh +++ b/hack/tools/regsync.sh @@ -5,4 +5,4 @@ TOOLS_DIR="$(realpath `dirname "${0}"`)" ( cd "${TOOLS_DIR}" go run github.com/regclient/regclient/cmd/regsync "${@}" -) \ No newline at end of file +) diff --git a/hack/upstream-images.yaml b/hack/upstream-images.yaml index 5d87a27d..9209b859 100644 --- a/hack/upstream-images.yaml +++ b/hack/upstream-images.yaml @@ -6,4 +6,4 @@ creds: sync: - source: alpine/socat:1.8.0.0 target: ghcr.io/canonical/cluster-api-k8s/socat:1.8.0.0 - type: image \ No newline at end of file + type: image diff --git a/pkg/ck8s/k8sd_proxy.go b/pkg/ck8s/k8sd_proxy.go index 4b37a9b8..500b87dd 100644 --- a/pkg/ck8s/k8sd_proxy.go +++ b/pkg/ck8s/k8sd_proxy.go @@ -16,33 +16,36 @@ import ( "k8s.io/client-go/rest" ) -const K8sdProxyDaemonsetYamlLocation = "/opt/capi/manifests/k8sd-proxy.yaml" +const K8sdProxyDaemonsetYamlLocation = "/capi/manifests/k8sd-proxy.yaml" //go:embed k8sd-proxy.yaml var K8sdProxyDaemonsetYaml string type K8sdProxy struct { - nodeIP string - client *http.Client + NodeIP string + Client *http.Client } -type K8sdProxyGenerator struct { +type k8sdProxyGenerator struct { restConfig *rest.Config clientset *kubernetes.Clientset proxyClientTimeout time.Duration - k8sdPort int } -func NewK8sdProxyGenerator(restConfig *rest.Config, clientset *kubernetes.Clientset, proxyClientTimeout time.Duration, k8sdPort int) (*K8sdProxyGenerator, error) { - return &K8sdProxyGenerator{ +func NewK8sdProxyGenerator(restConfig *rest.Config, proxyClientTimeout time.Duration) (*k8sdProxyGenerator, error) { + clientset, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("failed to create clientset: %w", err) + } + + return &k8sdProxyGenerator{ restConfig: restConfig, clientset: clientset, proxyClientTimeout: proxyClientTimeout, - k8sdPort: k8sdPort, }, nil } -func (g *K8sdProxyGenerator) forNodeName(ctx context.Context, nodeName string) (*K8sdProxy, error) { +func (g *k8sdProxyGenerator) forNodeName(ctx context.Context, nodeName string) (*K8sdProxy, error) { node, err := g.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) if err != nil { return nil, errors.Wrap(err, "unable to get node in target cluster") @@ -51,7 +54,7 @@ func (g *K8sdProxyGenerator) forNodeName(ctx context.Context, nodeName string) ( return g.forNode(ctx, node) } -func (g *K8sdProxyGenerator) forNode(ctx context.Context, node *corev1.Node) (*K8sdProxy, error) { +func (g *k8sdProxyGenerator) forNode(ctx context.Context, node *corev1.Node) (*K8sdProxy, error) { podmap, err := g.getProxyPods(ctx) if err != nil { return nil, fmt.Errorf("failed to get proxy pods: %w", err) @@ -62,7 +65,7 @@ func (g *K8sdProxyGenerator) forNode(ctx context.Context, node *corev1.Node) (*K return nil, fmt.Errorf("this node does not have a k8sd proxy pod") } - nodeInternalIP, err := g.getNodeInternalIP(node) + nodeInternalIP, err := getNodeInternalIP(node) if err != nil { return nil, fmt.Errorf("failed to get internal IP for node %s: %w", node.Name, err) } @@ -72,75 +75,13 @@ func (g *K8sdProxyGenerator) forNode(ctx context.Context, node *corev1.Node) (*K return nil, err } - if err := g.checkIfK8sdIsReachable(ctx, client, nodeInternalIP); err != nil { - return nil, fmt.Errorf("failed to reach k8sd through proxy client: %w", err) - } - return &K8sdProxy{ - nodeIP: nodeInternalIP, - client: client, + NodeIP: nodeInternalIP, + Client: client, }, nil } -func (g *K8sdProxyGenerator) forControlPlane(ctx context.Context) (*K8sdProxy, error) { - cplaneNodes, err := g.getControlPlaneNodes(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get control plane nodes: %w", err) - } - - for _, node := range cplaneNodes.Items { - proxy, err := g.forNode(ctx, &node) - if err != nil { - continue - } - - return proxy, nil - } - - return nil, fmt.Errorf("failed to find a control plane node with a reachable k8sd proxy") -} - -func (g *K8sdProxyGenerator) checkIfK8sdIsReachable(ctx context.Context, client *http.Client, nodeIP string) error { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://%s:%v/", nodeIP, g.k8sdPort), nil) - if err != nil { - return fmt.Errorf("failed to create request: %w", err) - } - - res, err := client.Do(req) - if err != nil { - return fmt.Errorf("failed to reach k8sd through proxy client: %w", err) - } - res.Body.Close() - - return nil -} - -func (g *K8sdProxyGenerator) getNodeInternalIP(node *corev1.Node) (string, error) { - // TODO: Make this more robust by possibly finding/parsing the right IP. - // This works as a start but might not be sufficient as the kubelet IP might not match microcluster IP. - for _, addr := range node.Status.Addresses { - if addr.Type == "InternalIP" { - return addr.Address, nil - } - } - - return "", fmt.Errorf("unable to find internal IP for node %s", node.Name) -} - -func (g *K8sdProxyGenerator) getControlPlaneNodes(ctx context.Context) (*corev1.NodeList, error) { - nodes, err := g.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/control-plane="}) - if err != nil { - return nil, errors.Wrap(err, "unable to list nodes in target cluster") - } - - if len(nodes.Items) == 0 { - return nil, errors.New("there isn't any nodes registered in target cluster") - } - - return nodes, nil -} - -func (g *K8sdProxyGenerator) getProxyPods(ctx context.Context) (map[string]string, error) { +func (g *k8sdProxyGenerator) getProxyPods(ctx context.Context) (map[string]string, error) { pods, err := g.clientset.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{LabelSelector: "app=k8sd-proxy"}) if err != nil { return nil, errors.Wrap(err, "unable to list k8sd-proxy pods in target cluster") @@ -158,7 +99,7 @@ func (g *K8sdProxyGenerator) getProxyPods(ctx context.Context) (map[string]strin return podmap, nil } -func (g *K8sdProxyGenerator) NewHTTPClient(ctx context.Context, podName string) (*http.Client, error) { +func (g *k8sdProxyGenerator) NewHTTPClient(ctx context.Context, podName string) (*http.Client, error) { p := proxy.Proxy{ Kind: "pods", Namespace: metav1.NamespaceSystem, diff --git a/pkg/ck8s/management_cluster.go b/pkg/ck8s/management_cluster.go index 240a6be2..cc8b53b4 100644 --- a/pkg/ck8s/management_cluster.go +++ b/pkg/ck8s/management_cluster.go @@ -26,11 +26,7 @@ type Management struct { Client client.Reader - // NOTE(neoaggelos): These are used as timeouts when interacting with the etcd of the workload cluster. - // - // TODO(neoaggelos): Replace these with timeouts for interacting with the k8sd proxy instances of the nodes. - EtcdDialTimeout time.Duration - EtcdCallTimeout time.Duration + K8sdDialTimeout time.Duration } // RemoteClusterConnectionError represents a failure to connect to a remote cluster. @@ -86,8 +82,16 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O return nil, &RemoteClusterConnectionError{Name: clusterKey.String(), Err: err} } + g, err := NewK8sdProxyGenerator(restConfig, m.K8sdDialTimeout) + if err != nil { + return nil, err + } + workload := &Workload{ - Client: c, + Client: c, + ClientRestConfig: restConfig, + K8sdProxyGenerator: g, + /** CoreDNSMigrator: &CoreDNSMigrator{}, **/ diff --git a/pkg/ck8s/workload_cluster.go b/pkg/ck8s/workload_cluster.go index c1761335..adeeff46 100644 --- a/pkg/ck8s/workload_cluster.go +++ b/pkg/ck8s/workload_cluster.go @@ -3,6 +3,7 @@ package ck8s import ( "context" "fmt" + "net/http" "strings" "github.com/pkg/errors" @@ -62,8 +63,9 @@ type WorkloadCluster interface { type Workload struct { WorkloadCluster - Client ctrlclient.Client - ClientRestConfig *rest.Config + Client ctrlclient.Client + ClientRestConfig *rest.Config + K8sdProxyGenerator *k8sdProxyGenerator // NOTE(neoaggelos): CoreDNSMigrator and etcdClientGenerator are used by upstream to reach and manage the services in the workload cluster // TODO(neoaggelos): Replace them with a k8sdProxyClientGenerator. @@ -153,6 +155,51 @@ func nodeHasUnreachableTaint(node corev1.Node) bool { return false } +func CheckIfK8sdIsReachable(ctx context.Context, client *http.Client, nodeIP string, k8sdPort int) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://%s:%v/", nodeIP, k8sdPort), nil) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + res, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to reach k8sd through proxy client: %w", err) + } + res.Body.Close() + + return nil +} + +func getNodeInternalIP(node *corev1.Node) (string, error) { + // TODO: Make this more robust by possibly finding/parsing the right IP. + // This works as a start but might not be sufficient as the kubelet IP might not match microcluster IP. + for _, addr := range node.Status.Addresses { + if addr.Type == "InternalIP" { + return addr.Address, nil + } + } + + return "", fmt.Errorf("unable to find internal IP for node %s", node.Name) +} + +func (w *Workload) GetK8sdProxyForControlPlane(ctx context.Context) (*K8sdProxy, error) { + cplaneNodes, err := w.getControlPlaneNodes(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get control plane nodes: %w", err) + } + + for _, node := range cplaneNodes.Items { + proxy, err := w.K8sdProxyGenerator.forNode(ctx, &node) + if err != nil { + continue + } + + return proxy, nil + } + + return nil, fmt.Errorf("failed to get k8sd proxy for control plane") +} + // UpdateAgentConditions is responsible for updating machine conditions reflecting the status of all the control plane // components. This operation is best effort, in the sense that in case // of problems in retrieving the pod status, it sets the condition to Unknown state without returning any error. From 15bd49d0aead15994e187b54390edaa7ff35f3bd Mon Sep 17 00:00:00 2001 From: Berkay Tekin Oz Date: Thu, 13 Jun 2024 20:35:13 +0000 Subject: [PATCH 3/9] Update error message and remove old comment --- bootstrap/controllers/ck8sconfig_controller.go | 11 ----------- pkg/ck8s/k8sd_proxy.go | 2 +- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/bootstrap/controllers/ck8sconfig_controller.go b/bootstrap/controllers/ck8sconfig_controller.go index 96b4faa9..03527d64 100644 --- a/bootstrap/controllers/ck8sconfig_controller.go +++ b/bootstrap/controllers/ck8sconfig_controller.go @@ -244,17 +244,6 @@ func (r *CK8sConfigReconciler) joinControlplane(ctx context.Context, scope *Scop return err } - // TODO(neoaggelos): figure out what is needed for k8sd proxy - // if scope.Config.Spec.IsEtcdEmbedded() { - // etcdProxyFile := bootstrapv1.File{ - // Path: etcd.EtcdProxyDaemonsetYamlLocation, - // Content: etcd.EtcdProxyDaemonsetYaml, - // Owner: "root:root", - // Permissions: "0640", - // } - // files = append(files, etcdProxyFile) - // } - input := cloudinit.JoinControlPlaneInput{ BaseUserData: cloudinit.BaseUserData{ BootCommands: scope.Config.Spec.BootCommands, diff --git a/pkg/ck8s/k8sd_proxy.go b/pkg/ck8s/k8sd_proxy.go index 500b87dd..4559ddb2 100644 --- a/pkg/ck8s/k8sd_proxy.go +++ b/pkg/ck8s/k8sd_proxy.go @@ -62,7 +62,7 @@ func (g *k8sdProxyGenerator) forNode(ctx context.Context, node *corev1.Node) (*K podname, ok := podmap[node.Name] if !ok { - return nil, fmt.Errorf("this node does not have a k8sd proxy pod") + return nil, fmt.Errorf("missing k8sd proxy pod for node %s", node.Name) } nodeInternalIP, err := getNodeInternalIP(node) From ad7421a8da3c6f936acc8bbe02e34ab711c3f322 Mon Sep 17 00:00:00 2001 From: Berkay Tekin Oz Date: Fri, 14 Jun 2024 05:11:19 +0000 Subject: [PATCH 4/9] Add manual unit test for proxy testing --- test/unit/k8sd_proxy_manual_test.go | 61 +++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 test/unit/k8sd_proxy_manual_test.go diff --git a/test/unit/k8sd_proxy_manual_test.go b/test/unit/k8sd_proxy_manual_test.go new file mode 100644 index 00000000..089e15f1 --- /dev/null +++ b/test/unit/k8sd_proxy_manual_test.go @@ -0,0 +1,61 @@ +package unit + +import ( + "context" + "os" + "testing" + "time" + + "github.com/canonical/cluster-api-k8s/pkg/ck8s" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func TestK8sdProxyManual(t *testing.T) { + kubeconfig := os.Getenv("K8SD_PROXY_TEST_KUBECONFIG") + if kubeconfig == "" { + t.Skip() + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var microclusterPort int = 2380 + + // use the current context in kubeconfig + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + t.Fatalf("failed to create rest config from kubeconfig: %v", err) + } + config.Timeout = 30 * time.Second + + c, err := client.New(config, client.Options{Scheme: scheme.Scheme}) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + + g, err := ck8s.NewK8sdProxyGenerator(config, 10*time.Second) + if err != nil { + t.Fatalf("failed to create k8sd proxy generator: %v", err) + } + + w := &ck8s.Workload{ + Client: c, + ClientRestConfig: config, + K8sdProxyGenerator: g, + } + + proxy, err := w.GetK8sdProxyForControlPlane(ctx) + if err != nil { + t.Fatalf("failed to get k8sd proxy: %v", err) + } + + if proxy != nil { + err = ck8s.CheckIfK8sdIsReachable(ctx, proxy.Client, proxy.NodeIP, microclusterPort) + if err != nil { + t.Fatalf("k8sd is not reachable: %v", err) + } + t.Logf("k8sd is reachable") + } + +} From 5d173be2e95ca2db9f37cd58173a6bd14bc487ef Mon Sep 17 00:00:00 2001 From: Berkay Tekin Oz Date: Fri, 14 Jun 2024 05:15:11 +0000 Subject: [PATCH 5/9] Add comment for temporary k8sd proxy check in Reconcile --- controlplane/controllers/ck8scontrolplane_controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/controlplane/controllers/ck8scontrolplane_controller.go b/controlplane/controllers/ck8scontrolplane_controller.go index 6de77f6b..8247fadc 100644 --- a/controlplane/controllers/ck8scontrolplane_controller.go +++ b/controlplane/controllers/ck8scontrolplane_controller.go @@ -152,6 +152,7 @@ func (r *CK8sControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Req } } + // TODO(berkayoz) This is a temporary workaround to check if k8sd is reachable var microclusterPort int microclusterPort = kcp.Spec.CK8sConfigSpec.ControlPlaneConfig.MicroclusterPort if microclusterPort == 0 { From a34edf07312f572bf52bc15428ae113446dc756f Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Wed, 19 Jun 2024 16:25:23 +0300 Subject: [PATCH 6/9] Separate concerns of rendering vs applying k8sd-proxy daemonset --- .../controllers/ck8sconfig_controller.go | 27 +++--------------- pkg/ck8s/k8sd_proxy_daemonset.go | 28 +++++++++++++++++++ .../k8sd-proxy-template.yaml} | 4 +-- pkg/cloudinit/controlplane_init.go | 23 +++++++++++---- 4 files changed, 51 insertions(+), 31 deletions(-) create mode 100644 pkg/ck8s/k8sd_proxy_daemonset.go rename pkg/ck8s/{k8sd-proxy.yaml => manifests/k8sd-proxy-template.yaml} (97%) diff --git a/bootstrap/controllers/ck8sconfig_controller.go b/bootstrap/controllers/ck8sconfig_controller.go index 03527d64..355951be 100644 --- a/bootstrap/controllers/ck8sconfig_controller.go +++ b/bootstrap/controllers/ck8sconfig_controller.go @@ -17,11 +17,9 @@ limitations under the License. package controllers import ( - "bytes" "context" "errors" "fmt" - "text/template" "time" "github.com/go-logr/logr" @@ -458,28 +456,10 @@ func (r *CK8sConfigReconciler) handleClusterNotInitialized(ctx context.Context, microclusterPort = 2380 } - var k8sdProxyDaemonset bytes.Buffer - - t, err := template.New("k8sd-proxy-daemonset").Parse(ck8s.K8sdProxyDaemonsetYaml) + ds, err := ck8s.RenderK8sdProxyDaemonSetManifest(ck8s.K8sdProxyDaemonSetInput{K8sdPort: microclusterPort}) if err != nil { - return ctrl.Result{}, err - } - - if err := t.Execute(&k8sdProxyDaemonset, struct { - K8sdPort int - }{ - K8sdPort: microclusterPort, - }); err != nil { - return ctrl.Result{}, err - } - - k8sdProxyFile := bootstrapv1.File{ - Path: ck8s.K8sdProxyDaemonsetYamlLocation, - Content: k8sdProxyDaemonset.String(), - Owner: "root:root", - Permissions: "0400", + return ctrl.Result{}, fmt.Errorf("failed to render k8sd-proxy daemonset: %w", err) } - files = append(files, k8sdProxyFile) cpinput := cloudinit.InitControlPlaneInput{ BaseUserData: cloudinit.BaseUserData{ @@ -492,7 +472,8 @@ func (r *CK8sConfigReconciler) handleClusterNotInitialized(ctx context.Context, MicroclusterAddress: scope.Config.Spec.ControlPlaneConfig.MicroclusterAddress, AirGapped: scope.Config.Spec.AirGapped, }, - Token: *token, + Token: *token, + K8sdProxyDaemonSet: string(ds), } cloudConfig, err := cloudinit.NewInitControlPlane(cpinput) diff --git a/pkg/ck8s/k8sd_proxy_daemonset.go b/pkg/ck8s/k8sd_proxy_daemonset.go new file mode 100644 index 00000000..e93582dc --- /dev/null +++ b/pkg/ck8s/k8sd_proxy_daemonset.go @@ -0,0 +1,28 @@ +package ck8s + +import ( + "bytes" + _ "embed" + "text/template" +) + +var ( + //go:embed manifests/k8sd-proxy-template.yaml + k8sdProxyDaemonSetYaml string + + k8sdProxyDaemonSetTemplate *template.Template = template.Must(template.New("K8sdProxyDaemonset").Parse(k8sdProxyDaemonSetYaml)) +) + +type K8sdProxyDaemonSetInput struct { + K8sdPort int +} + +// RenderK8sdProxyDaemonSet renders the manifest for the k8sd-proxy daemonset based on supplied configuration. +func RenderK8sdProxyDaemonSetManifest(input K8sdProxyDaemonSetInput) ([]byte, error) { + var b bytes.Buffer + if err := k8sdProxyDaemonSetTemplate.Execute(&b, input); err != nil { + return nil, err + } + + return b.Bytes(), nil +} diff --git a/pkg/ck8s/k8sd-proxy.yaml b/pkg/ck8s/manifests/k8sd-proxy-template.yaml similarity index 97% rename from pkg/ck8s/k8sd-proxy.yaml rename to pkg/ck8s/manifests/k8sd-proxy-template.yaml index 3ed22c45..2832a1b7 100644 --- a/pkg/ck8s/k8sd-proxy.yaml +++ b/pkg/ck8s/manifests/k8sd-proxy-template.yaml @@ -33,7 +33,7 @@ spec: - name: k8sd-proxy image: ghcr.io/canonical/cluster-api-k8s/socat:1.8.0.0 env: - # TODO: Make this more robust by possibly finding/parsing the right IP. + # TODO: Make this more robust by possibly finding/parsing the right IP. # This works as a start but might not be sufficient as the kubelet IP might not match microcluster IP. - name: HOSTIP valueFrom: @@ -44,7 +44,7 @@ spec: configMapKeyRef: name: k8sd-proxy-config key: k8sd-port - args: + args: - TCP4-LISTEN:2380,fork,reuseaddr - TCP4:$(HOSTIP):$(K8SD_PORT) resources: diff --git a/pkg/cloudinit/controlplane_init.go b/pkg/cloudinit/controlplane_init.go index 5838f154..e3240842 100644 --- a/pkg/cloudinit/controlplane_init.go +++ b/pkg/cloudinit/controlplane_init.go @@ -23,6 +23,8 @@ type InitControlPlaneInput struct { BaseUserData // Token is used to join more cluster nodes. Token string + // K8sdProxyDaemonSet is the manifest that deploys k8sd-proxy to the cluster. + K8sdProxyDaemonSet string } // NewInitControlPlane returns the user data string to be used on a controlplane instance. @@ -33,12 +35,21 @@ func NewInitControlPlane(input InitControlPlaneInput) (CloudConfig, error) { } // write files - config.WriteFiles = append(config.WriteFiles, File{ - Path: "/capi/etc/token", - Content: input.Token, - Permissions: "0400", - Owner: "root:root", - }) + config.WriteFiles = append( + config.WriteFiles, + File{ + Path: "/capi/etc/token", + Content: input.Token, + Permissions: "0400", + Owner: "root:root", + }, + File{ + Path: "/capi/manifests/00-k8sd-proxy.yaml", + Content: input.K8sdProxyDaemonSet, + Permissions: "0400", + Owner: "root:root", + }, + ) // run commands config.RunCommands = append(config.RunCommands, input.PreRunCommands...) From 87d26ef4ebdce9191c82abdacaa8041564a663f0 Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Wed, 19 Jun 2024 16:26:11 +0300 Subject: [PATCH 7/9] Adjust names, we create K8sdClients for the workload cluster --- pkg/ck8s/management_cluster.go | 12 +++++----- pkg/ck8s/workload_cluster.go | 10 ++++---- ...k8sd_proxy.go => workload_cluster_k8sd.go} | 23 ++++++++----------- test/unit/k8sd_proxy_manual_test.go | 8 +++---- 4 files changed, 24 insertions(+), 29 deletions(-) rename pkg/ck8s/{k8sd_proxy.go => workload_cluster_k8sd.go} (81%) diff --git a/pkg/ck8s/management_cluster.go b/pkg/ck8s/management_cluster.go index cc8b53b4..f9594b49 100644 --- a/pkg/ck8s/management_cluster.go +++ b/pkg/ck8s/management_cluster.go @@ -73,7 +73,7 @@ const ( func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.ObjectKey) (*Workload, error) { restConfig, err := remote.RESTConfig(ctx, CK8sControlPlaneControllerName, m.Client, clusterKey) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get config: %w", err) } restConfig.Timeout = 30 * time.Second @@ -82,15 +82,15 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O return nil, &RemoteClusterConnectionError{Name: clusterKey.String(), Err: err} } - g, err := NewK8sdProxyGenerator(restConfig, m.K8sdDialTimeout) + g, err := NewK8sdClientGenerator(restConfig, m.K8sdDialTimeout) if err != nil { - return nil, err + return nil, &RemoteClusterConnectionError{Name: clusterKey.String(), Err: err} } workload := &Workload{ - Client: c, - ClientRestConfig: restConfig, - K8sdProxyGenerator: g, + Client: c, + ClientRestConfig: restConfig, + K8sdClientGenerator: g, /** CoreDNSMigrator: &CoreDNSMigrator{}, diff --git a/pkg/ck8s/workload_cluster.go b/pkg/ck8s/workload_cluster.go index adeeff46..18a43834 100644 --- a/pkg/ck8s/workload_cluster.go +++ b/pkg/ck8s/workload_cluster.go @@ -63,9 +63,9 @@ type WorkloadCluster interface { type Workload struct { WorkloadCluster - Client ctrlclient.Client - ClientRestConfig *rest.Config - K8sdProxyGenerator *k8sdProxyGenerator + Client ctrlclient.Client + ClientRestConfig *rest.Config + K8sdClientGenerator *k8sdClientGenerator // NOTE(neoaggelos): CoreDNSMigrator and etcdClientGenerator are used by upstream to reach and manage the services in the workload cluster // TODO(neoaggelos): Replace them with a k8sdProxyClientGenerator. @@ -182,14 +182,14 @@ func getNodeInternalIP(node *corev1.Node) (string, error) { return "", fmt.Errorf("unable to find internal IP for node %s", node.Name) } -func (w *Workload) GetK8sdProxyForControlPlane(ctx context.Context) (*K8sdProxy, error) { +func (w *Workload) GetK8sdProxyForControlPlane(ctx context.Context) (*K8sdClient, error) { cplaneNodes, err := w.getControlPlaneNodes(ctx) if err != nil { return nil, fmt.Errorf("failed to get control plane nodes: %w", err) } for _, node := range cplaneNodes.Items { - proxy, err := w.K8sdProxyGenerator.forNode(ctx, &node) + proxy, err := w.K8sdClientGenerator.forNode(ctx, &node) if err != nil { continue } diff --git a/pkg/ck8s/k8sd_proxy.go b/pkg/ck8s/workload_cluster_k8sd.go similarity index 81% rename from pkg/ck8s/k8sd_proxy.go rename to pkg/ck8s/workload_cluster_k8sd.go index 4559ddb2..af58e4b3 100644 --- a/pkg/ck8s/k8sd_proxy.go +++ b/pkg/ck8s/workload_cluster_k8sd.go @@ -16,36 +16,31 @@ import ( "k8s.io/client-go/rest" ) -const K8sdProxyDaemonsetYamlLocation = "/capi/manifests/k8sd-proxy.yaml" - -//go:embed k8sd-proxy.yaml -var K8sdProxyDaemonsetYaml string - -type K8sdProxy struct { +type K8sdClient struct { NodeIP string Client *http.Client } -type k8sdProxyGenerator struct { +type k8sdClientGenerator struct { restConfig *rest.Config clientset *kubernetes.Clientset proxyClientTimeout time.Duration } -func NewK8sdProxyGenerator(restConfig *rest.Config, proxyClientTimeout time.Duration) (*k8sdProxyGenerator, error) { +func NewK8sdClientGenerator(restConfig *rest.Config, proxyClientTimeout time.Duration) (*k8sdClientGenerator, error) { clientset, err := kubernetes.NewForConfig(restConfig) if err != nil { return nil, fmt.Errorf("failed to create clientset: %w", err) } - return &k8sdProxyGenerator{ + return &k8sdClientGenerator{ restConfig: restConfig, clientset: clientset, proxyClientTimeout: proxyClientTimeout, }, nil } -func (g *k8sdProxyGenerator) forNodeName(ctx context.Context, nodeName string) (*K8sdProxy, error) { +func (g *k8sdClientGenerator) forNodeName(ctx context.Context, nodeName string) (*K8sdClient, error) { node, err := g.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) if err != nil { return nil, errors.Wrap(err, "unable to get node in target cluster") @@ -54,7 +49,7 @@ func (g *k8sdProxyGenerator) forNodeName(ctx context.Context, nodeName string) ( return g.forNode(ctx, node) } -func (g *k8sdProxyGenerator) forNode(ctx context.Context, node *corev1.Node) (*K8sdProxy, error) { +func (g *k8sdClientGenerator) forNode(ctx context.Context, node *corev1.Node) (*K8sdClient, error) { podmap, err := g.getProxyPods(ctx) if err != nil { return nil, fmt.Errorf("failed to get proxy pods: %w", err) @@ -75,13 +70,13 @@ func (g *k8sdProxyGenerator) forNode(ctx context.Context, node *corev1.Node) (*K return nil, err } - return &K8sdProxy{ + return &K8sdClient{ NodeIP: nodeInternalIP, Client: client, }, nil } -func (g *k8sdProxyGenerator) getProxyPods(ctx context.Context) (map[string]string, error) { +func (g *k8sdClientGenerator) getProxyPods(ctx context.Context) (map[string]string, error) { pods, err := g.clientset.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{LabelSelector: "app=k8sd-proxy"}) if err != nil { return nil, errors.Wrap(err, "unable to list k8sd-proxy pods in target cluster") @@ -99,7 +94,7 @@ func (g *k8sdProxyGenerator) getProxyPods(ctx context.Context) (map[string]strin return podmap, nil } -func (g *k8sdProxyGenerator) NewHTTPClient(ctx context.Context, podName string) (*http.Client, error) { +func (g *k8sdClientGenerator) NewHTTPClient(ctx context.Context, podName string) (*http.Client, error) { p := proxy.Proxy{ Kind: "pods", Namespace: metav1.NamespaceSystem, diff --git a/test/unit/k8sd_proxy_manual_test.go b/test/unit/k8sd_proxy_manual_test.go index 089e15f1..bebb7920 100644 --- a/test/unit/k8sd_proxy_manual_test.go +++ b/test/unit/k8sd_proxy_manual_test.go @@ -34,15 +34,15 @@ func TestK8sdProxyManual(t *testing.T) { t.Fatalf("failed to create client: %v", err) } - g, err := ck8s.NewK8sdProxyGenerator(config, 10*time.Second) + g, err := ck8s.NewK8sdClientGenerator(config, 10*time.Second) if err != nil { t.Fatalf("failed to create k8sd proxy generator: %v", err) } w := &ck8s.Workload{ - Client: c, - ClientRestConfig: config, - K8sdProxyGenerator: g, + Client: c, + ClientRestConfig: config, + K8sdClientGenerator: g, } proxy, err := w.GetK8sdProxyForControlPlane(ctx) From c05b876d0bfcf14d4cda56a03806c12ecbe992c5 Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Wed, 19 Jun 2024 16:26:28 +0300 Subject: [PATCH 8/9] drop temporary check from control plane controller --- .../ck8scontrolplane_controller.go | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/controlplane/controllers/ck8scontrolplane_controller.go b/controlplane/controllers/ck8scontrolplane_controller.go index 8247fadc..fd6c89bd 100644 --- a/controlplane/controllers/ck8scontrolplane_controller.go +++ b/controlplane/controllers/ck8scontrolplane_controller.go @@ -152,32 +152,6 @@ func (r *CK8sControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Req } } - // TODO(berkayoz) This is a temporary workaround to check if k8sd is reachable - var microclusterPort int - microclusterPort = kcp.Spec.CK8sConfigSpec.ControlPlaneConfig.MicroclusterPort - if microclusterPort == 0 { - microclusterPort = 2380 - } - - w, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster)) - if err != nil { - logger.Error(err, "failed to get workload cluster") - } - - proxy, err := w.GetK8sdProxyForControlPlane(ctx) - if err != nil { - logger.Error(err, "failed to get k8sd proxy for control plane") - } - - if proxy != nil { - err = ck8s.CheckIfK8sdIsReachable(ctx, proxy.Client, proxy.NodeIP, microclusterPort) - if err != nil { - logger.Error(err, "failed to reach k8sd") - } else { - logger.Info("k8sd is reachable") - } - } - // Always attempt to Patch the CK8sControlPlane object and status after each reconciliation. if patchErr := patchCK8sControlPlane(ctx, patchHelper, kcp); patchErr != nil { logger.Error(err, "Failed to patch CK8sControlPlane") From b65ad3cbea9a3d08408d3fb074ee913a7665622e Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Wed, 19 Jun 2024 16:31:34 +0300 Subject: [PATCH 9/9] drop temporary testing code --- pkg/ck8s/workload_cluster.go | 16 -------- test/unit/k8sd_proxy_manual_test.go | 61 ----------------------------- 2 files changed, 77 deletions(-) delete mode 100644 test/unit/k8sd_proxy_manual_test.go diff --git a/pkg/ck8s/workload_cluster.go b/pkg/ck8s/workload_cluster.go index 18a43834..5864f035 100644 --- a/pkg/ck8s/workload_cluster.go +++ b/pkg/ck8s/workload_cluster.go @@ -3,7 +3,6 @@ package ck8s import ( "context" "fmt" - "net/http" "strings" "github.com/pkg/errors" @@ -155,21 +154,6 @@ func nodeHasUnreachableTaint(node corev1.Node) bool { return false } -func CheckIfK8sdIsReachable(ctx context.Context, client *http.Client, nodeIP string, k8sdPort int) error { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://%s:%v/", nodeIP, k8sdPort), nil) - if err != nil { - return fmt.Errorf("failed to create request: %w", err) - } - - res, err := client.Do(req) - if err != nil { - return fmt.Errorf("failed to reach k8sd through proxy client: %w", err) - } - res.Body.Close() - - return nil -} - func getNodeInternalIP(node *corev1.Node) (string, error) { // TODO: Make this more robust by possibly finding/parsing the right IP. // This works as a start but might not be sufficient as the kubelet IP might not match microcluster IP. diff --git a/test/unit/k8sd_proxy_manual_test.go b/test/unit/k8sd_proxy_manual_test.go deleted file mode 100644 index bebb7920..00000000 --- a/test/unit/k8sd_proxy_manual_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package unit - -import ( - "context" - "os" - "testing" - "time" - - "github.com/canonical/cluster-api-k8s/pkg/ck8s" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/clientcmd" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -func TestK8sdProxyManual(t *testing.T) { - kubeconfig := os.Getenv("K8SD_PROXY_TEST_KUBECONFIG") - if kubeconfig == "" { - t.Skip() - } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var microclusterPort int = 2380 - - // use the current context in kubeconfig - config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) - if err != nil { - t.Fatalf("failed to create rest config from kubeconfig: %v", err) - } - config.Timeout = 30 * time.Second - - c, err := client.New(config, client.Options{Scheme: scheme.Scheme}) - if err != nil { - t.Fatalf("failed to create client: %v", err) - } - - g, err := ck8s.NewK8sdClientGenerator(config, 10*time.Second) - if err != nil { - t.Fatalf("failed to create k8sd proxy generator: %v", err) - } - - w := &ck8s.Workload{ - Client: c, - ClientRestConfig: config, - K8sdClientGenerator: g, - } - - proxy, err := w.GetK8sdProxyForControlPlane(ctx) - if err != nil { - t.Fatalf("failed to get k8sd proxy: %v", err) - } - - if proxy != nil { - err = ck8s.CheckIfK8sdIsReachable(ctx, proxy.Client, proxy.NodeIP, microclusterPort) - if err != nil { - t.Fatalf("k8sd is not reachable: %v", err) - } - t.Logf("k8sd is reachable") - } - -}