Skip to content

Commit

Permalink
Merge pull request #7110 from mitake/reauth
Browse files Browse the repository at this point in the history
etcdserver, clientv3: handle a case of expired auth token
  • Loading branch information
mitake committed Jan 13, 2017
2 parents 432bda4 + d431b64 commit c89eae7
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 27 deletions.
62 changes: 49 additions & 13 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"
"net/url"
"strings"
"sync"
"time"

"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
Expand All @@ -46,11 +47,12 @@ type Client struct {
Auth
Maintenance

conn *grpc.ClientConn
cfg Config
creds *credentials.TransportCredentials
balancer *simpleBalancer
retryWrapper retryRpcFunc
conn *grpc.ClientConn
cfg Config
creds *credentials.TransportCredentials
balancer *simpleBalancer
retryWrapper retryRpcFunc
retryAuthWrapper retryRpcFunc

ctx context.Context
cancel context.CancelFunc
Expand All @@ -59,6 +61,8 @@ type Client struct {
Username string
// Password is a password for authentication
Password string
// tokenCred is an instance of WithPerRPCCredentials()'s argument
tokenCred *authTokenCredential
}

// New creates a new etcdv3 client from a given configuration.
Expand Down Expand Up @@ -144,14 +148,17 @@ func (c *Client) autoSync() {
}

type authTokenCredential struct {
token string
token string
tokenMu *sync.RWMutex
}

func (cred authTokenCredential) RequireTransportSecurity() bool {
return false
}

func (cred authTokenCredential) GetRequestMetadata(ctx context.Context, s ...string) (map[string]string, error) {
cred.tokenMu.RLock()
defer cred.tokenMu.RUnlock()
return map[string]string{
"token": cred.token,
}, nil
Expand Down Expand Up @@ -236,22 +243,50 @@ func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
return c.dial(endpoint)
}

func (c *Client) getToken(ctx context.Context) error {
var err error // return last error in a case of fail
var auth *authenticator

for i := 0; i < len(c.cfg.Endpoints); i++ {
endpoint := c.cfg.Endpoints[i]
host := getHost(endpoint)
// use dial options without dopts to avoid reusing the client balancer
auth, err = newAuthenticator(host, c.dialSetupOpts(endpoint))
if err != nil {
continue
}
defer auth.close()

var resp *AuthenticateResponse
resp, err = auth.authenticate(ctx, c.Username, c.Password)
if err != nil {
continue
}

c.tokenCred.tokenMu.Lock()
c.tokenCred.token = resp.Token
c.tokenCred.tokenMu.Unlock()

return nil
}

return err
}

func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts := c.dialSetupOpts(endpoint, dopts...)
host := getHost(endpoint)
if c.Username != "" && c.Password != "" {
// use dial options without dopts to avoid reusing the client balancer
auth, err := newAuthenticator(host, c.dialSetupOpts(endpoint))
if err != nil {
return nil, err
c.tokenCred = &authTokenCredential{
tokenMu: &sync.RWMutex{},
}
defer auth.close()

resp, err := auth.authenticate(c.ctx, c.Username, c.Password)
err := c.getToken(context.TODO())
if err != nil {
return nil, err
}
opts = append(opts, grpc.WithPerRPCCredentials(authTokenCredential{token: resp.Token}))

opts = append(opts, grpc.WithPerRPCCredentials(c.tokenCred))
}

// add metrics options
Expand Down Expand Up @@ -303,6 +338,7 @@ func newClient(cfg *Config) (*Client, error) {
}
client.conn = conn
client.retryWrapper = client.newRetryWrapper()
client.retryAuthWrapper = client.newAuthRetryWrapper()

// wait for a connection
if cfg.DialTimeout > 0 {
Expand Down
65 changes: 51 additions & 14 deletions clientv3/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,17 @@ func (c *Client) newRetryWrapper() retryRpcFunc {
return nil
}

// only retry if unavailable
if grpc.Code(err) != codes.Unavailable {
return err
}
// always stop retry on etcd errors
eErr := rpctypes.Error(err)
// always stop retry on etcd errors
if _, ok := eErr.(rpctypes.EtcdError); ok {
return err
}

// only retry if unavailable
if grpc.Code(err) != codes.Unavailable {
return err
}

select {
case <-c.balancer.ConnectNotify():
case <-rpcCtx.Done():
Expand All @@ -54,41 +55,76 @@ func (c *Client) newRetryWrapper() retryRpcFunc {
}
}

type retryKVClient struct {
pb.KVClient
retryf retryRpcFunc
func (c *Client) newAuthRetryWrapper() retryRpcFunc {
return func(rpcCtx context.Context, f rpcFunc) error {
for {
err := f(rpcCtx)
if err == nil {
return nil
}

// always stop retry on etcd errors other than invalid auth token
if rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
gterr := c.getToken(rpcCtx)
if gterr != nil {
return err // return the original error for simplicity
}
continue
}

return err
}
}
}

// RetryKVClient implements a KVClient that uses the client's FailFast retry policy.
func RetryKVClient(c *Client) pb.KVClient {
return &retryKVClient{pb.NewKVClient(c.conn), c.retryWrapper}
retryWrite := &retryWriteKVClient{pb.NewKVClient(c.conn), c.retryWrapper}
return &retryKVClient{&retryWriteKVClient{retryWrite, c.retryAuthWrapper}}
}

type retryKVClient struct {
*retryWriteKVClient
}

func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) {
err = rkv.retryf(ctx, func(rctx context.Context) error {
resp, err = rkv.retryWriteKVClient.Range(rctx, in, opts...)
return err
})
return resp, err
}

type retryWriteKVClient struct {
pb.KVClient
retryf retryRpcFunc
}

func (rkv *retryKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) {
func (rkv *retryWriteKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) {
err = rkv.retryf(ctx, func(rctx context.Context) error {
resp, err = rkv.KVClient.Put(rctx, in, opts...)
return err
})
return resp, err
}

func (rkv *retryKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) {
func (rkv *retryWriteKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) {
err = rkv.retryf(ctx, func(rctx context.Context) error {
resp, err = rkv.KVClient.DeleteRange(rctx, in, opts...)
return err
})
return resp, err
}

func (rkv *retryKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) {
func (rkv *retryWriteKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) {
err = rkv.retryf(ctx, func(rctx context.Context) error {
resp, err = rkv.KVClient.Txn(rctx, in, opts...)
return err
})
return resp, err
}

func (rkv *retryKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) {
func (rkv *retryWriteKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) {
err = rkv.retryf(ctx, func(rctx context.Context) error {
resp, err = rkv.KVClient.Compact(rctx, in, opts...)
return err
Expand All @@ -103,7 +139,8 @@ type retryLeaseClient struct {

// RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy.
func RetryLeaseClient(c *Client) pb.LeaseClient {
return &retryLeaseClient{pb.NewLeaseClient(c.conn), c.retryWrapper}
retry := &retryLeaseClient{pb.NewLeaseClient(c.conn), c.retryWrapper}
return &retryLeaseClient{retry, c.retryAuthWrapper}
}

func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) {
Expand Down
3 changes: 3 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
ErrGRPCRoleNotGranted = grpc.Errorf(codes.FailedPrecondition, "etcdserver: role is not granted to the user")
ErrGRPCPermissionNotGranted = grpc.Errorf(codes.FailedPrecondition, "etcdserver: permission is not granted to the role")
ErrGRPCAuthNotEnabled = grpc.Errorf(codes.FailedPrecondition, "etcdserver: authentication is not enabled")
ErrGRPCInvalidAuthToken = grpc.Errorf(codes.Unauthenticated, "etcdserver: invalid auth token")

ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader")
ErrGRPCNotCapable = grpc.Errorf(codes.Unavailable, "etcdserver: not capable")
Expand Down Expand Up @@ -93,6 +94,7 @@ var (
grpc.ErrorDesc(ErrGRPCRoleNotGranted): ErrGRPCRoleNotGranted,
grpc.ErrorDesc(ErrGRPCPermissionNotGranted): ErrGRPCPermissionNotGranted,
grpc.ErrorDesc(ErrGRPCAuthNotEnabled): ErrGRPCAuthNotEnabled,
grpc.ErrorDesc(ErrGRPCInvalidAuthToken): ErrGRPCInvalidAuthToken,

grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
grpc.ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
Expand Down Expand Up @@ -135,6 +137,7 @@ var (
ErrRoleNotGranted = Error(ErrGRPCRoleNotGranted)
ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted)
ErrAuthNotEnabled = Error(ErrGRPCAuthNotEnabled)
ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken)

ErrNoLeader = Error(ErrGRPCNoLeader)
ErrNotCapable = Error(ErrGRPCNotCapable)
Expand Down
2 changes: 2 additions & 0 deletions etcdserver/api/v3rpc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func togRPCError(err error) error {
return rpctypes.ErrGRPCPermissionNotGranted
case auth.ErrAuthNotEnabled:
return rpctypes.ErrGRPCAuthNotEnabled
case etcdserver.ErrInvalidAuthToken:
return rpctypes.ErrGRPCInvalidAuthToken
default:
return grpc.Errorf(codes.Unknown, err.Error())
}
Expand Down

0 comments on commit c89eae7

Please sign in to comment.