diff --git a/.words b/.words index 31fffef38f1..f41d120b058 100644 --- a/.words +++ b/.words @@ -41,4 +41,4 @@ too_many_pings uncontended unprefixed unlisting - +WithDialer diff --git a/clientv3/balancer/resolver/endpoint/endpoint.go b/clientv3/balancer/resolver/endpoint/endpoint.go index 2f33594aebb..864b5df6426 100644 --- a/clientv3/balancer/resolver/endpoint/endpoint.go +++ b/clientv3/balancer/resolver/endpoint/endpoint.go @@ -16,7 +16,9 @@ package endpoint import ( + "context" "fmt" + "net" "net/url" "strings" "sync" @@ -227,3 +229,19 @@ func ParseTarget(target string) (string, string, error) { } return parts[0], parts[1], nil } + +// Dialer dials a endpoint using net.Dialer. +// Context cancelation and timeout are supported. +func Dialer(ctx context.Context, dialEp string) (net.Conn, error) { + proto, host, _ := ParseEndpoint(dialEp) + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + dialer := &net.Dialer{} + if deadline, ok := ctx.Deadline(); ok { + dialer.Deadline = deadline + } + return dialer.DialContext(ctx, proto, host) +} diff --git a/clientv3/client.go b/clientv3/client.go index 548fb8372db..4c9df7a19a3 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -226,24 +226,17 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts } opts = append(opts, dopts...) - // Provide a net dialer that supports cancelation and timeout. - f := func(dialEp string, t time.Duration) (net.Conn, error) { - proto, host, _ := endpoint.ParseEndpoint(dialEp) - select { - case <-c.ctx.Done(): - return nil, c.ctx.Err() - default: - } - dialer := &net.Dialer{Timeout: t} - return dialer.DialContext(c.ctx, proto, host) - } - opts = append(opts, grpc.WithDialer(f)) - + dialer := endpoint.Dialer if creds != nil { opts = append(opts, grpc.WithTransportCredentials(creds)) + // gRPC load balancer workaround. See credentials.transportCredential for details. + if credsDialer, ok := creds.(TransportCredentialsWithDialer); ok { + dialer = credsDialer.Dialer + } } else { opts = append(opts, grpc.WithInsecure()) } + opts = append(opts, grpc.WithContextDialer(dialer)) // Interceptor retry and backoff. // TODO: Replace all of clientv3/retry.go with interceptor based retry, or with @@ -667,3 +660,9 @@ func IsConnCanceled(err error) bool { // <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")' return strings.Contains(err.Error(), "grpc: the client connection is closing") } + +// TransportCredentialsWithDialer is for a gRPC load balancer workaround. See credentials.transportCredential for details. +type TransportCredentialsWithDialer interface { + grpccredentials.TransportCredentials + Dialer(ctx context.Context, dialEp string) (net.Conn, error) +} diff --git a/clientv3/credentials/credentials.go b/clientv3/credentials/credentials.go index e5a55667fe6..2dc2012924e 100644 --- a/clientv3/credentials/credentials.go +++ b/clientv3/credentials/credentials.go @@ -22,6 +22,7 @@ import ( "net" "sync" + "github.com/coreos/etcd/clientv3/balancer/resolver/endpoint" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" grpccredentials "google.golang.org/grpc/credentials" ) @@ -65,38 +66,37 @@ func (b *bundle) NewWithMode(mode string) (grpccredentials.Bundle, error) { } // transportCredential implements "grpccredentials.TransportCredentials" interface. +// transportCredential wraps TransportCredentials to track which +// addresses are dialed for which endpoints, and then sets the authority when checking the endpoint's cert to the +// hostname or IP of the dialed endpoint. +// This is a workaround of a gRPC load balancer issue. gRPC uses the dialed target's service name as the authority when +// checking all endpoint certs, which does not work for etcd servers using their hostname or IP as the Subject Alternative Name +// in their TLS certs. +// To enable, include both WithTransportCredentials(creds) and WithContextDialer(creds.Dialer) +// when dialing. type transportCredential struct { gtc grpccredentials.TransportCredentials + mu sync.Mutex + // addrToEndpoint maps from the connection addresses that are dialed to the hostname or IP of the + // endpoint provided to the dialer when dialing + addrToEndpoint map[string]string } func newTransportCredential(cfg *tls.Config) *transportCredential { return &transportCredential{ - gtc: grpccredentials.NewTLS(cfg), + gtc: grpccredentials.NewTLS(cfg), + addrToEndpoint: map[string]string{}, } } func (tc *transportCredential) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) { - // Only overwrite when authority is an IP address! - // Let's say, a server runs SRV records on "etcd.local" that resolves - // to "m1.etcd.local", and its SAN field also includes "m1.etcd.local". - // But what if SAN does not include its resolved IP address (e.g. 127.0.0.1)? - // Then, the server should only authenticate using its DNS hostname "m1.etcd.local", - // instead of overwriting it with its IP address. - // And we do not overwrite "localhost" either. Only overwrite IP addresses! - if isIP(authority) { - target := rawConn.RemoteAddr().String() - if authority != target { - // When user dials with "grpc.WithDialer", "grpc.DialContext" "cc.parsedTarget" - // update only happens once. This is problematic, because when TLS is enabled, - // retries happen through "grpc.WithDialer" with static "cc.parsedTarget" from - // the initial dial call. - // If the server authenticates by IP addresses, we want to set a new endpoint as - // a new authority. Otherwise - // "transport: authentication handshake failed: x509: certificate is valid for 127.0.0.1, 192.168.121.180, not 192.168.223.156" - // when the new dial target is "192.168.121.180" whose certificate host name is also "192.168.121.180" - // but client tries to authenticate with previously set "cc.parsedTarget" field "192.168.223.156" - authority = target - } + // Set the authority when checking the endpoint's cert to the hostname or IP of the dialed endpoint + tc.mu.Lock() + dialEp, ok := tc.addrToEndpoint[rawConn.RemoteAddr().String()] + tc.mu.Unlock() + if ok { + _, host, _ := endpoint.ParseEndpoint(dialEp) + authority = host } return tc.gtc.ClientHandshake(ctx, authority, rawConn) } @@ -115,8 +115,15 @@ func (tc *transportCredential) Info() grpccredentials.ProtocolInfo { } func (tc *transportCredential) Clone() grpccredentials.TransportCredentials { + copy := map[string]string{} + tc.mu.Lock() + for k, v := range tc.addrToEndpoint { + copy[k] = v + } + tc.mu.Unlock() return &transportCredential{ - gtc: tc.gtc.Clone(), + gtc: tc.gtc.Clone(), + addrToEndpoint: copy, } } @@ -124,6 +131,17 @@ func (tc *transportCredential) OverrideServerName(serverNameOverride string) err return tc.gtc.OverrideServerName(serverNameOverride) } +func (tc *transportCredential) Dialer(ctx context.Context, dialEp string) (net.Conn, error) { + // Keep track of which addresses are dialed for which endpoints + conn, err := endpoint.Dialer(ctx, dialEp) + if conn != nil { + tc.mu.Lock() + tc.addrToEndpoint[conn.RemoteAddr().String()] = dialEp + tc.mu.Unlock() + } + return conn, err +} + // perRPCCredential implements "grpccredentials.PerRPCCredentials" interface. type perRPCCredential struct { authToken string