diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index edea20b9c..b885814f1 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -476,27 +476,43 @@ func (mq *MessageQueue) rebroadcastWantlist() { mq.rebroadcastIntervalLk.Unlock() // If some wants were transferred from the rebroadcast list - if mq.transferRebroadcastWants() { + if toRebroadcast := mq.transferRebroadcastWants(); toRebroadcast > 0 { // Send them out mq.sendMessage() + log.Infow("Rebroadcasting wants", "amount", toRebroadcast, "peer", mq.p) } } // Transfer wants from the rebroadcast lists into the pending lists. -func (mq *MessageQueue) transferRebroadcastWants() bool { +func (mq *MessageQueue) transferRebroadcastWants() int { mq.wllock.Lock() defer mq.wllock.Unlock() - // Check if there are any wants to rebroadcast - if mq.bcstWants.sent.Len() == 0 && mq.peerWants.sent.Len() == 0 { - return false + mq.rebroadcastIntervalLk.Lock() + rebroadcastInterval := mq.rebroadcastInterval + mq.rebroadcastIntervalLk.Unlock() + + now := time.Now() + var toRebroadcast int + for _, want := range mq.bcstWants.sent.Entries() { + sentAt, ok := mq.bcstWants.sentAt[want.Cid] + if ok && now.Sub(sentAt) >= rebroadcastInterval { + mq.bcstWants.pending.Add(want.Cid, want.Priority, want.WantType) + mq.bcstWants.sent.Remove(want.Cid) + toRebroadcast++ + } } - // Copy sent wants into pending wants lists - mq.bcstWants.pending.Absorb(mq.bcstWants.sent) - mq.peerWants.pending.Absorb(mq.peerWants.sent) + for _, want := range mq.peerWants.sent.Entries() { + sentAt, ok := mq.peerWants.sentAt[want.Cid] + if ok && now.Sub(sentAt) >= rebroadcastInterval { + mq.peerWants.pending.Add(want.Cid, want.Priority, want.WantType) + mq.peerWants.sent.Remove(want.Cid) + toRebroadcast++ + } + } - return true + return toRebroadcast } func (mq *MessageQueue) signalWorkReady() {