Skip to content

Commit

Permalink
fix(bitswap/client/msgq): prevent duplicate requests
Browse files Browse the repository at this point in the history
Previously, in-progress requests could be re-requested again during periodic rebroadcast.
The queue requests, and while awaiting response, the rebroadcast event happens.
Rebroadcast event changes previosly sent WANTs to pending and sends them again in a new message.

The solution here is to ensure WANT was in sent status for long enough, before bringing it back to pending.
This utilizes existing `sendAt` map which tracks when every CID was sent.
  • Loading branch information
Wondertan committed Oct 19, 2024
1 parent 19bcc75 commit 9020b71
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,27 +476,43 @@ func (mq *MessageQueue) rebroadcastWantlist() {
mq.rebroadcastIntervalLk.Unlock()

// If some wants were transferred from the rebroadcast list
if mq.transferRebroadcastWants() {
if toRebroadcast := mq.transferRebroadcastWants(); toRebroadcast > 0 {
// Send them out
mq.sendMessage()
log.Infow("Rebroadcasting wants", "amount", toRebroadcast, "peer", mq.p)
}
}

// Transfer wants from the rebroadcast lists into the pending lists.
func (mq *MessageQueue) transferRebroadcastWants() bool {
func (mq *MessageQueue) transferRebroadcastWants() int {
mq.wllock.Lock()
defer mq.wllock.Unlock()

// Check if there are any wants to rebroadcast
if mq.bcstWants.sent.Len() == 0 && mq.peerWants.sent.Len() == 0 {
return false
mq.rebroadcastIntervalLk.Lock()
rebroadcastInterval := mq.rebroadcastInterval
mq.rebroadcastIntervalLk.Unlock()

now := time.Now()
var toRebroadcast int
for _, want := range mq.bcstWants.sent.Entries() {
sentAt, ok := mq.bcstWants.sentAt[want.Cid]
if ok && now.Sub(sentAt) >= rebroadcastInterval {
mq.bcstWants.pending.Add(want.Cid, want.Priority, want.WantType)
mq.bcstWants.sent.Remove(want.Cid)
toRebroadcast++
}
}

// Copy sent wants into pending wants lists
mq.bcstWants.pending.Absorb(mq.bcstWants.sent)
mq.peerWants.pending.Absorb(mq.peerWants.sent)
for _, want := range mq.peerWants.sent.Entries() {
sentAt, ok := mq.peerWants.sentAt[want.Cid]
if ok && now.Sub(sentAt) >= rebroadcastInterval {
mq.peerWants.pending.Add(want.Cid, want.Priority, want.WantType)
mq.peerWants.sent.Remove(want.Cid)
toRebroadcast++
}
}

return true
return toRebroadcast
}

func (mq *MessageQueue) signalWorkReady() {
Expand Down

0 comments on commit 9020b71

Please sign in to comment.