Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #5 from taylormike/feat/bitswap/sendmultiple
Browse files Browse the repository at this point in the history
bitswap: Bitswap now sends multiple blocks per message
  • Loading branch information
Stebalien authored Oct 4, 2018
2 parents e21d842 + 409a3ec commit cbd7eb7
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 91 deletions.
2 changes: 1 addition & 1 deletion decision/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ func BenchmarkTaskQueuePush(b *testing.B) {
for i := 0; i < b.N; i++ {
c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))

q.Push(&wantlist.Entry{Cid: c, Priority: math.MaxInt32}, peers[i%len(peers)])
q.Push(peers[i%len(peers)], &wantlist.Entry{Cid: c, Priority: math.MaxInt32})
}
}
52 changes: 39 additions & 13 deletions decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,17 @@ var log = logging.Logger("engine")
const (
// outboxChanBuffer must be 0 to prevent stale messages from being sent
outboxChanBuffer = 0
// maxMessageSize is the maximum size of the batched payload
maxMessageSize = 512 * 1024
)

// Envelope contains a message for a Peer
type Envelope struct {
// Peer is the intended recipient
Peer peer.ID

// Block is the payload
Block blocks.Block
// Message is the payload
Message bsmsg.BitSwapMessage

// A callback to notify the decision queue that the task is complete
Sent func()
Expand Down Expand Up @@ -166,21 +168,28 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
}

// with a task in hand, we're ready to prepare the envelope...
msg := bsmsg.New(true)
for _, entry := range nextTask.Entries {
block, err := e.bs.Get(entry.Cid)
if err != nil {
log.Errorf("tried to execute a task and errored fetching block: %s", err)
continue
}
msg.AddBlock(block)
}

block, err := e.bs.Get(nextTask.Entry.Cid)
if err != nil {
log.Errorf("tried to execute a task and errored fetching block: %s", err)
if msg.Empty() {
// If we don't have the block, don't hold that against the peer
// make sure to update that the task has been 'completed'
nextTask.Done()
nextTask.Done(nextTask.Entries)
continue
}

return &Envelope{
Peer: nextTask.Target,
Block: block,
Peer: nextTask.Target,
Message: msg,
Sent: func() {
nextTask.Done()
nextTask.Done(nextTask.Entries)
select {
case e.workSignal <- struct{}{}:
// work completing may mean that our queue will provide new
Expand Down Expand Up @@ -231,6 +240,8 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
l.wantList = wl.New()
}

var msgSize int
var activeEntries []*wl.Entry
for _, entry := range m.Wantlist() {
if entry.Cancel {
log.Debugf("%s cancel %s", p, entry.Cid)
Expand All @@ -239,13 +250,28 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
} else {
log.Debugf("wants %s - %d", entry.Cid, entry.Priority)
l.Wants(entry.Cid, entry.Priority)
if exists, err := e.bs.Has(entry.Cid); err == nil && exists {
e.peerRequestQueue.Push(entry.Entry, p)
blockSize, err := e.bs.GetSize(entry.Cid)
if err != nil {
if err == bstore.ErrNotFound {
continue
}
log.Error(err)
} else {
// we have the block
newWorkExists = true
if msgSize + blockSize > maxMessageSize {
e.peerRequestQueue.Push(p, activeEntries...)
activeEntries = []*wl.Entry{}
msgSize = 0
}
activeEntries = append(activeEntries, entry.Entry)
msgSize += blockSize
}
}
}

if len(activeEntries) > 0 {
e.peerRequestQueue.Push(p, activeEntries...)
}
for _, block := range m.Blocks() {
log.Debugf("got block %s %d bytes", block, len(block.RawData()))
l.ReceivedBytes(len(block.RawData()))
Expand All @@ -259,7 +285,7 @@ func (e *Engine) addBlock(block blocks.Block) {
for _, l := range e.ledgerMap {
l.lk.Lock()
if entry, ok := l.WantListContains(block.Cid()); ok {
e.peerRequestQueue.Push(entry, l.Partner)
e.peerRequestQueue.Push(l.Partner, entry)
work = true
}
l.lk.Unlock()
Expand Down
54 changes: 41 additions & 13 deletions decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"math"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -139,6 +138,19 @@ func TestPartnerWantsThenCancels(t *testing.T) {
},
{
alphabet, stringsComplement(alphabet, vowels),
alphabet[1:25], stringsComplement(alphabet[1:25], vowels), alphabet[2:25], stringsComplement(alphabet[2:25], vowels),
alphabet[3:25], stringsComplement(alphabet[3:25], vowels), alphabet[4:25], stringsComplement(alphabet[4:25], vowels),
alphabet[5:25], stringsComplement(alphabet[5:25], vowels), alphabet[6:25], stringsComplement(alphabet[6:25], vowels),
alphabet[7:25], stringsComplement(alphabet[7:25], vowels), alphabet[8:25], stringsComplement(alphabet[8:25], vowels),
alphabet[9:25], stringsComplement(alphabet[9:25], vowels), alphabet[10:25], stringsComplement(alphabet[10:25], vowels),
alphabet[11:25], stringsComplement(alphabet[11:25], vowels), alphabet[12:25], stringsComplement(alphabet[12:25], vowels),
alphabet[13:25], stringsComplement(alphabet[13:25], vowels), alphabet[14:25], stringsComplement(alphabet[14:25], vowels),
alphabet[15:25], stringsComplement(alphabet[15:25], vowels), alphabet[16:25], stringsComplement(alphabet[16:25], vowels),
alphabet[17:25], stringsComplement(alphabet[17:25], vowels), alphabet[18:25], stringsComplement(alphabet[18:25], vowels),
alphabet[19:25], stringsComplement(alphabet[19:25], vowels), alphabet[20:25], stringsComplement(alphabet[20:25], vowels),
alphabet[21:25], stringsComplement(alphabet[21:25], vowels), alphabet[22:25], stringsComplement(alphabet[22:25], vowels),
alphabet[23:25], stringsComplement(alphabet[23:25], vowels), alphabet[24:25], stringsComplement(alphabet[24:25], vowels),
alphabet[25:25], stringsComplement(alphabet[25:25], vowels),
},
}

Expand All @@ -151,20 +163,22 @@ func TestPartnerWantsThenCancels(t *testing.T) {
}

for i := 0; i < numRounds; i++ {
expected := make([][]string, 0, len(testcases))
e := NewEngine(context.Background(), bs)
for _, testcase := range testcases {
set := testcase[0]
cancels := testcase[1]
keeps := stringsComplement(set, cancels)
expected = append(expected, keeps)

e := NewEngine(context.Background(), bs)
partner := testutil.RandPeerIDFatal(t)

partnerWants(e, set, partner)
partnerCancels(e, cancels, partner)
if err := checkHandledInOrder(t, e, keeps); err != nil {
t.Logf("run #%d of %d", i, numRounds)
t.Fatal(err)
}
}
if err := checkHandledInOrder(t, e, expected); err != nil {
t.Logf("run #%d of %d", i, numRounds)
t.Fatal(err)
}
}
}
Expand All @@ -173,7 +187,7 @@ func partnerWants(e *Engine, keys []string, partner peer.ID) {
add := message.New(false)
for i, letter := range keys {
block := blocks.NewBlock([]byte(letter))
add.AddEntry(block.Cid(), math.MaxInt32-i)
add.AddEntry(block.Cid(), len(keys)-i)
}
e.MessageReceived(partner, add)
}
Expand All @@ -187,14 +201,28 @@ func partnerCancels(e *Engine, keys []string, partner peer.ID) {
e.MessageReceived(partner, cancels)
}

func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
for _, k := range keys {
func checkHandledInOrder(t *testing.T, e *Engine, expected [][]string) error {
for _, keys := range expected {
next := <-e.Outbox()
envelope := <-next
received := envelope.Block
expected := blocks.NewBlock([]byte(k))
if !received.Cid().Equals(expected.Cid()) {
return errors.New(fmt.Sprintln("received", string(received.RawData()), "expected", string(expected.RawData())))
received := envelope.Message.Blocks()
// Verify payload message length
if len(received) != len(keys) {
return errors.New(fmt.Sprintln("# blocks received", len(received), "# blocks expected", len(keys)))
}
// Verify payload message contents
for _, k := range keys {
found := false
expected := blocks.NewBlock([]byte(k))
for _, block := range received {
if block.Cid().Equals(expected.Cid()) {
found = true
break
}
}
if !found {
return errors.New(fmt.Sprintln("received", received, "expected", string(expected.RawData())))
}
}
}
return nil
Expand Down
102 changes: 64 additions & 38 deletions decision/peer_request_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
type peerRequestQueue interface {
// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
Pop() *peerRequestTask
Push(entry *wantlist.Entry, to peer.ID)
Push(to peer.ID, entries ...*wantlist.Entry)
Remove(k cid.Cid, p peer.ID)

// NB: cannot expose simply expose taskQueue.Len because trashed elements
Expand Down Expand Up @@ -46,7 +46,7 @@ type prq struct {
}

// Push currently adds a new peerRequestTask to the end of the list
func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) {
func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
tl.lock.Lock()
defer tl.lock.Unlock()
partner, ok := tl.partners[to]
Expand All @@ -58,31 +58,49 @@ func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) {

partner.activelk.Lock()
defer partner.activelk.Unlock()
if partner.activeBlocks.Has(entry.Cid) {
return

var priority int
newEntries := make([]*wantlist.Entry, 0, len(entries))
for _, entry := range entries {
if partner.activeBlocks.Has(entry.Cid) {
continue
}
if task, ok := tl.taskMap[taskEntryKey(to, entry.Cid)]; ok {
if entry.Priority > task.Priority {
task.Priority = entry.Priority
partner.taskQueue.Update(task.index)
}
continue
}
if entry.Priority > priority {
priority = entry.Priority
}
newEntries = append(newEntries, entry)
}

if task, ok := tl.taskMap[taskKey(to, entry.Cid)]; ok {
task.Entry.Priority = entry.Priority
partner.taskQueue.Update(task.index)
if len(newEntries) == 0 {
return
}

task := &peerRequestTask{
Entry: entry,
Entries: newEntries,
Target: to,
created: time.Now(),
Done: func() {
Done: func(e []*wantlist.Entry) {
tl.lock.Lock()
partner.TaskDone(entry.Cid)
for _, entry := range e {
partner.TaskDone(entry.Cid)
}
tl.pQueue.Update(partner.Index())
tl.lock.Unlock()
},
}

task.Priority = priority
partner.taskQueue.Push(task)
tl.taskMap[task.Key()] = task
partner.requests++
for _, entry := range newEntries {
tl.taskMap[taskEntryKey(to, entry.Cid)] = task
}
partner.requests += len(newEntries)
tl.pQueue.Update(partner.Index())
}

Expand All @@ -98,14 +116,23 @@ func (tl *prq) Pop() *peerRequestTask {
var out *peerRequestTask
for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 {
out = partner.taskQueue.Pop().(*peerRequestTask)
delete(tl.taskMap, out.Key())
if out.trash {
out = nil
continue // discarding tasks that have been removed
}

partner.StartTask(out.Entry.Cid)
partner.requests--
newEntries := make([]*wantlist.Entry, 0, len(out.Entries))
for _, entry := range out.Entries {
delete(tl.taskMap, taskEntryKey(out.Target, entry.Cid))
if entry.Trash {
continue
}
partner.requests--
partner.StartTask(entry.Cid)
newEntries = append(newEntries, entry)
}
if len(newEntries) > 0 {
out.Entries = newEntries
} else {
out = nil // discarding tasks that have been removed
continue
}
break // and return |out|
}

Expand All @@ -116,12 +143,17 @@ func (tl *prq) Pop() *peerRequestTask {
// Remove removes a task from the queue
func (tl *prq) Remove(k cid.Cid, p peer.ID) {
tl.lock.Lock()
t, ok := tl.taskMap[taskKey(p, k)]
t, ok := tl.taskMap[taskEntryKey(p, k)]
if ok {
// remove the task "lazily"
// simply mark it as trash, so it'll be dropped when popped off the
// queue.
t.trash = true
for _, entry := range t.Entries {
if entry.Cid.Equals(k) {
// remove the task "lazily"
// simply mark it as trash, so it'll be dropped when popped off the
// queue.
entry.Trash = true
break
}
}

// having canceled a block, we now account for that in the given partner
partner := tl.partners[p]
Expand Down Expand Up @@ -166,24 +198,18 @@ func (tl *prq) thawRound() {
}

type peerRequestTask struct {
Entry *wantlist.Entry
Target peer.ID
Entries []*wantlist.Entry
Priority int
Target peer.ID

// A callback to signal that this task has been completed
Done func()
Done func([]*wantlist.Entry)

// trash in a book-keeping field
trash bool
// created marks the time that the task was added to the queue
created time.Time
index int // book-keeping field used by the pq container
}

// Key uniquely identifies a task.
func (t *peerRequestTask) Key() string {
return taskKey(t.Target, t.Entry.Cid)
}

// Index implements pq.Elem
func (t *peerRequestTask) Index() int {
return t.index
Expand All @@ -194,8 +220,8 @@ func (t *peerRequestTask) SetIndex(i int) {
t.index = i
}

// taskKey returns a key that uniquely identifies a task.
func taskKey(p peer.ID, k cid.Cid) string {
// taskEntryKey returns a key that uniquely identifies a task.
func taskEntryKey(p peer.ID, k cid.Cid) string {
return string(p) + k.KeyString()
}

Expand All @@ -208,7 +234,7 @@ var FIFO = func(a, b *peerRequestTask) bool {
// different peers, the oldest task is prioritized.
var V1 = func(a, b *peerRequestTask) bool {
if a.Target == b.Target {
return a.Entry.Priority > b.Entry.Priority
return a.Priority > b.Priority
}
return FIFO(a, b)
}
Expand Down
Loading

0 comments on commit cbd7eb7

Please sign in to comment.