Skip to content

Commit

Permalink
Merge changes from topic 'FAB-2963'
Browse files Browse the repository at this point in the history
* changes:
  [FAB-2963] Gossip inter-org confidentiality - P4
  [FAB-2061] Gossip inter-org confidentiality - P3
  • Loading branch information
christo4ferris authored and Gerrit Code Review committed Apr 23, 2017
2 parents ffbf604 + 43bcc9a commit 7ea1de5
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 53 deletions.
40 changes: 30 additions & 10 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ type Adapter interface {
// GetMembership returns the known alive peers and their information
GetMembership() []discovery.NetworkMember

// Lookup returns a network member, or nil if not found
Lookup(PKIID common.PKIidType) *discovery.NetworkMember

// Send sends a message to a list of peers
Send(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer)

Expand All @@ -121,6 +124,7 @@ type gossipChannel struct {
shouldGossipStateInfo int32
mcs api.MessageCryptoService
pkiID common.PKIidType
selfOrg api.OrgIdentityType
stopChan chan struct{}
stateInfoMsg *proto.SignedGossipMessage
orgs []api.OrgIdentityType
Expand Down Expand Up @@ -153,8 +157,10 @@ func (mf *membershipFilter) GetMembership() []discovery.NetworkMember {
}

// NewGossipChannel creates a new GossipChannel
func NewGossipChannel(pkiID common.PKIidType, mcs api.MessageCryptoService, chainID common.ChainID, adapter Adapter, joinMsg api.JoinChannelMessage) GossipChannel {
func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.MessageCryptoService,
chainID common.ChainID, adapter Adapter, joinMsg api.JoinChannelMessage) GossipChannel {
gc := &gossipChannel{
selfOrg: org,
pkiID: pkiID,
mcs: mcs,
Adapter: adapter,
Expand Down Expand Up @@ -382,8 +388,8 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
return
}
orgID := gc.GetOrgOfPeer(msg.GetConnectionInfo().ID)
if orgID == nil {
gc.logger.Warning("Couldn't find org identity of peer", msg.GetConnectionInfo().ID)
if len(orgID) == 0 {
gc.logger.Debug("Couldn't find org identity of peer", msg.GetConnectionInfo().ID)
return
}
if !gc.IsOrgInChannel(orgID) {
Expand All @@ -392,7 +398,7 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
}

if m.IsStateInfoPullRequestMsg() {
msg.Respond(gc.createStateInfoSnapshot())
msg.Respond(gc.createStateInfoSnapshot(orgID))
return
}

Expand Down Expand Up @@ -480,8 +486,8 @@ func (gc *gossipChannel) handleStateInfSnapshot(m *proto.GossipMessage, sender c
si := stateInf.GetStateInfo()
orgID := gc.GetOrgOfPeer(si.PkiId)
if orgID == nil {
gc.logger.Warning("Channel", chanName, ": Couldn't find org identity of peer",
si.PkiId, "message sent from", sender)
gc.logger.Debug("Channel", chanName, ": Couldn't find org identity of peer",
string(si.PkiId), "message sent from", string(sender))
return
}

Expand Down Expand Up @@ -524,11 +530,25 @@ func (gc *gossipChannel) verifyBlock(msg *proto.GossipMessage, sender common.PKI
return true
}

func (gc *gossipChannel) createStateInfoSnapshot() *proto.GossipMessage {
func (gc *gossipChannel) createStateInfoSnapshot(requestersOrg api.OrgIdentityType) *proto.GossipMessage {
sameOrg := bytes.Equal(gc.selfOrg, requestersOrg)
rawElements := gc.stateInfoMsgStore.Get()
elements := make([]*proto.Envelope, len(rawElements))
for i, rawEl := range rawElements {
elements[i] = rawEl.(*proto.SignedGossipMessage).Envelope
elements := []*proto.Envelope{}
for _, rawEl := range rawElements {
msg := rawEl.(*proto.SignedGossipMessage)
orgOfCurrentMsg := gc.GetOrgOfPeer(msg.GetStateInfo().PkiId)
// If we're in the same org as the requester, or the message belongs to a foreign org
// don't do any filtering
if sameOrg || !bytes.Equal(orgOfCurrentMsg, gc.selfOrg) {
elements = append(elements, msg.Envelope)
continue
}
// Else, the requester is in a different org, so disclose only StateInfo messages that their
// corresponding AliveMessages have external endpoints
if netMember := gc.Lookup(msg.GetStateInfo().PkiId); netMember == nil || netMember.Endpoint == "" {
continue
}
elements = append(elements, msg.Envelope)
}

return &proto.GossipMessage{
Expand Down
144 changes: 123 additions & 21 deletions gossip/gossip/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ func (ga *gossipAdapterMock) GetMembership() []discovery.NetworkMember {
return members
}

// Lookup returns a network member, or nil if not found
func (ga *gossipAdapterMock) Lookup(PKIID common.PKIidType) *discovery.NetworkMember {
args := ga.Called(PKIID)
if args.Get(0) == nil {
return nil
}
return args.Get(0).(*discovery.NetworkMember)
}

func (ga *gossipAdapterMock) Send(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer) {
// Ensure we have configured Send prior
foundSend := false
Expand Down Expand Up @@ -244,7 +253,7 @@ func TestBadInput(t *testing.T) {
adapter.On("Gossip", mock.Anything)
adapter.On("Send", mock.Anything, mock.Anything)
adapter.On("DeMultiplex", mock.Anything)
gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{}).(*gossipChannel)
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}).(*gossipChannel)
assert.False(t, gc.verifyMsg(nil))
assert.False(t, gc.verifyMsg(&receivedMsg{msg: nil, PKIID: nil}))

Expand Down Expand Up @@ -276,7 +285,7 @@ func TestChannelPeriodicalPublishStateInfo(t *testing.T) {
stateInfoReceptionChan <- msg
})

gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
stateInfoMsg := createStateInfoMsg(ledgerHeight, pkiIDInOrg1, channelA)
gc.UpdateStateInfo(stateInfoMsg)

Expand Down Expand Up @@ -319,7 +328,7 @@ func TestChannelPull(t *testing.T) {
assert.True(t, msg.IsDataMsg())
receivedBlocksChan <- msg
})
gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
go gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(100, pkiIDInOrg1, channelA)})

var wg sync.WaitGroup
Expand Down Expand Up @@ -348,7 +357,7 @@ func TestChannelPeerNotInChannel(t *testing.T) {
adapter.On("Gossip", mock.Anything)
adapter.On("Send", mock.Anything, mock.Anything)
adapter.On("DeMultiplex", mock.Anything)
gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})

// First thing, we test that blocks can only be received from peers that are in an org that's in the channel
// Empty PKI-ID, should drop the block
Expand Down Expand Up @@ -455,7 +464,7 @@ func TestChannelIsInChannel(t *testing.T) {
cs.On("VerifyBlock", mock.Anything).Return(nil)
adapter := new(gossipAdapterMock)
configureAdapter(adapter)
gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
adapter.On("Gossip", mock.Anything)
adapter.On("Send", mock.Anything, mock.Anything)
adapter.On("DeMultiplex", mock.Anything)
Expand All @@ -474,7 +483,7 @@ func TestChannelIsSubscribed(t *testing.T) {
cs.On("VerifyBlock", mock.Anything).Return(nil)
adapter := new(gossipAdapterMock)
configureAdapter(adapter)
gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
adapter.On("Gossip", mock.Anything)
adapter.On("Send", mock.Anything, mock.Anything)
adapter.On("DeMultiplex", mock.Anything)
Expand All @@ -490,7 +499,7 @@ func TestChannelAddToMessageStore(t *testing.T) {
demuxedMsgs := make(chan *proto.SignedGossipMessage, 1)
adapter := new(gossipAdapterMock)
configureAdapter(adapter)
gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
adapter.On("Gossip", mock.Anything)
adapter.On("Send", mock.Anything, mock.Anything)
adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) {
Expand Down Expand Up @@ -542,7 +551,7 @@ func TestChannelAddToMessageStoreExpire(t *testing.T) {
demuxedMsgs := make(chan *proto.SignedGossipMessage, 1)
adapter := new(gossipAdapterMock)
configureAdapter(adapter)
gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
adapter.On("Gossip", mock.Anything)
adapter.On("Send", mock.Anything, mock.Anything)
adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) {
Expand Down Expand Up @@ -635,7 +644,7 @@ func TestChannelBadBlocks(t *testing.T) {
adapter := new(gossipAdapterMock)
configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1})
adapter.On("Gossip", mock.Anything)
gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})

adapter.On("DeMultiplex", mock.Anything).Run(func(args mock.Arguments) {
receivedMessages <- args.Get(0).(*proto.SignedGossipMessage)
Expand Down Expand Up @@ -673,7 +682,7 @@ func TestChannelPulledBadBlocks(t *testing.T) {
configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1})
adapter.On("DeMultiplex", mock.Anything)
adapter.On("Gossip", mock.Anything)
gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)})

var wg sync.WaitGroup
Expand All @@ -699,7 +708,7 @@ func TestChannelPulledBadBlocks(t *testing.T) {
adapter.On("Gossip", mock.Anything)
adapter.On("DeMultiplex", mock.Anything)
configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1})
gc = NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc = NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)})

var wg2 sync.WaitGroup
Expand All @@ -719,7 +728,7 @@ func TestChannelPulledBadBlocks(t *testing.T) {
adapter.On("Gossip", mock.Anything)
adapter.On("DeMultiplex", mock.Anything)
configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1})
gc = NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc = NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)})

var wg3 sync.WaitGroup
Expand All @@ -742,7 +751,7 @@ func TestChannelPulledBadBlocks(t *testing.T) {
adapter.On("Gossip", mock.Anything)
adapter.On("DeMultiplex", mock.Anything)
configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1})
gc = NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc = NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)})

var wg4 sync.WaitGroup
Expand All @@ -764,8 +773,9 @@ func TestChannelStateInfoSnapshot(t *testing.T) {
cs := &cryptoService{}
cs.On("VerifyBlock", mock.Anything).Return(nil)
adapter := new(gossipAdapterMock)
adapter.On("Lookup", mock.Anything).Return(&discovery.NetworkMember{Endpoint: "localhost:5000"})
configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1})
gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
adapter.On("Gossip", mock.Anything)
sentMessages := make(chan *proto.GossipMessage, 10)
adapter.On("Send", mock.Anything, mock.Anything)
Expand Down Expand Up @@ -861,7 +871,6 @@ func TestChannelStateInfoSnapshot(t *testing.T) {
// Ensure we don't crash if we got a stateInfoMessage from a peer that its org isn't known
invalidStateInfoSnapshot = stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, common.PKIidType("unknown"), channelA))
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: invalidStateInfoSnapshot})

// Lets expire msg in store
time.Sleep(gc.(*gossipChannel).GetConf().StateInfoExpirationInterval + time.Second)

Expand Down Expand Up @@ -896,7 +905,100 @@ func TestChannelStateInfoSnapshot(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, []byte("4"), sMsg.GetStateInfo().Metadata)
}
}

func TestInterOrgExternalEndpointDisclosure(t *testing.T) {
t.Parallel()

cs := &cryptoService{}
adapter := new(gossipAdapterMock)
pkiID1 := common.PKIidType("withExternalEndpoint")
pkiID2 := common.PKIidType("noExternalEndpoint")
pkiID3 := common.PKIidType("pkiIDinOrg2")
adapter.On("Lookup", pkiID1).Return(&discovery.NetworkMember{Endpoint: "localhost:5000"})
adapter.On("Lookup", pkiID2).Return(&discovery.NetworkMember{})
adapter.On("GetOrgOfPeer", pkiID1).Return(orgInChannelA)
adapter.On("GetOrgOfPeer", pkiID2).Return(orgInChannelA)
adapter.On("GetOrgOfPeer", pkiID3).Return(api.OrgIdentityType("ORG2"))
adapter.On("Gossip", mock.Anything)
adapter.On("DeMultiplex", mock.Anything)
configureAdapter(adapter)
jcm := &joinChanMsg{
members2AnchorPeers: map[string][]api.AnchorPeer{
string(orgInChannelA): {},
"ORG2": {},
},
}
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, jcm)
gc.HandleMessage(&receivedMsg{PKIID: pkiID1, msg: createStateInfoMsg(0, pkiID1, channelA)})
gc.HandleMessage(&receivedMsg{PKIID: pkiID2, msg: createStateInfoMsg(0, pkiID2, channelA)})
gc.HandleMessage(&receivedMsg{PKIID: pkiID2, msg: createStateInfoMsg(0, pkiID3, channelA)})

sentMessages := make(chan *proto.GossipMessage, 10)

// Check that we only return StateInfo messages of peers with external endpoints
// to peers of other orgs
snapshotReq := &receivedMsg{
PKIID: pkiID3,
msg: (&proto.GossipMessage{
Tag: proto.GossipMessage_CHAN_OR_ORG,
Content: &proto.GossipMessage_StateInfoPullReq{
StateInfoPullReq: &proto.StateInfoPullRequest{
ChannelMAC: GenerateMAC(pkiID3, channelA),
},
},
}).NoopSign(),
}
snapshotReq.On("Respond", mock.Anything).Run(func(args mock.Arguments) {
sentMessages <- args.Get(0).(*proto.GossipMessage)
})

go gc.HandleMessage(snapshotReq)
select {
case <-time.After(time.Second):
assert.Fail(t, "Should have responded to this StateInfoSnapshot, but didn't")
case msg := <-sentMessages:
elements := msg.GetStateSnapshot().Elements
assert.Len(t, elements, 2)
m1, _ := elements[0].ToGossipMessage()
m2, _ := elements[1].ToGossipMessage()
pkiIDs := [][]byte{m1.GetStateInfo().PkiId, m2.GetStateInfo().PkiId}
assert.Contains(t, pkiIDs, []byte(pkiID1))
assert.Contains(t, pkiIDs, []byte(pkiID3))
}

// Check that we return all StateInfo messages to peers in our organization, regardless
// if the peers from foreign organizations have external endpoints or not
snapshotReq = &receivedMsg{
PKIID: pkiID2,
msg: (&proto.GossipMessage{
Tag: proto.GossipMessage_CHAN_OR_ORG,
Content: &proto.GossipMessage_StateInfoPullReq{
StateInfoPullReq: &proto.StateInfoPullRequest{
ChannelMAC: GenerateMAC(pkiID2, channelA),
},
},
}).NoopSign(),
}
snapshotReq.On("Respond", mock.Anything).Run(func(args mock.Arguments) {
sentMessages <- args.Get(0).(*proto.GossipMessage)
})

go gc.HandleMessage(snapshotReq)
select {
case <-time.After(time.Second):
assert.Fail(t, "Should have responded to this StateInfoSnapshot, but didn't")
case msg := <-sentMessages:
elements := msg.GetStateSnapshot().Elements
assert.Len(t, elements, 3)
m1, _ := elements[0].ToGossipMessage()
m2, _ := elements[1].ToGossipMessage()
m3, _ := elements[2].ToGossipMessage()
pkiIDs := [][]byte{m1.GetStateInfo().PkiId, m2.GetStateInfo().PkiId, m3.GetStateInfo().PkiId}
assert.Contains(t, pkiIDs, []byte(pkiID1))
assert.Contains(t, pkiIDs, []byte(pkiID2))
assert.Contains(t, pkiIDs, []byte(pkiID3))
}
}

func TestChannelStop(t *testing.T) {
Expand All @@ -910,7 +1012,7 @@ func TestChannelStop(t *testing.T) {
adapter.On("Send", mock.Anything, mock.Anything).Run(func(mock.Arguments) {
atomic.AddInt32(&sendCount, int32(1))
})
gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
time.Sleep(time.Second)
gc.Stop()
oldCount := atomic.LoadInt32(&sendCount)
Expand Down Expand Up @@ -976,7 +1078,7 @@ func TestChannelReconfigureChannel(t *testing.T) {
},
}

gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, api.JoinChannelMessage(newJoinChanMsg))
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, api.JoinChannelMessage(newJoinChanMsg))

// Just call it again, to make sure stuff don't crash
gc.ConfigureChannel(api.JoinChannelMessage(newJoinChanMsg))
Expand Down Expand Up @@ -1044,7 +1146,7 @@ func TestChannelNoAnchorPeers(t *testing.T) {
},
}

gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, api.JoinChannelMessage(jcm))
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, api.JoinChannelMessage(jcm))
assert.True(t, gc.IsOrgInChannel(orgInChannelA))
}

Expand All @@ -1068,7 +1170,7 @@ func TestChannelGetPeers(t *testing.T) {
{PKIid: pkiIDinOrg2},
}
configureAdapter(adapter, members...)
gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)})
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDinOrg2, channelA)})
assert.Len(t, gc.GetPeers(), 1)
Expand All @@ -1081,7 +1183,7 @@ func TestChannelGetPeers(t *testing.T) {

// Now recreate gc and corrupt the MAC
// and ensure that the StateInfo message doesn't count
gc = NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc = NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
msg := &receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)}
msg.GetGossipMessage().GetStateInfo().ChannelMAC = GenerateMAC(pkiIDinOrg2, channelA)
gc.HandleMessage(msg)
Expand All @@ -1107,7 +1209,7 @@ func TestOnDemandGossip(t *testing.T) {
adapter.On("Gossip", mock.Anything).Run(func(mock.Arguments) {
gossipedEvents <- struct{}{}
})
gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, api.JoinChannelMessage(&joinChanMsg{}))
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, api.JoinChannelMessage(&joinChanMsg{}))
defer gc.Stop()
select {
case <-gossipedEvents:
Expand Down
Loading

0 comments on commit 7ea1de5

Please sign in to comment.