Skip to content

Commit

Permalink
clientv3: revert the client side change in 14547
Browse files Browse the repository at this point in the history
In order to fix etcd-io#12385,
PR etcd-io#14322 introduced a change
in which the client side may retry based on the error message
returned from server side.

This is not good, as it's too fragile and it's also changed the
protocol between client and server. Please see the discussion
in kubernetes/kubernetes#114403

Note: The issue etcd-io#12385 only
happens when auth is enabled, and client side reuse the same client
to watch.

So we decided to rollback the change on 3.5, reasons:
1.K8s doesn't enable auth at all. It has no any impact on K8s.
2.It's very easy for client application to workaround the issue.
  The client just needs to create a new client each time before watching.

Signed-off-by: Benjamin Wang <wachao@vmware.com>
  • Loading branch information
ahrtr authored and tjungblu committed Jul 26, 2023
1 parent 532e5e9 commit 89a9ad2
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 59 deletions.
26 changes: 0 additions & 26 deletions client/v3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -589,26 +588,6 @@ func (w *watchGrpcStream) run() {

switch {
case pbresp.Created:
cancelReasonError := v3rpc.Error(errors.New(pbresp.CancelReason))
if shouldRetryWatch(cancelReasonError) {
var newErr error
if wc, newErr = w.newWatchClient(); newErr != nil {
w.lg.Error("failed to create a new watch client", zap.Error(newErr))
return
}

if len(w.resuming) != 0 {
if ws := w.resuming[0]; ws != nil {
if err := wc.Send(ws.initReq.toPB()); err != nil {
w.lg.Debug("error when sending request", zap.Error(err))
}
}
}

cur = nil
continue
}

// response to head of queue creation
if len(w.resuming) != 0 {
if ws := w.resuming[0]; ws != nil {
Expand Down Expand Up @@ -718,11 +697,6 @@ func (w *watchGrpcStream) run() {
}
}

func shouldRetryWatch(cancelReasonError error) bool {
return (strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCInvalidAuthToken.Error()) == 0) ||
(strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCAuthOldRevision.Error()) == 0)
}

// nextResume chooses the next resuming to register with the grpc stream. Abandoned
// streams are marked as nil in the queue since the head must wait for its inflight registration.
func (w *watchGrpcStream) nextResume() *watcherStream {
Expand Down
33 changes: 0 additions & 33 deletions tests/integration/v3_auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,39 +499,6 @@ func TestV3AuthRestartMember(t *testing.T) {
testutil.AssertNil(t, err)
}

func TestV3AuthWatchAndTokenExpire(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1, AuthTokenTTL: 3})
defer clus.Terminate(t)

ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()

authSetupRoot(t, toGRPC(clus.Client(0)).Auth)

c, cerr := NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "root", Password: "123"})
if cerr != nil {
t.Fatal(cerr)
}
defer c.Close()

_, err := c.Put(ctx, "key", "val")
if err != nil {
t.Fatalf("Unexpected error from Put: %v", err)
}

// The first watch gets a valid auth token through watcher.newWatcherGrpcStream()
// We should discard the first one by waiting TTL after the first watch.
wChan := c.Watch(ctx, "key", clientv3.WithRev(1))
watchResponse := <-wChan

time.Sleep(5 * time.Second)

wChan = c.Watch(ctx, "key", clientv3.WithRev(1))
watchResponse = <-wChan
testutil.AssertNil(t, watchResponse.Err())
}

func TestV3AuthWatchErrorAndWatchId0(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
Expand Down

0 comments on commit 89a9ad2

Please sign in to comment.