Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make bitswap better #2798

Merged
merged 1 commit into from
Jun 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions exchange/bitswap/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package decision

import (
"sync"
"time"

blocks "github.com/ipfs/go-ipfs/blocks"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
Expand Down Expand Up @@ -68,7 +69,7 @@ type Engine struct {
// peerRequestQueue is a priority queue of requests received from peers.
// Requests are popped from the queue, packaged up, and placed in the
// outbox.
peerRequestQueue peerRequestQueue
peerRequestQueue *prq

// FIXME it's a bit odd for the client and the worker to both share memory
// (both modify the peerRequestQueue) and also to communicate over the
Expand All @@ -86,6 +87,8 @@ type Engine struct {
lock sync.Mutex // protects the fields immediatly below
// ledgerMap lists Ledgers by their Partner key.
ledgerMap map[peer.ID]*ledger

ticker *time.Ticker
}

func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
Expand All @@ -95,6 +98,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
peerRequestQueue: newPRQ(),
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
}
go e.taskWorker(ctx)
return e
Expand Down Expand Up @@ -142,6 +146,9 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
return nil, ctx.Err()
case <-e.workSignal:
nextTask = e.peerRequestQueue.Pop()
case <-e.ticker.C:
e.peerRequestQueue.thawRound()
nextTask = e.peerRequestQueue.Pop()
}
}

Expand Down Expand Up @@ -191,9 +198,6 @@ func (e *Engine) Peers() []peer.ID {
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
e.lock.Lock()
defer e.lock.Unlock()

if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 {
log.Debugf("received empty message from %s", p)
}
Expand All @@ -206,6 +210,8 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
}()

l := e.findOrCreate(p)
l.lk.Lock()
defer l.lk.Unlock()
if m.Full() {
l.wantList = wl.New()
}
Expand Down Expand Up @@ -236,10 +242,12 @@ func (e *Engine) addBlock(block blocks.Block) {
work := false

for _, l := range e.ledgerMap {
l.lk.Lock()
if entry, ok := l.WantListContains(block.Key()); ok {
e.peerRequestQueue.Push(entry, l.Partner)
work = true
}
l.lk.Unlock()
}

if work {
Expand All @@ -261,9 +269,6 @@ func (e *Engine) AddBlock(block blocks.Block) {
// send happen atomically

func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
e.lock.Lock()
defer e.lock.Unlock()

l := e.findOrCreate(p)
for _, block := range m.Blocks() {
l.SentBytes(len(block.Data()))
Expand All @@ -290,11 +295,13 @@ func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {

// ledger lazily instantiates a ledger
func (e *Engine) findOrCreate(p peer.ID) *ledger {
e.lock.Lock()
l, ok := e.ledgerMap[p]
if !ok {
l = newLedger(p)
e.ledgerMap[p] = l
}
e.lock.Unlock()
return l
}

Expand Down
3 changes: 3 additions & 0 deletions exchange/bitswap/decision/ledger.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package decision

import (
"sync"
"time"

key "github.com/ipfs/go-ipfs/blocks/key"
Expand Down Expand Up @@ -44,6 +45,8 @@ type ledger struct {
// sentToPeer is a set of keys to ensure we dont send duplicate blocks
// to a given peer
sentToPeer map[key.Key]time.Time

lk sync.Mutex
}

type debtRatio struct {
Expand Down
57 changes: 54 additions & 3 deletions exchange/bitswap/decision/peer_request_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ type peerRequestQueue interface {
Pop() *peerRequestTask
Push(entry wantlist.Entry, to peer.ID)
Remove(k key.Key, p peer.ID)

// NB: cannot expose simply expose taskQueue.Len because trashed elements
// may exist. These trashed elements should not contribute to the count.
}

func newPRQ() peerRequestQueue {
func newPRQ() *prq {
return &prq{
taskMap: make(map[string]*peerRequestTask),
partners: make(map[peer.ID]*activePartner),
frozen: make(map[peer.ID]*activePartner),
pQueue: pq.New(partnerCompare),
}
}
Expand All @@ -38,6 +40,8 @@ type prq struct {
pQueue pq.PQ
taskMap map[string]*peerRequestTask
partners map[peer.ID]*activePartner

frozen map[peer.ID]*activePartner
}

// Push currently adds a new peerRequestTask to the end of the list
Expand Down Expand Up @@ -92,7 +96,7 @@ func (tl *prq) Pop() *peerRequestTask {
partner := tl.pQueue.Pop().(*activePartner)

var out *peerRequestTask
for partner.taskQueue.Len() > 0 {
for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 {
out = partner.taskQueue.Pop().(*peerRequestTask)
delete(tl.taskMap, out.Key())
if out.trash {
Expand Down Expand Up @@ -120,11 +124,47 @@ func (tl *prq) Remove(k key.Key, p peer.ID) {
t.trash = true

// having canceled a block, we now account for that in the given partner
tl.partners[p].requests--
partner := tl.partners[p]
partner.requests--

// we now also 'freeze' that partner. If they sent us a cancel for a
// block we were about to send them, we should wait a short period of time
// to make sure we receive any other in-flight cancels before sending
// them a block they already potentially have
if partner.freezeVal == 0 {
tl.frozen[p] = partner
}

partner.freezeVal++
tl.pQueue.Update(partner.index)
}
tl.lock.Unlock()
}

func (tl *prq) fullThaw() {
tl.lock.Lock()
defer tl.lock.Unlock()

for id, partner := range tl.frozen {
partner.freezeVal = 0
delete(tl.frozen, id)
tl.pQueue.Update(partner.index)
}
}

func (tl *prq) thawRound() {
tl.lock.Lock()
defer tl.lock.Unlock()

for id, partner := range tl.frozen {
partner.freezeVal -= (partner.freezeVal + 1) / 2
if partner.freezeVal <= 0 {
delete(tl.frozen, id)
}
tl.pQueue.Update(partner.index)
}
}

type peerRequestTask struct {
Entry wantlist.Entry
Target peer.ID
Expand Down Expand Up @@ -196,6 +236,8 @@ type activePartner struct {
// for the PQ interface
index int

freezeVal int

// priority queue of tasks belonging to this peer
taskQueue pq.PQ
}
Expand All @@ -208,6 +250,7 @@ func newActivePartner() *activePartner {
}

// partnerCompare implements pq.ElemComparator
// returns true if peer 'a' has higher priority than peer 'b'
func partnerCompare(a, b pq.Elem) bool {
pa := a.(*activePartner)
pb := b.(*activePartner)
Expand All @@ -220,6 +263,14 @@ func partnerCompare(a, b pq.Elem) bool {
if pb.requests == 0 {
return true
}

if pa.freezeVal > pb.freezeVal {
return false
}
if pa.freezeVal < pb.freezeVal {
return true
}

if pa.active == pb.active {
// sorting by taskQueue.Len() aids in cleaning out trash entries faster
// if we sorted instead by requests, one peer could potentially build up
Expand Down
2 changes: 2 additions & 0 deletions exchange/bitswap/decision/peer_request_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func TestPushPop(t *testing.T) {
prq.Remove(key.Key(consonant), partner)
}

prq.fullThaw()

var out []string
for {
received := prq.Pop()
Expand Down
7 changes: 7 additions & 0 deletions exchange/bitswap/network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@ type BitSwapNetwork interface {

ConnectTo(context.Context, peer.ID) error

NewMessageSender(context.Context, peer.ID) (MessageSender, error)

Routing
}

type MessageSender interface {
SendMsg(bsmsg.BitSwapMessage) error
Close() error
}

// Implement Receiver to receive messages from the BitSwapNetwork
type Receiver interface {
ReceiveMessage(
Expand Down
21 changes: 21 additions & 0 deletions exchange/bitswap/network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ type impl struct {
receiver Receiver
}

type streamMessageSender struct {
s inet.Stream
}

func (s *streamMessageSender) Close() error {
return s.s.Close()
}

func (s *streamMessageSender) SendMsg(msg bsmsg.BitSwapMessage) error {
return msg.ToNet(s.s)
}

func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID) (MessageSender, error) {
s, err := bsnet.newStreamToPeer(ctx, p)
if err != nil {
return nil, err
}

return &streamMessageSender{s: s}, nil
}

func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) {

// first, make sure we're connected.
Expand Down
24 changes: 24 additions & 0 deletions exchange/bitswap/testnet/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,30 @@ func (nc *networkClient) FindProvidersAsync(ctx context.Context, k key.Key, max
return out
}

type messagePasser struct {
net *network
target peer.ID
local peer.ID
ctx context.Context
}

func (mp *messagePasser) SendMsg(m bsmsg.BitSwapMessage) error {
return mp.net.SendMessage(mp.ctx, mp.local, mp.target, m)
}

func (mp *messagePasser) Close() error {
return nil
}

func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) {
return &messagePasser{
net: n.network,
target: p,
local: n.local,
ctx: ctx,
}, nil
}

// Provide provides the key to the network
func (nc *networkClient) Provide(ctx context.Context, k key.Key) error {
return nc.routing.Provide(ctx, k)
Expand Down
Loading