Skip to content

Commit

Permalink
[FAB-3444] Gossip: pulled blocks aren't evicted
Browse files Browse the repository at this point in the history
In Gossip, the component that handles pulling messages
of blocks is the pullMediator .

There is a bug that was discovered, that blocks may enter the pull
mediator's internal map, but are never evicted.
This is because the eviction logic only worked for blocks that were manually
added, but blocks that were consumed by update messages were never removed by
the dedicated callback (that worked for blocks that were manually added)

I fixed the bug by filtering the incoming data request with the help
of the message store, and thus - blocks that are old don't enter
the pull mediator.

This effectively makes all blocks of the pull mediator a subset of the blocks
in the message store (which has a fixed upper bound)

Blocks that are new, cause eviction of old blocks from the pull mediator
in the callback in channel.go line 185

Change-Id: Idfcaeff68f941cc5ac7e437968ec8fa4d4a8fe31
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Apr 30, 2017
1 parent 3a2a717 commit 05f811f
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 46 deletions.
2 changes: 1 addition & 1 deletion gossip/gossip/certstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func createObjects(updateFactory func(uint64) proto.ReceivedMessage, msgCons pro
memberSvc.On("GetMembership").Return([]discovery.NetworkMember{{PKIid: []byte("bla bla"), Endpoint: "localhost:5611"}})

var certStore *certStore
adapter := pull.PullAdapter{
adapter := &pull.PullAdapter{
Sndr: sender,
MsgCons: func(msg *proto.SignedGossipMessage) {
certStore.idMapper.Put(msg.GetPeerIdentity().PkiId, msg.GetPeerIdentity().Cert)
Expand Down
33 changes: 17 additions & 16 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,25 +282,13 @@ func (gc *gossipChannel) createBlockPuller() pull.Mediator {
}
return fmt.Sprintf("%d", dataMsg.Payload.SeqNum)
}
blockConsumer := func(msg *proto.SignedGossipMessage) {
dataMsg := msg.GetDataMsg()
if dataMsg == nil || dataMsg.Payload == nil {
gc.logger.Warning("Invalid DataMessage:", dataMsg)
return
}
added := gc.blockMsgStore.Add(msg)
// if we can't add the message to the msgStore,
// no point in disseminating it to others...
if !added {
return
}
gc.DeMultiplex(msg)
}
adapter := pull.PullAdapter{
adapter := &pull.PullAdapter{
Sndr: gc,
MemSvc: gc.memFilter,
IdExtractor: seqNumFromMsg,
MsgCons: blockConsumer,
MsgCons: func(msg *proto.SignedGossipMessage) {
gc.DeMultiplex(msg)
},
}
return pull.NewPullMediator(conf, adapter)
}
Expand Down Expand Up @@ -446,6 +434,10 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
return
}
if m.IsDataUpdate() {
// Iterate over the envelopes, and filter out blocks
// that we already have in the blockMsgStore, or blocks that
// are too far in the past.
filteredEnvelopes := []*proto.Envelope{}
for _, item := range m.GetDataUpdate().Data {
gMsg, err := item.ToGossipMessage()
if err != nil {
Expand All @@ -459,7 +451,16 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
if !gc.verifyBlock(gMsg.GossipMessage, msg.GetConnectionInfo().ID) {
return
}
added := gc.blockMsgStore.Add(gMsg)
if !added {
// If this block doesn't need to be added, it means it either already
// exists in memory or that it is too far in the past
continue
}
filteredEnvelopes = append(filteredEnvelopes, item)
}
// Replace the update message with just the blocks that should be processed
m.GetDataUpdate().Data = filteredEnvelopes
}
gc.blocksPuller.HandleMessage(msg)
}
Expand Down
133 changes: 122 additions & 11 deletions gossip/gossip/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,103 @@ func TestChannelPeriodicalPublishStateInfo(t *testing.T) {
gc.Stop()
}

func TestChannelMsgStoreEviction(t *testing.T) {
t.Parallel()
// Scenario: Create 4 phases in which the pull mediator of the channel would receive blocks
// via pull.
// The total amount of blocks should be restricted by the capacity of the message store.
// After the pull phases end, we ensure that only the latest blocks are preserved in the pull
// mediator, and the old blocks were evicted.
// We test this by sending a hello message to the pull mediator and inspecting the digest message
// returned as a response.

cs := &cryptoService{}
cs.On("VerifyBlock", mock.Anything).Return(nil)
adapter := new(gossipAdapterMock)
configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1})
adapter.On("Gossip", mock.Anything)
adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) {
})

gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
defer gc.Stop()
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(100, pkiIDInOrg1, channelA)})

var wg sync.WaitGroup

msgsPerPhase := uint64(50)
lastPullPhase := make(chan uint64, msgsPerPhase)
totalPhases := uint64(4)
phaseNum := uint64(0)
wg.Add(int(totalPhases))

adapter.On("Send", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
msg := args.Get(0).(*proto.SignedGossipMessage)
// Ignore all other messages sent like StateInfo messages
if !msg.IsPullMsg() {
return
}
// Stop the pull when we reach the final phase
if atomic.LoadUint64(&phaseNum) == totalPhases && msg.IsHelloMsg() {
return
}

start := atomic.LoadUint64(&phaseNum) * msgsPerPhase
end := start + msgsPerPhase
if msg.IsHelloMsg() {
// Advance phase
atomic.AddUint64(&phaseNum, uint64(1))
}

// Create and execute the current phase of pull
currSeq := sequence(start, end)
pullPhase := simulatePullPhase(gc, t, &wg, func(envelope *proto.Envelope) {}, currSeq...)
pullPhase(args)

// If we finished the last phase, save the sequence to be used later for inspection
if msg.IsDataReq() && atomic.LoadUint64(&phaseNum) == totalPhases {
for _, seq := range currSeq {
lastPullPhase <- seq
}
close(lastPullPhase)
}
})
// Wait for all pull phases to end
wg.Wait()

msgSentFromPullMediator := make(chan *proto.GossipMessage, 1)

helloMsg := createHelloMsg(pkiIDInOrg1)
helloMsg.On("Respond", mock.Anything).Run(func(arg mock.Arguments) {
msg := arg.Get(0).(*proto.GossipMessage)
if !msg.IsDigestMsg() {
return
}
msgSentFromPullMediator <- msg
})
gc.HandleMessage(helloMsg)
select {
case msg := <-msgSentFromPullMediator:
// This is just to check that we responded with a digest on time.
// Put message back into the channel for further inspection
msgSentFromPullMediator <- msg
case <-time.After(time.Second * 5):
t.Fatal("Didn't reply with a digest on time")
}
// Only 1 digest sent
assert.Len(t, msgSentFromPullMediator, 1)
msg := <-msgSentFromPullMediator
// It's a digest and not anything else, like an update
assert.True(t, msg.IsDigestMsg())
assert.Len(t, msg.GetDataDig().Digests, adapter.GetConf().MaxBlockCountToStore+1)
// Check that the last sequences are kept.
// Since we checked the length, it proves that the old blocks were discarded, since we had much more
// total blocks overall than our capacity
for seq := range lastPullPhase {
assert.Contains(t, msg.GetDataDig().Digests, fmt.Sprintf("%d", seq))
}
}

func TestChannelPull(t *testing.T) {
t.Parallel()
cs := &cryptoService{}
Expand All @@ -334,7 +431,7 @@ func TestChannelPull(t *testing.T) {
go gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(100, pkiIDInOrg1, channelA)})

var wg sync.WaitGroup
pullPhase := simulatePullPhase(gc, t, &wg, func(envelope *proto.Envelope) {})
pullPhase := simulatePullPhase(gc, t, &wg, func(envelope *proto.Envelope) {}, 10, 11)
adapter.On("Send", mock.Anything, mock.Anything).Run(pullPhase)

wg.Wait()
Expand Down Expand Up @@ -696,7 +793,7 @@ func TestChannelPulledBadBlocks(t *testing.T) {
env.Payload = sMsg.NoopSign().Payload
}

pullPhase1 := simulatePullPhase(gc, t, &wg, changeChan)
pullPhase1 := simulatePullPhase(gc, t, &wg, changeChan, 10, 11)
adapter.On("Send", mock.Anything, mock.Anything).Run(pullPhase1)
adapter.On("DeMultiplex", mock.Anything)
wg.Wait()
Expand All @@ -718,7 +815,7 @@ func TestChannelPulledBadBlocks(t *testing.T) {
noop := func(env *proto.Envelope) {

}
pullPhase2 := simulatePullPhase(gc, t, &wg2, noop)
pullPhase2 := simulatePullPhase(gc, t, &wg2, noop, 10, 11)
adapter.On("Send", mock.Anything, mock.Anything).Run(pullPhase2)
wg2.Wait()
assert.Equal(t, 0, gc.(*gossipChannel).blockMsgStore.Size())
Expand All @@ -740,7 +837,7 @@ func TestChannelPulledBadBlocks(t *testing.T) {
sMsg.GossipMessage.GetDataMsg().Payload = nil
env.Payload = sMsg.NoopSign().Payload
}
pullPhase3 := simulatePullPhase(gc, t, &wg3, emptyBlock)
pullPhase3 := simulatePullPhase(gc, t, &wg3, emptyBlock, 10, 11)
adapter.On("Send", mock.Anything, mock.Anything).Run(pullPhase3)
wg3.Wait()
assert.Equal(t, 0, gc.(*gossipChannel).blockMsgStore.Size())
Expand All @@ -763,7 +860,7 @@ func TestChannelPulledBadBlocks(t *testing.T) {
sMsg.Content = createHelloMsg(pkiIDInOrg1).GetGossipMessage().Content
env.Payload = sMsg.NoopSign().Payload
}
pullPhase4 := simulatePullPhase(gc, t, &wg4, nonBlockMsg)
pullPhase4 := simulatePullPhase(gc, t, &wg4, nonBlockMsg, 10, 11)
adapter.On("Send", mock.Anything, mock.Anything).Run(pullPhase4)
wg4.Wait()
assert.Equal(t, 0, gc.(*gossipChannel).blockMsgStore.Size())
Expand Down Expand Up @@ -1253,19 +1350,23 @@ func TestOnDemandGossip(t *testing.T) {
}
}

func createDataUpdateMsg(nonce uint64) *proto.SignedGossipMessage {
return (&proto.GossipMessage{
func createDataUpdateMsg(nonce uint64, seqs ...uint64) *proto.SignedGossipMessage {
msg := &proto.GossipMessage{
Nonce: 0,
Channel: []byte(channelA),
Tag: proto.GossipMessage_CHAN_AND_ORG,
Content: &proto.GossipMessage_DataUpdate{
DataUpdate: &proto.DataUpdate{
MsgType: proto.PullMsgType_BLOCK_MSG,
Nonce: nonce,
Data: []*proto.Envelope{createDataMsg(10, channelA).Envelope, createDataMsg(11, channelA).Envelope},
Data: []*proto.Envelope{},
},
},
}).NoopSign()
}
for _, seq := range seqs {
msg.GetDataUpdate().Data = append(msg.GetDataUpdate().Data, createDataMsg(seq, channelA).Envelope)
}
return (msg).NoopSign()
}

func createHelloMsg(PKIID common.PKIidType) *receivedMsg {
Expand Down Expand Up @@ -1348,7 +1449,7 @@ func createDataMsg(seqnum uint64, channel common.ChainID) *proto.SignedGossipMes
}).NoopSign()
}

func simulatePullPhase(gc GossipChannel, t *testing.T, wg *sync.WaitGroup, mutator msgMutator) func(args mock.Arguments) {
func simulatePullPhase(gc GossipChannel, t *testing.T, wg *sync.WaitGroup, mutator msgMutator, seqs ...uint64) func(args mock.Arguments) {
var l sync.Mutex
var sentHello bool
var sentReq bool
Expand Down Expand Up @@ -1386,11 +1487,21 @@ func simulatePullPhase(gc GossipChannel, t *testing.T, wg *sync.WaitGroup, mutat
// from the imaginary peer that got the request
dataUpdateMsg := new(receivedMsg)
dataUpdateMsg.PKIID = pkiIDInOrg1
dataUpdateMsg.msg = createDataUpdateMsg(dataReq.Nonce)
dataUpdateMsg.msg = createDataUpdateMsg(dataReq.Nonce, seqs...)
mutator(dataUpdateMsg.msg.GetDataUpdate().Data[0])
gc.HandleMessage(dataUpdateMsg)
wg.Done()
}
}
}

func sequence(start uint64, end uint64) []uint64 {
sequence := make([]uint64, end-start+1)
i := 0
for n := start; n <= end; n++ {
sequence[i] = n
i++
}
return sequence

}
2 changes: 1 addition & 1 deletion gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator {
}
g.logger.Info("Learned of a new certificate:", idMsg.Cert)
}
adapter := pull.PullAdapter{
adapter := &pull.PullAdapter{
Sndr: g.comm,
MemSvc: g.disc,
IdExtractor: pkiIDFromMsg,
Expand Down
26 changes: 10 additions & 16 deletions gossip/gossip/pull/pullstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,16 @@ type Mediator interface {
// pullMediatorImpl is an implementation of Mediator
type pullMediatorImpl struct {
sync.RWMutex
Sender
*PullAdapter
msgType2Hook map[MsgType][]MessageHook
idExtractor proto.IdentifierExtractor
msgCons proto.MsgConsumer
config Config
logger *logging.Logger
itemID2Msg map[string]*proto.SignedGossipMessage
memBvc MembershipService
engine *algo.PullEngine
}

// NewPullMediator returns a new Mediator
func NewPullMediator(config Config, adapter PullAdapter) Mediator {
func NewPullMediator(config Config, adapter *PullAdapter) Mediator {
digFilter := adapter.DigFilter

acceptAllFilter := func(_ proto.ReceivedMessage) func(string) bool {
Expand All @@ -141,14 +138,11 @@ func NewPullMediator(config Config, adapter PullAdapter) Mediator {
}

p := &pullMediatorImpl{
msgCons: adapter.MsgCons,
PullAdapter: adapter,
msgType2Hook: make(map[MsgType][]MessageHook),
idExtractor: adapter.IdExtractor,
config: config,
logger: util.GetLogger(util.LoggingPullModule, config.ID),
itemID2Msg: make(map[string]*proto.SignedGossipMessage),
memBvc: adapter.MemSvc,
Sender: adapter.Sndr,
}

p.engine = algo.NewPullEngineWithFilter(p, config.PullInterval, digFilter.byContext())
Expand Down Expand Up @@ -196,8 +190,8 @@ func (p *pullMediatorImpl) HandleMessage(m proto.ReceivedMessage) {
p.logger.Warning("Data update contains an invalid message:", err)
return
}
p.msgCons(msg)
itemIDs[i] = p.idExtractor(msg)
p.MsgCons(msg)
itemIDs[i] = p.IdExtractor(msg)
items[i] = msg
p.Lock()
p.itemID2Msg[itemIDs[i]] = msg
Expand Down Expand Up @@ -228,7 +222,7 @@ func (p *pullMediatorImpl) RegisterMsgHook(pullMsgType MsgType, hook MessageHook
func (p *pullMediatorImpl) Add(msg *proto.SignedGossipMessage) {
p.Lock()
defer p.Unlock()
itemID := p.idExtractor(msg)
itemID := p.IdExtractor(msg)
p.itemID2Msg[itemID] = msg
p.engine.Add(itemID)
}
Expand All @@ -244,7 +238,7 @@ func (p *pullMediatorImpl) Remove(digest string) {

// SelectPeers returns a slice of peers which the engine will initiate the protocol with
func (p *pullMediatorImpl) SelectPeers() []string {
remotePeers := SelectEndpoints(p.config.PeerCountToSelect, p.memBvc.GetMembership())
remotePeers := SelectEndpoints(p.config.PeerCountToSelect, p.MemSvc.GetMembership())
endpoints := make([]string, len(remotePeers))
for i, peer := range remotePeers {
endpoints[i] = peer.Endpoint
Expand All @@ -269,7 +263,7 @@ func (p *pullMediatorImpl) Hello(dest string, nonce uint64) {
}

p.logger.Debug("Sending hello to", dest)
p.Send(helloMsg.NoopSign(), p.peersWithEndpoints(dest)...)
p.Sndr.Send(helloMsg.NoopSign(), p.peersWithEndpoints(dest)...)
}

// SendDigest sends a digest to a remote PullEngine.
Expand Down Expand Up @@ -307,7 +301,7 @@ func (p *pullMediatorImpl) SendReq(dest string, items []string, nonce uint64) {
},
}
p.logger.Debug("Sending", req, "to", dest)
p.Send(req.NoopSign(), p.peersWithEndpoints(dest)...)
p.Sndr.Send(req.NoopSign(), p.peersWithEndpoints(dest)...)
}

// SendRes sends an array of items to a remote PullEngine identified by a context.
Expand Down Expand Up @@ -339,7 +333,7 @@ func (p *pullMediatorImpl) SendRes(items []string, context interface{}, nonce ui

func (p *pullMediatorImpl) peersWithEndpoints(endpoints ...string) []*comm.RemotePeer {
peers := []*comm.RemotePeer{}
for _, member := range p.memBvc.GetMembership() {
for _, member := range p.MemSvc.GetMembership() {
for _, endpoint := range endpoints {
if member.PreferredEndpoint() == endpoint {
peers = append(peers, &comm.RemotePeer{Endpoint: member.PreferredEndpoint(), PKIID: member.PKIid})
Expand Down
2 changes: 1 addition & 1 deletion gossip/gossip/pull/pullstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func createPullInstanceWithFilters(endpoint string, peer2PullInst map[string]*pu
blockConsumer := func(msg *proto.SignedGossipMessage) {
inst.items.Add(msg.GetDataMsg().Payload.SeqNum)
}
adapter := PullAdapter{
adapter := &PullAdapter{
Sndr: inst,
MemSvc: inst,
IdExtractor: seqNumFromMsg,
Expand Down

0 comments on commit 05f811f

Please sign in to comment.