From cb8e65a8ce5fd69f93aa0c7afd18674a3c9777a9 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 20 Feb 2019 17:29:22 -0800 Subject: [PATCH 1/2] nit: remove bsmsg.Entry redirection --- messagequeue/messagequeue.go | 6 +++--- peermanager/peermanager.go | 4 ++-- peermanager/peermanager_test.go | 6 +++--- testutil/testutil.go | 6 +++--- wantmanager/wantmanager.go | 8 ++++---- wantmanager/wantmanager_test.go | 2 +- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go index e9204652..3383e326 100644 --- a/messagequeue/messagequeue.go +++ b/messagequeue/messagequeue.go @@ -43,7 +43,7 @@ type MessageQueue struct { } type messageRequest struct { - entries []*bsmsg.Entry + entries []bsmsg.Entry ses uint64 } @@ -65,7 +65,7 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue { } // AddMessage adds new entries to an outgoing message for a given session. -func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) { +func (mq *MessageQueue) AddMessage(entries []bsmsg.Entry, ses uint64) { select { case mq.newRequests <- &messageRequest{entries, ses}: case <-mq.ctx.Done(): @@ -140,7 +140,7 @@ func (wr *wantlistRequest) handle(mq *MessageQueue) { } } -func (mq *MessageQueue) addEntries(entries []*bsmsg.Entry, ses uint64) { +func (mq *MessageQueue) addEntries(entries []bsmsg.Entry, ses uint64) { for _, e := range entries { if e.Cancel { if mq.wl.Remove(e.Cid, ses) { diff --git a/peermanager/peermanager.go b/peermanager/peermanager.go index b1b8ee9a..59e8ca3d 100644 --- a/peermanager/peermanager.go +++ b/peermanager/peermanager.go @@ -19,7 +19,7 @@ var ( // PeerQueue provides a queer of messages to be sent for a single peer. type PeerQueue interface { - AddMessage(entries []*bsmsg.Entry, ses uint64) + AddMessage(entries []bsmsg.Entry, ses uint64) Startup() AddWantlist(initialWants *wantlist.SessionTrackedWantlist) Shutdown() @@ -108,7 +108,7 @@ func (pm *PeerManager) Disconnected(p peer.ID) { // SendMessage is called to send a message to all or some peers in the pool; // if targets is nil, it sends to all. -func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) { +func (pm *PeerManager) SendMessage(entries []bsmsg.Entry, targets []peer.ID, from uint64) { if len(targets) == 0 { pm.peerQueuesLk.RLock() for _, p := range pm.peerQueues { diff --git a/peermanager/peermanager_test.go b/peermanager/peermanager_test.go index 1d56d042..0505f973 100644 --- a/peermanager/peermanager_test.go +++ b/peermanager/peermanager_test.go @@ -15,7 +15,7 @@ import ( type messageSent struct { p peer.ID - entries []*bsmsg.Entry + entries []bsmsg.Entry ses uint64 } @@ -27,7 +27,7 @@ type fakePeer struct { func (fp *fakePeer) Startup() {} func (fp *fakePeer) Shutdown() {} -func (fp *fakePeer) AddMessage(entries []*bsmsg.Entry, ses uint64) { +func (fp *fakePeer) AddMessage(entries []bsmsg.Entry, ses uint64) { fp.messagesSent <- messageSent{fp.p, entries, ses} } func (fp *fakePeer) AddWantlist(initialWants *wantlist.SessionTrackedWantlist) {} @@ -44,7 +44,7 @@ func collectAndCheckMessages( ctx context.Context, t *testing.T, messagesSent <-chan messageSent, - entries []*bsmsg.Entry, + entries []bsmsg.Entry, ses uint64, timeout time.Duration) []peer.ID { var peersReceived []peer.ID diff --git a/testutil/testutil.go b/testutil/testutil.go index 05fd152b..87bd91d2 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -50,11 +50,11 @@ func GenerateWantlist(n int, ses uint64) *wantlist.SessionTrackedWantlist { } // GenerateMessageEntries makes fake bitswap message entries. -func GenerateMessageEntries(n int, isCancel bool) []*bsmsg.Entry { - bsmsgs := make([]*bsmsg.Entry, 0, n) +func GenerateMessageEntries(n int, isCancel bool) []bsmsg.Entry { + bsmsgs := make([]bsmsg.Entry, 0, n) for i := 0; i < n; i++ { prioritySeq++ - msg := &bsmsg.Entry{ + msg := bsmsg.Entry{ Entry: wantlist.NewRefEntry(blockGenerator.Next().Cid(), prioritySeq), Cancel: isCancel, } diff --git a/wantmanager/wantmanager.go b/wantmanager/wantmanager.go index bf5db3c4..0fd7d5a1 100644 --- a/wantmanager/wantmanager.go +++ b/wantmanager/wantmanager.go @@ -25,7 +25,7 @@ const ( type PeerHandler interface { Disconnected(p peer.ID) Connected(p peer.ID, initialWants *wantlist.SessionTrackedWantlist) - SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) + SendMessage(entries []bsmsg.Entry, targets []peer.ID, from uint64) } type wantMessage interface { @@ -187,9 +187,9 @@ func (wm *WantManager) run() { } func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []peer.ID, cancel bool, ses uint64) { - entries := make([]*bsmsg.Entry, 0, len(ks)) + entries := make([]bsmsg.Entry, 0, len(ks)) for i, k := range ks { - entries = append(entries, &bsmsg.Entry{ + entries = append(entries, bsmsg.Entry{ Cancel: cancel, Entry: wantlist.NewRefEntry(k, maxPriority-i), }) @@ -202,7 +202,7 @@ func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []p } type wantSet struct { - entries []*bsmsg.Entry + entries []bsmsg.Entry targets []peer.ID from uint64 } diff --git a/wantmanager/wantmanager_test.go b/wantmanager/wantmanager_test.go index 4cb05ac0..3b9d0cb1 100644 --- a/wantmanager/wantmanager_test.go +++ b/wantmanager/wantmanager_test.go @@ -19,7 +19,7 @@ type fakePeerHandler struct { lastWantSet wantSet } -func (fph *fakePeerHandler) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) { +func (fph *fakePeerHandler) SendMessage(entries []bsmsg.Entry, targets []peer.ID, from uint64) { fph.lk.Lock() fph.lastWantSet = wantSet{entries, targets, from} fph.lk.Unlock() From 8d357ff2fde61213129ba28e048e197ab5a7b108 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 20 Feb 2019 17:45:45 -0800 Subject: [PATCH 2/2] feat(messagequeue): use a buffer pool --- messagequeue/messagequeue.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go index 3383e326..405daf39 100644 --- a/messagequeue/messagequeue.go +++ b/messagequeue/messagequeue.go @@ -2,6 +2,7 @@ package messagequeue import ( "context" + "sync" "time" bsmsg "github.com/ipfs/go-bitswap/message" @@ -67,7 +68,7 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue { // AddMessage adds new entries to an outgoing message for a given session. func (mq *MessageQueue) AddMessage(entries []bsmsg.Entry, ses uint64) { select { - case mq.newRequests <- &messageRequest{entries, ses}: + case mq.newRequests <- newMessageRequest(entries, ses): case <-mq.ctx.Done(): } } @@ -123,8 +124,28 @@ func (mq *MessageQueue) runQueue() { } } +// We allocate a bunch of these so use a pool. +var messageRequestPool = sync.Pool{ + New: func() interface{} { + return new(messageRequest) + }, +} + +func newMessageRequest(entries []bsmsg.Entry, session uint64) *messageRequest { + mr := messageRequestPool.Get().(*messageRequest) + mr.entries = entries + mr.ses = session + return mr +} + +func returnMessageRequest(mr *messageRequest) { + *mr = messageRequest{} + messageRequestPool.Put(mr) +} + func (mr *messageRequest) handle(mq *MessageQueue) { mq.addEntries(mr.entries, mr.ses) + returnMessageRequest(mr) } func (wr *wantlistRequest) handle(mq *MessageQueue) {