Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to handle newly subscribed peers #190

Merged
merged 12 commits into from
Aug 7, 2019
Merged
141 changes: 141 additions & 0 deletions floodsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,3 +1063,144 @@ func TestImproperlySignedMessageRejected(t *testing.T) {
)
}
}

func TestSubscriptionJoinNotification(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const numLateSubscribers = 10
const numHosts = 20
hosts := getNetHosts(t, ctx, numHosts)

psubs := getPubsubs(ctx, hosts)

msgs := make([]*Subscription, numHosts)
subPeersFound := make([]map[peer.ID]struct{}, numHosts)

// Have some peers subscribe earlier than other peers.
// This exercises whether we get subscription notifications from
// existing peers.
for i, ps := range psubs[numLateSubscribers:] {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs[i] = subch
}

connectAll(t, hosts)

time.Sleep(time.Millisecond * 100)

// Have the rest subscribe
for i, ps := range psubs[:numLateSubscribers] {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs[i+numLateSubscribers] = subch
}

wg := sync.WaitGroup{}
for i := 0; i < numHosts; i++ {
peersFound := make(map[peer.ID]struct{})
subPeersFound[i] = peersFound
sub := msgs[i]
wg.Add(1)
go func(peersFound map[peer.ID]struct{}) {
defer wg.Done()
for i := 0; i < numHosts-1; i++ {
pid, err := sub.NextPeerJoin(ctx)
if err != nil {
t.Fatal(err)
}
peersFound[pid] = struct{}{}
}
}(peersFound)
}

wg.Wait()
for _, peersFound := range subPeersFound {
if len(peersFound) != numHosts-1 {
t.Fatal("incorrect number of peers found")
}
}
}

func TestSubscriptionLeaveNotification(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const numHosts = 20
hosts := getNetHosts(t, ctx, numHosts)

psubs := getPubsubs(ctx, hosts)

msgs := make([]*Subscription, numHosts)
subPeersFound := make([]map[peer.ID]struct{}, numHosts)

// Subscribe all peers and wait until they've all been found
for i, ps := range psubs {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs[i] = subch
}

connectAll(t, hosts)

time.Sleep(time.Millisecond * 100)

wg := sync.WaitGroup{}
for i := 0; i < numHosts; i++ {
peersFound := make(map[peer.ID]struct{})
subPeersFound[i] = peersFound
sub := msgs[i]
wg.Add(1)
go func(peersFound map[peer.ID]struct{}) {
defer wg.Done()
for i := 0; i < numHosts-1; i++ {
pid, err := sub.NextPeerJoin(ctx)
if err != nil {
t.Fatal(err)
}
peersFound[pid] = struct{}{}
}
}(peersFound)
}

wg.Wait()
for _, peersFound := range subPeersFound {
if len(peersFound) != numHosts-1 {
t.Fatal("incorrect number of peers found")
}
}

// Test removing peers and verifying that they cause events
msgs[1].Cancel()
hosts[2].Close()
psubs[0].BlacklistPeer(hosts[3].ID())

leavingPeers := make(map[peer.ID]struct{})
for i := 0; i < 3; i++ {
pid, err := msgs[0].NextPeerLeave(ctx)
if err != nil {
t.Fatal(err)
}
leavingPeers[pid] = struct{}{}
}

if _, ok := leavingPeers[hosts[1].ID()]; !ok {
t.Fatal(fmt.Errorf("canceling subscription did not cause a leave event"))
}
if _, ok := leavingPeers[hosts[2].ID()]; !ok {
t.Fatal(fmt.Errorf("closing host did not cause a leave event"))
}
if _, ok := leavingPeers[hosts[3].ID()]; !ok {
t.Fatal(fmt.Errorf("blacklisting peer did not cause a leave event"))
}
}
54 changes: 49 additions & 5 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type PubSub struct {
// topics tracks which topics each of our peers are subscribed to
topics map[string]map[peer.ID]struct{}

// a set of notification channels for newly subscribed peers
newSubs map[string]chan peer.ID
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved

// sendMsg handles messages that have been validated
sendMsg chan *sendReq

Expand Down Expand Up @@ -333,8 +336,9 @@ func (p *PubSub) processLoop(ctx context.Context) {
}

delete(p.peers, pid)
for _, t := range p.topics {
delete(t, pid)
for t, tmap := range p.topics {
delete(tmap, pid)
p.notifySubscriberLeft(t, pid)
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
}

p.rt.RemovePeer(pid)
Expand Down Expand Up @@ -392,8 +396,9 @@ func (p *PubSub) processLoop(ctx context.Context) {
if ok {
close(ch)
delete(p.peers, pid)
for _, t := range p.topics {
delete(t, pid)
for t, tmap := range p.topics {
delete(tmap, pid)
p.notifySubscriberLeft(t, pid)
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
}
p.rt.RemovePeer(pid)
}
Expand All @@ -418,6 +423,8 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) {

sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()")
close(sub.ch)
close(sub.inboundSubs)
close(sub.leavingSubs)
delete(subs, sub)

if len(subs) == 0 {
Expand Down Expand Up @@ -447,9 +454,21 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) {
subs = p.myTopics[sub.topic]
}

tmap := p.topics[sub.topic]
inboundBufSize := len(tmap)
if inboundBufSize < 32 {
inboundBufSize = 32
}

sub.ch = make(chan *Message, 32)
sub.inboundSubs = make(chan peer.ID, inboundBufSize)
sub.leavingSubs = make(chan peer.ID, 32)
sub.cancelCh = p.cancelCh

for pid := range tmap {
sub.inboundSubs <- pid
}

p.myTopics[sub.topic][sub] = struct{}{}

req.resp <- sub
Expand Down Expand Up @@ -560,6 +579,18 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
return false
}

func (p *PubSub) notifySubscriberLeft(topic string, pid peer.ID) {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
if subs, ok := p.myTopics[topic]; ok {
for s := range subs {
select {
case s.leavingSubs <- pid:
default:
log.Infof("Can't deliver leave event to subscription for topic %s; subscriber too slow", topic)
}
}
}
}

func (p *PubSub) handleIncomingRPC(rpc *RPC) {
for _, subopt := range rpc.GetSubscriptions() {
t := subopt.GetTopicid()
Expand All @@ -570,13 +601,26 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
p.topics[t] = tmap
}

tmap[rpc.from] = struct{}{}
if _, ok = tmap[rpc.from]; !ok {
tmap[rpc.from] = struct{}{}
if subs, ok := p.myTopics[t]; ok {
inboundPeer := rpc.from
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
for s := range subs {
select {
case s.inboundSubs <- inboundPeer:
default:
log.Infof("Can't deliver join event to subscription for topic %s; subscriber too slow", t)
}
}
}
}
} else {
tmap, ok := p.topics[t]
if !ok {
continue
}
delete(tmap, rpc.from)
p.notifySubscriberLeft(t, rpc.from)
}
}

Expand Down
37 changes: 33 additions & 4 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package pubsub

import (
"context"
"github.com/libp2p/go-libp2p-core/peer"
)

type Subscription struct {
topic string
ch chan *Message
cancelCh chan<- *Subscription
err error
topic string
ch chan *Message
cancelCh chan<- *Subscription
inboundSubs chan peer.ID
leavingSubs chan peer.ID
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
err error
}

func (sub *Subscription) Topic() string {
Expand All @@ -31,3 +34,29 @@ func (sub *Subscription) Next(ctx context.Context) (*Message, error) {
func (sub *Subscription) Cancel() {
sub.cancelCh <- sub
}

func (sub *Subscription) NextPeerJoin(ctx context.Context) (peer.ID, error) {
select {
case newPeer, ok := <-sub.inboundSubs:
if !ok {
return newPeer, sub.err
}

return newPeer, nil
case <-ctx.Done():
return "", ctx.Err()
}
}

func (sub *Subscription) NextPeerLeave(ctx context.Context) (peer.ID, error) {
select {
case leavingPeer, ok := <-sub.leavingSubs:
if !ok {
return leavingPeer, sub.err
}

return leavingPeer, nil
case <-ctx.Done():
return "", ctx.Err()
}
}