Skip to content

Commit

Permalink
[FAB-2777] Alive msg handling
Browse files Browse the repository at this point in the history
Move alive message store from gossip impl to discovy impl
to keep all alive messages handling in one place

Change-Id: I98157f1c208375b95db497f8fe8d1fac07ba97f3
Signed-off-by: Gennady Laventman <gennady@il.ibm.com>
  • Loading branch information
gennadylaventman authored and yacovm committed Mar 15, 2017
1 parent 14055d7 commit 5b2baa0
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
10 changes: 10 additions & 0 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strings"

"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/gossip/msgstore"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/op/go-logging"
Expand Down Expand Up @@ -79,6 +80,8 @@ type gossipDiscoveryImpl struct {
aliveMembership *util.MembershipStore
deadMembership *util.MembershipStore

msgStore msgstore.MessageStore

bootstrapPeers []string

comm CommService
Expand All @@ -102,6 +105,7 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
id2Member: make(map[string]*NetworkMember),
aliveMembership: util.NewMembershipStore(),
deadMembership: util.NewMembershipStore(),
msgStore: msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {}),
crypt: crypt,
comm: comm,
lock: &sync.RWMutex{},
Expand Down Expand Up @@ -319,6 +323,12 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
}

if m.IsAliveMsg() {
added := d.msgStore.Add(m)
if !added {
return
}
d.comm.Gossip(m)

d.handleAliveMessage(m)
return
}
Expand Down
11 changes: 0 additions & 11 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type gossipServiceImpl struct {
chanState *channelState
disSecAdap *discoverySecurityAdapter
mcs api.MessageCryptoService
aliveMsgStore msgstore.MessageStore
stateInfoMsgStore msgstore.MessageStore
}

Expand Down Expand Up @@ -110,8 +109,6 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis
includeIdentityPeriod: time.Now().Add(conf.PublishCertPeriod),
}

g.aliveMsgStore = msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {})

g.chanState = newChannelState(g)
g.emitter = newBatchingEmitter(conf.PropagateIterations,
conf.MaxPropagationBurstSize, conf.MaxPropagationBurstLatency,
Expand Down Expand Up @@ -298,14 +295,6 @@ func (g *gossipServiceImpl) handleMessage(m proto.ReceivedMessage) {
return
}

if msg.IsAliveMsg() {
added := g.aliveMsgStore.Add(msg)
if !added {
return
}
g.emitter.Add(msg)
}

if msg.IsChannelRestricted() {
if gc := g.chanState.getGossipChannelByChainID(msg.Channel); gc == nil {
// If we're not in the channel but we should forward to peers of our org
Expand Down

0 comments on commit 5b2baa0

Please sign in to comment.