Skip to content

Commit

Permalink
Merge pull request #7757 from heyitsanthony/fix-speedy-close
Browse files Browse the repository at this point in the history
etcdserver: initialize raftNode with constructor
  • Loading branch information
Anthony Romano committed Apr 18, 2017
2 parents 30552e2 + 714b48a commit cb408ac
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 150 deletions.
61 changes: 35 additions & 26 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,7 @@ type raftNode struct {
term uint64
lead uint64

mu sync.Mutex
// last lead elected time
lt time.Time

// to check if msg receiver is removed from cluster
isIDRemoved func(id uint64) bool

raft.Node
raftNodeConfig

// a chan to send/receive snapshot
msgSnapC chan raftpb.Message
Expand All @@ -115,26 +108,49 @@ type raftNode struct {
// utility
ticker *time.Ticker
// contention detectors for raft heartbeat message
td *contention.TimeoutDetector
heartbeat time.Duration // for logging
td *contention.TimeoutDetector

stopped chan struct{}
done chan struct{}
}

type raftNodeConfig struct {
// to check if msg receiver is removed from cluster
isIDRemoved func(id uint64) bool
raft.Node
raftStorage *raft.MemoryStorage
storage Storage
heartbeat time.Duration // for logging
// transport specifies the transport to send and receive msgs to members.
// Sending messages MUST NOT block. It is okay to drop messages, since
// clients should timeout and reissue their messages.
// If transport is nil, server will panic.
transport rafthttp.Transporter
}

stopped chan struct{}
done chan struct{}
func newRaftNode(cfg raftNodeConfig) *raftNode {
r := &raftNode{
raftNodeConfig: cfg,
// set up contention detectors for raft heartbeat message.
// expect to send a heartbeat within 2 heartbeat intervals.
td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
readStateC: make(chan raft.ReadState, 1),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
applyc: make(chan apply),
stopped: make(chan struct{}),
done: make(chan struct{}),
}
if r.heartbeat == 0 {
r.ticker = &time.Ticker{}
} else {
r.ticker = time.NewTicker(r.heartbeat)
}
return r
}

// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
r.applyc = make(chan apply)
r.stopped = make(chan struct{})
r.done = make(chan struct{})
internalTimeout := time.Second

go func() {
Expand All @@ -147,10 +163,8 @@ func (r *raftNode) start(rh *raftReadyHandler) {
r.Tick()
case rd := <-r.Ready():
if rd.SoftState != nil {
if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead {
r.mu.Lock()
r.lt = time.Now()
r.mu.Unlock()
newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
if newLeader {
leaderChanges.Inc()
}

Expand All @@ -162,7 +176,8 @@ func (r *raftNode) start(rh *raftReadyHandler) {

atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
islead = rd.RaftState == raft.StateLeader
rh.updateLeadership()
rh.updateLeadership(newLeader)
r.td.Reset()
}

if len(rd.ReadStates) != 0 {
Expand Down Expand Up @@ -316,12 +331,6 @@ func (r *raftNode) apply() chan apply {
return r.applyc
}

func (r *raftNode) leadElectedTime() time.Time {
r.mu.Lock()
defer r.mu.Unlock()
return r.lt
}

func (r *raftNode) stop() {
r.stopped <- struct{}{}
<-r.done
Expand Down
14 changes: 7 additions & 7 deletions etcdserver/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ func TestCreateConfigChangeEnts(t *testing.T) {

func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
n := newNopReadyNode()
srv := &EtcdServer{r: raftNode{
r := newRaftNode(raftNodeConfig{
Node: n,
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
ticker: &time.Ticker{},
}}
})
srv := &EtcdServer{r: *r}
srv.r.start(nil)
n.readyc <- raft.Ready{}
select {
Expand All @@ -182,16 +182,16 @@ func TestConfgChangeBlocksApply(t *testing.T) {

waitApplyc := make(chan struct{})

srv := &EtcdServer{r: raftNode{
r := newRaftNode(raftNodeConfig{
Node: n,
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
ticker: &time.Ticker{},
}}
})
srv := &EtcdServer{r: *r}

rh := &raftReadyHandler{
updateLeadership: func() {},
updateLeadership: func(bool) {},
waitForApply: func() {
<-waitApplyc
},
Expand Down
43 changes: 23 additions & 20 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/pkg/contention"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/idutil"
"github.com/coreos/etcd/pkg/pbutil"
Expand Down Expand Up @@ -243,6 +242,9 @@ type EtcdServer struct {
// on etcd server shutdown.
ctx context.Context
cancel context.CancelFunc

leadTimeMu sync.RWMutex
leadElectedTime time.Time
}

// NewServer creates a new EtcdServer from the supplied configuration. The
Expand Down Expand Up @@ -419,19 +421,15 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
snapCount: cfg.SnapCount,
errorc: make(chan error, 1),
store: st,
r: raftNode{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
ticker: time.NewTicker(heartbeat),
// set up contention detectors for raft heartbeat message.
// expect to send a heartbeat within 2 heartbeat intervals.
td: contention.NewTimeoutDetector(2 * heartbeat),
heartbeat: heartbeat,
raftStorage: s,
storage: NewStorage(w, ss),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
readStateC: make(chan raft.ReadState, 1),
},
r: *newRaftNode(
raftNodeConfig{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
heartbeat: heartbeat,
raftStorage: s,
storage: NewStorage(w, ss),
},
),
id: id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: cl,
Expand Down Expand Up @@ -614,7 +612,7 @@ type etcdProgress struct {
// and helps decouple state machine logic from Raft algorithms.
// TODO: add a state machine interface to apply the commit entries and do snapshot/recover
type raftReadyHandler struct {
updateLeadership func()
updateLeadership func(newLeader bool)
updateCommittedIndex func(uint64)
waitForApply func()
}
Expand Down Expand Up @@ -644,7 +642,7 @@ func (s *EtcdServer) run() {
return
}
rh := &raftReadyHandler{
updateLeadership: func() {
updateLeadership: func(newLeader bool) {
if !s.isLeader() {
if s.lessor != nil {
s.lessor.Demote()
Expand All @@ -654,6 +652,12 @@ func (s *EtcdServer) run() {
}
setSyncC(nil)
} else {
if newLeader {
t := time.Now()
s.leadTimeMu.Lock()
s.leadElectedTime = t
s.leadTimeMu.Unlock()
}
setSyncC(s.SyncTicker.C)
if s.compactor != nil {
s.compactor.Resume()
Expand All @@ -665,9 +669,6 @@ func (s *EtcdServer) run() {
if s.stats != nil {
s.stats.BecomeLeader()
}
if s.r.td != nil {
s.r.td.Reset()
}
},
updateCommittedIndex: func(ci uint64) {
cci := s.getCommittedIndex()
Expand Down Expand Up @@ -1580,7 +1581,9 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
case context.Canceled:
return ErrCanceled
case context.DeadlineExceeded:
curLeadElected := s.r.leadElectedTime()
s.leadTimeMu.RLock()
curLeadElected := s.leadElectedTime
s.leadTimeMu.RUnlock()
prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond)
if start.After(prevLeadLost) && start.Before(curLeadElected) {
return ErrTimeoutDueToLeaderFail
Expand Down
Loading

0 comments on commit cb408ac

Please sign in to comment.