From d16d163deb7241894e9aebef61d76f5958c6e9a0 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Sun, 21 Jan 2018 14:37:28 -0800 Subject: [PATCH] use buffer pool for stream buffers 1. Reduces GC pressure when creating/destroying many streams. 2. Frees memory when not in use. This doesn't noticeably hurt throughput. --- package.json | 8 ++++++++ stream.go | 31 +++++++++---------------------- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/package.json b/package.json index 7148e6a..43380e1 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,14 @@ "gx": { "dvcsimport": "github.com/whyrusleeping/yamux" }, + "gxDependencies": [ + { + "author": "Stebalien", + "hash": "QmUQy76yspPa3fRyY3GzXFTg9n8JVwFru6ue3KFRt4MeTw", + "name": "go-buffer-pool", + "version": "0.1.1" + } + ], "gxVersion": "0.10.0", "language": "go", "license": "", diff --git a/stream.go b/stream.go index e922fe3..af6cf72 100644 --- a/stream.go +++ b/stream.go @@ -6,6 +6,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/libp2p/go-buffer-pool" ) type streamState int @@ -33,8 +35,8 @@ type Stream struct { state streamState stateLock sync.Mutex - recvBuf *bytes.Buffer recvLock sync.Mutex + recvBuf pool.Buffer controlHdr header controlErr chan error @@ -92,7 +94,7 @@ START: fallthrough case streamClosed: s.recvLock.Lock() - if s.recvBuf == nil || s.recvBuf.Len() == 0 { + if s.recvBuf.Len() == 0 { s.recvLock.Unlock() s.stateLock.Unlock() return 0, io.EOF @@ -106,7 +108,7 @@ START: // If there is no data available, block s.recvLock.Lock() - if s.recvBuf == nil || s.recvBuf.Len() == 0 { + if s.recvBuf.Len() == 0 { s.recvLock.Unlock() goto WAIT } @@ -239,12 +241,8 @@ func (s *Stream) sendWindowUpdate() error { // Determine the delta update max := s.session.config.MaxStreamWindowSize - var bufLen uint32 s.recvLock.Lock() - if s.recvBuf != nil { - bufLen = uint32(s.recvBuf.Len()) - } - delta := (max - bufLen) - s.recvWindow + delta := (max - uint32(s.recvBuf.Len())) - s.recvWindow // Determine the flags if any flags := s.sendFlags() @@ -446,12 +444,8 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error { return ErrRecvWindowExceeded } - if s.recvBuf == nil { - // Allocate the receive buffer just-in-time to fit the full data frame. - // This way we can read in the whole packet without further allocations. - s.recvBuf = bytes.NewBuffer(make([]byte, 0, length)) - } - if _, err := io.Copy(s.recvBuf, conn); err != nil { + s.recvBuf.Grow(int(length)) + if _, err := io.Copy(&s.recvBuf, conn); err != nil { s.session.logger.Printf("[ERR] yamux: Failed to read stream data: %v", err) s.recvLock.Unlock() return err @@ -489,13 +483,6 @@ func (s *Stream) SetWriteDeadline(t time.Time) error { return nil } -// Shrink is used to compact the amount of buffers utilized -// This is useful when using Yamux in a connection pool to reduce -// the idle memory utilization. +// Shrink is a no-op. The internal buffer automatically shrinks itself. func (s *Stream) Shrink() { - s.recvLock.Lock() - if s.recvBuf != nil && s.recvBuf.Len() == 0 { - s.recvBuf = nil - } - s.recvLock.Unlock() }