diff --git a/decision/bench_test.go b/decision/bench_test.go index dc3aea06..46d40ce0 100644 --- a/decision/bench_test.go +++ b/decision/bench_test.go @@ -25,6 +25,6 @@ func BenchmarkTaskQueuePush(b *testing.B) { for i := 0; i < b.N; i++ { c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i)))) - q.Push(&wantlist.Entry{Cid: c, Priority: math.MaxInt32}, peers[i%len(peers)]) + q.Push(peers[i%len(peers)], &wantlist.Entry{Cid: c, Priority: math.MaxInt32}) } } diff --git a/decision/engine.go b/decision/engine.go index 736e5d46..8126555f 100644 --- a/decision/engine.go +++ b/decision/engine.go @@ -52,6 +52,8 @@ var log = logging.Logger("engine") const ( // outboxChanBuffer must be 0 to prevent stale messages from being sent outboxChanBuffer = 0 + // maxMessageSize is the maximum size of the batched payload + maxMessageSize = 512 * 1024 ) // Envelope contains a message for a Peer @@ -59,8 +61,8 @@ type Envelope struct { // Peer is the intended recipient Peer peer.ID - // Block is the payload - Block blocks.Block + // Message is the payload + Message bsmsg.BitSwapMessage // A callback to notify the decision queue that the task is complete Sent func() @@ -166,21 +168,28 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { } // with a task in hand, we're ready to prepare the envelope... + msg := bsmsg.New(true) + for _, entry := range nextTask.Entries { + block, err := e.bs.Get(entry.Cid) + if err != nil { + log.Errorf("tried to execute a task and errored fetching block: %s", err) + continue + } + msg.AddBlock(block) + } - block, err := e.bs.Get(nextTask.Entry.Cid) - if err != nil { - log.Errorf("tried to execute a task and errored fetching block: %s", err) + if msg.Empty() { // If we don't have the block, don't hold that against the peer // make sure to update that the task has been 'completed' - nextTask.Done() + nextTask.Done(nextTask.Entries) continue } return &Envelope{ - Peer: nextTask.Target, - Block: block, + Peer: nextTask.Target, + Message: msg, Sent: func() { - nextTask.Done() + nextTask.Done(nextTask.Entries) select { case e.workSignal <- struct{}{}: // work completing may mean that our queue will provide new @@ -231,6 +240,8 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { l.wantList = wl.New() } + var blockSize, msgSize int + var activeEntries []*wl.Entry for _, entry := range m.Wantlist() { if entry.Cancel { log.Debugf("%s cancel %s", p, entry.Cid) @@ -240,12 +251,24 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { log.Debugf("wants %s - %d", entry.Cid, entry.Priority) l.Wants(entry.Cid, entry.Priority) if exists, err := e.bs.Has(entry.Cid); err == nil && exists { - e.peerRequestQueue.Push(entry.Entry, p) newWorkExists = true + if blockSize, err = e.bs.GetSize(entry.Cid); err != nil { + log.Error(err) + continue + } + if msgSize + blockSize > maxMessageSize { + e.peerRequestQueue.Push(p, activeEntries...) + activeEntries = []*wl.Entry{} + msgSize = 0 + } + activeEntries = append(activeEntries, entry.Entry) + msgSize += blockSize } } } - + if len(activeEntries) > 0 { + e.peerRequestQueue.Push(p, activeEntries...) + } for _, block := range m.Blocks() { log.Debugf("got block %s %d bytes", block, len(block.RawData())) l.ReceivedBytes(len(block.RawData())) @@ -259,7 +282,7 @@ func (e *Engine) addBlock(block blocks.Block) { for _, l := range e.ledgerMap { l.lk.Lock() if entry, ok := l.WantListContains(block.Cid()); ok { - e.peerRequestQueue.Push(entry, l.Partner) + e.peerRequestQueue.Push(l.Partner, entry) work = true } l.lk.Unlock() diff --git a/decision/engine_test.go b/decision/engine_test.go index ed7d1055..54dd935c 100644 --- a/decision/engine_test.go +++ b/decision/engine_test.go @@ -191,7 +191,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error { for _, k := range keys { next := <-e.Outbox() envelope := <-next - received := envelope.Block + received := envelope.Message.Blocks()[0] expected := blocks.NewBlock([]byte(k)) if !received.Cid().Equals(expected.Cid()) { return errors.New(fmt.Sprintln("received", string(received.RawData()), "expected", string(expected.RawData()))) diff --git a/decision/peer_request_queue.go b/decision/peer_request_queue.go index b9e34763..2b81e610 100644 --- a/decision/peer_request_queue.go +++ b/decision/peer_request_queue.go @@ -14,7 +14,7 @@ import ( type peerRequestQueue interface { // Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty. Pop() *peerRequestTask - Push(entry *wantlist.Entry, to peer.ID) + Push(to peer.ID, entries ...*wantlist.Entry) Remove(k *cid.Cid, p peer.ID) // NB: cannot expose simply expose taskQueue.Len because trashed elements @@ -46,7 +46,7 @@ type prq struct { } // Push currently adds a new peerRequestTask to the end of the list -func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) { +func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) { tl.lock.Lock() defer tl.lock.Unlock() partner, ok := tl.partners[to] @@ -58,31 +58,45 @@ func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) { partner.activelk.Lock() defer partner.activelk.Unlock() - if partner.activeBlocks.Has(entry.Cid) { - return + + newEntries := make([]*wantlist.Entry, 0, len(entries)) + for _, entry := range entries { + if partner.activeBlocks.Has(entry.Cid) { + continue + } + if task, ok := tl.taskMap[taskEntryKey(to, entry.Cid)]; ok { + if task.Priority > entry.Priority { + task.Priority = entry.Priority + partner.taskQueue.Update(task.index) + } + continue + } + newEntries = append(newEntries, entry) } - if task, ok := tl.taskMap[taskKey(to, entry.Cid)]; ok { - task.Entry.Priority = entry.Priority - partner.taskQueue.Update(task.index) + if len(newEntries) == 0 { return } task := &peerRequestTask{ - Entry: entry, + Entries: newEntries, Target: to, created: time.Now(), - Done: func() { + Done: func(e []*wantlist.Entry) { tl.lock.Lock() - partner.TaskDone(entry.Cid) + for _, entry := range e { + partner.TaskDone(entry.Cid) + } tl.pQueue.Update(partner.Index()) tl.lock.Unlock() }, } partner.taskQueue.Push(task) - tl.taskMap[task.Key()] = task - partner.requests++ + for _, entry := range newEntries { + tl.taskMap[taskEntryKey(to, entry.Cid)] = task + } + partner.requests += len(newEntries) tl.pQueue.Update(partner.Index()) } @@ -98,14 +112,23 @@ func (tl *prq) Pop() *peerRequestTask { var out *peerRequestTask for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 { out = partner.taskQueue.Pop().(*peerRequestTask) - delete(tl.taskMap, out.Key()) - if out.trash { - out = nil - continue // discarding tasks that have been removed - } - partner.StartTask(out.Entry.Cid) - partner.requests-- + newEntries := make([]*wantlist.Entry, 0, len(out.Entries)) + for _, entry := range out.Entries { + delete(tl.taskMap, taskEntryKey(out.Target, entry.Cid)) + if entry.Trash { + continue + } + partner.requests-- + partner.StartTask(entry.Cid) + newEntries = append(newEntries, entry) + } + if len(newEntries) > 0 { + out.Entries = newEntries + } else { + out = nil // discarding tasks that have been removed + continue + } break // and return |out| } @@ -116,12 +139,17 @@ func (tl *prq) Pop() *peerRequestTask { // Remove removes a task from the queue func (tl *prq) Remove(k *cid.Cid, p peer.ID) { tl.lock.Lock() - t, ok := tl.taskMap[taskKey(p, k)] + t, ok := tl.taskMap[taskEntryKey(p, k)] if ok { - // remove the task "lazily" - // simply mark it as trash, so it'll be dropped when popped off the - // queue. - t.trash = true + for _, entry := range t.Entries { + if entry.Cid == k { + // remove the task "lazily" + // simply mark it as trash, so it'll be dropped when popped off the + // queue. + entry.Trash = true + break + } + } // having canceled a block, we now account for that in the given partner partner := tl.partners[p] @@ -166,14 +194,13 @@ func (tl *prq) thawRound() { } type peerRequestTask struct { - Entry *wantlist.Entry - Target peer.ID + Entries []*wantlist.Entry + Priority int + Target peer.ID // A callback to signal that this task has been completed - Done func() + Done func([]*wantlist.Entry) - // trash in a book-keeping field - trash bool // created marks the time that the task was added to the queue created time.Time index int // book-keeping field used by the pq container @@ -181,7 +208,7 @@ type peerRequestTask struct { // Key uniquely identifies a task. func (t *peerRequestTask) Key() string { - return taskKey(t.Target, t.Entry.Cid) + return taskKey(t.Target, t.Entries) } // Index implements pq.Elem @@ -195,7 +222,16 @@ func (t *peerRequestTask) SetIndex(i int) { } // taskKey returns a key that uniquely identifies a task. -func taskKey(p peer.ID, k *cid.Cid) string { +func taskKey(p peer.ID, entries []*wantlist.Entry) string { + key := string(p) + for _, entry := range entries { + key += entry.Cid.KeyString() + } + return key +} + +// taskEntryKey returns a key that uniquely identifies a task. +func taskEntryKey(p peer.ID, k *cid.Cid) string { return string(p) + k.KeyString() } @@ -208,7 +244,7 @@ var FIFO = func(a, b *peerRequestTask) bool { // different peers, the oldest task is prioritized. var V1 = func(a, b *peerRequestTask) bool { if a.Target == b.Target { - return a.Entry.Priority > b.Entry.Priority + return a.Priority > b.Priority } return FIFO(a, b) } diff --git a/decision/peer_request_queue_test.go b/decision/peer_request_queue_test.go index 32e93a27..d6ad8989 100644 --- a/decision/peer_request_queue_test.go +++ b/decision/peer_request_queue_test.go @@ -45,7 +45,7 @@ func TestPushPop(t *testing.T) { t.Log(partner.String()) c := cid.NewCidV0(u.Hash([]byte(letter))) - prq.Push(&wantlist.Entry{Cid: c, Priority: math.MaxInt32 - index}, partner) + prq.Push(partner, &wantlist.Entry{Cid: c, Priority: math.MaxInt32 - index}) } for _, consonant := range consonants { c := cid.NewCidV0(u.Hash([]byte(consonant))) @@ -61,7 +61,9 @@ func TestPushPop(t *testing.T) { break } - out = append(out, received.Entry.Cid.String()) + for _, entry := range received.Entries { + out = append(out, entry.Cid.String()) + } } // Entries popped should already be in correct order @@ -85,10 +87,10 @@ func TestPeerRepeats(t *testing.T) { for i := 0; i < 5; i++ { elcid := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i)))) - prq.Push(&wantlist.Entry{Cid: elcid}, a) - prq.Push(&wantlist.Entry{Cid: elcid}, b) - prq.Push(&wantlist.Entry{Cid: elcid}, c) - prq.Push(&wantlist.Entry{Cid: elcid}, d) + prq.Push(a, &wantlist.Entry{Cid: elcid}) + prq.Push(b, &wantlist.Entry{Cid: elcid}) + prq.Push(c, &wantlist.Entry{Cid: elcid}) + prq.Push(d, &wantlist.Entry{Cid: elcid}) } // now, pop off four entries, there should be one from each @@ -117,7 +119,7 @@ func TestPeerRepeats(t *testing.T) { for blockI := 0; blockI < 4; blockI++ { for i := 0; i < 4; i++ { // its okay to mark the same task done multiple times here (JUST FOR TESTING) - tasks[i].Done() + tasks[i].Done(tasks[i].Entries) ntask := prq.Pop() if ntask.Target != tasks[i].Target { diff --git a/wantlist/wantlist.go b/wantlist/wantlist.go index beb4ac75..a7b02fcc 100644 --- a/wantlist/wantlist.go +++ b/wantlist/wantlist.go @@ -24,6 +24,8 @@ type Entry struct { Priority int SesTrk map[uint64]struct{} + // Trash in a book-keeping field + Trash bool } // NewRefEntry creates a new reference tracked wantlist entry diff --git a/wantmanager.go b/wantmanager.go index 380d8538..7db5aaf3 100644 --- a/wantmanager.go +++ b/wantmanager.go @@ -114,16 +114,20 @@ func (pm *WantManager) ConnectedPeers() []peer.ID { return <-resp } -func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { +func (pm *WantManager) SendBlocks(ctx context.Context, env *engine.Envelope) { // Blocks need to be sent synchronously to maintain proper backpressure // throughout the network stack defer env.Sent() - pm.sentHistogram.Observe(float64(len(env.Block.RawData()))) - + msgSize := 0 msg := bsmsg.New(false) - msg.AddBlock(env.Block) - log.Infof("Sending block %s to %s", env.Block, env.Peer) + for _, block := range env.Message.Blocks() { + msgSize += len(block.RawData()) + msg.AddBlock(block) + log.Infof("Sending block %s to %s", block, env.Peer) + } + + pm.sentHistogram.Observe(float64(msgSize)) err := pm.network.SendMessage(ctx, env.Peer, msg) if err != nil { log.Infof("sendblock error: %s", err) diff --git a/workers.go b/workers.go index 8f5e6edd..e1569288 100644 --- a/workers.go +++ b/workers.go @@ -59,24 +59,27 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { if !ok { continue } - log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableF(func() map[string]interface{} { - return logging.LoggableMap{ - "ID": id, - "Target": envelope.Peer.Pretty(), - "Block": envelope.Block.Cid().String(), - } - })) - // update the BS ledger to reflect sent message // TODO: Should only track *useful* messages in ledger outgoing := bsmsg.New(false) - outgoing.AddBlock(envelope.Block) + for _, block := range envelope.Message.Blocks() { + log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableF(func() map[string]interface{} { + return logging.LoggableMap{ + "ID": id, + "Target": envelope.Peer.Pretty(), + "Block": block.Cid().String(), + } + })) + outgoing.AddBlock(block) + } bs.engine.MessageSent(envelope.Peer, outgoing) - bs.wm.SendBlock(ctx, envelope) + bs.wm.SendBlocks(ctx, envelope) bs.counterLk.Lock() - bs.counters.blocksSent++ - bs.counters.dataSent += uint64(len(envelope.Block.RawData())) + for _, block := range envelope.Message.Blocks() { + bs.counters.blocksSent++ + bs.counters.dataSent += uint64(len(block.RawData())) + } bs.counterLk.Unlock() case <-ctx.Done(): return