Skip to content

Commit

Permalink
Merge "[FAB-2779] Clear data in discovery"
Browse files Browse the repository at this point in the history
  • Loading branch information
binhn authored and Gerrit Code Review committed Apr 12, 2017
2 parents 6bfb758 + 5dbb05a commit 224ed0a
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 7 deletions.
45 changes: 38 additions & 7 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
)

const defaultHelloInterval = time.Duration(5) * time.Second
const msgExpirationFactor = 20

var aliveExpirationCheckInterval time.Duration
var maxConnectionAttempts = 120
Expand Down Expand Up @@ -105,7 +106,6 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
id2Member: make(map[string]*NetworkMember),
aliveMembership: util.NewMembershipStore(),
deadMembership: util.NewMembershipStore(),
msgStore: msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {}),
crypt: crypt,
comm: comm,
lock: &sync.RWMutex{},
Expand All @@ -115,6 +115,26 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
disclosurePolicy: disPol,
}

policy := proto.NewGossipMessageComparator(0)
trigger := func(m interface{}) {}
aliveMsgTTL := getAliveExpirationTimeout() * msgExpirationFactor
externalLock := func() { d.lock.Lock() }
externalUnlock := func() { d.lock.Unlock() }
callback := func(m interface{}) {
msg := m.(*proto.SignedGossipMessage)
if !msg.IsAliveMsg() {
return
}
id := msg.GetAliveMsg().Membership.PkiId
d.aliveMembership.Remove(id)
d.deadMembership.Remove(id)
delete(d.id2Member, string(id))
delete(d.deadLastTS, string(id))
delete(d.aliveLastTS, string(id))
}

d.msgStore = msgstore.NewMessageStoreExpirable(policy, trigger, aliveMsgTTL, externalLock, externalUnlock, callback)

go d.periodicalSendAlive()
go d.periodicalCheckAlive()
go d.handleMessages()
Expand Down Expand Up @@ -308,7 +328,9 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
return
}

d.handleAliveMessage(selfInfoGossipMsg)
if d.msgStore.CheckValid(m) {
d.handleAliveMessage(selfInfoGossipMsg)
}

var internalEndpoint string
if m.Envelope.SecretEnvelope != nil {
Expand All @@ -323,13 +345,13 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
}

if m.IsAliveMsg() {
added := d.msgStore.Add(m)
if !added {

if !d.msgStore.Add(m) {
return
}
d.comm.Gossip(m)

d.handleAliveMessage(m)

d.comm.Gossip(m)
return
}

Expand All @@ -345,7 +367,10 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
return
}

d.handleAliveMessage(am)
if d.msgStore.CheckValid(m) {
d.handleAliveMessage(am)
}

}

for _, env := range memResp.Dead {
Expand All @@ -359,6 +384,11 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
continue
}

if !d.msgStore.CheckValid(m) {
//Newer alive message exist
return
}

newDeadMembers := []*proto.SignedGossipMessage{}
d.lock.RLock()
if _, known := d.id2Member[string(dm.GetAliveMsg().Membership.PkiId)]; !known {
Expand Down Expand Up @@ -902,6 +932,7 @@ func (d *gossipDiscoveryImpl) Stop() {
defer d.logger.Info("Stopped")
d.logger.Info("Stopping")
atomic.StoreInt32(&d.toDieFlag, int32(1))
d.msgStore.Stop()
d.toDieChan <- struct{}{}
}

Expand Down
89 changes: 89 additions & 0 deletions gossip/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,96 @@ func TestFilterOutLocalhost(t *testing.T) {
assert.NotEqual(t, endpoints[2], endpoints[0])
}

func TestMsgStoreExpiration(t *testing.T) {
t.Parallel()
nodeNum := 4
bootPeers := []string{bootPeer(12611), bootPeer(12612)}
instances := []*gossipInstance{}

inst := createDiscoveryInstance(12611, "d1", bootPeers)
instances = append(instances, inst)

inst = createDiscoveryInstance(12612, "d2", bootPeers)
instances = append(instances, inst)

for i := 3; i <= nodeNum; i++ {
id := fmt.Sprintf("d%d", i)
inst = createDiscoveryInstance(12610+i, id, bootPeers)
instances = append(instances, inst)
}

assertMembership(t, instances, nodeNum-1)

waitUntilOrFailBlocking(t, instances[nodeNum-1].Stop)
waitUntilOrFailBlocking(t, instances[nodeNum-2].Stop)

assertMembership(t, instances, nodeNum-3)

checkMessages := func() bool {
for _, inst := range instances[:len(instances)-2] {
for _, downInst := range instances[len(instances)-2:] {
downCastInst := inst.Discovery.(*gossipDiscoveryImpl)
downCastInst.lock.RLock()
if _, exist := downCastInst.aliveLastTS[string(downInst.Discovery.(*gossipDiscoveryImpl).self.PKIid)]; exist {
downCastInst.lock.RUnlock()
return false
}
if _, exist := downCastInst.deadLastTS[string(downInst.Discovery.(*gossipDiscoveryImpl).self.PKIid)]; exist {
downCastInst.lock.RUnlock()
return false
}
if _, exist := downCastInst.id2Member[string(downInst.Discovery.(*gossipDiscoveryImpl).self.PKIid)]; exist {
downCastInst.lock.RUnlock()
return false
}
if downCastInst.aliveMembership.MsgByID(downInst.Discovery.(*gossipDiscoveryImpl).self.PKIid) != nil {
downCastInst.lock.RUnlock()
return false
}
if downCastInst.deadMembership.MsgByID(downInst.Discovery.(*gossipDiscoveryImpl).self.PKIid) != nil {
downCastInst.lock.RUnlock()
return false
}
downCastInst.lock.RUnlock()
}
}
return true
}

waitUntilTimeoutOrFail(t, checkMessages, timeout*2)

assertMembership(t, instances[:len(instances)-2], nodeNum-3)

peerToResponse := &NetworkMember{
Metadata: []byte{},
PKIid: []byte(fmt.Sprintf("localhost:%d", 12612)),
Endpoint: fmt.Sprintf("localhost:%d", 12612),
InternalEndpoint: fmt.Sprintf("localhost:%d", 12612),
}

downCastInstance := instances[0].Discovery.(*gossipDiscoveryImpl)
memResp := downCastInstance.createMembershipResponse(peerToResponse)

downCastInstance.comm.SendToPeer(peerToResponse, (&proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: uint64(0),
Content: &proto.GossipMessage_MemRes{
MemRes: memResp,
},
}).NoopSign())

time.Sleep(getAliveExpirationTimeout())

assert.True(t, checkMessages(), "Validating lost message with already dead and expired nodes failed")

stopInstances(t, instances[:len(instances)-2])
}

func waitUntilOrFail(t *testing.T, pred func() bool) {
waitUntilTimeoutOrFail(t, pred, timeout)
}

func waitUntilTimeoutOrFail(t *testing.T, pred func() bool, timeout time.Duration) {
start := time.Now()
limit := start.UnixNano() + timeout.Nanoseconds()
for time.Now().UnixNano() < limit {
Expand Down

0 comments on commit 224ed0a

Please sign in to comment.