diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a62354e2..85176369f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ The following emojis are used to highlight certain changes: ### Changed +- `bitswap/server` minor memory use and performance improvements + ### Removed ### Fixed diff --git a/bitswap/client/internal/messagequeue/messagequeue_test.go b/bitswap/client/internal/messagequeue/messagequeue_test.go index 4d361c5d5..3f6a2f622 100644 --- a/bitswap/client/internal/messagequeue/messagequeue_test.go +++ b/bitswap/client/internal/messagequeue/messagequeue_test.go @@ -14,12 +14,13 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" pb "github.com/ipfs/boxo/bitswap/message/pb" bsnet "github.com/ipfs/boxo/bitswap/network" - "github.com/ipfs/boxo/internal/test" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) +const collectTimeout = 100 * time.Millisecond + type fakeMessageNetwork struct { connectError error messageSenderError error @@ -172,7 +173,7 @@ func TestStartupAndShutdown(t *testing.T) { messageQueue.Startup() messageQueue.AddBroadcastWantHaves(bcstwh) - messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, collectTimeout) if len(messages) != 1 { t.Fatal("wrong number of messages were sent for broadcast want-haves") } @@ -212,7 +213,7 @@ func TestSendingMessagesDeduped(t *testing.T) { messageQueue.Startup() messageQueue.AddWants(wantBlocks, wantHaves) messageQueue.AddWants(wantBlocks, wantHaves) - messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, collectTimeout) if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) { t.Fatal("Messages were not deduped") @@ -220,8 +221,6 @@ func TestSendingMessagesDeduped(t *testing.T) { } func TestSendingMessagesPartialDupe(t *testing.T) { - test.Flaky(t) - ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) resetChan := make(chan struct{}, 1) @@ -235,7 +234,7 @@ func TestSendingMessagesPartialDupe(t *testing.T) { messageQueue.Startup() messageQueue.AddWants(wantBlocks[:8], wantHaves[:8]) messageQueue.AddWants(wantBlocks[3:], wantHaves[3:]) - messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout) if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) { t.Fatal("messages were not correctly deduped") @@ -243,8 +242,6 @@ func TestSendingMessagesPartialDupe(t *testing.T) { } func TestSendingMessagesPriority(t *testing.T) { - test.Flaky(t) - ctx := context.Background() messagesSent := make(chan []bsmsg.Entry) resetChan := make(chan struct{}, 1) @@ -262,7 +259,7 @@ func TestSendingMessagesPriority(t *testing.T) { messageQueue.Startup() messageQueue.AddWants(wantBlocks1, wantHaves1) messageQueue.AddWants(wantBlocks2, wantHaves2) - messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout) if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) { t.Fatal("wrong number of wants") @@ -327,7 +324,7 @@ func TestCancelOverridesPendingWants(t *testing.T) { messageQueue.Startup() messageQueue.AddWants(wantBlocks, wantHaves) messageQueue.AddCancels(cancels) - messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, collectTimeout) if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks)-len(cancels) { t.Fatal("Wrong message count") @@ -351,7 +348,7 @@ func TestCancelOverridesPendingWants(t *testing.T) { // Cancel the remaining want-blocks and want-haves cancels = append(wantHaves, wantBlocks...) messageQueue.AddCancels(cancels) - messages = collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages = collectMessages(ctx, t, messagesSent, collectTimeout) // The remaining 2 cancels should be sent to the network as they are for // wants that were sent to the network @@ -379,7 +376,7 @@ func TestWantOverridesPendingCancels(t *testing.T) { // Add 1 want-block and 2 want-haves messageQueue.AddWants(wantBlocks, wantHaves) - messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, collectTimeout) if totalEntriesLength(messages) != len(wantBlocks)+len(wantHaves) { t.Fatal("Wrong message count", totalEntriesLength(messages)) } @@ -389,7 +386,7 @@ func TestWantOverridesPendingCancels(t *testing.T) { // Override one cancel with a want-block (before cancel is sent to network) messageQueue.AddWants(cids[:1], []cid.Cid{}) - messages = collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages = collectMessages(ctx, t, messagesSent, collectTimeout) if totalEntriesLength(messages) != 3 { t.Fatal("Wrong message count", totalEntriesLength(messages)) } @@ -531,7 +528,7 @@ func TestSendingLargeMessages(t *testing.T) { messageQueue.Startup() messageQueue.AddWants(wantBlocks, []cid.Cid{}) - messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout) // want-block has size 44, so with maxMsgSize 44 * 3 (3 want-blocks), then if // we send 10 want-blocks we should expect 4 messages: @@ -563,7 +560,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) { // Check broadcast want-haves bcwh := testutil.GenerateCids(10) messageQueue.AddBroadcastWantHaves(bcwh) - messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages := collectMessages(ctx, t, messagesSent, collectTimeout) if len(messages) != 1 { t.Fatal("wrong number of messages were sent", len(messages)) @@ -582,7 +579,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) { wbs := testutil.GenerateCids(10) whs := testutil.GenerateCids(10) messageQueue.AddWants(wbs, whs) - messages = collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + messages = collectMessages(ctx, t, messagesSent, collectTimeout) if len(messages) != 1 { t.Fatal("wrong number of messages were sent", len(messages)) @@ -612,7 +609,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) { wbs := testutil.GenerateCids(10) messageQueue.AddWants(wbs, nil) - collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + collectMessages(ctx, t, messagesSent, collectTimeout) // Check want-blocks are added to DontHaveTimeoutMgr if dhtm.pendingCount() != len(wbs) { @@ -621,7 +618,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) { cancelCount := 2 messageQueue.AddCancels(wbs[:cancelCount]) - collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + collectMessages(ctx, t, messagesSent, collectTimeout) // Check want-blocks are removed from DontHaveTimeoutMgr if dhtm.pendingCount() != len(wbs)-cancelCount { @@ -692,9 +689,9 @@ func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) { cids := testutil.GenerateCids(2) - // Add some wants and wait 10ms + // Add some wants and wait messageQueue.AddWants(cids, nil) - collectMessages(ctx, t, messagesSent, 100*time.Millisecond) + collectMessages(ctx, t, messagesSent, collectTimeout) // Receive a response for the wants messageQueue.ResponseReceived(cids) diff --git a/bitswap/message/message.go b/bitswap/message/message.go index 6b9d787e7..a0a45970b 100644 --- a/bitswap/message/message.go +++ b/bitswap/message/message.go @@ -182,15 +182,9 @@ func (m *impl) Clone() BitSwapMessage { // Reset the values in the message back to defaults, so it can be reused func (m *impl) Reset(full bool) { m.full = full - for k := range m.wantlist { - delete(m.wantlist, k) - } - for k := range m.blocks { - delete(m.blocks, k) - } - for k := range m.blockPresences { - delete(m.blockPresences, k) - } + clear(m.wantlist) + clear(m.blocks) + clear(m.blockPresences) m.pendingBytes = 0 } @@ -253,25 +247,31 @@ func (m *impl) Empty() bool { } func (m *impl) Wantlist() []Entry { - out := make([]Entry, 0, len(m.wantlist)) + out := make([]Entry, len(m.wantlist)) + var i int for _, e := range m.wantlist { - out = append(out, *e) + out[i] = *e + i++ } return out } func (m *impl) Blocks() []blocks.Block { - bs := make([]blocks.Block, 0, len(m.blocks)) + bs := make([]blocks.Block, len(m.blocks)) + var i int for _, block := range m.blocks { - bs = append(bs, block) + bs[i] = block + i++ } return bs } func (m *impl) BlockPresences() []BlockPresence { - bps := make([]BlockPresence, 0, len(m.blockPresences)) + bps := make([]BlockPresence, len(m.blockPresences)) + var i int for c, t := range m.blockPresences { - bps = append(bps, BlockPresence{c, t}) + bps[i] = BlockPresence{c, t} + i++ } return bps } diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 234c1c510..bc934db5a 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -668,25 +668,12 @@ func (e *Engine) Peers() []peer.ID { // MessageReceived is called when a message is received from a remote peer. // For each item in the wantlist, add a want-have or want-block entry to the -// request queue (this is later popped off by the workerTasks) -func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) (mustKillConnection bool) { - entries := m.Wantlist() - - if len(entries) > 0 { - log.Debugw("Bitswap engine <- msg", "local", e.self, "from", p, "entryCount", len(entries)) - for _, et := range entries { - if !et.Cancel { - if et.WantType == pb.Message_Wantlist_Have { - log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", et.Cid) - } else { - log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", et.Cid) - } - } - } - } - +// request queue (this is later popped off by the workerTasks). Returns true +// if the connection to the server must be closed. +func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) bool { if m.Empty() { log.Infof("received empty message from %s", p) + return false } newWorkExists := false @@ -696,9 +683,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap } }() - // Dispatch entries - wants, cancels := e.splitWantsCancels(entries) - wants, denials := e.splitWantsDenials(p, wants) + wants, cancels, denials := e.splitWantsCancelsDenials(p, m) // Get block sizes wantKs := cid.NewSet() @@ -708,7 +693,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys()) if err != nil { log.Info("aborting message processing", err) - return + return false } e.lock.Lock() @@ -745,26 +730,26 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap e.peerLedger.Wants(p, entry.Entry) filteredWants = append(filteredWants, entry) } - clear := wants[len(filteredWants):] - for i := range clear { - clear[i] = bsmsg.Entry{} // early GC - } + // Clear truncated entries - early GC. + clear(wants[len(filteredWants):]) + wants = filteredWants for _, entry := range cancels { - if entry.Cid.Prefix().MhType == mh.IDENTITY { + c := entry.Cid + if c.Prefix().MhType == mh.IDENTITY { // This is a truely broken client, let's kill the connection. e.lock.Unlock() log.Warnw("peer canceled an identity CID", "local", e.self, "remote", p) return true } - if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize { + if e.maxCidSize != 0 && uint(c.ByteLen()) > e.maxCidSize { // Ignore requests about CIDs that big. continue } - log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", entry.Cid) - if e.peerLedger.CancelWant(p, entry.Cid) { - e.peerRequestQueue.Remove(entry.Cid, p) + log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", c) + if e.peerLedger.CancelWant(p, c) { + e.peerRequestQueue.Remove(c, p) } } e.lock.Unlock() @@ -806,40 +791,40 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // For each want-have / want-block for _, entry := range wants { c := entry.Cid - blockSize, found := blockSizes[entry.Cid] + blockSize, found := blockSizes[c] // If the block was not found if !found { - log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave) + log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", c, "sendDontHave", entry.SendDontHave) sendDontHave(entry) - } else { - // The block was found, add it to the queue - newWorkExists = true + continue + } + // The block was found, add it to the queue + newWorkExists = true - isWantBlock := e.sendAsBlock(entry.WantType, blockSize) + isWantBlock := e.sendAsBlock(entry.WantType, blockSize) - log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", entry.Cid, "isWantBlock", isWantBlock) + log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", c, "isWantBlock", isWantBlock) - // entrySize is the amount of space the entry takes up in the - // message we send to the recipient. If we're sending a block, the - // entrySize is the size of the block. Otherwise it's the size of - // a block presence entry. - entrySize := blockSize - if !isWantBlock { - entrySize = bsmsg.BlockPresenceSize(c) - } - activeEntries = append(activeEntries, peertask.Task{ - Topic: c, - Priority: int(entry.Priority), - Work: entrySize, - Data: &taskData{ - BlockSize: blockSize, - HaveBlock: true, - IsWantBlock: isWantBlock, - SendDontHave: entry.SendDontHave, - }, - }) + // entrySize is the amount of space the entry takes up in the + // message we send to the recipient. If we're sending a block, the + // entrySize is the size of the block. Otherwise it's the size of + // a block presence entry. + entrySize := blockSize + if !isWantBlock { + entrySize = bsmsg.BlockPresenceSize(c) } + activeEntries = append(activeEntries, peertask.Task{ + Topic: c, + Priority: int(entry.Priority), + Work: entrySize, + Data: &taskData{ + BlockSize: blockSize, + HaveBlock: true, + IsWantBlock: isWantBlock, + SendDontHave: entry.SendDontHave, + }, + }) } // Push entries onto the request queue @@ -850,38 +835,46 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return false } -// Split the want-have / want-block entries from the cancel entries -func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) { - wants := make([]bsmsg.Entry, 0, len(es)) - cancels := make([]bsmsg.Entry, 0, len(es)) - for _, et := range es { +// Split the want-havek entries from the cancel and deny entries. +func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]bsmsg.Entry, []bsmsg.Entry, []bsmsg.Entry) { + entries := m.Wantlist() // creates copy; safe to modify + if len(entries) == 0 { + return nil, nil, nil + } + + log.Debugw("Bitswap engine <- msg", "local", e.self, "from", p, "entryCount", len(entries)) + + wants := entries[:0] // shift in-place + var cancels, denials []bsmsg.Entry + + for _, et := range entries { if et.Cancel { cancels = append(cancels, et) + continue + } + + if et.WantType == pb.Message_Wantlist_Have { + log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", et.Cid) } else { - wants = append(wants, et) + log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", et.Cid) } - } - return wants, cancels -} -// Split the want-have / want-block entries from the block that will be denied access -func (e *Engine) splitWantsDenials(p peer.ID, allWants []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) { - if e.peerBlockRequestFilter == nil { - return allWants, nil - } + if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, et.Cid) { + denials = append(denials, et) + continue + } - wants := make([]bsmsg.Entry, 0, len(allWants)) - denied := make([]bsmsg.Entry, 0, len(allWants)) + wants = append(wants, et) + } - for _, et := range allWants { - if e.peerBlockRequestFilter(p, et.Cid) { - wants = append(wants, et) - } else { - denied = append(denied, et) - } + if len(wants) == 0 { + wants = nil } - return wants, denied + // Clear truncated entries. + clear(entries[len(wants):]) + + return wants, cancels, denials } // ReceivedBlocks is called when new blocks are received from the network.