diff --git a/src/pkg/cluster/injector.go b/src/pkg/cluster/injector.go index b2a8569318..b30ab34314 100644 --- a/src/pkg/cluster/injector.go +++ b/src/pkg/cluster/injector.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-containerregistry/pkg/crane" "github.com/mholt/archiver/v3" corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -156,7 +157,11 @@ func (c *Cluster) StopInjectionMadness(ctx context.Context) error { } // Remove the injector service - return c.DeleteService(ctx, ZarfNamespaceName, "zarf-injector") + err := c.Clientset.CoreV1().Services(ZarfNamespaceName).Delete(ctx, "zarf-injector", metav1.DeleteOptions{}) + if err != nil { + return err + } + return nil } func (c *Cluster) loadSeedImages(imagesDir, seedImagesDir string, injectorSeedSrcs []string, spinner *message.Spinner) ([]transform.Image, error) { @@ -306,20 +311,37 @@ func (c *Cluster) createInjectorConfigMap(ctx context.Context, binaryPath string } func (c *Cluster) createService(ctx context.Context) (*corev1.Service, error) { - service := c.GenerateService(ZarfNamespaceName, "zarf-injector") - - service.Spec.Type = corev1.ServiceTypeNodePort - service.Spec.Ports = append(service.Spec.Ports, corev1.ServicePort{ - Port: int32(5000), - }) - service.Spec.Selector = map[string]string{ - "app": "zarf-injector", + svc := &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "zarf-injector", + Namespace: ZarfNamespaceName, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + Ports: []corev1.ServicePort{ + { + Port: int32(5000), + }, + }, + Selector: map[string]string{ + "app": "zarf-injector", + }, + }, } - - // Attempt to purse the service silently - _ = c.DeleteService(ctx, ZarfNamespaceName, "zarf-injector") - - return c.CreateService(ctx, service) + // TODO: Replace with create or update + err := c.Clientset.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}) + if err != nil && !kerrors.IsNotFound(err) { + return nil, err + } + svc, err = c.Clientset.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + return svc, nil } // buildInjectionPod return a pod for injection with the appropriate containers to perform the injection. diff --git a/src/pkg/cluster/tunnel.go b/src/pkg/cluster/tunnel.go index c960fb1e8e..424881b2fc 100644 --- a/src/pkg/cluster/tunnel.go +++ b/src/pkg/cluster/tunnel.go @@ -7,14 +7,18 @@ package cluster import ( "context" "fmt" + "net/url" + "strconv" "strings" - "github.com/defenseunicorns/zarf/src/types" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/defenseunicorns/pkg/helpers" "github.com/defenseunicorns/zarf/src/config" "github.com/defenseunicorns/zarf/src/pkg/k8s" "github.com/defenseunicorns/zarf/src/pkg/message" - v1 "k8s.io/api/core/v1" + "github.com/defenseunicorns/zarf/src/types" ) // Zarf specific connect strings @@ -56,25 +60,30 @@ func NewTunnelInfo(namespace, resourceType, resourceName, urlSuffix string, loca // PrintConnectTable will print a table of all Zarf connect matches found in the cluster. func (c *Cluster) PrintConnectTable(ctx context.Context) error { - list, err := c.GetServicesByLabelExists(ctx, v1.NamespaceAll, config.ZarfConnectLabelName) + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{{ + Operator: metav1.LabelSelectorOpExists, + Key: config.ZarfConnectLabelName, + }}, + }) + if err != nil { + return err + } + serviceList, err := c.Clientset.CoreV1().Services("").List(ctx, metav1.ListOptions{LabelSelector: selector.String()}) if err != nil { return err } connections := make(types.ConnectStrings) - - for _, svc := range list.Items { + for _, svc := range serviceList.Items { name := svc.Labels[config.ZarfConnectLabelName] - // Add the connectString for processing later in the deployment. connections[name] = types.ConnectString{ Description: svc.Annotations[config.ZarfConnectAnnotationDescription], URL: svc.Annotations[config.ZarfConnectAnnotationURL], } } - message.PrintConnectStringTable(connections) - return nil } @@ -151,11 +160,15 @@ func (c *Cluster) ConnectToZarfRegistryEndpoint(ctx context.Context, registryInf return "", tunnel, err } } else { - svcInfo, err := c.ServiceInfoFromNodePortURL(ctx, registryInfo.Address) + serviceList, err := c.Clientset.CoreV1().Services("").List(ctx, metav1.ListOptions{}) + if err != nil { + return "", nil, err + } + namespace, name, port, err := serviceInfoFromNodePortURL(serviceList.Items, registryInfo.Address) // If this is a service (no error getting svcInfo), create a port-forward tunnel to that resource if err == nil { - if tunnel, err = c.NewTunnel(svcInfo.Namespace, k8s.SvcResource, svcInfo.Name, "", 0, svcInfo.Port); err != nil { + if tunnel, err = c.NewTunnel(namespace, k8s.SvcResource, name, "", 0, port); err != nil { return "", tunnel, err } } @@ -179,14 +192,23 @@ func (c *Cluster) checkForZarfConnectLabel(ctx context.Context, name string) (Tu message.Debugf("Looking for a Zarf Connect Label in the cluster") - matches, err := c.GetServicesByLabel(ctx, "", config.ZarfConnectLabelName, name) + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: map[string]string{ + config.ZarfConnectLabelName: name, + }, + }) if err != nil { - return zt, fmt.Errorf("unable to lookup the service: %w", err) + return TunnelInfo{}, err + } + listOpts := metav1.ListOptions{LabelSelector: selector.String()} + serviceList, err := c.Clientset.CoreV1().Services("").List(ctx, listOpts) + if err != nil { + return TunnelInfo{}, err } - if len(matches.Items) > 0 { + if len(serviceList.Items) > 0 { // If there is a match, use the first one as these are supposed to be unique. - svc := matches.Items[0] + svc := serviceList.Items[0] // Reset based on the matched params. zt.resourceType = k8s.SvcResource @@ -209,3 +231,42 @@ func (c *Cluster) checkForZarfConnectLabel(ctx context.Context, name string) (Tu return zt, nil } + +// TODO: Refactor to use netip.AddrPort instead of a string for nodePortURL. +func serviceInfoFromNodePortURL(services []corev1.Service, nodePortURL string) (string, string, int, error) { + // Attempt to parse as normal, if this fails add a scheme to the URL (docker registries don't use schemes) + parsedURL, err := url.Parse(nodePortURL) + if err != nil { + parsedURL, err = url.Parse("scheme://" + nodePortURL) + if err != nil { + return "", "", 0, err + } + } + + // Match hostname against localhost ip/hostnames + hostname := parsedURL.Hostname() + if hostname != helpers.IPV4Localhost && hostname != "localhost" { + return "", "", 0, fmt.Errorf("node port services should be on localhost") + } + + // Get the node port from the nodeportURL. + nodePort, err := strconv.Atoi(parsedURL.Port()) + if err != nil { + return "", "", 0, err + } + if nodePort < 30000 || nodePort > 32767 { + return "", "", 0, fmt.Errorf("node port services should use the port range 30000-32767") + } + + for _, svc := range services { + if svc.Spec.Type == "NodePort" { + for _, port := range svc.Spec.Ports { + if int(port.NodePort) == nodePort { + return svc.Namespace, svc.Name, int(port.Port), nil + } + } + } + } + + return "", "", 0, fmt.Errorf("no matching node port services found") +} diff --git a/src/pkg/cluster/tunnel_test.go b/src/pkg/cluster/tunnel_test.go new file mode 100644 index 0000000000..41c4aac407 --- /dev/null +++ b/src/pkg/cluster/tunnel_test.go @@ -0,0 +1,127 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: 2021-Present The Zarf Authors + +package cluster + +import ( + "testing" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestServiceInfoFromNodePortURL(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + services []corev1.Service + nodePortURL string + expectedErr string + expectedNamespace string + expectedName string + expectedPort int + }{ + { + name: "invalid node port", + nodePortURL: "example.com", + expectedErr: "node port services should be on localhost", + }, + { + name: "invalid port range", + nodePortURL: "http://localhost:8080", + expectedErr: "node port services should use the port range 30000-32767", + }, + { + name: "no services", + nodePortURL: "http://localhost:30001", + services: []corev1.Service{}, + expectedErr: "no matching node port services found", + }, + { + name: "found serivce", + nodePortURL: "http://localhost:30001", + services: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "wrong-type", + Namespace: "wrong-type", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + Ports: []corev1.ServicePort{ + { + Port: 1111, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "wrong-node-port", + Namespace: "wrong-node-port", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + Ports: []corev1.ServicePort{ + { + NodePort: 30002, + Port: 2222, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "good-service", + Namespace: "good-namespace", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + Ports: []corev1.ServicePort{ + { + NodePort: 30001, + Port: 3333, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "too-late", + Namespace: "too-late", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + Ports: []corev1.ServicePort{ + { + NodePort: 30001, + Port: 4444, + }, + }, + }, + }, + }, + expectedNamespace: "good-namespace", + expectedName: "good-service", + expectedPort: 3333, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + namespace, name, port, err := serviceInfoFromNodePortURL(tt.services, tt.nodePortURL) + if tt.expectedErr != "" { + require.EqualError(t, err, tt.expectedErr) + return + } + require.NoError(t, err) + require.Equal(t, tt.expectedNamespace, namespace) + require.Equal(t, tt.expectedName, name) + require.Equal(t, tt.expectedPort, port) + }) + } +} diff --git a/src/pkg/k8s/services.go b/src/pkg/k8s/services.go deleted file mode 100644 index 63b847d413..0000000000 --- a/src/pkg/k8s/services.go +++ /dev/null @@ -1,181 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: 2021-Present The Zarf Authors - -// Package k8s provides a client for interacting with a Kubernetes cluster. -package k8s - -import ( - "context" - "fmt" - "net/url" - "regexp" - "strconv" - - "github.com/defenseunicorns/pkg/helpers" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -// See https://regex101.com/r/OWVfAO/1. -const serviceURLPattern = `^(?P[^\.]+)\.(?P[^\.]+)\.svc\.cluster\.local$` - -// ServiceInfo contains information necessary for connecting to a cluster service. -type ServiceInfo struct { - Namespace string - Name string - Port int -} - -// ReplaceService deletes and re-creates a service. -func (k *K8s) ReplaceService(ctx context.Context, service *corev1.Service) (*corev1.Service, error) { - if err := k.DeleteService(ctx, service.Namespace, service.Name); err != nil { - return nil, err - } - - return k.CreateService(ctx, service) -} - -// GenerateService returns a K8s service struct without writing to the cluster. -func (k *K8s) GenerateService(namespace, name string) *corev1.Service { - service := &corev1.Service{ - TypeMeta: metav1.TypeMeta{ - APIVersion: corev1.SchemeGroupVersion.String(), - Kind: "Service", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - Annotations: make(Labels), - Labels: make(Labels), - }, - } - - return service -} - -// DeleteService removes a service from the cluster by namespace and name. -func (k *K8s) DeleteService(ctx context.Context, namespace, name string) error { - return k.Clientset.CoreV1().Services(namespace).Delete(ctx, name, metav1.DeleteOptions{}) -} - -// CreateService creates the given service in the cluster. -func (k *K8s) CreateService(ctx context.Context, service *corev1.Service) (*corev1.Service, error) { - createOptions := metav1.CreateOptions{} - return k.Clientset.CoreV1().Services(service.Namespace).Create(ctx, service, createOptions) -} - -// GetService returns a Kubernetes service resource in the provided namespace with the given name. -func (k *K8s) GetService(ctx context.Context, namespace, serviceName string) (*corev1.Service, error) { - return k.Clientset.CoreV1().Services(namespace).Get(ctx, serviceName, metav1.GetOptions{}) -} - -// GetServices returns a list of services in the provided namespace. To search all namespaces, pass "" in the namespace arg. -func (k *K8s) GetServices(ctx context.Context, namespace string) (*corev1.ServiceList, error) { - return k.Clientset.CoreV1().Services(namespace).List(ctx, metav1.ListOptions{}) -} - -// GetServicesByLabel returns a list of matched services given a label and value. To search all namespaces, pass "" in the namespace arg. -func (k *K8s) GetServicesByLabel(ctx context.Context, namespace, label, value string) (*corev1.ServiceList, error) { - // Create the selector and add the requirement - labelSelector, _ := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ - MatchLabels: Labels{ - label: value, - }, - }) - - // Run the query with the selector and return as a ServiceList - return k.Clientset.CoreV1().Services(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector.String()}) -} - -// GetServicesByLabelExists returns a list of matched services given a label. To search all namespaces, pass "" in the namespace arg. -func (k *K8s) GetServicesByLabelExists(ctx context.Context, namespace, label string) (*corev1.ServiceList, error) { - // Create the selector and add the requirement - labelSelector, _ := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{{ - Key: label, - Operator: metav1.LabelSelectorOpExists, - }}, - }) - - // Run the query with the selector and return as a ServiceList - return k.Clientset.CoreV1().Services(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector.String()}) -} - -// ServiceInfoFromNodePortURL takes a nodePortURL and parses it to find the service info for connecting to the cluster. The string is expected to follow the following format: -// Example nodePortURL: 127.0.0.1:{PORT}. -func (k *K8s) ServiceInfoFromNodePortURL(ctx context.Context, nodePortURL string) (*ServiceInfo, error) { - // Attempt to parse as normal, if this fails add a scheme to the URL (docker registries don't use schemes) - parsedURL, err := url.Parse(nodePortURL) - if err != nil { - parsedURL, err = url.Parse("scheme://" + nodePortURL) - if err != nil { - return nil, err - } - } - - // Match hostname against localhost ip/hostnames - hostname := parsedURL.Hostname() - if hostname != helpers.IPV4Localhost && hostname != "localhost" { - return nil, fmt.Errorf("node port services should be on localhost") - } - - // Get the node port from the nodeportURL. - nodePort, err := strconv.Atoi(parsedURL.Port()) - if err != nil { - return nil, err - } - if nodePort < 30000 || nodePort > 32767 { - return nil, fmt.Errorf("node port services should use the port range 30000-32767") - } - - services, err := k.GetServices(ctx, "") - if err != nil { - return nil, err - } - - for _, svc := range services.Items { - if svc.Spec.Type == "NodePort" { - for _, port := range svc.Spec.Ports { - if int(port.NodePort) == nodePort { - return &ServiceInfo{ - Namespace: svc.Namespace, - Name: svc.Name, - Port: int(port.Port), - }, nil - } - } - } - } - - return nil, fmt.Errorf("no matching node port services found") -} - -// ServiceInfoFromServiceURL takes a serviceURL and parses it to find the service info for connecting to the cluster. The string is expected to follow the following format: -// Example serviceURL: http://{SERVICE_NAME}.{NAMESPACE}.svc.cluster.local:{PORT}. -func ServiceInfoFromServiceURL(serviceURL string) (*ServiceInfo, error) { - parsedURL, err := url.Parse(serviceURL) - if err != nil { - return nil, err - } - - // Get the remote port from the serviceURL. - remotePort, err := strconv.Atoi(parsedURL.Port()) - if err != nil { - return nil, err - } - - // Match hostname against local cluster service format. - pattern := regexp.MustCompile(serviceURLPattern) - get, err := helpers.MatchRegex(pattern, parsedURL.Hostname()) - - // If incomplete match, return an error. - if err != nil { - return nil, err - } - - return &ServiceInfo{ - Namespace: get("namespace"), - Name: get("name"), - Port: remotePort, - }, nil -} diff --git a/src/pkg/k8s/tunnel.go b/src/pkg/k8s/tunnel.go index 6c4f46e0cf..df49ac667a 100644 --- a/src/pkg/k8s/tunnel.go +++ b/src/pkg/k8s/tunnel.go @@ -14,9 +14,11 @@ import ( "sync" "time" - "github.com/defenseunicorns/pkg/helpers" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/transport/spdy" + + "github.com/defenseunicorns/pkg/helpers" ) // Global lock to synchronize port selections. @@ -248,7 +250,7 @@ func (tunnel *Tunnel) getAttachablePodForResource(ctx context.Context) (string, // getAttachablePodForService will find an active pod associated with the Service and return the pod name. func (tunnel *Tunnel) getAttachablePodForService(ctx context.Context) (string, error) { - service, err := tunnel.kube.GetService(ctx, tunnel.namespace, tunnel.resourceName) + service, err := tunnel.kube.Clientset.CoreV1().Services(tunnel.namespace).Get(ctx, tunnel.resourceName, metav1.GetOptions{}) if err != nil { return "", fmt.Errorf("unable to find the service: %w", err) } diff --git a/src/pkg/packager/deploy.go b/src/pkg/packager/deploy.go index a7c516fb43..2805c96620 100644 --- a/src/pkg/packager/deploy.go +++ b/src/pkg/packager/deploy.go @@ -7,8 +7,10 @@ package packager import ( "context" "fmt" + "net/url" "os" "path/filepath" + "regexp" "runtime" "strconv" "strings" @@ -511,13 +513,11 @@ func (p *Packager) pushReposToRepository(ctx context.Context, reposPath string, // Create an anonymous function to push the repo to the Zarf git server tryPush := func() error { gitClient := git.New(p.state.GitServer) - svcInfo, _ := k8s.ServiceInfoFromServiceURL(gitClient.Server.Address) - - var err error - var tunnel *k8s.Tunnel + namespace, name, port, err := serviceInfoFromServiceURL(gitClient.Server.Address) // If this is a service (svcInfo is not nil), create a port-forward tunnel to that resource - if svcInfo != nil { + // TODO: Find a better way as ignoring the error is not a good solution to decide to port forward. + if err == nil { if !p.isConnectedToCluster() { connectCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -527,7 +527,7 @@ func (p *Packager) pushReposToRepository(ctx context.Context, reposPath string, } } - tunnel, err = p.cluster.NewTunnel(svcInfo.Namespace, k8s.SvcResource, svcInfo.Name, "", 0, svcInfo.Port) + tunnel, err := p.cluster.NewTunnel(namespace, k8s.SvcResource, name, "", 0, port) if err != nil { return err } @@ -704,3 +704,31 @@ func (p *Packager) printTablesForDeployment(ctx context.Context, componentsToDep } } } + +// ServiceInfoFromServiceURL takes a serviceURL and parses it to find the service info for connecting to the cluster. The string is expected to follow the following format: +// Example serviceURL: http://{SERVICE_NAME}.{NAMESPACE}.svc.cluster.local:{PORT}. +func serviceInfoFromServiceURL(serviceURL string) (string, string, int, error) { + parsedURL, err := url.Parse(serviceURL) + if err != nil { + return "", "", 0, err + } + + // Get the remote port from the serviceURL. + remotePort, err := strconv.Atoi(parsedURL.Port()) + if err != nil { + return "", "", 0, err + } + + // Match hostname against local cluster service format. + pattern, err := regexp.Compile(`^(?P[^\.]+)\.(?P[^\.]+)\.svc\.cluster\.local$`) + if err != nil { + return "", "", 0, err + } + get, err := helpers.MatchRegex(pattern, parsedURL.Hostname()) + + // If incomplete match, return an error. + if err != nil { + return "", "", 0, err + } + return get("namespace"), get("name"), remotePort, nil +} diff --git a/src/pkg/packager/deploy_test.go b/src/pkg/packager/deploy_test.go index d79f19f1a5..b4b7cb9df2 100644 --- a/src/pkg/packager/deploy_test.go +++ b/src/pkg/packager/deploy_test.go @@ -10,6 +10,7 @@ import ( "github.com/defenseunicorns/zarf/src/pkg/packager/sources" "github.com/defenseunicorns/zarf/src/pkg/variables" "github.com/defenseunicorns/zarf/src/types" + "github.com/stretchr/testify/require" ) func TestGenerateValuesOverrides(t *testing.T) { @@ -227,3 +228,49 @@ func TestGenerateValuesOverrides(t *testing.T) { }) } } + +func TestServiceInfoFromServiceURL(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + serviceURL string + expectedErr string + expectedNamespace string + expectedName string + expectedPort int + }{ + { + name: "no port", + serviceURL: "http://example.com", + expectedErr: `strconv.Atoi: parsing "": invalid syntax`, + }, + { + name: "normal domain", + serviceURL: "http://example.com:8080", + expectedErr: "unable to match against example.com", + }, + { + name: "valid url", + serviceURL: "http://foo.bar.svc.cluster.local:9090", + expectedNamespace: "bar", + expectedName: "foo", + expectedPort: 9090, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + namespace, name, port, err := serviceInfoFromServiceURL(tt.serviceURL) + if tt.expectedErr != "" { + require.EqualError(t, err, tt.expectedErr) + return + } + require.NoError(t, err) + require.Equal(t, tt.expectedNamespace, namespace) + require.Equal(t, tt.expectedName, name) + require.Equal(t, tt.expectedPort, port) + }) + } +}