Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable outbound peer queue sizes #230

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 51 additions & 34 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pubsub
import (
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"sync"
Expand Down Expand Up @@ -45,6 +46,9 @@ type PubSub struct {

disc *discover

// size of the outbound message channel that we maintain for each peer
peerOutboundQueueSize int

// incoming messages from other peers
incoming chan *RPC

Expand Down Expand Up @@ -169,38 +173,39 @@ type Option func(*PubSub) error
// NewPubSub returns a new PubSub management object.
func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) {
ps := &PubSub{
host: h,
ctx: ctx,
rt: rt,
val: newValidation(),
disc: &discover{},
signID: h.ID(),
signKey: h.Peerstore().PrivKey(h.ID()),
signStrict: true,
incoming: make(chan *RPC, 32),
publish: make(chan *Message),
newPeers: make(chan peer.ID),
newPeerStream: make(chan network.Stream),
newPeerError: make(chan peer.ID),
peerDead: make(chan peer.ID),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
addTopic: make(chan *addTopicReq),
rmTopic: make(chan *rmTopicReq),
getTopics: make(chan *topicReq),
sendMsg: make(chan *Message, 32),
addVal: make(chan *addValReq),
rmVal: make(chan *rmValReq),
eval: make(chan func()),
myTopics: make(map[string]*Topic),
mySubs: make(map[string]map[*Subscription]struct{}),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
counter: uint64(time.Now().UnixNano()),
host: h,
ctx: ctx,
rt: rt,
val: newValidation(),
disc: &discover{},
peerOutboundQueueSize: 32,
signID: h.ID(),
signKey: h.Peerstore().PrivKey(h.ID()),
signStrict: true,
incoming: make(chan *RPC, 32),
publish: make(chan *Message),
newPeers: make(chan peer.ID),
newPeerStream: make(chan network.Stream),
newPeerError: make(chan peer.ID),
peerDead: make(chan peer.ID),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
addTopic: make(chan *addTopicReq),
rmTopic: make(chan *rmTopicReq),
getTopics: make(chan *topicReq),
sendMsg: make(chan *Message, 32),
addVal: make(chan *addValReq),
rmVal: make(chan *rmValReq),
eval: make(chan func()),
myTopics: make(map[string]*Topic),
mySubs: make(map[string]map[*Subscription]struct{}),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
counter: uint64(time.Now().UnixNano()),
}

for _, opt := range opts {
Expand Down Expand Up @@ -232,6 +237,18 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
return ps, nil
}

// WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer
// We start dropping messages to a peer if the outbound queue if full
func WithPeerOutboundQueueSize(size int) Option {
return func(p *PubSub) error {
if size <= 0 {
return errors.New("outbound queue size must always be positive")
}
p.peerOutboundQueueSize = size
return nil
}
}

// WithMessageSigning enables or disables message signing (enabled by default).
func WithMessageSigning(enabled bool) Option {
return func(p *PubSub) error {
Expand Down Expand Up @@ -327,7 +344,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
continue
}

messages := make(chan *RPC, 32)
messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket()
go p.handleNewPeer(ctx, pid, messages)
p.peers[pid] = messages
Expand Down Expand Up @@ -366,7 +383,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
// still connected, must be a duplicate connection being closed.
// we respawn the writer as we need to ensure there is a stream active
log.Warning("peer declared dead but still connected; respawning writer: ", pid)
messages := make(chan *RPC, 32)
messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket()
go p.handleNewPeer(ctx, pid, messages)
p.peers[pid] = messages
Expand Down