Skip to content

Commit

Permalink
[kube] Drop usage of ProcessKubeCSR
Browse files Browse the repository at this point in the history
This PR drops the usage of `ProcessKubeCSR` to ask Auth to sign a certificate on behalf of a user when running `tsh request search --kind=pod,...`.
Previously, we used `ProcessKubeCSR` to sign a new certificate but given that Kubernetes Access supports impersonation, this PR levarages it.

This also allows reusing the same kubernetes client for multiple calls.
  • Loading branch information
tigrato committed Sep 19, 2024
1 parent 2e091ff commit 189b11e
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 124 deletions.
3 changes: 0 additions & 3 deletions lib/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5076,9 +5076,6 @@ func parseMFAMode(in string) (wancli.AuthenticatorAttachment, error) {
// NewKubernetesServiceClient connects to the proxy and returns an authenticated gRPC
// client to the Kubernetes service.
func (tc *TeleportClient) NewKubernetesServiceClient(ctx context.Context, clusterName string) (kubeproto.KubeServiceClient, error) {
if !tc.TLSRoutingEnabled {
return nil, trace.BadParameter("kube service is not supported if TLS routing is not enabled")
}
// get tlsConfig to dial to proxy.
tlsConfig, err := tc.LoadTLSConfig()
if err != nil {
Expand Down
70 changes: 39 additions & 31 deletions lib/kube/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,18 @@ import (
"github.com/gravitational/trace/trail"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/defaults"
proto "github.com/gravitational/teleport/api/gen/proto/go/teleport/kube/v1"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/authz"
"github.com/gravitational/teleport/lib/cryptosuites"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/services/local"
"github.com/gravitational/teleport/lib/tlsca"
"github.com/gravitational/teleport/lib/utils"
)

// errDone indicates that resource iteration is complete
Expand All @@ -51,25 +50,31 @@ type Server struct {
cfg Config
proxyAddress string
kubeProxySNI string
kubeClient kubernetes.Interface
}

// New creates a new instance of Kube gRPC handler.
func New(cfg Config) (*Server, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}

sni, addr, err := getWebAddrAndKubeSNI(cfg.KubeProxyAddr)
if err != nil {
return nil, trace.Wrap(err)
}

return &Server{cfg: cfg, proxyAddress: addr, kubeProxySNI: sni}, nil
s := &Server{cfg: cfg, proxyAddress: addr, kubeProxySNI: sni}

if s.kubeClient, err = s.buildKubeClient(); err != nil {
return nil, trace.Wrap(err, "unable to create kubeClient")
}

return s, nil
}

// Config specifies configuration for Kube gRPC server.
type Config struct {
// Signer is a auth server client to sign Kubernetes Certificates.
Signer CertificateSigner
// AccessPoint is caching access point to retrieve roles and the cluster
// auth preference.
AccessPoint AccessPoint
Expand All @@ -85,26 +90,33 @@ type Config struct {
KubeProxyAddr string
// ClusterName is the name of the cluster that this server is running in.
ClusterName string
}

// CertificateSigner is an interface for signing Kubernetes certificates.
type CertificateSigner interface {
// ProcessKubeCSR processes CSR request against Kubernetes CA, returns
// signed certificate if successful.
ProcessKubeCSR(req authclient.KubeCSR) (*authclient.KubeCSRResponse, error)
// GetConnTLSCertificate returns the TLS kubeClient certificate to use when
// connecting to the upstream Teleport proxy or Kubernetes service when
// forwarding requests using the forward identity (i.e. proxy impersonating
// a user) method. Paired with GetConnTLSRoots and ConnTLSCipherSuites to
// generate the correct [*tls.Config] on demand.
GetConnTLSCertificate utils.GetCertificateFunc
// GetConnTLSRoots returns the [*x509.CertPool] used to validate TLS
// connections to the upstream Teleport proxy or Kubernetes service.
GetConnTLSRoots utils.GetRootsFunc
// ConnTLSCipherSuites optionally contains a list of TLS ciphersuites to use
// when connecting to the upstream Teleport Proxy or Kubernetes service.
ConnTLSCipherSuites []uint16
}

// AccessPoint is caching access point to retrieve roles and the cluster
// auth preference.
type AccessPoint interface {
services.RoleGetter
cryptosuites.AuthPreferenceGetter
}

// CheckAndSetDefaults checks and sets default values.
func (c *Config) CheckAndSetDefaults() error {
if c.Signer == nil {
return trace.BadParameter("missing parameter Signer")
if c.GetConnTLSCertificate == nil {
return trace.BadParameter("missing parameter GetConnTLSCertificate")
}
if c.GetConnTLSRoots == nil {
return trace.BadParameter("missing parameter GetConnTLSRoots")
}
if c.AccessPoint == nil {
return trace.BadParameter("missing parameter AccessPoint")
Expand Down Expand Up @@ -166,12 +178,14 @@ func (s *Server) ListKubernetesResources(ctx context.Context, req *proto.ListKub
identity.KubernetesCluster = req.KubernetesCluster
identity.Groups = userContext.Checker.RoleNames()
identity.RouteToCluster = req.TeleportCluster
ctx = authz.ContextWithUser(ctx, authz.WrapIdentity(identity)) // wrap the identity in the context

switch {
case requiresFakePagination(req):
rsp, err := s.listResourcesUsingFakePagination(ctx, identity, req)
rsp, err := s.listResourcesUsingFakePagination(ctx, req)
return rsp, trail.ToGRPC(err)
case slices.Contains(types.KubernetesResourcesKinds, req.ResourceType):
rsp, err := s.listKubernetesResources(ctx, identity, true, req)
rsp, err := s.listKubernetesResources(ctx, true, req)
return rsp, trail.ToGRPC(err)
default:
return nil, trail.ToGRPC(trace.BadParameter("unsupported resource type %q", req.ResourceType))
Expand Down Expand Up @@ -216,7 +230,6 @@ func (s *Server) emitAuditEvent(ctx context.Context, userContext *authz.Context,
// those that match the search parameters.
func (s *Server) listKubernetesResources(
ctx context.Context,
identity tlsca.Identity,
respectLimit bool,
req *proto.ListKubernetesResourcesRequest,
) (*proto.ListKubernetesResourcesResponse, error) {
Expand Down Expand Up @@ -244,7 +257,7 @@ func (s *Server) listKubernetesResources(

rsp := &proto.ListKubernetesResourcesResponse{}
err := s.iterateKubernetesResources(
ctx, identity, req, respectLimit,
ctx, req, respectLimit,
func(r *types.KubernetesResourceV1, continueKey string) (int, error) {
switch match, err := services.MatchResourceByFilters(r, filter, nil /* ignore dup matches */); {
case err != nil:
Expand All @@ -268,24 +281,19 @@ func (s *Server) listKubernetesResources(
// For each resources discovered, the fn function is called to decide the action.
// Kubernetes continue key is a base64 encoded json payload with the resource
// version of the request. In order to resume the operation when using the paginated
// mode, Teleport respects the Kubernetes Continue Key and will return it to the client
// mode, Teleport respects the Kubernetes Continue Key and will return it to the kubeClient
// as a NextKey.
// In order to have the expected behavior Teleport must respect the ContinueKey and
// cannot manipulate it. It means that Teleport needs to manipulate the number of
// requested items from the Kubernetes Cluster in order to have the expected behavior.
func (s *Server) iterateKubernetesResources(
ctx context.Context,
identity tlsca.Identity,
req *proto.ListKubernetesResourcesRequest,
respectLimit bool,
fn func(*types.KubernetesResourceV1, string) (int, error),
) error {
kubeClient, err := s.newKubernetesClient(ctx, s.cfg.ClusterName, identity)
if err != nil {
s.cfg.Log.WithError(err).Warnf("unable to create a Kubernetes client for user %q", identity.Username)
// Hide the root cause of the error from the client.
return trace.Errorf("unable to create a Kubernetes client for user %q", identity.Username)
}
kubeClient := s.kubeClient

continueKey := req.StartKey
itemsAppended := 0
for {
Expand Down Expand Up @@ -519,10 +527,10 @@ func itemListToKObjectList[T kObject](items []T) []kObject {
}

// listResourcesUsingFakePagination is a helper function that lists Kubernetes
// resources using fake pagination. It is used when the client requires
// resources using fake pagination. It is used when the kubeClient requires
// the total count or sorting.
func (s *Server) listResourcesUsingFakePagination(
ctx context.Context, identity tlsca.Identity,
ctx context.Context,
req *proto.ListKubernetesResourcesRequest,
) (*proto.ListKubernetesResourcesResponse, error) {
var (
Expand All @@ -531,7 +539,7 @@ func (s *Server) listResourcesUsingFakePagination(
)
switch {
case slices.Contains(types.KubernetesResourcesKinds, req.ResourceType):
rsp, err = s.listKubernetesResources(ctx, identity, false /* do not respect the limit value */, req)
rsp, err = s.listKubernetesResources(ctx, false /* do not respect the limit value */, req)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
15 changes: 13 additions & 2 deletions lib/kube/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package kubev1
import (
"context"
"crypto/tls"
"crypto/x509"
"net"
"testing"

Expand Down Expand Up @@ -539,10 +540,20 @@ func initGRPCServer(t *testing.T, testCtx *kubeproxy.TestContext, listener net.L
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, proxyAuthClient.Close()) })

proxyTLSConfig, err := serverIdentity.TLSConfig(nil)
require.NoError(t, err)
require.Len(t, proxyTLSConfig.Certificates, 1)
require.NotNil(t, proxyTLSConfig.RootCAs)

server, err := New(
Config{
ClusterName: testCtx.ClusterName,
Signer: proxyAuthClient,
ClusterName: testCtx.ClusterName,
GetConnTLSCertificate: func() (*tls.Certificate, error) {
return &proxyTLSConfig.Certificates[0], nil
},
GetConnTLSRoots: func() (*x509.CertPool, error) {
return proxyTLSConfig.RootCAs, nil
},
AccessPoint: proxyAuthClient,
Emitter: testCtx.Emitter,
KubeProxyAddr: testCtx.KubeProxyAddress(),
Expand Down
94 changes: 33 additions & 61 deletions lib/kube/grpc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,19 @@
package kubev1

import (
"bytes"
"context"
"crypto/rand"
"crypto/x509"
"encoding/pem"
"crypto/tls"
"net"
"net/http"
"time"

"github.com/gravitational/trace"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/utils/keys"
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/client"
"github.com/gravitational/teleport/lib/cryptosuites"
"github.com/gravitational/teleport/lib/tlsca"
"github.com/gravitational/teleport/lib/utils"
)

Expand All @@ -59,66 +55,42 @@ func getWebAddrAndKubeSNI(proxyAddr string) (string, string, error) {
if ip.IsUnspecified() {
addr = string(teleport.PrincipalLocalhost)
}
return sni, net.JoinHostPort(addr, port), nil
return sni, "https://" + net.JoinHostPort(addr, port), nil
}

// requestCertificate requests a short-lived certificate for the user using the
// Kubernetes CA.
func (s *Server) requestCertificate(ctx context.Context, username string, cluster string, identity tlsca.Identity) (*rest.Config, error) {
s.cfg.Log.Debugf("Requesting K8s cert for %v.", username)
key, err := cryptosuites.GenerateKey(ctx,
cryptosuites.GetCurrentSuiteFromAuthPreference(s.cfg.AccessPoint),
cryptosuites.ProxyKubeClient)
if err != nil {
return nil, trace.Wrap(err)
}
keyPEM, err := keys.MarshalPrivateKey(key)
if err != nil {
return nil, trace.Wrap(err)
}
// buildKubeClient creates a new Kubernetes client that is used to communicate
// with the Kubernetes API server.
func (s *Server) buildKubeClient() (kubernetes.Interface, error) {
const idleConnsPerHost = 25

subject, err := identity.Subject()
if err != nil {
return nil, trace.Wrap(err)
}
csr := &x509.CertificateRequest{
Subject: subject,
tlsConfig := utils.TLSConfig(s.cfg.ConnTLSCipherSuites)
tlsConfig.GetClientCertificate = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
tlsCert, err := s.cfg.GetConnTLSCertificate()
if err != nil {
return nil, trace.Wrap(err)
}
return tlsCert, nil
}
csrBytes, err := x509.CreateCertificateRequest(rand.Reader, csr, key)
if err != nil {
return nil, trace.Wrap(err)
}
csrPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE REQUEST", Bytes: csrBytes})
tlsConfig.InsecureSkipVerify = true
tlsConfig.VerifyConnection = utils.VerifyConnectionWithRoots(s.cfg.GetConnTLSRoots)
tlsConfig.ServerName = s.kubeProxySNI

response, err := s.cfg.Signer.ProcessKubeCSR(authclient.KubeCSR{
Username: username,
ClusterName: cluster,
CSR: csrPEM,
transport := utilnet.SetTransportDefaults(&http.Transport{
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: tlsConfig,
MaxIdleConnsPerHost: idleConnsPerHost,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
})
if err != nil {
return nil, trace.Wrap(err)
}
return &rest.Config{
Host: s.proxyAddress,
TLSClientConfig: rest.TLSClientConfig{
CertData: response.Cert,
KeyData: keyPEM,
CAData: bytes.Join(response.CertAuthorities, []byte("\n")),
ServerName: s.kubeProxySNI,
},
}, nil
}

// newKubernetesClient creates a new Kubernetes client with short-lived user
// certificates that include in the roles field the available search_as_role
// roles.
func (s *Server) newKubernetesClient(ctx context.Context, cluster string, identity tlsca.Identity) (kubernetes.Interface, error) {
cfg, err := s.requestCertificate(ctx, identity.Username, cluster, identity)
if err != nil {
return nil, trace.Wrap(err)
cfg := &rest.Config{
Host: s.proxyAddress,
Transport: auth.NewImpersonatorRoundTripper(transport),
}
client, err := kubernetes.NewForConfig(cfg)
return client, trace.Wrap(err)
kubeClient, err := kubernetes.NewForConfig(cfg)
return kubeClient, trace.Wrap(err)
}

// decideLimit returns the number of items we should request for. If respectLimit
Expand Down
Loading

0 comments on commit 189b11e

Please sign in to comment.