Skip to content

Commit

Permalink
*: LeaseTimeToLive returns error if leader changed
Browse files Browse the repository at this point in the history
The old leader demotes lessor and all the leases' expire time will be
updated. Instead of returning incorrect remaining TTL, we should return
errors to force client retry.

Cherry-pick: d3bb6f6

Signed-off-by: Wei Fu <fuweid89@gmail.com>
  • Loading branch information
fuweid committed Apr 4, 2024
1 parent b78b214 commit 94a1d0c
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 1 deletion.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ GOFAIL_VERSION=$(cd tools/mod && go list -m -f '{{.Version}}' go.etcd.io/gofail)
toggle_failpoints() {
mode="$1"
if command -v gofail >/dev/null 2>&1; then
run gofail "$mode" server/etcdserver/ server/mvcc/ server/wal/ server/mvcc/backend/
run gofail "$mode" server/etcdserver/ server/lease/leasehttp server/mvcc/ server/wal/ server/mvcc/backend/
if [[ "$mode" == "enable" ]]; then
go get go.etcd.io/gofail@"${GOFAIL_VERSION}"
cd ./server && go get go.etcd.io/gofail@"${GOFAIL_VERSION}"
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI=
go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE=
go.etcd.io/gofail v0.1.0 h1:XItAMIhOojXFQMgrxjnd2EIIHun/d5qL0Pf7FzVTkFg=
go.etcd.io/gofail v0.1.0/go.mod h1:VZBCXYGZhHAinaBiiqYvuDynvahNsAyLFwB3kEHKz1M=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 h1:PzIubN4/sjByhDRHLviCjJuweBXWFZWhghjg7cS28+M=
Expand Down
12 changes: 12 additions & 0 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,9 @@ func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
if err := s.waitAppliedIndex(); err != nil {
return nil, err
}

// gofail: var beforeLookupWhenLeaseTimeToLive struct{}

// primary; timetolive directly from leader
le := s.lessor.Lookup(lease.LeaseID(r.ID))
if le == nil {
Expand All @@ -392,6 +395,15 @@ func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
}
resp.Keys = kbs
}

// The leasor could be demoted if leader changed during lookup.
// We should return error to force retry instead of returning
// incorrect remaining TTL.
if le.Demoted() {
// NOTE: lease.ErrNotPrimary is not retryable error for
// client. Instead, uses ErrLeaderChanged.
return nil, ErrLeaderChanged
}
return resp, nil
}

Expand Down
11 changes: 11 additions & 0 deletions server/lease/leasehttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
return
}

// gofail: var beforeLookupWhenForwardLeaseTimeToLive struct{}

l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
if l == nil {
http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
Expand All @@ -126,6 +129,14 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
resp.LeaseTimeToLiveResponse.Keys = kbs
}

// The leasor could be demoted if leader changed during lookup.
// We should return error to force retry instead of returning
// incorrect remaining TTL.
if l.Demoted() {
http.Error(w, lease.ErrNotPrimary.Error(), http.StatusInternalServerError)
return
}

v, err = resp.Marshal()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
7 changes: 7 additions & 0 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,13 @@ func (l *Lease) forever() {
l.expiry = forever
}

// Demoted returns true if the lease's expiry has been reset to forever.
func (l *Lease) Demoted() bool {
l.expiryMu.Lock()
defer l.expiryMu.Unlock()
return l.expiry == forever
}

// Keys returns all the keys attached to the lease.
func (l *Lease) Keys() []string {
l.mu.RLock()
Expand Down
1 change: 1 addition & 0 deletions tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
go.etcd.io/etcd/pkg/v3 v3.5.13
go.etcd.io/etcd/raft/v3 v3.5.13
go.etcd.io/etcd/server/v3 v3.5.13
go.etcd.io/gofail v0.1.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0
go.opentelemetry.io/otel v1.20.0
go.opentelemetry.io/otel/sdk v1.20.0
Expand Down
2 changes: 2 additions & 0 deletions tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI=
go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE=
go.etcd.io/gofail v0.1.0 h1:XItAMIhOojXFQMgrxjnd2EIIHun/d5qL0Pf7FzVTkFg=
go.etcd.io/gofail v0.1.0/go.mod h1:VZBCXYGZhHAinaBiiqYvuDynvahNsAyLFwB3kEHKz1M=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 h1:PzIubN4/sjByhDRHLviCjJuweBXWFZWhghjg7cS28+M=
Expand Down
76 changes: 76 additions & 0 deletions tests/integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/testutil"
clientv3 "go.etcd.io/etcd/client/v3"
gofail "go.etcd.io/gofail/runtime"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -1056,6 +1060,78 @@ func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) {
}
}

func TestV3LeaseTimeToLiveWithLeaderChanged(t *testing.T) {
t.Run("normal", func(subT *testing.T) {
testV3LeaseTimeToLiveWithLeaderChanged(subT, "beforeLookupWhenLeaseTimeToLive")
})

t.Run("forward", func(subT *testing.T) {
testV3LeaseTimeToLiveWithLeaderChanged(subT, "beforeLookupWhenForwardLeaseTimeToLive")
})
}

func testV3LeaseTimeToLiveWithLeaderChanged(t *testing.T, fpName string) {
if len(gofail.List()) == 0 {
t.Skip("please run 'make gofail-enable' before running the test")
}

BeforeTest(t)

clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

oldLeadIdx := clus.WaitLeader(t)
followerIdx := (oldLeadIdx + 1) % 3

followerMemberID := clus.Members[followerIdx].ID()

oldLeadC := clus.Client(oldLeadIdx)

leaseResp, err := oldLeadC.Grant(ctx, 100)
require.NoError(t, err)

require.NoError(t, gofail.Enable(fpName, `sleep("3s")`))
t.Cleanup(func() {
terr := gofail.Disable(fpName)
if terr != nil && terr != gofail.ErrDisabled {
t.Fatalf("failed to disable %s: %v", fpName, terr)
}
})

readyCh := make(chan struct{})
errCh := make(chan error, 1)

var targetC *clientv3.Client
switch fpName {
case "beforeLookupWhenLeaseTimeToLive":
targetC = oldLeadC
case "beforeLookupWhenForwardLeaseTimeToLive":
targetC = clus.Client((oldLeadIdx + 2) % 3)
default:
t.Fatalf("unsupported %s failpoint", fpName)
}

go func() {
<-readyCh
time.Sleep(1 * time.Second)

_, merr := oldLeadC.MoveLeader(ctx, uint64(followerMemberID))
assert.NoError(t, gofail.Disable(fpName))
errCh <- merr
}()

close(readyCh)

ttlResp, err := targetC.TimeToLive(ctx, leaseResp.ID)
require.NoError(t, err)
require.GreaterOrEqual(t, int64(100), ttlResp.TTL)

require.NoError(t, <-errCh)
}

// acquireLeaseAndKey creates a new lease and creates an attached key.
func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) {
// create lease
Expand Down

0 comments on commit 94a1d0c

Please sign in to comment.