Skip to content

Commit

Permalink
Merge pull request #16 from cockroachdb/erik/22.2-probe-state-heartbeat
Browse files Browse the repository at this point in the history
crdb-release-22.2: Move from StatePause->StateReplicate on heartbeat response when possible
  • Loading branch information
erikgrinaker committed Jul 13, 2023
2 parents 88bbb1d + db2ae17 commit f61509e
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 38 deletions.
10 changes: 9 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,15 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966 h1:j6JEOq5QWFker+d7mFQYOhjTZonQ7YkLTHm56dbn+yM=
Expand Down Expand Up @@ -470,6 +475,9 @@ gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
20 changes: 18 additions & 2 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,15 @@ func stepLeader(r *raft, m pb.Message) error {
}
} else {
oldPaused := pr.IsPaused()
if pr.MaybeUpdate(m.Index) {
// We want to update our tracking if the response updates our
// matched index or if the response can move a probing peer back
// into StateReplicate (see heartbeat_rep_recovers_from_probing.txt
// for an example of the latter case).
// NB: the same does not make sense for StateSnapshot - if `m.Index`
// equals pr.Match we know we don't m.Index+1 in our log, so moving
// back to replicating state is not useful; besides pr.PendingSnapshot
// would prevent it.
if pr.MaybeUpdate(m.Index) || (pr.Match == m.Index && pr.State == tracker.StateProbe) {
switch {
case pr.State == tracker.StateProbe:
pr.BecomeReplicate()
Expand Down Expand Up @@ -1312,7 +1320,15 @@ func stepLeader(r *raft, m pb.Message) error {
if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
pr.Inflights.FreeFirstOne()
}
if pr.Match < r.raftLog.lastIndex() {
// If the follower is fully caught up but also in StateProbe (as can happen
// if ReportUnreachable was called), we also want to send an append (it will
// be empty) to allow the follower to transition back to StateReplicate once
// it responds.
//
// Note that StateSnapshot typically satisfies pr.Match < lastIndex, but
// `pr.Paused()` is always true for StateSnapshot, so sendAppend is a
// no-op.
if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe {
r.sendAppend(m.From)
}

Expand Down
83 changes: 48 additions & 35 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"testing"

"github.com/stretchr/testify/require"
pb "go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/raft/v3/tracker"
)
Expand Down Expand Up @@ -1225,10 +1226,10 @@ func TestStepIgnoreOldTermMsg(t *testing.T) {
}

// TestHandleMsgApp ensures:
// 1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm.
// 2. If an existing entry conflicts with a new one (same index but different terms),
// delete the existing entry and all that follow it; append any new entries not already in the log.
// 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry).
// 1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm.
// 2. If an existing entry conflicts with a new one (same index but different terms),
// delete the existing entry and all that follow it; append any new entries not already in the log.
// 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry).
func TestHandleMsgApp(t *testing.T) {
tests := []struct {
m pb.Message
Expand Down Expand Up @@ -2427,43 +2428,55 @@ func TestLeaderAppResp(t *testing.T) {
{3, true, 0, 3, 0, 0, 0}, // stale resp; no replies
{2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrease next and send probing msg
{2, false, 2, 4, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
{0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies
// Follower is StateProbing at 0, it sends MsgAppResp for 0 (which
// matches the pr.Match) so it is moved to StateReplicate and as many
// entries as possible are sent to it (1, 2, and 3). Correspondingly the
// Next is then 4 (an Entry at 4 does not exist, indicating the follower
// will be up to date should it process the emitted MsgApp).
{0, false, 0, 4, 1, 0, 0},
}

for i, tt := range tests {
// sm term is 1 after it becomes the leader.
// thus the last log term must be 1 to be committed.
sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
sm.raftLog = &raftLog{
storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}},
unstable: unstable{offset: 3},
}
sm.becomeCandidate()
sm.becomeLeader()
sm.readMessages()
sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index})
for _, tt := range tests {
t.Run("", func(t *testing.T) {
// sm term is 1 after it becomes the leader.
// thus the last log term must be 1 to be committed.
sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
sm.raftLog = &raftLog{
storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}}},
unstable: unstable{offset: 3},
}
sm.becomeCandidate()
sm.becomeLeader()
sm.readMessages()
require.NoError(t, sm.Step(
pb.Message{
From: 2,
Type: pb.MsgAppResp,
Index: tt.index,
Term: sm.Term,
Reject: tt.reject,
RejectHint: tt.index,
},
))

p := sm.prs.Progress[2]
if p.Match != tt.wmatch {
t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch)
}
if p.Next != tt.wnext {
t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
}
p := sm.prs.Progress[2]
require.EqualValues(t, tt.wmatch, p.Match)
require.EqualValues(t, tt.wnext, p.Next)

msgs := sm.readMessages()
msgs := sm.readMessages()

if len(msgs) != tt.wmsgNum {
t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
}
for j, msg := range msgs {
if msg.Index != tt.windex {
t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex)
if len(msgs) != tt.wmsgNum {
require.Equal(t, len(msgs), tt.wmsgNum)
}
if msg.Commit != tt.wcommitted {
t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted)
for _, msg := range msgs {
if msg.Index != tt.windex {
require.EqualValues(t, tt.windex, msg.Index, "%v", DescribeMessage(msg, nil))
}
if msg.Commit != tt.wcommitted {
require.EqualValues(t, tt.wcommitted, msg.Commit, "%v", DescribeMessage(msg, nil))
}
}
}
})
}
}

Expand Down Expand Up @@ -2806,7 +2819,7 @@ func TestRestoreWithLearner(t *testing.T) {
}
}

/// Tests if outgoing voter can receive and apply snapshot correctly.
// / Tests if outgoing voter can receive and apply snapshot correctly.
func TestRestoreWithVotersOutgoing(t *testing.T) {
s := pb.Snapshot{
Metadata: pb.SnapshotMetadata{
Expand Down
6 changes: 6 additions & 0 deletions raft/rafttest/interaction_env_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
// propose-conf-change v1=true
// v5
err = env.handleProposeConfChange(t, d)
case "report-unreachable":
// Calls <1st>.ReportUnreachable(<2nd>).
//
// Example:
// report-unreachable 1 2
err = env.handleReportUnreachable(t, d)
default:
err = fmt.Errorf("unknown command")
}
Expand Down
30 changes: 30 additions & 0 deletions raft/rafttest/interaction_env_handler_report_unreachable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest

import (
"errors"
"testing"

"github.com/cockroachdb/datadriven"
)

func (env *InteractionEnv) handleReportUnreachable(t *testing.T, d datadriven.TestData) error {
sl := nodeIdxs(t, d)
if len(sl) != 2 {
return errors.New("must specify exactly two node indexes: node on which to report, and reported node")
}
env.Nodes[sl[0]].ReportUnreachable(env.Nodes[sl[1]].Config.ID)
return nil
}
90 changes: 90 additions & 0 deletions raft/testdata/heartbeat_resp_recovers_from_probing.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# This test checks that if a fully caught-up follower transitions
# into StateProbe (for example due to a call to ReportUnreachable), the
# leader will react to a subsequent heartbeat response from the probing
# follower by sending an empty MsgApp, the response of which restores
# StateReplicate for the follower. In other words, we don't end up in
# a stable state with a fully caught up follower in StateProbe.

# Turn off output during the setup of the test.
log-level none
----
ok

add-nodes 3 voters=(1,2,3) index=10
----
ok

campaign 1
----
ok

stabilize
----
ok (quiet)

log-level debug
----
ok

status 1
----
1: StateReplicate match=11 next=12 inactive
2: StateReplicate match=11 next=12
3: StateReplicate match=11 next=12

# On the first replica, report the second one as not reachable.
report-unreachable 1 2
----
DEBUG 1 failed to send message to 2 because it is unreachable [StateProbe match=11 next=12]

status 1
----
1: StateReplicate match=11 next=12 inactive
2: StateProbe match=11 next=12
3: StateReplicate match=11 next=12

tick-heartbeat 1
----
ok

# Heartbeat -> HeartbeatResp -> MsgApp -> MsgAppResp -> StateReplicate.
stabilize
----
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
> 2 receiving messages
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11
> 3 receiving messages
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgHeartbeatResp Term:1 Log:0/0
> 3 handling Ready
Ready MustSync=false:
Messages:
3->1 MsgHeartbeatResp Term:1 Log:0/0
> 1 receiving messages
2->1 MsgHeartbeatResp Term:1 Log:0/0
3->1 MsgHeartbeatResp Term:1 Log:0/0
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgApp Term:1 Log:1/11 Commit:11
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/11 Commit:11
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgAppResp Term:1 Log:0/11
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/11

status 1
----
1: StateReplicate match=11 next=12 inactive
2: StateReplicate match=11 next=12
3: StateReplicate match=11 next=12

0 comments on commit f61509e

Please sign in to comment.