From 189b11efcbb9c8fa210dd983bbdfcb27acd5c880 Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Thu, 19 Sep 2024 19:10:56 +0100 Subject: [PATCH] [kube] Drop usage of `ProcessKubeCSR` This PR drops the usage of `ProcessKubeCSR` to ask Auth to sign a certificate on behalf of a user when running `tsh request search --kind=pod,...`. Previously, we used `ProcessKubeCSR` to sign a new certificate but given that Kubernetes Access supports impersonation, this PR levarages it. This also allows reusing the same kubernetes client for multiple calls. --- lib/client/api.go | 3 -- lib/kube/grpc/grpc.go | 70 +++++++++++++++------------ lib/kube/grpc/grpc_test.go | 15 +++++- lib/kube/grpc/utils.go | 94 +++++++++++++------------------------ lib/service/service.go | 50 +++++++++++--------- lib/service/service_test.go | 13 ++--- 6 files changed, 121 insertions(+), 124 deletions(-) diff --git a/lib/client/api.go b/lib/client/api.go index 74fd055476196..e0779875c8b50 100644 --- a/lib/client/api.go +++ b/lib/client/api.go @@ -5076,9 +5076,6 @@ func parseMFAMode(in string) (wancli.AuthenticatorAttachment, error) { // NewKubernetesServiceClient connects to the proxy and returns an authenticated gRPC // client to the Kubernetes service. func (tc *TeleportClient) NewKubernetesServiceClient(ctx context.Context, clusterName string) (kubeproto.KubeServiceClient, error) { - if !tc.TLSRoutingEnabled { - return nil, trace.BadParameter("kube service is not supported if TLS routing is not enabled") - } // get tlsConfig to dial to proxy. tlsConfig, err := tc.LoadTLSConfig() if err != nil { diff --git a/lib/kube/grpc/grpc.go b/lib/kube/grpc/grpc.go index 554e68fdd7de3..9a68513660b3b 100644 --- a/lib/kube/grpc/grpc.go +++ b/lib/kube/grpc/grpc.go @@ -27,19 +27,18 @@ import ( "github.com/gravitational/trace/trail" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/defaults" proto "github.com/gravitational/teleport/api/gen/proto/go/teleport/kube/v1" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" - "github.com/gravitational/teleport/lib/auth/authclient" "github.com/gravitational/teleport/lib/authz" - "github.com/gravitational/teleport/lib/cryptosuites" "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/services/local" - "github.com/gravitational/teleport/lib/tlsca" + "github.com/gravitational/teleport/lib/utils" ) // errDone indicates that resource iteration is complete @@ -51,6 +50,7 @@ type Server struct { cfg Config proxyAddress string kubeProxySNI string + kubeClient kubernetes.Interface } // New creates a new instance of Kube gRPC handler. @@ -58,18 +58,23 @@ func New(cfg Config) (*Server, error) { if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } + sni, addr, err := getWebAddrAndKubeSNI(cfg.KubeProxyAddr) if err != nil { return nil, trace.Wrap(err) } - return &Server{cfg: cfg, proxyAddress: addr, kubeProxySNI: sni}, nil + s := &Server{cfg: cfg, proxyAddress: addr, kubeProxySNI: sni} + + if s.kubeClient, err = s.buildKubeClient(); err != nil { + return nil, trace.Wrap(err, "unable to create kubeClient") + } + + return s, nil } // Config specifies configuration for Kube gRPC server. type Config struct { - // Signer is a auth server client to sign Kubernetes Certificates. - Signer CertificateSigner // AccessPoint is caching access point to retrieve roles and the cluster // auth preference. AccessPoint AccessPoint @@ -85,26 +90,33 @@ type Config struct { KubeProxyAddr string // ClusterName is the name of the cluster that this server is running in. ClusterName string -} - -// CertificateSigner is an interface for signing Kubernetes certificates. -type CertificateSigner interface { - // ProcessKubeCSR processes CSR request against Kubernetes CA, returns - // signed certificate if successful. - ProcessKubeCSR(req authclient.KubeCSR) (*authclient.KubeCSRResponse, error) + // GetConnTLSCertificate returns the TLS kubeClient certificate to use when + // connecting to the upstream Teleport proxy or Kubernetes service when + // forwarding requests using the forward identity (i.e. proxy impersonating + // a user) method. Paired with GetConnTLSRoots and ConnTLSCipherSuites to + // generate the correct [*tls.Config] on demand. + GetConnTLSCertificate utils.GetCertificateFunc + // GetConnTLSRoots returns the [*x509.CertPool] used to validate TLS + // connections to the upstream Teleport proxy or Kubernetes service. + GetConnTLSRoots utils.GetRootsFunc + // ConnTLSCipherSuites optionally contains a list of TLS ciphersuites to use + // when connecting to the upstream Teleport Proxy or Kubernetes service. + ConnTLSCipherSuites []uint16 } // AccessPoint is caching access point to retrieve roles and the cluster // auth preference. type AccessPoint interface { services.RoleGetter - cryptosuites.AuthPreferenceGetter } // CheckAndSetDefaults checks and sets default values. func (c *Config) CheckAndSetDefaults() error { - if c.Signer == nil { - return trace.BadParameter("missing parameter Signer") + if c.GetConnTLSCertificate == nil { + return trace.BadParameter("missing parameter GetConnTLSCertificate") + } + if c.GetConnTLSRoots == nil { + return trace.BadParameter("missing parameter GetConnTLSRoots") } if c.AccessPoint == nil { return trace.BadParameter("missing parameter AccessPoint") @@ -166,12 +178,14 @@ func (s *Server) ListKubernetesResources(ctx context.Context, req *proto.ListKub identity.KubernetesCluster = req.KubernetesCluster identity.Groups = userContext.Checker.RoleNames() identity.RouteToCluster = req.TeleportCluster + ctx = authz.ContextWithUser(ctx, authz.WrapIdentity(identity)) // wrap the identity in the context + switch { case requiresFakePagination(req): - rsp, err := s.listResourcesUsingFakePagination(ctx, identity, req) + rsp, err := s.listResourcesUsingFakePagination(ctx, req) return rsp, trail.ToGRPC(err) case slices.Contains(types.KubernetesResourcesKinds, req.ResourceType): - rsp, err := s.listKubernetesResources(ctx, identity, true, req) + rsp, err := s.listKubernetesResources(ctx, true, req) return rsp, trail.ToGRPC(err) default: return nil, trail.ToGRPC(trace.BadParameter("unsupported resource type %q", req.ResourceType)) @@ -216,7 +230,6 @@ func (s *Server) emitAuditEvent(ctx context.Context, userContext *authz.Context, // those that match the search parameters. func (s *Server) listKubernetesResources( ctx context.Context, - identity tlsca.Identity, respectLimit bool, req *proto.ListKubernetesResourcesRequest, ) (*proto.ListKubernetesResourcesResponse, error) { @@ -244,7 +257,7 @@ func (s *Server) listKubernetesResources( rsp := &proto.ListKubernetesResourcesResponse{} err := s.iterateKubernetesResources( - ctx, identity, req, respectLimit, + ctx, req, respectLimit, func(r *types.KubernetesResourceV1, continueKey string) (int, error) { switch match, err := services.MatchResourceByFilters(r, filter, nil /* ignore dup matches */); { case err != nil: @@ -268,24 +281,19 @@ func (s *Server) listKubernetesResources( // For each resources discovered, the fn function is called to decide the action. // Kubernetes continue key is a base64 encoded json payload with the resource // version of the request. In order to resume the operation when using the paginated -// mode, Teleport respects the Kubernetes Continue Key and will return it to the client +// mode, Teleport respects the Kubernetes Continue Key and will return it to the kubeClient // as a NextKey. // In order to have the expected behavior Teleport must respect the ContinueKey and // cannot manipulate it. It means that Teleport needs to manipulate the number of // requested items from the Kubernetes Cluster in order to have the expected behavior. func (s *Server) iterateKubernetesResources( ctx context.Context, - identity tlsca.Identity, req *proto.ListKubernetesResourcesRequest, respectLimit bool, fn func(*types.KubernetesResourceV1, string) (int, error), ) error { - kubeClient, err := s.newKubernetesClient(ctx, s.cfg.ClusterName, identity) - if err != nil { - s.cfg.Log.WithError(err).Warnf("unable to create a Kubernetes client for user %q", identity.Username) - // Hide the root cause of the error from the client. - return trace.Errorf("unable to create a Kubernetes client for user %q", identity.Username) - } + kubeClient := s.kubeClient + continueKey := req.StartKey itemsAppended := 0 for { @@ -519,10 +527,10 @@ func itemListToKObjectList[T kObject](items []T) []kObject { } // listResourcesUsingFakePagination is a helper function that lists Kubernetes -// resources using fake pagination. It is used when the client requires +// resources using fake pagination. It is used when the kubeClient requires // the total count or sorting. func (s *Server) listResourcesUsingFakePagination( - ctx context.Context, identity tlsca.Identity, + ctx context.Context, req *proto.ListKubernetesResourcesRequest, ) (*proto.ListKubernetesResourcesResponse, error) { var ( @@ -531,7 +539,7 @@ func (s *Server) listResourcesUsingFakePagination( ) switch { case slices.Contains(types.KubernetesResourcesKinds, req.ResourceType): - rsp, err = s.listKubernetesResources(ctx, identity, false /* do not respect the limit value */, req) + rsp, err = s.listKubernetesResources(ctx, false /* do not respect the limit value */, req) if err != nil { return nil, trace.Wrap(err) } diff --git a/lib/kube/grpc/grpc_test.go b/lib/kube/grpc/grpc_test.go index b2c53c0520c67..6d6c888315d58 100644 --- a/lib/kube/grpc/grpc_test.go +++ b/lib/kube/grpc/grpc_test.go @@ -21,6 +21,7 @@ package kubev1 import ( "context" "crypto/tls" + "crypto/x509" "net" "testing" @@ -539,10 +540,20 @@ func initGRPCServer(t *testing.T, testCtx *kubeproxy.TestContext, listener net.L require.NoError(t, err) t.Cleanup(func() { require.NoError(t, proxyAuthClient.Close()) }) + proxyTLSConfig, err := serverIdentity.TLSConfig(nil) + require.NoError(t, err) + require.Len(t, proxyTLSConfig.Certificates, 1) + require.NotNil(t, proxyTLSConfig.RootCAs) + server, err := New( Config{ - ClusterName: testCtx.ClusterName, - Signer: proxyAuthClient, + ClusterName: testCtx.ClusterName, + GetConnTLSCertificate: func() (*tls.Certificate, error) { + return &proxyTLSConfig.Certificates[0], nil + }, + GetConnTLSRoots: func() (*x509.CertPool, error) { + return proxyTLSConfig.RootCAs, nil + }, AccessPoint: proxyAuthClient, Emitter: testCtx.Emitter, KubeProxyAddr: testCtx.KubeProxyAddress(), diff --git a/lib/kube/grpc/utils.go b/lib/kube/grpc/utils.go index 5209c3d728929..3c1c95e961b92 100644 --- a/lib/kube/grpc/utils.go +++ b/lib/kube/grpc/utils.go @@ -19,23 +19,19 @@ package kubev1 import ( - "bytes" - "context" - "crypto/rand" - "crypto/x509" - "encoding/pem" + "crypto/tls" "net" + "net/http" + "time" "github.com/gravitational/trace" + utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "github.com/gravitational/teleport" - "github.com/gravitational/teleport/api/utils/keys" - "github.com/gravitational/teleport/lib/auth/authclient" + "github.com/gravitational/teleport/lib/auth" "github.com/gravitational/teleport/lib/client" - "github.com/gravitational/teleport/lib/cryptosuites" - "github.com/gravitational/teleport/lib/tlsca" "github.com/gravitational/teleport/lib/utils" ) @@ -59,66 +55,42 @@ func getWebAddrAndKubeSNI(proxyAddr string) (string, string, error) { if ip.IsUnspecified() { addr = string(teleport.PrincipalLocalhost) } - return sni, net.JoinHostPort(addr, port), nil + return sni, "https://" + net.JoinHostPort(addr, port), nil } -// requestCertificate requests a short-lived certificate for the user using the -// Kubernetes CA. -func (s *Server) requestCertificate(ctx context.Context, username string, cluster string, identity tlsca.Identity) (*rest.Config, error) { - s.cfg.Log.Debugf("Requesting K8s cert for %v.", username) - key, err := cryptosuites.GenerateKey(ctx, - cryptosuites.GetCurrentSuiteFromAuthPreference(s.cfg.AccessPoint), - cryptosuites.ProxyKubeClient) - if err != nil { - return nil, trace.Wrap(err) - } - keyPEM, err := keys.MarshalPrivateKey(key) - if err != nil { - return nil, trace.Wrap(err) - } +// buildKubeClient creates a new Kubernetes client that is used to communicate +// with the Kubernetes API server. +func (s *Server) buildKubeClient() (kubernetes.Interface, error) { + const idleConnsPerHost = 25 - subject, err := identity.Subject() - if err != nil { - return nil, trace.Wrap(err) - } - csr := &x509.CertificateRequest{ - Subject: subject, + tlsConfig := utils.TLSConfig(s.cfg.ConnTLSCipherSuites) + tlsConfig.GetClientCertificate = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) { + tlsCert, err := s.cfg.GetConnTLSCertificate() + if err != nil { + return nil, trace.Wrap(err) + } + return tlsCert, nil } - csrBytes, err := x509.CreateCertificateRequest(rand.Reader, csr, key) - if err != nil { - return nil, trace.Wrap(err) - } - csrPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE REQUEST", Bytes: csrBytes}) + tlsConfig.InsecureSkipVerify = true + tlsConfig.VerifyConnection = utils.VerifyConnectionWithRoots(s.cfg.GetConnTLSRoots) + tlsConfig.ServerName = s.kubeProxySNI - response, err := s.cfg.Signer.ProcessKubeCSR(authclient.KubeCSR{ - Username: username, - ClusterName: cluster, - CSR: csrPEM, + transport := utilnet.SetTransportDefaults(&http.Transport{ + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: tlsConfig, + MaxIdleConnsPerHost: idleConnsPerHost, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, }) - if err != nil { - return nil, trace.Wrap(err) - } - return &rest.Config{ - Host: s.proxyAddress, - TLSClientConfig: rest.TLSClientConfig{ - CertData: response.Cert, - KeyData: keyPEM, - CAData: bytes.Join(response.CertAuthorities, []byte("\n")), - ServerName: s.kubeProxySNI, - }, - }, nil -} -// newKubernetesClient creates a new Kubernetes client with short-lived user -// certificates that include in the roles field the available search_as_role -// roles. -func (s *Server) newKubernetesClient(ctx context.Context, cluster string, identity tlsca.Identity) (kubernetes.Interface, error) { - cfg, err := s.requestCertificate(ctx, identity.Username, cluster, identity) - if err != nil { - return nil, trace.Wrap(err) + cfg := &rest.Config{ + Host: s.proxyAddress, + Transport: auth.NewImpersonatorRoundTripper(transport), } - client, err := kubernetes.NewForConfig(cfg) - return client, trace.Wrap(err) + kubeClient, err := kubernetes.NewForConfig(cfg) + return kubeClient, trace.Wrap(err) } // decideLimit returns the number of items we should request for. If respectLimit diff --git a/lib/service/service.go b/lib/service/service.go index 9a7899a7b4294..7acbcfe74751e 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -5158,14 +5158,20 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { if err != nil { return trace.Wrap(err) } + grpcServerMTLS, err = process.initSecureGRPCServer( initSecureGRPCServerCfg{ - limiter: proxyLimiter, - conn: conn, - listener: listeners.grpcMTLS, - accessPoint: accessPoint, - lockWatcher: lockWatcher, - emitter: asyncEmitter, + limiter: proxyLimiter, + conn: conn, + listener: listeners.grpcMTLS, + kubeProxyAddr: kubeDialAddr( + cfg.Proxy, + clusterNetworkConfig.GetProxyListenerMode(), + ), + accessPoint: accessPoint, + lockWatcher: lockWatcher, + emitter: asyncEmitter, + tlsCipherSuites: cfg.CipherSuites, }, ) if err != nil { @@ -6593,15 +6599,15 @@ func (process *TeleportProcess) initSecureGRPCServer(cfg initSecureGRPCServerCfg ) kubeServer, err := kubegrpc.New(kubegrpc.Config{ - Signer: cfg.conn.Client, - AccessPoint: cfg.accessPoint, - Authz: authorizer, - Log: process.log, - Emitter: cfg.emitter, - // listener is using the underlying web listener, so we can just use its address. - // since tls routing is enabled. - KubeProxyAddr: cfg.listener.Addr().String(), - ClusterName: clusterName, + AccessPoint: cfg.accessPoint, + Authz: authorizer, + Log: process.log, + Emitter: cfg.emitter, + KubeProxyAddr: cfg.kubeProxyAddr.String(), + ClusterName: clusterName, + GetConnTLSCertificate: cfg.conn.ClientGetCertificate, + GetConnTLSRoots: cfg.conn.ClientGetPool, + ConnTLSCipherSuites: cfg.tlsCipherSuites, }) if err != nil { return nil, trace.Wrap(err) @@ -6617,12 +6623,14 @@ func (process *TeleportProcess) initSecureGRPCServer(cfg initSecureGRPCServerCfg // initSecureGRPCServerCfg is a configuration for initSecureGRPCServer function. type initSecureGRPCServerCfg struct { - conn *Connector - limiter *limiter.Limiter - listener net.Listener - accessPoint authclient.ProxyAccessPoint - lockWatcher *services.LockWatcher - emitter apievents.Emitter + conn *Connector + limiter *limiter.Limiter + listener net.Listener + kubeProxyAddr utils.NetAddr + accessPoint authclient.ProxyAccessPoint + lockWatcher *services.LockWatcher + emitter apievents.Emitter + tlsCipherSuites []uint16 } // copyAndConfigureTLS can be used to copy and modify an existing *tls.Config diff --git a/lib/service/service_test.go b/lib/service/service_test.go index 6e3ab2554977b..bd0155a6db774 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -1302,12 +1302,13 @@ func TestProxyGRPCServers(t *testing.T) { }) // Secure gRPC server. secureGRPC, err := process.initSecureGRPCServer(initSecureGRPCServerCfg{ - limiter: limiter, - conn: testConnector, - listener: secureListener, - accessPoint: testConnector.Client, - lockWatcher: proxyLockWatcher, - emitter: testConnector.Client, + limiter: limiter, + conn: testConnector, + listener: secureListener, + kubeProxyAddr: *utils.MustParseAddr(secureListener.Addr().String()), + accessPoint: testConnector.Client, + lockWatcher: proxyLockWatcher, + emitter: testConnector.Client, }) require.NoError(t, err) t.Cleanup(secureGRPC.GracefulStop)