From e06ac247eec2f6a98824a1fa3c27756ac86faa6c Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 13 Apr 2020 11:21:48 -0400 Subject: [PATCH 1/4] refactor: simplify messageQueue onSent --- internal/messagequeue/messagequeue.go | 34 ++++++++++----------------- 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index 4b3f090d..8b106b0d 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -113,8 +113,8 @@ func (r *recallWantlist) RemoveType(c cid.Cid, wtype pb.Message_Wantlist_WantTyp r.pending.RemoveType(c, wtype) } -// Sent moves the want from the pending to the sent list -func (r *recallWantlist) Sent(e bsmsg.Entry) { +// MarkSent moves the want from the pending to the sent list +func (r *recallWantlist) MarkSent(e wantlist.Entry) { r.pending.RemoveType(e.Cid, e.WantType) r.sent.Add(e.Cid, e.Priority, e.WantType) } @@ -566,6 +566,7 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap } // Add each regular want-have / want-block to the message + peerSentCount := 0 for i := 0; i < len(peerEntries) && msgSize < mq.maxMessageSize; i++ { e := peerEntries[i] // If the remote peer doesn't support HAVE / DONT_HAVE messages, @@ -575,9 +576,12 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap } else { msgSize += mq.msg.AddEntry(e.Cid, e.Priority, e.WantType, true) } + + peerSentCount++ } // Add each broadcast want-have to the message + bcstSentCount := 0 for i := 0; i < len(bcstEntries) && msgSize < mq.maxMessageSize; i++ { // Broadcast wants are sent as want-have wantType := pb.Message_Wantlist_Have @@ -590,39 +594,27 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap e := bcstEntries[i] msgSize += mq.msg.AddEntry(e.Cid, e.Priority, wantType, false) + + bcstSentCount++ } // Called when the message has been successfully sent. onMessageSent := func(wantlist []bsmsg.Entry) { - bcst := keysToSet(bcstEntries) - prws := keysToSet(peerEntries) - mq.wllock.Lock() defer mq.wllock.Unlock() // Move the keys from pending to sent - for _, e := range wantlist { - if _, ok := bcst[e.Cid]; ok { - mq.bcstWants.Sent(e) - } - if _, ok := prws[e.Cid]; ok { - mq.peerWants.Sent(e) - } + for i := 0; i < bcstSentCount; i++ { + mq.bcstWants.MarkSent(bcstEntries[i]) + } + for i := 0; i < peerSentCount; i++ { + mq.peerWants.MarkSent(peerEntries[i]) } } return mq.msg, onMessageSent } -// Convert wantlist entries into a set of cids -func keysToSet(wl []wantlist.Entry) map[cid.Cid]struct{} { - set := make(map[cid.Cid]struct{}, len(wl)) - for _, e := range wl { - set[e.Cid] = struct{}{} - } - return set -} - func (mq *MessageQueue) initializeSender() error { if mq.sender != nil { return nil From efd006e9a458492a18bae131fb88dc7c4d8c9f1a Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 13 Apr 2020 11:23:44 -0400 Subject: [PATCH 2/4] refactor: save some vars --- internal/messagequeue/messagequeue.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index 8b106b0d..4a16ee60 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -567,8 +567,8 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap // Add each regular want-have / want-block to the message peerSentCount := 0 - for i := 0; i < len(peerEntries) && msgSize < mq.maxMessageSize; i++ { - e := peerEntries[i] + for ; peerSentCount < len(peerEntries) && msgSize < mq.maxMessageSize; peerSentCount++ { + e := peerEntries[peerSentCount] // If the remote peer doesn't support HAVE / DONT_HAVE messages, // don't send want-haves (only send want-blocks) if !supportsHave && e.WantType == pb.Message_Wantlist_Have { @@ -576,13 +576,11 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap } else { msgSize += mq.msg.AddEntry(e.Cid, e.Priority, e.WantType, true) } - - peerSentCount++ } // Add each broadcast want-have to the message bcstSentCount := 0 - for i := 0; i < len(bcstEntries) && msgSize < mq.maxMessageSize; i++ { + for ; bcstSentCount < len(bcstEntries) && msgSize < mq.maxMessageSize; bcstSentCount++ { // Broadcast wants are sent as want-have wantType := pb.Message_Wantlist_Have @@ -592,10 +590,8 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap wantType = pb.Message_Wantlist_Block } - e := bcstEntries[i] + e := bcstEntries[bcstSentCount] msgSize += mq.msg.AddEntry(e.Cid, e.Priority, wantType, false) - - bcstSentCount++ } // Called when the message has been successfully sent. From 6c4126051520a3c3fcf460896200342cf1b7b96c Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 13 Apr 2020 11:26:20 -0400 Subject: [PATCH 3/4] refactor: remove unnecessary func param --- internal/messagequeue/messagequeue.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index 4a16ee60..ed43ec57 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -439,7 +439,7 @@ func (mq *MessageQueue) sendMessage() { for i := 0; i < maxRetries; i++ { if mq.attemptSendAndRecovery(message) { // We were able to send successfully. - onSent(wantlist) + onSent() mq.simulateDontHaveWithTimeout(wantlist) @@ -540,7 +540,7 @@ func (mq *MessageQueue) pendingWorkCount() int { } // Convert the lists of wants into a Bitswap message -func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func([]bsmsg.Entry)) { +func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) { mq.wllock.Lock() defer mq.wllock.Unlock() @@ -595,7 +595,7 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap } // Called when the message has been successfully sent. - onMessageSent := func(wantlist []bsmsg.Entry) { + onMessageSent := func() { mq.wllock.Lock() defer mq.wllock.Unlock() From b6a8a73a29063bd23a3dac7727a3b9bad6d7fe81 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 13 Apr 2020 12:08:26 -0400 Subject: [PATCH 4/4] fix: only mark sent wants as sent --- internal/messagequeue/messagequeue.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index ed43ec57..1a8c2d5a 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -566,15 +566,16 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap } // Add each regular want-have / want-block to the message - peerSentCount := 0 - for ; peerSentCount < len(peerEntries) && msgSize < mq.maxMessageSize; peerSentCount++ { - e := peerEntries[peerSentCount] + peerSent := make([]wantlist.Entry, 0, len(peerEntries)) + for i := 0; i < len(peerEntries) && msgSize < mq.maxMessageSize; i++ { + e := peerEntries[i] // If the remote peer doesn't support HAVE / DONT_HAVE messages, // don't send want-haves (only send want-blocks) if !supportsHave && e.WantType == pb.Message_Wantlist_Have { mq.peerWants.RemoveType(e.Cid, pb.Message_Wantlist_Have) } else { msgSize += mq.msg.AddEntry(e.Cid, e.Priority, e.WantType, true) + peerSent = append(peerSent, e) } } @@ -603,8 +604,8 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap for i := 0; i < bcstSentCount; i++ { mq.bcstWants.MarkSent(bcstEntries[i]) } - for i := 0; i < peerSentCount; i++ { - mq.peerWants.MarkSent(peerEntries[i]) + for _, e := range peerSent { + mq.peerWants.MarkSent(e) } }