Skip to content

Commit

Permalink
[FAB-3446] Bug - Alive msgs in MemReq and MemResp
Browse files Browse the repository at this point in the history
Alive messages inside MemReq and MemResponce messages should be
valideted using alive message store inside discovery

Current implementation validates MemReq and MemResp msgs instead
Alive messages stored inside membership messages.

Change-Id: I0098a8d032ea9630fe94f0a5be0c0a804e8a09a7
Signed-off-by: Gennady Laventman <gennady@il.ibm.com>
  • Loading branch information
gennadylaventman committed May 8, 2017
1 parent 4342105 commit c01a433
Show file tree
Hide file tree
Showing 2 changed files with 312 additions and 53 deletions.
72 changes: 48 additions & 24 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type gossipDiscoveryImpl struct {
aliveMembership *util.MembershipStore
deadMembership *util.MembershipStore

msgStore msgstore.MessageStore
msgStore *aliveMsgStore

bootstrapPeers []string

Expand Down Expand Up @@ -114,25 +114,7 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
disclosurePolicy: disPol,
}

policy := proto.NewGossipMessageComparator(0)
trigger := func(m interface{}) {}
aliveMsgTTL := getAliveExpirationTimeout() * msgExpirationFactor
externalLock := func() { d.lock.Lock() }
externalUnlock := func() { d.lock.Unlock() }
callback := func(m interface{}) {
msg := m.(*proto.SignedGossipMessage)
if !msg.IsAliveMsg() {
return
}
id := msg.GetAliveMsg().Membership.PkiId
d.aliveMembership.Remove(id)
d.deadMembership.Remove(id)
delete(d.id2Member, string(id))
delete(d.deadLastTS, string(id))
delete(d.aliveLastTS, string(id))
}

d.msgStore = msgstore.NewMessageStoreExpirable(policy, trigger, aliveMsgTTL, externalLock, externalUnlock, callback)
d.msgStore = newAliveMsgStore(d)

go d.periodicalSendAlive()
go d.periodicalCheckAlive()
Expand Down Expand Up @@ -325,7 +307,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
return
}

if d.msgStore.CheckValid(m) {
if d.msgStore.CheckValid(selfInfoGossipMsg) {
d.handleAliveMessage(selfInfoGossipMsg)
}

Expand Down Expand Up @@ -364,10 +346,9 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
return
}

if d.msgStore.CheckValid(m) {
if d.msgStore.CheckValid(am) {
d.handleAliveMessage(am)
}

}

for _, env := range memResp.Dead {
Expand All @@ -381,7 +362,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
continue
}

if !d.msgStore.CheckValid(m) {
if !d.msgStore.CheckValid(dm) {
//Newer alive message exist
return
}
Expand Down Expand Up @@ -965,3 +946,46 @@ func filterOutLocalhost(endpoints []string, port int) []string {
}
return returnedEndpoints
}

type aliveMsgStore struct {
msgstore.MessageStore
}

func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore {
policy := proto.NewGossipMessageComparator(0)
trigger := func(m interface{}) {}
aliveMsgTTL := getAliveExpirationTimeout() * msgExpirationFactor
externalLock := func() { d.lock.Lock() }
externalUnlock := func() { d.lock.Unlock() }
callback := func(m interface{}) {
msg := m.(*proto.SignedGossipMessage)
if !msg.IsAliveMsg() {
return
}
id := msg.GetAliveMsg().Membership.PkiId
d.aliveMembership.Remove(id)
d.deadMembership.Remove(id)
delete(d.id2Member, string(id))
delete(d.deadLastTS, string(id))
delete(d.aliveLastTS, string(id))
}

s := &aliveMsgStore{
MessageStore: msgstore.NewMessageStoreExpirable(policy, trigger, aliveMsgTTL, externalLock, externalUnlock, callback),
}
return s
}

func (s *aliveMsgStore) Add(msg interface{}) bool {
if !msg.(*proto.SignedGossipMessage).IsAliveMsg() {
panic(fmt.Sprint("Msg ", msg, " is not AliveMsg"))
}
return s.MessageStore.Add(msg)
}

func (s *aliveMsgStore) CheckValid(msg interface{}) bool {
if !msg.(*proto.SignedGossipMessage).IsAliveMsg() {
panic(fmt.Sprint("Msg ", msg, " is not AliveMsg"))
}
return s.MessageStore.CheckValid(msg)
}
Loading

0 comments on commit c01a433

Please sign in to comment.