Skip to content

Commit

Permalink
Addressing comments, initial tests to reach k8sd
Browse files Browse the repository at this point in the history
  • Loading branch information
berkayoz committed Jun 13, 2024
1 parent 1a07499 commit 5402034
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 140 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/sync-images.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ jobs:
USERNAME: ${{ github.actor }}
PASSWORD: ${{ secrets.GITHUB_TOKEN }}
run: |
./hack/sync-images.sh
./hack/sync-images.sh
69 changes: 37 additions & 32 deletions bootstrap/controllers/ck8sconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,29 +244,16 @@ func (r *CK8sConfigReconciler) joinControlplane(ctx context.Context, scope *Scop
return err
}

var k8sdProxyDaemonset bytes.Buffer

t, err := template.New("k8sd-proxy-daemonset").Parse(ck8s.K8sdProxyDaemonsetYaml)
if err != nil {
return err
}

if err := t.Execute(&k8sdProxyDaemonset, struct {
K8sdPort int
}{
K8sdPort: configStruct.ControlPlaneConfig.MicroclusterPort,
}); err != nil {
return err
}

// TODO(neoaggelos): figure out what is needed for k8sd proxy
k8sdProxyFile := bootstrapv1.File{
Path: ck8s.K8sdProxyDaemonsetYamlLocation,
Content: k8sdProxyDaemonset.String(),
Owner: "root:root",
Permissions: "0400",
}
files = append(files, k8sdProxyFile)
// if scope.Config.Spec.IsEtcdEmbedded() {
// etcdProxyFile := bootstrapv1.File{
// Path: etcd.EtcdProxyDaemonsetYamlLocation,
// Content: etcd.EtcdProxyDaemonsetYaml,
// Owner: "root:root",
// Permissions: "0640",
// }
// files = append(files, etcdProxyFile)
// }

input := cloudinit.JoinControlPlaneInput{
BaseUserData: cloudinit.BaseUserData{
Expand Down Expand Up @@ -476,16 +463,34 @@ func (r *CK8sConfigReconciler) handleClusterNotInitialized(ctx context.Context,
return ctrl.Result{}, err
}

// TODO(neoaggelos): deploy k8sd-proxy daemonsets
// if scope.Config.Spec.IsK8sDqlite() {
// etcdProxyFile := bootstrapv1.File{
// Path: etcd.EtcdProxyDaemonsetYamlLocation,
// Content: etcd.EtcdProxyDaemonsetYaml,
// Owner: "root:root",
// Permissions: "0640",
// }
// files = append(files, etcdProxyFile)
// }
var microclusterPort int
microclusterPort = scope.Config.Spec.ControlPlaneConfig.MicroclusterPort
if microclusterPort == 0 {
microclusterPort = 2380
}

var k8sdProxyDaemonset bytes.Buffer

t, err := template.New("k8sd-proxy-daemonset").Parse(ck8s.K8sdProxyDaemonsetYaml)
if err != nil {
return ctrl.Result{}, err
}

if err := t.Execute(&k8sdProxyDaemonset, struct {
K8sdPort int
}{
K8sdPort: microclusterPort,
}); err != nil {
return ctrl.Result{}, err
}

k8sdProxyFile := bootstrapv1.File{
Path: ck8s.K8sdProxyDaemonsetYamlLocation,
Content: k8sdProxyDaemonset.String(),
Owner: "root:root",
Permissions: "0400",
}
files = append(files, k8sdProxyFile)

cpinput := cloudinit.InitControlPlaneInput{
BaseUserData: cloudinit.BaseUserData{
Expand Down
34 changes: 28 additions & 6 deletions controlplane/controllers/ck8scontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ type CK8sControlPlaneReconciler struct {
controller controller.Controller
recorder record.EventRecorder

EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration
K8sdDialTimeout time.Duration

managementCluster ck8s.ManagementCluster
managementClusterUncached ck8s.ManagementCluster
Expand Down Expand Up @@ -153,6 +152,31 @@ func (r *CK8sControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
}

var microclusterPort int
microclusterPort = kcp.Spec.CK8sConfigSpec.ControlPlaneConfig.MicroclusterPort
if microclusterPort == 0 {
microclusterPort = 2380
}

w, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
logger.Error(err, "failed to get workload cluster")
}

proxy, err := w.GetK8sdProxyForControlPlane(ctx)
if err != nil {
logger.Error(err, "failed to get k8sd proxy for control plane")
}

if proxy != nil {
err = ck8s.CheckIfK8sdIsReachable(ctx, proxy.Client, proxy.NodeIP, microclusterPort)
if err != nil {
logger.Error(err, "failed to reach k8sd")
} else {
logger.Info("k8sd is reachable")
}
}

// Always attempt to Patch the CK8sControlPlane object and status after each reconciliation.
if patchErr := patchCK8sControlPlane(ctx, patchHelper, kcp); patchErr != nil {
logger.Error(err, "Failed to patch CK8sControlPlane")
Expand Down Expand Up @@ -294,16 +318,14 @@ func (r *CK8sControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr c
if r.managementCluster == nil {
r.managementCluster = &ck8s.Management{
Client: r.Client,
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
K8sdDialTimeout: r.K8sdDialTimeout,
}
}

if r.managementClusterUncached == nil {
r.managementClusterUncached = &ck8s.Management{
Client: mgr.GetAPIReader(),
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
K8sdDialTimeout: r.K8sdDialTimeout,
}
}

Expand Down
3 changes: 1 addition & 2 deletions controlplane/controllers/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ type MachineReconciler struct {
Log logr.Logger
Scheme *runtime.Scheme

EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration
K8sdDialTimeout time.Duration

// NOTE(neoaggelos): See note below
/**
Expand Down
16 changes: 5 additions & 11 deletions controlplane/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ func main() {
var metricsAddr string
var enableLeaderElection bool
var syncPeriod time.Duration
var etcdDialTimeout time.Duration
var etcdCallTimeout time.Duration
var k8sdDialTimeout time.Duration

flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
Expand All @@ -67,11 +66,8 @@ func main() {
flag.DurationVar(&syncPeriod, "sync-period", 10*time.Minute,
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

flag.DurationVar(&etcdDialTimeout, "etcd-dial-timeout-duration", 10*time.Second,
"Duration that the etcd client waits at most to establish a connection with etcd")

flag.DurationVar(&etcdCallTimeout, "etcd-call-timeout-duration", 15*time.Second,
"Duration that the etcd client waits at most for read and write operations to etcd.")
flag.DurationVar(&k8sdDialTimeout, "k8sd-dial-timeout-duration", 10*time.Second,
"Duration that the proxy client waits at most to establish a connection with k8sd")

flag.Parse()

Expand Down Expand Up @@ -103,8 +99,7 @@ func main() {
Client: mgr.GetClient(),
Log: ctrPlaneLogger,
Scheme: mgr.GetScheme(),
EtcdDialTimeout: etcdDialTimeout,
EtcdCallTimeout: etcdCallTimeout,
K8sdDialTimeout: k8sdDialTimeout,
}).SetupWithManager(ctx, mgr, &ctrPlaneLogger); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CK8sControlPlane")
os.Exit(1)
Expand All @@ -115,8 +110,7 @@ func main() {
Client: mgr.GetClient(),
Log: ctrMachineLogger,
Scheme: mgr.GetScheme(),
EtcdDialTimeout: etcdDialTimeout,
EtcdCallTimeout: etcdCallTimeout,
K8sdDialTimeout: k8sdDialTimeout,
}).SetupWithManager(ctx, mgr, &ctrMachineLogger); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Machine")
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion hack/sync-images.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

DIR="$(realpath "$(dirname "${0}")")"

"${DIR}/tools/regsync.sh" once -c "${DIR}/upstream-images.yaml"
"${DIR}/tools/regsync.sh" once -c "${DIR}/upstream-images.yaml"
2 changes: 1 addition & 1 deletion hack/tools/regsync.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ TOOLS_DIR="$(realpath `dirname "${0}"`)"
(
cd "${TOOLS_DIR}"
go run github.com/regclient/regclient/cmd/regsync "${@}"
)
)
2 changes: 1 addition & 1 deletion hack/upstream-images.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ creds:
sync:
- source: alpine/socat:1.8.0.0
target: ghcr.io/canonical/cluster-api-k8s/socat:1.8.0.0
type: image
type: image
95 changes: 18 additions & 77 deletions pkg/ck8s/k8sd_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,36 @@ import (
"k8s.io/client-go/rest"
)

const K8sdProxyDaemonsetYamlLocation = "/opt/capi/manifests/k8sd-proxy.yaml"
const K8sdProxyDaemonsetYamlLocation = "/capi/manifests/k8sd-proxy.yaml"

//go:embed k8sd-proxy.yaml
var K8sdProxyDaemonsetYaml string

type K8sdProxy struct {
nodeIP string
client *http.Client
NodeIP string
Client *http.Client
}

type K8sdProxyGenerator struct {
type k8sdProxyGenerator struct {
restConfig *rest.Config
clientset *kubernetes.Clientset
proxyClientTimeout time.Duration
k8sdPort int
}

func NewK8sdProxyGenerator(restConfig *rest.Config, clientset *kubernetes.Clientset, proxyClientTimeout time.Duration, k8sdPort int) (*K8sdProxyGenerator, error) {
return &K8sdProxyGenerator{
func NewK8sdProxyGenerator(restConfig *rest.Config, proxyClientTimeout time.Duration) (*k8sdProxyGenerator, error) {
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to create clientset: %w", err)
}

return &k8sdProxyGenerator{
restConfig: restConfig,
clientset: clientset,
proxyClientTimeout: proxyClientTimeout,
k8sdPort: k8sdPort,
}, nil
}

func (g *K8sdProxyGenerator) forNodeName(ctx context.Context, nodeName string) (*K8sdProxy, error) {
func (g *k8sdProxyGenerator) forNodeName(ctx context.Context, nodeName string) (*K8sdProxy, error) {
node, err := g.clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrap(err, "unable to get node in target cluster")
Expand All @@ -51,7 +54,7 @@ func (g *K8sdProxyGenerator) forNodeName(ctx context.Context, nodeName string) (
return g.forNode(ctx, node)
}

func (g *K8sdProxyGenerator) forNode(ctx context.Context, node *corev1.Node) (*K8sdProxy, error) {
func (g *k8sdProxyGenerator) forNode(ctx context.Context, node *corev1.Node) (*K8sdProxy, error) {
podmap, err := g.getProxyPods(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get proxy pods: %w", err)
Expand All @@ -62,7 +65,7 @@ func (g *K8sdProxyGenerator) forNode(ctx context.Context, node *corev1.Node) (*K
return nil, fmt.Errorf("this node does not have a k8sd proxy pod")
}

nodeInternalIP, err := g.getNodeInternalIP(node)
nodeInternalIP, err := getNodeInternalIP(node)
if err != nil {
return nil, fmt.Errorf("failed to get internal IP for node %s: %w", node.Name, err)
}
Expand All @@ -72,75 +75,13 @@ func (g *K8sdProxyGenerator) forNode(ctx context.Context, node *corev1.Node) (*K
return nil, err
}

if err := g.checkIfK8sdIsReachable(ctx, client, nodeInternalIP); err != nil {
return nil, fmt.Errorf("failed to reach k8sd through proxy client: %w", err)
}

return &K8sdProxy{
nodeIP: nodeInternalIP,
client: client,
NodeIP: nodeInternalIP,
Client: client,
}, nil
}

func (g *K8sdProxyGenerator) forControlPlane(ctx context.Context) (*K8sdProxy, error) {
cplaneNodes, err := g.getControlPlaneNodes(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get control plane nodes: %w", err)
}

for _, node := range cplaneNodes.Items {
proxy, err := g.forNode(ctx, &node)
if err != nil {
continue
}

return proxy, nil
}

return nil, fmt.Errorf("failed to find a control plane node with a reachable k8sd proxy")
}

func (g *K8sdProxyGenerator) checkIfK8sdIsReachable(ctx context.Context, client *http.Client, nodeIP string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://%s:%v/", nodeIP, g.k8sdPort), nil)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

res, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to reach k8sd through proxy client: %w", err)
}
res.Body.Close()

return nil
}

func (g *K8sdProxyGenerator) getNodeInternalIP(node *corev1.Node) (string, error) {
// TODO: Make this more robust by possibly finding/parsing the right IP.
// This works as a start but might not be sufficient as the kubelet IP might not match microcluster IP.
for _, addr := range node.Status.Addresses {
if addr.Type == "InternalIP" {
return addr.Address, nil
}
}

return "", fmt.Errorf("unable to find internal IP for node %s", node.Name)
}

func (g *K8sdProxyGenerator) getControlPlaneNodes(ctx context.Context) (*corev1.NodeList, error) {
nodes, err := g.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/control-plane="})
if err != nil {
return nil, errors.Wrap(err, "unable to list nodes in target cluster")
}

if len(nodes.Items) == 0 {
return nil, errors.New("there isn't any nodes registered in target cluster")
}

return nodes, nil
}

func (g *K8sdProxyGenerator) getProxyPods(ctx context.Context) (map[string]string, error) {
func (g *k8sdProxyGenerator) getProxyPods(ctx context.Context) (map[string]string, error) {
pods, err := g.clientset.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{LabelSelector: "app=k8sd-proxy"})
if err != nil {
return nil, errors.Wrap(err, "unable to list k8sd-proxy pods in target cluster")
Expand All @@ -158,7 +99,7 @@ func (g *K8sdProxyGenerator) getProxyPods(ctx context.Context) (map[string]strin
return podmap, nil
}

func (g *K8sdProxyGenerator) NewHTTPClient(ctx context.Context, podName string) (*http.Client, error) {
func (g *k8sdProxyGenerator) NewHTTPClient(ctx context.Context, podName string) (*http.Client, error) {
p := proxy.Proxy{
Kind: "pods",
Namespace: metav1.NamespaceSystem,
Expand Down
Loading

0 comments on commit 5402034

Please sign in to comment.