Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
bitswap: Bitswap now sends multiple blocks per message
Browse files Browse the repository at this point in the history
Updated PeerRequestTask to hold multiple wantlist.Entry(s). This allows Bitswap to send multiple blocks in bulk per a Peer's request. Also, added a metric for how many blocks to put in a given message. Currently: 512 * 1024 bytes. 

License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
  • Loading branch information
taylormike committed Aug 13, 2018
1 parent 4297480 commit d501754
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 61 deletions.
49 changes: 37 additions & 12 deletions decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,17 @@ var log = logging.Logger("engine")
const (
// outboxChanBuffer must be 0 to prevent stale messages from being sent
outboxChanBuffer = 0
// maxMessageSize is the maximum size of the batched payload
maxMessageSize = 512 * 1024
)

// Envelope contains a message for a Peer
type Envelope struct {
// Peer is the intended recipient
Peer peer.ID

// Block is the payload
Block blocks.Block
// Message is the payload
Message bsmsg.BitSwapMessage

// A callback to notify the decision queue that the task is complete
Sent func()
Expand Down Expand Up @@ -166,21 +168,28 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
}

// with a task in hand, we're ready to prepare the envelope...
msg := bsmsg.New(true)
for _, entry := range nextTask.Entries {
block, err := e.bs.Get(entry.Cid)
if err != nil {
log.Errorf("tried to execute a task and errored fetching block: %s", err)
continue
}
msg.AddBlock(block)
}

block, err := e.bs.Get(nextTask.Entry.Cid)
if err != nil {
log.Errorf("tried to execute a task and errored fetching block: %s", err)
if msg.Empty() {
// If we don't have the block, don't hold that against the peer
// make sure to update that the task has been 'completed'
nextTask.Done()
nextTask.Done(nextTask.Entries)
continue
}

return &Envelope{
Peer: nextTask.Target,
Block: block,
Peer: nextTask.Target,
Message: msg,
Sent: func() {
nextTask.Done()
nextTask.Done(nextTask.Entries)
select {
case e.workSignal <- struct{}{}:
// work completing may mean that our queue will provide new
Expand Down Expand Up @@ -231,6 +240,8 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
l.wantList = wl.New()
}

blockSize, msgSize := 0, 0
var activeEntries []*wl.Entry
for _, entry := range m.Wantlist() {
if entry.Cancel {
log.Debugf("%s cancel %s", p, entry.Cid)
Expand All @@ -240,12 +251,26 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
log.Debugf("wants %s - %d", entry.Cid, entry.Priority)
l.Wants(entry.Cid, entry.Priority)
if exists, err := e.bs.Has(entry.Cid); err == nil && exists {
e.peerRequestQueue.Push(entry.Entry, p)
newWorkExists = true
if blockSize, _ = e.bs.GetSize(entry.Cid); blockSize == -1 {
// Force Update Size Cache
e.bs.Get(entry.Cid)
// Retry
blockSize, _ = e.bs.GetSize(entry.Cid)
}
if msgSize+blockSize > maxMessageSize {
e.peerRequestQueue.Push(activeEntries, p)
activeEntries = []*wl.Entry{}
msgSize = 0
}
activeEntries = append(activeEntries, entry.Entry)
msgSize += blockSize
}
}
}

if len(activeEntries) > 0 {
e.peerRequestQueue.Push(activeEntries, p)
}
for _, block := range m.Blocks() {
log.Debugf("got block %s %d bytes", block, len(block.RawData()))
l.ReceivedBytes(len(block.RawData()))
Expand All @@ -259,7 +284,7 @@ func (e *Engine) addBlock(block blocks.Block) {
for _, l := range e.ledgerMap {
l.lk.Lock()
if entry, ok := l.WantListContains(block.Cid()); ok {
e.peerRequestQueue.Push(entry, l.Partner)
e.peerRequestQueue.Push([]*wl.Entry{entry}, l.Partner)
work = true
}
l.lk.Unlock()
Expand Down
100 changes: 68 additions & 32 deletions decision/peer_request_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
type peerRequestQueue interface {
// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
Pop() *peerRequestTask
Push(entry *wantlist.Entry, to peer.ID)
Push(entry []*wantlist.Entry, to peer.ID)
Remove(k *cid.Cid, p peer.ID)

// NB: cannot expose simply expose taskQueue.Len because trashed elements
Expand Down Expand Up @@ -46,7 +46,7 @@ type prq struct {
}

// Push currently adds a new peerRequestTask to the end of the list
func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) {
func (tl *prq) Push(entries []*wantlist.Entry, to peer.ID) {
tl.lock.Lock()
defer tl.lock.Unlock()
partner, ok := tl.partners[to]
Expand All @@ -58,31 +58,45 @@ func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) {

partner.activelk.Lock()
defer partner.activelk.Unlock()
if partner.activeBlocks.Has(entry.Cid) {
return

newEntries := make([]*wantlist.Entry, 0, len(entries))
for _, entry := range entries {
if partner.activeBlocks.Has(entry.Cid) {
continue
}
if task, ok := tl.taskMap[taskEntryKey(to, entry.Cid)]; ok {
if task.Priority > entry.Priority {
task.Priority = entry.Priority
partner.taskQueue.Update(task.index)
}
continue
}
newEntries = append(newEntries, entry)
}

if task, ok := tl.taskMap[taskKey(to, entry.Cid)]; ok {
task.Entry.Priority = entry.Priority
partner.taskQueue.Update(task.index)
if len(newEntries) == 0 {
return
}

task := &peerRequestTask{
Entry: entry,
Entries: newEntries,
Target: to,
created: time.Now(),
Done: func() {
Done: func(e []*wantlist.Entry) {
tl.lock.Lock()
partner.TaskDone(entry.Cid)
for _, entry := range e {
partner.TaskDone(entry.Cid)
}
tl.pQueue.Update(partner.Index())
tl.lock.Unlock()
},
}

partner.taskQueue.Push(task)
tl.taskMap[task.Key()] = task
partner.requests++
for _, entry := range newEntries {
tl.taskMap[taskEntryKey(to, entry.Cid)] = task
}
partner.requests += len(newEntries)
tl.pQueue.Update(partner.Index())
}

Expand All @@ -98,14 +112,23 @@ func (tl *prq) Pop() *peerRequestTask {
var out *peerRequestTask
for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 {
out = partner.taskQueue.Pop().(*peerRequestTask)
delete(tl.taskMap, out.Key())
if out.trash {
out = nil
continue // discarding tasks that have been removed
}

partner.StartTask(out.Entry.Cid)
partner.requests--
newEntries := make([]*wantlist.Entry, 0, len(out.Entries))
for _, entry := range out.Entries {
delete(tl.taskMap, taskEntryKey(out.Target, entry.Cid))
if entry.Trash {
continue
}
partner.requests--
partner.StartTask(entry.Cid)
newEntries = append(newEntries, entry)
}
if len(newEntries) > 0 {
out.Entries = newEntries
} else {
out = nil // discarding tasks that have been removed
continue
}
break // and return |out|
}

Expand All @@ -116,12 +139,17 @@ func (tl *prq) Pop() *peerRequestTask {
// Remove removes a task from the queue
func (tl *prq) Remove(k *cid.Cid, p peer.ID) {
tl.lock.Lock()
t, ok := tl.taskMap[taskKey(p, k)]
t, ok := tl.taskMap[taskEntryKey(p, k)]
if ok {
// remove the task "lazily"
// simply mark it as trash, so it'll be dropped when popped off the
// queue.
t.trash = true
for _, entry := range t.Entries {
if entry.Cid == k {
// remove the task "lazily"
// simply mark it as trash, so it'll be dropped when popped off the
// queue.
entry.Trash = true
break
}
}

// having canceled a block, we now account for that in the given partner
partner := tl.partners[p]
Expand Down Expand Up @@ -166,22 +194,21 @@ func (tl *prq) thawRound() {
}

type peerRequestTask struct {
Entry *wantlist.Entry
Target peer.ID
Entries []*wantlist.Entry
Priority int
Target peer.ID

// A callback to signal that this task has been completed
Done func()
Done func([]*wantlist.Entry)

// trash in a book-keeping field
trash bool
// created marks the time that the task was added to the queue
created time.Time
index int // book-keeping field used by the pq container
}

// Key uniquely identifies a task.
func (t *peerRequestTask) Key() string {
return taskKey(t.Target, t.Entry.Cid)
return taskKey(t.Target, t.Entries)
}

// Index implements pq.Elem
Expand All @@ -195,7 +222,16 @@ func (t *peerRequestTask) SetIndex(i int) {
}

// taskKey returns a key that uniquely identifies a task.
func taskKey(p peer.ID, k *cid.Cid) string {
func taskKey(p peer.ID, entries []*wantlist.Entry) string {
key := string(p)
for _, entry := range entries {
key += entry.Cid.KeyString()
}
return key
}

// taskEntryKey returns a key that uniquely identifies a task.
func taskEntryKey(p peer.ID, k *cid.Cid) string {
return string(p) + k.KeyString()
}

Expand All @@ -208,7 +244,7 @@ var FIFO = func(a, b *peerRequestTask) bool {
// different peers, the oldest task is prioritized.
var V1 = func(a, b *peerRequestTask) bool {
if a.Target == b.Target {
return a.Entry.Priority > b.Entry.Priority
return a.Priority > b.Priority
}
return FIFO(a, b)
}
Expand Down
2 changes: 2 additions & 0 deletions wantlist/wantlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type Entry struct {
Priority int

SesTrk map[uint64]struct{}
// Trash in a book-keeping field
Trash bool
}

// NewRefEntry creates a new reference tracked wantlist entry
Expand Down
14 changes: 9 additions & 5 deletions wantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,20 @@ func (pm *WantManager) ConnectedPeers() []peer.ID {
return <-resp
}

func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
func (pm *WantManager) SendBlocks(ctx context.Context, env *engine.Envelope) {
// Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack
defer env.Sent()

pm.sentHistogram.Observe(float64(len(env.Block.RawData())))

msgSize := 0
msg := bsmsg.New(false)
msg.AddBlock(env.Block)
log.Infof("Sending block %s to %s", env.Block, env.Peer)
for _, block := range env.Message.Blocks() {
msgSize += len(block.RawData())
msg.AddBlock(block)
log.Infof("Sending block %s to %s", block, env.Peer)
}

pm.sentHistogram.Observe(float64(msgSize))
err := pm.network.SendMessage(ctx, env.Peer, msg)
if err != nil {
log.Infof("sendblock error: %s", err)
Expand Down
27 changes: 15 additions & 12 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,27 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
if !ok {
continue
}
log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableF(func() map[string]interface{} {
return logging.LoggableMap{
"ID": id,
"Target": envelope.Peer.Pretty(),
"Block": envelope.Block.Cid().String(),
}
}))

// update the BS ledger to reflect sent message
// TODO: Should only track *useful* messages in ledger
outgoing := bsmsg.New(false)
outgoing.AddBlock(envelope.Block)
for _, block := range envelope.Message.Blocks() {
log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableF(func() map[string]interface{} {
return logging.LoggableMap{
"ID": id,
"Target": envelope.Peer.Pretty(),
"Block": block.Cid().String(),
}
}))
outgoing.AddBlock(block)
}
bs.engine.MessageSent(envelope.Peer, outgoing)

bs.wm.SendBlock(ctx, envelope)
bs.wm.SendBlocks(ctx, envelope)
bs.counterLk.Lock()
bs.counters.blocksSent++
bs.counters.dataSent += uint64(len(envelope.Block.RawData()))
for _, block := range envelope.Message.Blocks() {
bs.counters.blocksSent++
bs.counters.dataSent += uint64(len(block.RawData()))
}
bs.counterLk.Unlock()
case <-ctx.Done():
return
Expand Down

0 comments on commit d501754

Please sign in to comment.