Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gossipsub v1.1: prune peer exchange #234

Merged
merged 28 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
be049a5
gomod: use go-libp2p-core@peer-records and go-libp2p-peerstore@certif…
vyzo Nov 21, 2019
9d280a2
extend ControlPrune with peer exchange information
vyzo Nov 21, 2019
f68a02b
protocol ID for gossipsub v1.1
vyzo Nov 21, 2019
e2ebf99
peer exchange on prune
vyzo Nov 21, 2019
60f6b2f
backoff grafting to peers that have pruned us
vyzo Nov 22, 2019
152ebc5
connect to peers obtained through px
vyzo Nov 22, 2019
a9fdc41
test prune px with a star topology
vyzo Nov 23, 2019
9519116
trace peer exchange
vyzo Nov 23, 2019
47b221d
extend star topology test to assert that no peer is left with a singl…
vyzo Nov 23, 2019
f42ce48
make connection timeout a variable, set for 30s (instead of 10s)
vyzo Nov 25, 2019
0e5a1ed
add limit to the number of peers to connect to from px
vyzo Dec 5, 2019
1a261fe
shuffle peers when limiting px set
vyzo Dec 5, 2019
7c03aa0
don't spawn a goroutine for scheduling connections
vyzo Dec 6, 2019
895bcf6
track changes to peer records in -core
yusefnapora Jan 22, 2020
8186906
update PR branch dependencies
yusefnapora Jan 23, 2020
22faa75
fix import & var naming
yusefnapora Jan 23, 2020
c1831f0
add missing continue to error case
yusefnapora Jan 27, 2020
6a9e6d8
renaming in error messages & local var
yusefnapora Jan 27, 2020
082b8b9
gomod: use go-libp2p-core@peer-records and go-libp2p-peerstore@certif…
vyzo Nov 21, 2019
7a80d74
protocol ID for gossipsub v1.1
vyzo Nov 21, 2019
c65bd30
peer exchange on prune
vyzo Nov 21, 2019
e3bd9fa
backoff grafting to peers that have pruned us
vyzo Nov 22, 2019
1e152d2
connect to peers obtained through px
vyzo Nov 22, 2019
a198f5f
add limit to the number of peers to connect to from px
vyzo Dec 5, 2019
3d943c9
shuffle peers when limiting px set
vyzo Dec 5, 2019
2066fcd
don't spawn a goroutine for scheduling connections
vyzo Dec 6, 2019
480e48f
fix rebase artifacts
vyzo Mar 24, 2020
e86314f
gomod tidy
vyzo Mar 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ module github.com/libp2p/go-libp2p-pubsub

require (
github.com/gogo/protobuf v1.3.1
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/ipfs/go-log v0.0.1
github.com/ipfs/go-log v1.0.2
github.com/libp2p/go-libp2p-blankhost v0.1.4
github.com/libp2p/go-libp2p-core v0.3.0
github.com/libp2p/go-libp2p-core v0.5.0
github.com/libp2p/go-libp2p-discovery v0.2.0
github.com/libp2p/go-libp2p-swarm v0.2.2
github.com/multiformats/go-multiaddr v0.2.0
github.com/libp2p/go-libp2p-transport-upgrader v0.2.0 // indirect
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multistream v0.1.1
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee
golang.org/x/crypto v0.0.0-20200117160349-530e935923ad // indirect
)

go 1.13
63 changes: 51 additions & 12 deletions go.sum

Large diffs are not rendered by default.

220 changes: 204 additions & 16 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ import (
pb "github.com/libp2p/go-libp2p-pubsub/pb"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/record"
)

const (
GossipSubID = protocol.ID("/meshsub/1.0.0")
GossipSubID_v10 = protocol.ID("/meshsub/1.0.0")
GossipSubID_v11 = protocol.ID("/meshsub/1.1.0")
)

var (
Expand All @@ -33,6 +37,21 @@ var (

// fanout ttl
GossipSubFanoutTTL = 60 * time.Second

// number of peers to include in prune Peer eXchange
GossipSubPrunePeers = 16

// backoff time for pruned peers
GossipSubPruneBackoff = time.Minute

// number of active connection attempts for peers obtained through px
GossipSubConnectors = 16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this too much? 16 conn attempts at once, i.e. we'll try to connect to all peers recommended by the pruning one (as this value is equal to GossipSubPrunePeers.

I'd prefer if connect was a buffered channel, and we'd have less concurrent goroutines. Right now it's a bit hit or miss, e.g. if we got pruned from two topics at once, the connector goroutines will be saturated with the first batch, and all peers from the second batch will be dropped.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a buffered channel! But yes, we can certainly lower from 16, how about 8 or 4?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Max inflight dials FD limit is 160 by default. Assuming each peer has an average of 3 addresses, 16 connectors could make use 30% of our FD allowance. I'd scale this down to 8 by default, but I'd add an option so the app can increase/decrease it (it may also touch the swarm limits!).

Also, we will need some fairness heuristic here. If we're pruned from two or three topics at once, the first topic will get all slots, and the other two will be queued. Instead, we might want to balance peers from topics 1, 2, and 3 for quicker healing. Some form of priority queue could work well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the hardening branch reduces this to 8.


// maximum number of pending connections for peers attempted through px
GossipSubMaxPendingConnections = 1024

// timeout for connection attempts
GossipSubConnectionTimeout = 30 * time.Second
)

// NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
Expand All @@ -44,6 +63,8 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
lastpub: make(map[string]int64),
gossip: make(map[peer.ID][]*pb.ControlIHave),
control: make(map[peer.ID]*pb.ControlMessage),
backoff: make(map[string]map[peer.ID]time.Time),
connect: make(chan connectInfo, GossipSubMaxPendingConnections),
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
}
return NewPubSub(ctx, h, rt, opts...)
Expand All @@ -58,18 +79,25 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
// messages to their topic for GossipSubFanoutTTL.
type GossipSubRouter struct {
p *PubSub
peers map[peer.ID]protocol.ID // peer protocols
mesh map[string]map[peer.ID]struct{} // topic meshes
fanout map[string]map[peer.ID]struct{} // topic fanout
lastpub map[string]int64 // last publish time for fanout topics
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
control map[peer.ID]*pb.ControlMessage // pending control messages
peers map[peer.ID]protocol.ID // peer protocols
mesh map[string]map[peer.ID]struct{} // topic meshes
fanout map[string]map[peer.ID]struct{} // topic fanout
lastpub map[string]int64 // last publish time for fanout topics
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
control map[peer.ID]*pb.ControlMessage // pending control messages
backoff map[string]map[peer.ID]time.Time // prune backoff
vyzo marked this conversation as resolved.
Show resolved Hide resolved
connect chan connectInfo // px connection requests
mcache *MessageCache
tracer *pubsubTracer
}

type connectInfo struct {
p peer.ID
spr *record.Envelope
}

func (gs *GossipSubRouter) Protocols() []protocol.ID {
return []protocol.ID{GossipSubID, FloodSubID}
return []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID}
}

func (gs *GossipSubRouter) Attach(p *PubSub) {
Expand All @@ -78,6 +106,9 @@ func (gs *GossipSubRouter) Attach(p *PubSub) {
// start using the same msg ID function as PubSub for caching messages.
gs.mcache.SetMsgIdFn(p.msgID)
go gs.heartbeatTimer()
for i := 0; i < GossipSubConnectors; i++ {
go gs.connector()
}
}

func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
Expand Down Expand Up @@ -226,7 +257,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.

cprune := make([]*pb.ControlPrune, 0, len(prune))
for _, topic := range prune {
cprune = append(cprune, &pb.ControlPrune{TopicID: &topic})
cprune = append(cprune, gs.makePrune(p, topic))
}

return cprune
Expand All @@ -241,6 +272,103 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.untagPeer(p, topic)
gs.addBackoff(p, topic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 great!

However, we're not currently doing anything about a peer continuously chasing away other peers by sending a GRAFT. If during a GRAFT we could check if we're already over the limit and send pack a PRUNE that would largely resolve this case.

Copy link
Collaborator Author

@vyzo vyzo Nov 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to do that because we will reject all new peers and they may not be able to form a connected mesh.
That's why it accepts the peer and resolves (with randomization) during the heartbeat.

Copy link
Collaborator Author

@vyzo vyzo Nov 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear, sending a PRUNE if we are over the limit is the wrong thing to do, because then the mesh can become full and fail to accept new peers.
(I considered it when designing the algorithm)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that it used to be that way, but is that still true now that we have peer exchange?

If I send back a PRUNE only when I have >Dhi peers and I prune (Dhi-D) peers and tell them about each other won't they always be able to join the mesh?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They probably will be able to do that, but I'd rather reshuffle the mesh in its entirety instead of rejecting new GRAFTs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think about a fully connected mesh where all peers are at D_hi -- (unlikely as it is) it won't accept new peers then, while GRAFTing and reshuffling will resolve the situation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so IIUC this is really a timing problem right? Because if B is connected to C and then A tries to connect to B it's possible that when B PRUNEs A and C that A will send a GRAFT to C before C receives the PRUNE from B.

My concern with the current approach is that if A wants to GRAFT to B it can always keep spamming B and eventually it will get through, even though B has given A plenty of other peers to connect to. This is annoying since if a peer is "better" in some way (e.g. they are the primary publisher) then nodes might be selfish, however it's certainly not a deal-breaker and wasn't even resolvable until the Peer Exchange changes.

Since fixing this would likely require some protocol thought and is less important than the rest of the episub work, seems reasonable to push this further down the road. Would you like me to create an issue?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be an attack -- I don't think it can happen naturally.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can create an issue to discuss further instead of it being lost in oblivion after the pr merges.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a recent win here! The decisions of which peers to retain and which to eject is now performed by evaluating the peer's score! 🎉

px := prune.GetPeers()
if len(px) > 0 {
gs.pxConnect(px)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have two concerns:

  1. What if a single peer sends a bunch of prunes? Could they cause us to launch a ton of goroutines (and crash)?
  2. Can't a peer use this to trick us into connecting to a ton of (potentially non-useful) peers? We should only try connecting to the number of peers we actually need (one?).

Ideally, we'd stick these peers in a set of known peers for the topic, connecting to them as-needed whenever we're pruned or we disconnect from a peer.

Copy link
Collaborator Author

@vyzo vyzo Dec 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We must be already grafted, we ignore prunes that don't correspond to a peer on our mesh. So a peer can't make us launch an arbitrary number of goroutines by sending us prunes; at most he can make us launch a single goroutine for each topic we belong to its mesh. Not much of a vector I think.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We could have a (legitimate) prune listing an arbitrary number of useless peers; we could limit the number of peers we connect to.

Copy link
Collaborator Author

@vyzo vyzo Dec 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 2 I added a check that limits the number of connections to at most GossipSubPrunePeers, so this should address the concern.
We do want more than 1 peer in general, to expand our potential peers as much as possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. SGTM.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I slept on it and there is a DoS vector: A malicious peer could send us in sequence GRAFT/PRUNE and cause us to spawn a goroutine; it could simply be sitting there sending GRAFT/PRUNE ad infinum, causing us to spawn a goroutine for each pair.
Granted, the effect is limited in how many goroutines it can fit inside the 30s window for the connection timeout, but it's still nasty.
I will rewrite the connection logic to use a limited set of pending connections and goroutine-free scheduling.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented a connection scheduler, which limits the number of max pending connections too.

}
}
}
}

func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) {
backoff, ok := gs.backoff[topic]
if !ok {
backoff = make(map[peer.ID]time.Time)
gs.backoff[topic] = backoff
}
backoff[p] = time.Now().Add(GossipSubPruneBackoff)
}

func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) {
if len(peers) > GossipSubPrunePeers {
shufflePeerInfo(peers)
peers = peers[:GossipSubPrunePeers]
vyzo marked this conversation as resolved.
Show resolved Hide resolved
}

toconnect := make([]connectInfo, 0, len(peers))

for _, pi := range peers {
p := peer.ID(pi.PeerID)

_, connected := gs.peers[p]
if connected {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we'll connect to PEX peers but we'll wait until the next heartbeat to rebalance the mesh. That's why we can safely skip topic members that we're already connected to, because we'll anyway consider them in the next heartbeat (as long as we remain connected to them). I think that's correct.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, do you want a comment here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally ;-)

continue
}

var spr *record.Envelope
if pi.SignedPeerRecord != nil {
// the peer sent us a signed record; ensure that it is valid
envelope, r, err := record.ConsumeEnvelope(pi.SignedPeerRecord, peer.PeerRecordEnvelopeDomain)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We recently added https://godoc.org/github.com/libp2p/go-libp2p-core/record#ConsumeTypedEnvelope, which is much nicer on the allocator (can re-use a single PeerRecord instance). May want to move to that at some point.

if err != nil {
log.Warnf("error unmarshalling peer record obtained through px: %s", err)
continue
}
rec, ok := r.(*peer.PeerRecord)
if !ok {
log.Warnf("bogus peer record obtained through px: envelope payload is not PeerRecord")
continue
}
if rec.PeerID != p {
log.Warnf("bogus peer record obtained through px: peer ID %s doesn't match expected peer %s", rec.PeerID, p)
continue
}
spr = envelope
}

toconnect = append(toconnect, connectInfo{p, spr})
}

if len(toconnect) == 0 {
return
}

for _, ci := range toconnect {
select {
case gs.connect <- ci:
default:
log.Debugf("ignoring peer connection attempt; too many pending connections")
break
}
}
}

func (gs *GossipSubRouter) connector() {
for {
select {
case ci := <-gs.connect:
if gs.p.host.Network().Connectedness(ci.p) == network.Connected {
continue
}

log.Debugf("connecting to %s", ci.p)
cab, ok := peerstore.GetCertifiedAddrBook(gs.p.host.Peerstore())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a sync.Once to only set this one per goroutine?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it is all that expensive, it's just a cast isn't it?

if ok && ci.spr != nil {
_, err := cab.ConsumePeerRecord(ci.spr, peerstore.TempAddrTTL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: we had to roll back the change in the peerstore that clears all addresses, and blocks subsequent uncertified address additions, once a peer record is seen. It caused downstream regressions (IPFS, and others).

For now we're just accumulating all addresses.

As a definitive solution, I'm enhancing the peerstore with "address labels" such that certified addresses will be tagged as such, and in the host.Connect() below, you'll be able to set dial constraints, i.e. "only dial certified addresses".

With this solution, the peerstore tracks all addresses, and it's up to the application to filter which ones it wants to actually use.

if err != nil {
log.Debugf("error processing peer record: %s", err)
}
}

ctx, cancel := context.WithTimeout(gs.p.ctx, GossipSubConnectionTimeout)
err := gs.p.host.Connect(ctx, peer.AddrInfo{ID: ci.p})
cancel()
if err != nil {
log.Debugf("error connecting to %s: %s", ci.p, err)
}

case <-gs.p.ctx.Done():
return
}
}
}
Expand Down Expand Up @@ -360,7 +488,7 @@ func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) {
}

func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) {
prune := []*pb.ControlPrune{&pb.ControlPrune{TopicID: &topic}}
prune := []*pb.ControlPrune{gs.makePrune(p, topic)}
out := rpcWithControl(nil, nil, nil, nil, prune)
gs.sendRPC(p, out)
}
Expand Down Expand Up @@ -443,16 +571,21 @@ func (gs *GossipSubRouter) heartbeat() {
tograft := make(map[peer.ID][]string)
toprune := make(map[peer.ID][]string)

// clean up expired backoffs
gs.clearBackoff()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably fine, but we should monitor this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is, walking through each of these every heartbeat should be fine, but could get expensive in some edge cases.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do it every few heartbeats instead of every heartbeat.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every heartbeat is probably fine, I'm just calling it out so we keep it in mind.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A way more efficient manner would be to use a ring buffer of peer slices, with nslots = BackoffPeriod/HeartbeatInterval. Then you track the tick count (monotonically increasing by 1), every heartbeat increments it by 1. And when clearing the backoff, you simply do currentTick mod nslots, and simply clear that slot. That's pretty efficient. The data for backoff would turn into map[string][][]peer.ID.


// maintain the mesh for topics we have joined
for topic, peers := range gs.mesh {

// do we have enough peers?
if len(peers) < GossipSubDlo {
backoff := gs.backoff[topic]
ineed := GossipSubD - len(peers)
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
// filter our current peers
_, ok := peers[p]
return !ok
// filter our current peers and peers we are backing off
_, inMesh := peers[p]
_, doBackoff := backoff[p]
return !inMesh && !doBackoff
})

for _, p := range plst {
Expand Down Expand Up @@ -531,6 +664,20 @@ func (gs *GossipSubRouter) heartbeat() {
gs.mcache.Shift()
}

func (gs *GossipSubRouter) clearBackoff() {
now := time.Now()
for topic, backoff := range gs.backoff {
for p, expire := range backoff {
if expire.Before(now) {
delete(backoff, p)
}
}
if len(backoff) == 0 {
delete(gs.backoff, topic)
}
}
}

func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string) {
for p, topics := range tograft {
graft := make([]*pb.ControlGraft, 0, len(topics))
Expand All @@ -544,7 +691,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string)
delete(toprune, p)
prune = make([]*pb.ControlPrune, 0, len(pruning))
for _, topic := range pruning {
prune = append(prune, &pb.ControlPrune{TopicID: &topic})
prune = append(prune, gs.makePrune(p, topic))
}
}

Expand All @@ -555,7 +702,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string)
for p, topics := range toprune {
prune := make([]*pb.ControlPrune, 0, len(topics))
for _, topic := range topics {
prune = append(prune, &pb.ControlPrune{TopicID: &topic})
prune = append(prune, gs.makePrune(p, topic))
}

out := rpcWithControl(nil, nil, nil, nil, prune)
Expand Down Expand Up @@ -673,6 +820,40 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control
}
}

func (gs *GossipSubRouter) makePrune(p peer.ID, topic string) *pb.ControlPrune {
if gs.peers[p] == GossipSubID_v10 {
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
return &pb.ControlPrune{TopicID: &topic}
}

// select peers for Peer eXchange
peers := gs.getPeers(topic, GossipSubPrunePeers, func(xp peer.ID) bool {
return p != xp
})

cab, ok := peerstore.GetCertifiedAddrBook(gs.p.host.Peerstore())
px := make([]*pb.PeerInfo, 0, len(peers))
for _, p := range peers {
// see if we have a signed peer record to send back; if we don't, just send
// the peer ID and let the pruned peer find them in the DHT -- we can't trust
// unsigned address records through px anyway.
var recordBytes []byte
if ok {
spr := cab.GetPeerRecord(p)
var err error
if spr != nil {
recordBytes, err = spr.Marshal()
if err != nil {
log.Warnf("error marshaling signed peer record for %s: %s", p, err)
}
}
}
px = append(px, &pb.PeerInfo{PeerID: []byte(p), SignedPeerRecord: recordBytes})
}

return &pb.ControlPrune{TopicID: &topic, Peers: px}
}

func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID) bool) []peer.ID {
tmap, ok := gs.p.topics[topic]
if !ok {
Expand All @@ -681,7 +862,7 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID

peers := make([]peer.ID, 0, len(tmap))
for p := range tmap {
if gs.peers[p] == GossipSubID && filter(p) {
if (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && filter(p) {
peers = append(peers, p)
}
}
Expand Down Expand Up @@ -731,3 +912,10 @@ func shufflePeers(peers []peer.ID) {
peers[i], peers[j] = peers[j], peers[i]
}
}

func shufflePeerInfo(peers []*pb.PeerInfo) {
for i := range peers {
j := rand.Intn(i + 1)
peers[i], peers[j] = peers[j], peers[i]
}
}
Loading