diff --git a/.github/workflows/sync-images.yaml b/.github/workflows/sync-images.yaml new file mode 100644 index 00000000..f07bdb91 --- /dev/null +++ b/.github/workflows/sync-images.yaml @@ -0,0 +1,23 @@ +name: Sync upstream images to ghcr.io + +on: + workflow_dispatch: + +jobs: + publish: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Install Go + uses: actions/setup-go@v5 + with: + go-version: "1.22" + + - name: Sync images + env: + USERNAME: ${{ github.actor }} + PASSWORD: ${{ secrets.GITHUB_TOKEN }} + run: | + ./hack/sync-images.sh diff --git a/bootstrap/controllers/ck8sconfig_controller.go b/bootstrap/controllers/ck8sconfig_controller.go index a7ffcee8..355951be 100644 --- a/bootstrap/controllers/ck8sconfig_controller.go +++ b/bootstrap/controllers/ck8sconfig_controller.go @@ -242,17 +242,6 @@ func (r *CK8sConfigReconciler) joinControlplane(ctx context.Context, scope *Scop return err } - // TODO(neoaggelos): figure out what is needed for k8sd proxy - // if scope.Config.Spec.IsEtcdEmbedded() { - // etcdProxyFile := bootstrapv1.File{ - // Path: etcd.EtcdProxyDaemonsetYamlLocation, - // Content: etcd.EtcdProxyDaemonsetYaml, - // Owner: "root:root", - // Permissions: "0640", - // } - // files = append(files, etcdProxyFile) - // } - input := cloudinit.JoinControlPlaneInput{ BaseUserData: cloudinit.BaseUserData{ BootCommands: scope.Config.Spec.BootCommands, @@ -461,16 +450,16 @@ func (r *CK8sConfigReconciler) handleClusterNotInitialized(ctx context.Context, return ctrl.Result{}, err } - // TODO(neoaggelos): deploy k8sd-proxy daemonsets - // if scope.Config.Spec.IsK8sDqlite() { - // etcdProxyFile := bootstrapv1.File{ - // Path: etcd.EtcdProxyDaemonsetYamlLocation, - // Content: etcd.EtcdProxyDaemonsetYaml, - // Owner: "root:root", - // Permissions: "0640", - // } - // files = append(files, etcdProxyFile) - // } + var microclusterPort int + microclusterPort = scope.Config.Spec.ControlPlaneConfig.MicroclusterPort + if microclusterPort == 0 { + microclusterPort = 2380 + } + + ds, err := ck8s.RenderK8sdProxyDaemonSetManifest(ck8s.K8sdProxyDaemonSetInput{K8sdPort: microclusterPort}) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to render k8sd-proxy daemonset: %w", err) + } cpinput := cloudinit.InitControlPlaneInput{ BaseUserData: cloudinit.BaseUserData{ @@ -483,7 +472,8 @@ func (r *CK8sConfigReconciler) handleClusterNotInitialized(ctx context.Context, MicroclusterAddress: scope.Config.Spec.ControlPlaneConfig.MicroclusterAddress, AirGapped: scope.Config.Spec.AirGapped, }, - Token: *token, + Token: *token, + K8sdProxyDaemonSet: string(ds), } cloudConfig, err := cloudinit.NewInitControlPlane(cpinput) diff --git a/controlplane/controllers/ck8scontrolplane_controller.go b/controlplane/controllers/ck8scontrolplane_controller.go index e8a74daf..fd6c89bd 100644 --- a/controlplane/controllers/ck8scontrolplane_controller.go +++ b/controlplane/controllers/ck8scontrolplane_controller.go @@ -61,8 +61,7 @@ type CK8sControlPlaneReconciler struct { controller controller.Controller recorder record.EventRecorder - EtcdDialTimeout time.Duration - EtcdCallTimeout time.Duration + K8sdDialTimeout time.Duration managementCluster ck8s.ManagementCluster managementClusterUncached ck8s.ManagementCluster @@ -294,16 +293,14 @@ func (r *CK8sControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr c if r.managementCluster == nil { r.managementCluster = &ck8s.Management{ Client: r.Client, - EtcdDialTimeout: r.EtcdDialTimeout, - EtcdCallTimeout: r.EtcdCallTimeout, + K8sdDialTimeout: r.K8sdDialTimeout, } } if r.managementClusterUncached == nil { r.managementClusterUncached = &ck8s.Management{ Client: mgr.GetAPIReader(), - EtcdDialTimeout: r.EtcdDialTimeout, - EtcdCallTimeout: r.EtcdCallTimeout, + K8sdDialTimeout: r.K8sdDialTimeout, } } diff --git a/controlplane/controllers/machine_controller.go b/controlplane/controllers/machine_controller.go index 5f72f19b..f8d7f3ec 100644 --- a/controlplane/controllers/machine_controller.go +++ b/controlplane/controllers/machine_controller.go @@ -22,8 +22,7 @@ type MachineReconciler struct { Log logr.Logger Scheme *runtime.Scheme - EtcdDialTimeout time.Duration - EtcdCallTimeout time.Duration + K8sdDialTimeout time.Duration // NOTE(neoaggelos): See note below /** diff --git a/controlplane/main.go b/controlplane/main.go index 6648d8f2..c673db2c 100644 --- a/controlplane/main.go +++ b/controlplane/main.go @@ -56,8 +56,7 @@ func main() { var metricsAddr string var enableLeaderElection bool var syncPeriod time.Duration - var etcdDialTimeout time.Duration - var etcdCallTimeout time.Duration + var k8sdDialTimeout time.Duration flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, @@ -67,11 +66,8 @@ func main() { flag.DurationVar(&syncPeriod, "sync-period", 10*time.Minute, "The minimum interval at which watched resources are reconciled (e.g. 15m)") - flag.DurationVar(&etcdDialTimeout, "etcd-dial-timeout-duration", 10*time.Second, - "Duration that the etcd client waits at most to establish a connection with etcd") - - flag.DurationVar(&etcdCallTimeout, "etcd-call-timeout-duration", 15*time.Second, - "Duration that the etcd client waits at most for read and write operations to etcd.") + flag.DurationVar(&k8sdDialTimeout, "k8sd-dial-timeout-duration", 10*time.Second, + "Duration that the proxy client waits at most to establish a connection with k8sd") flag.Parse() @@ -103,8 +99,7 @@ func main() { Client: mgr.GetClient(), Log: ctrPlaneLogger, Scheme: mgr.GetScheme(), - EtcdDialTimeout: etcdDialTimeout, - EtcdCallTimeout: etcdCallTimeout, + K8sdDialTimeout: k8sdDialTimeout, }).SetupWithManager(ctx, mgr, &ctrPlaneLogger); err != nil { setupLog.Error(err, "unable to create controller", "controller", "CK8sControlPlane") os.Exit(1) @@ -115,8 +110,7 @@ func main() { Client: mgr.GetClient(), Log: ctrMachineLogger, Scheme: mgr.GetScheme(), - EtcdDialTimeout: etcdDialTimeout, - EtcdCallTimeout: etcdCallTimeout, + K8sdDialTimeout: k8sdDialTimeout, }).SetupWithManager(ctx, mgr, &ctrMachineLogger); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Machine") os.Exit(1) diff --git a/hack/sync-images.sh b/hack/sync-images.sh new file mode 100755 index 00000000..4fdc0b54 --- /dev/null +++ b/hack/sync-images.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +# Description: +# Sync images from upstream repositories under ghcr.io/canonical. +# +# Usage: +# $ USERNAME="$username" PASSWORD="$password" ./sync-images.sh + +DIR="$(realpath "$(dirname "${0}")")" + +"${DIR}/tools/regsync.sh" once -c "${DIR}/upstream-images.yaml" diff --git a/hack/tools/go.mod b/hack/tools/go.mod new file mode 100644 index 00000000..321ce8c4 --- /dev/null +++ b/hack/tools/go.mod @@ -0,0 +1,19 @@ +module github.com/canonical/cluster-api-k8s/tools + +go 1.22.4 + +require github.com/regclient/regclient v0.6.1 + +require ( + github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/klauspost/compress v1.17.8 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + github.com/spf13/cobra v1.8.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + github.com/ulikunitz/xz v0.5.12 // indirect + golang.org/x/sys v0.20.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/hack/tools/go.sum b/hack/tools/go.sum new file mode 100644 index 00000000..42e110eb --- /dev/null +++ b/hack/tools/go.sum @@ -0,0 +1,40 @@ +github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7 h1:UhxFibDNY/bfvqU5CAUmr9zpesgbU6SWc8/B4mflAE4= +github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/olareg/olareg v0.1.0 h1:1dXBOgPrig5N7zoXyIZVQqU0QBo6sD9pbL6UYjY75CA= +github.com/olareg/olareg v0.1.0/go.mod h1:RBuU7JW7SoIIxZKzLRhq8sVtQeAHzCAtRrXEBx2KlM4= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/regclient/regclient v0.6.1 h1:4PxrGxMXrLpPrSaet8QZl568CVOolyHyukLL9UyogoU= +github.com/regclient/regclient v0.6.1/go.mod h1:hCKbRHYMx6LJntAhXzWVV7Oxyn9DzNVJoOKJaSnU5BM= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= +github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/ulikunitz/xz v0.5.12 h1:37Nm15o69RwBkXM0J6A5OlE67RZTfzUxTj8fB3dfcsc= +github.com/ulikunitz/xz v0.5.12/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/hack/tools/regsync.sh b/hack/tools/regsync.sh new file mode 100755 index 00000000..015f21e5 --- /dev/null +++ b/hack/tools/regsync.sh @@ -0,0 +1,8 @@ +#!/bin/bash -eu + +# Run regsync +TOOLS_DIR="$(realpath `dirname "${0}"`)" +( + cd "${TOOLS_DIR}" + go run github.com/regclient/regclient/cmd/regsync "${@}" +) diff --git a/hack/tools/tools.go b/hack/tools/tools.go new file mode 100644 index 00000000..1c97d316 --- /dev/null +++ b/hack/tools/tools.go @@ -0,0 +1,5 @@ +package main + +import ( + _ "github.com/regclient/regclient/cmd/regsync" +) diff --git a/hack/upstream-images.yaml b/hack/upstream-images.yaml new file mode 100644 index 00000000..9209b859 --- /dev/null +++ b/hack/upstream-images.yaml @@ -0,0 +1,9 @@ +version: 1 +creds: + - registry: ghcr.io + user: '{{ env "USERNAME" }}' + pass: '{{ env "PASSWORD" }}' +sync: + - source: alpine/socat:1.8.0.0 + target: ghcr.io/canonical/cluster-api-k8s/socat:1.8.0.0 + type: image diff --git a/pkg/ck8s/k8sd_proxy_daemonset.go b/pkg/ck8s/k8sd_proxy_daemonset.go new file mode 100644 index 00000000..e93582dc --- /dev/null +++ b/pkg/ck8s/k8sd_proxy_daemonset.go @@ -0,0 +1,28 @@ +package ck8s + +import ( + "bytes" + _ "embed" + "text/template" +) + +var ( + //go:embed manifests/k8sd-proxy-template.yaml + k8sdProxyDaemonSetYaml string + + k8sdProxyDaemonSetTemplate *template.Template = template.Must(template.New("K8sdProxyDaemonset").Parse(k8sdProxyDaemonSetYaml)) +) + +type K8sdProxyDaemonSetInput struct { + K8sdPort int +} + +// RenderK8sdProxyDaemonSet renders the manifest for the k8sd-proxy daemonset based on supplied configuration. +func RenderK8sdProxyDaemonSetManifest(input K8sdProxyDaemonSetInput) ([]byte, error) { + var b bytes.Buffer + if err := k8sdProxyDaemonSetTemplate.Execute(&b, input); err != nil { + return nil, err + } + + return b.Bytes(), nil +} diff --git a/pkg/ck8s/management_cluster.go b/pkg/ck8s/management_cluster.go index 240a6be2..f9594b49 100644 --- a/pkg/ck8s/management_cluster.go +++ b/pkg/ck8s/management_cluster.go @@ -26,11 +26,7 @@ type Management struct { Client client.Reader - // NOTE(neoaggelos): These are used as timeouts when interacting with the etcd of the workload cluster. - // - // TODO(neoaggelos): Replace these with timeouts for interacting with the k8sd proxy instances of the nodes. - EtcdDialTimeout time.Duration - EtcdCallTimeout time.Duration + K8sdDialTimeout time.Duration } // RemoteClusterConnectionError represents a failure to connect to a remote cluster. @@ -77,7 +73,7 @@ const ( func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.ObjectKey) (*Workload, error) { restConfig, err := remote.RESTConfig(ctx, CK8sControlPlaneControllerName, m.Client, clusterKey) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get config: %w", err) } restConfig.Timeout = 30 * time.Second @@ -86,8 +82,16 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O return nil, &RemoteClusterConnectionError{Name: clusterKey.String(), Err: err} } + g, err := NewK8sdClientGenerator(restConfig, m.K8sdDialTimeout) + if err != nil { + return nil, &RemoteClusterConnectionError{Name: clusterKey.String(), Err: err} + } + workload := &Workload{ - Client: c, + Client: c, + ClientRestConfig: restConfig, + K8sdClientGenerator: g, + /** CoreDNSMigrator: &CoreDNSMigrator{}, **/ diff --git a/pkg/ck8s/manifests/k8sd-proxy-template.yaml b/pkg/ck8s/manifests/k8sd-proxy-template.yaml new file mode 100644 index 00000000..2832a1b7 --- /dev/null +++ b/pkg/ck8s/manifests/k8sd-proxy-template.yaml @@ -0,0 +1,56 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: k8sd-proxy-config + namespace: kube-system +data: + k8sd-port: "{{ .K8sdPort }}" +--- +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: k8sd-proxy + namespace: kube-system + labels: + app: k8sd-proxy +spec: + selector: + matchLabels: + app: k8sd-proxy + template: + metadata: + labels: + app: k8sd-proxy + spec: + tolerations: + - key: node-role.kubernetes.io/control-plane + operator: Exists + effect: NoSchedule + - key: node-role.kubernetes.io/master + operator: Exists + effect: NoSchedule + containers: + - name: k8sd-proxy + image: ghcr.io/canonical/cluster-api-k8s/socat:1.8.0.0 + env: + # TODO: Make this more robust by possibly finding/parsing the right IP. + # This works as a start but might not be sufficient as the kubelet IP might not match microcluster IP. + - name: HOSTIP + valueFrom: + fieldRef: + fieldPath: status.hostIP + - name: K8SD_PORT + valueFrom: + configMapKeyRef: + name: k8sd-proxy-config + key: k8sd-port + args: + - TCP4-LISTEN:2380,fork,reuseaddr + - TCP4:$(HOSTIP):$(K8SD_PORT) + resources: + limits: + memory: 200Mi + requests: + cpu: 100m + memory: 200Mi + terminationGracePeriodSeconds: 30 diff --git a/pkg/ck8s/workload_cluster.go b/pkg/ck8s/workload_cluster.go index c1761335..5864f035 100644 --- a/pkg/ck8s/workload_cluster.go +++ b/pkg/ck8s/workload_cluster.go @@ -62,8 +62,9 @@ type WorkloadCluster interface { type Workload struct { WorkloadCluster - Client ctrlclient.Client - ClientRestConfig *rest.Config + Client ctrlclient.Client + ClientRestConfig *rest.Config + K8sdClientGenerator *k8sdClientGenerator // NOTE(neoaggelos): CoreDNSMigrator and etcdClientGenerator are used by upstream to reach and manage the services in the workload cluster // TODO(neoaggelos): Replace them with a k8sdProxyClientGenerator. @@ -153,6 +154,36 @@ func nodeHasUnreachableTaint(node corev1.Node) bool { return false } +func getNodeInternalIP(node *corev1.Node) (string, error) { + // TODO: Make this more robust by possibly finding/parsing the right IP. + // This works as a start but might not be sufficient as the kubelet IP might not match microcluster IP. + for _, addr := range node.Status.Addresses { + if addr.Type == "InternalIP" { + return addr.Address, nil + } + } + + return "", fmt.Errorf("unable to find internal IP for node %s", node.Name) +} + +func (w *Workload) GetK8sdProxyForControlPlane(ctx context.Context) (*K8sdClient, error) { + cplaneNodes, err := w.getControlPlaneNodes(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get control plane nodes: %w", err) + } + + for _, node := range cplaneNodes.Items { + proxy, err := w.K8sdClientGenerator.forNode(ctx, &node) + if err != nil { + continue + } + + return proxy, nil + } + + return nil, fmt.Errorf("failed to get k8sd proxy for control plane") +} + // UpdateAgentConditions is responsible for updating machine conditions reflecting the status of all the control plane // components. This operation is best effort, in the sense that in case // of problems in retrieving the pod status, it sets the condition to Unknown state without returning any error. diff --git a/pkg/ck8s/workload_cluster_k8sd.go b/pkg/ck8s/workload_cluster_k8sd.go new file mode 100644 index 00000000..af58e4b3 --- /dev/null +++ b/pkg/ck8s/workload_cluster_k8sd.go @@ -0,0 +1,128 @@ +package ck8s + +import ( + "context" + "crypto/tls" + _ "embed" + "fmt" + "net/http" + "time" + + "github.com/canonical/cluster-api-k8s/pkg/proxy" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +type K8sdClient struct { + NodeIP string + Client *http.Client +} + +type k8sdClientGenerator struct { + restConfig *rest.Config + clientset *kubernetes.Clientset + proxyClientTimeout time.Duration +} + +func NewK8sdClientGenerator(restConfig *rest.Config, proxyClientTimeout time.Duration) (*k8sdClientGenerator, error) { + clientset, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("failed to create clientset: %w", err) + } + + return &k8sdClientGenerator{ + restConfig: restConfig, + clientset: clientset, + proxyClientTimeout: proxyClientTimeout, + }, nil +} + +func (g *k8sdClientGenerator) forNodeName(ctx context.Context, nodeName string) (*K8sdClient, error) { + node, err := g.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return nil, errors.Wrap(err, "unable to get node in target cluster") + } + + return g.forNode(ctx, node) +} + +func (g *k8sdClientGenerator) forNode(ctx context.Context, node *corev1.Node) (*K8sdClient, error) { + podmap, err := g.getProxyPods(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get proxy pods: %w", err) + } + + podname, ok := podmap[node.Name] + if !ok { + return nil, fmt.Errorf("missing k8sd proxy pod for node %s", node.Name) + } + + nodeInternalIP, err := getNodeInternalIP(node) + if err != nil { + return nil, fmt.Errorf("failed to get internal IP for node %s: %w", node.Name, err) + } + + client, err := g.NewHTTPClient(ctx, podname) + if err != nil { + return nil, err + } + + return &K8sdClient{ + NodeIP: nodeInternalIP, + Client: client, + }, nil +} + +func (g *k8sdClientGenerator) getProxyPods(ctx context.Context) (map[string]string, error) { + pods, err := g.clientset.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{LabelSelector: "app=k8sd-proxy"}) + if err != nil { + return nil, errors.Wrap(err, "unable to list k8sd-proxy pods in target cluster") + } + + if len(pods.Items) == 0 { + return nil, errors.New("there isn't any k8sd-proxy pods in target cluster") + } + + podmap := make(map[string]string, len(pods.Items)) + for _, pod := range pods.Items { + podmap[pod.Spec.NodeName] = pod.Name + } + + return podmap, nil +} + +func (g *k8sdClientGenerator) NewHTTPClient(ctx context.Context, podName string) (*http.Client, error) { + p := proxy.Proxy{ + Kind: "pods", + Namespace: metav1.NamespaceSystem, + ResourceName: podName, + KubeConfig: g.restConfig, + Port: 2380, + } + + dialer, err := proxy.NewDialer(p) + if err != nil { + return nil, fmt.Errorf("failed to create proxy dialer: %w", err) + } + + // We return a http client with the same parameters as http.DefaultClient + // and an overridden DialContext to proxy the requests through api server. + return &http.Client{ + Transport: &http.Transport{ + Proxy: http.DefaultTransport.(*http.Transport).Proxy, + DialContext: dialer.DialContext, + ForceAttemptHTTP2: http.DefaultTransport.(*http.Transport).ForceAttemptHTTP2, + MaxIdleConns: http.DefaultTransport.(*http.Transport).MaxIdleConns, + IdleConnTimeout: http.DefaultTransport.(*http.Transport).IdleConnTimeout, + TLSHandshakeTimeout: http.DefaultTransport.(*http.Transport).TLSHandshakeTimeout, + ExpectContinueTimeout: http.DefaultTransport.(*http.Transport).ExpectContinueTimeout, + // TODO: Workaround for now, address later on + // get the certificate fingerprint from the matching node through a resource in the cluster (TBD), and validate it in the TLSClientConfig + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + Timeout: g.proxyClientTimeout, + }, nil +} diff --git a/pkg/cloudinit/controlplane_init.go b/pkg/cloudinit/controlplane_init.go index 5838f154..e3240842 100644 --- a/pkg/cloudinit/controlplane_init.go +++ b/pkg/cloudinit/controlplane_init.go @@ -23,6 +23,8 @@ type InitControlPlaneInput struct { BaseUserData // Token is used to join more cluster nodes. Token string + // K8sdProxyDaemonSet is the manifest that deploys k8sd-proxy to the cluster. + K8sdProxyDaemonSet string } // NewInitControlPlane returns the user data string to be used on a controlplane instance. @@ -33,12 +35,21 @@ func NewInitControlPlane(input InitControlPlaneInput) (CloudConfig, error) { } // write files - config.WriteFiles = append(config.WriteFiles, File{ - Path: "/capi/etc/token", - Content: input.Token, - Permissions: "0400", - Owner: "root:root", - }) + config.WriteFiles = append( + config.WriteFiles, + File{ + Path: "/capi/etc/token", + Content: input.Token, + Permissions: "0400", + Owner: "root:root", + }, + File{ + Path: "/capi/manifests/00-k8sd-proxy.yaml", + Content: input.K8sdProxyDaemonSet, + Permissions: "0400", + Owner: "root:root", + }, + ) // run commands config.RunCommands = append(config.RunCommands, input.PreRunCommands...) diff --git a/pkg/proxy/dial.go b/pkg/proxy/dial.go index e4c1024f..d3d80729 100644 --- a/pkg/proxy/dial.go +++ b/pkg/proxy/dial.go @@ -89,7 +89,7 @@ func (d *Dialer) DialContext(_ context.Context, _ string, addr string) (net.Conn Post(). Resource(d.proxy.Kind). Namespace(d.proxy.Namespace). - Name(addr). + Name(d.proxy.ResourceName). SubResource("portforward") dialer := spdy.NewDialer(d.upgrader, &http.Client{Transport: d.proxyTransport}, "POST", req.URL())