Skip to content

Commit

Permalink
codec/proto: allow reuse of marshal byte buffers
Browse files Browse the repository at this point in the history
Performance benchmarks can be found below. Obviously, a 10KB request and
10KB response is tailored to showcase this improvement as this is where
codec buffer re-use shines, but I've run other benchmarks too (like
1-byte requests and responses) and there's no discernable impact on
performance.

To no one's surprise, the number of bytes allocated per operation goes
down by almost exactly 10 KB across 60k+ queries, which suggests
excellent buffer re-use. The number of allocations itself increases by
5-ish, but that's probably because of a few additional slice pointers
that we need to store; these are 8-byte allocations and should have
virtually no impact on the allocator and garbage collector.

We do not allow reuse of buffers when stat handlers or binlogs are
turned on. This is because those two may need access to the data and
payload even after the data has been written to the wire. In such cases,
we never return the data back to the pool.

streaming-networkMode_none-bufConn_false-keepalive_false-benchTime_1m0s-trace_false-latency_0s-kbps_0-MTU_0-maxConcurrentCalls_1-reqSize_10240B-respSize_10240B-compressor_off-channelz_false-preloader_false

               Title       Before        After Percentage
            TotalOps       370480       372395     0.52%
             SendOps            0            0      NaN%
             RecvOps            0            0      NaN%
            Bytes/op    116049.91    105488.90    -9.10%
           Allocs/op       111.59       118.27     6.27%
             ReqT/op 505828693.33 508443306.67     0.52%
            RespT/op 505828693.33 508443306.67     0.52%
            50th-Lat    142.553µs    143.951µs     0.98%
            90th-Lat    193.714µs     192.51µs    -0.62%
            99th-Lat    549.345µs    545.059µs    -0.78%
             Avg-Lat    161.506µs    160.635µs    -0.54%

Closes #2816
  • Loading branch information
Adhityaa Chandrasekar committed Nov 9, 2019
1 parent 51ac07f commit 13e18cb
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 61 deletions.
16 changes: 16 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ type baseCodec interface {
Unmarshal(data []byte, v interface{}) error
}

// A bufferedBaseCodec is exactly like a baseCodec, but also requires a
// ReturnBuffer method to be implemented. Once a Marshal caller is done with
// the returned byte buffer, they can choose to return it back to the encoding
// library for re-use using this method.
type bufferedBaseCodec interface {
baseCodec
// If implemented in a codec, this function may be called with the byte
// buffer returned by Marshal after gRPC is done with the buffer.
//
// gRPC will not call ReturnBuffer after it's done with the buffer if any of
// the following is true:
// 1. Stats handlers are used.
// 2. Binlogs are enabled.
ReturnBuffer(buf []byte)
}

var _ baseCodec = Codec(nil)
var _ baseCodec = encoding.Codec(nil)

Expand Down
16 changes: 16 additions & 0 deletions encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ type Codec interface {
Name() string
}

// A BufferedCodec is exactly like a Codec, but also requires a ReturnBuffer
// method to be implemented. Once a Marshal caller is done with the returned
// byte buffer, they can choose to return it back to the encoding library for
// re-use using this method.
type BufferedCodec interface {
Codec
// If implemented in a codec, this function may be called with the byte
// buffer returned by Marshal after gRPC is done with the buffer.
//
// gRPC will not call ReturnBuffer after it's done with the buffer if any of
// the following is true:
// 1. Stats handlers are used.
// 2. Binlogs are enabled.
ReturnBuffer(buf []byte)
}

var registeredCodecs = make(map[string]Codec)

// RegisterCodec registers the provided Codec for use with all gRPC clients and
Expand Down
68 changes: 35 additions & 33 deletions encoding/proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package proto

import (
"math"
"sync"

"github.com/golang/protobuf/proto"
Expand All @@ -38,29 +37,16 @@ func init() {
// codec is a Codec implementation with protobuf. It is the default codec for gRPC.
type codec struct{}

type cachedProtoBuffer struct {
lastMarshaledSize uint32
proto.Buffer
}

func capToMaxInt32(val int) uint32 {
if val > math.MaxInt32 {
return uint32(math.MaxInt32)
}
return uint32(val)
}

func marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) {
func marshal(v interface{}, pb *proto.Buffer) ([]byte, error) {
protoMsg := v.(proto.Message)
newSlice := make([]byte, 0, cb.lastMarshaledSize)
newSlice := returnBufferPool.Get().([]byte)

cb.SetBuf(newSlice)
cb.Reset()
if err := cb.Marshal(protoMsg); err != nil {
pb.SetBuf(newSlice)
pb.Reset()
if err := pb.Marshal(protoMsg); err != nil {
return nil, err
}
out := cb.Bytes()
cb.lastMarshaledSize = capToMaxInt32(len(out))
out := pb.Bytes()
return out, nil
}

Expand All @@ -70,12 +56,12 @@ func (codec) Marshal(v interface{}) ([]byte, error) {
return pm.Marshal()
}

cb := protoBufferPool.Get().(*cachedProtoBuffer)
out, err := marshal(v, cb)
pb := protoBufferPool.Get().(*proto.Buffer)
out, err := marshal(v, pb)

// put back buffer and lose the ref to the slice
cb.SetBuf(nil)
protoBufferPool.Put(cb)
pb.SetBuf(nil)
protoBufferPool.Put(pb)
return out, err
}

Expand All @@ -88,23 +74,39 @@ func (codec) Unmarshal(data []byte, v interface{}) error {
return pu.Unmarshal(data)
}

cb := protoBufferPool.Get().(*cachedProtoBuffer)
cb.SetBuf(data)
err := cb.Unmarshal(protoMsg)
cb.SetBuf(nil)
protoBufferPool.Put(cb)
pb := protoBufferPool.Get().(*proto.Buffer)
pb.SetBuf(data)
err := pb.Unmarshal(protoMsg)
pb.SetBuf(nil)
protoBufferPool.Put(pb)
return err
}

func (codec) ReturnBuffer(data []byte) {
// Make sure we set the length of the buffer to zero so that future appends
// will start from the zeroeth byte, not append to the previous, stale data.
//
// Apparently, sync.Pool with non-pointer objects (slices, in this case)
// causes small allocations because of how interface{} works under the hood.
// This isn't a problem for us, however, because we're more concerned with
// _how_ much that allocation is. Ideally, we'd be using bytes.Buffer as the
// Marshal return value to remove even that allocation, but we can't change
// the Marshal interface at this point.
returnBufferPool.Put(data[:0])
}

func (codec) Name() string {
return Name
}

var protoBufferPool = &sync.Pool{
New: func() interface{} {
return &cachedProtoBuffer{
Buffer: proto.Buffer{},
lastMarshaledSize: 16,
}
return &proto.Buffer{}
},
}

var returnBufferPool = &sync.Pool{
New: func() interface{} {
return make([]byte, 0, 16)
},
}
9 changes: 8 additions & 1 deletion internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type dataFrame struct {
d []byte
// onEachWrite is called every time
// a part of d is written out.
onEachWrite func()
onEachWrite func()
returnBuffer func()
}

func (*dataFrame) isTransportResponseFrame() bool { return false }
Expand Down Expand Up @@ -841,6 +842,9 @@ func (l *loopyWriter) processData() (bool, error) {
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
return false, err
}
if dataItem.returnBuffer != nil {
dataItem.returnBuffer()
}
str.itl.dequeue() // remove the empty data item from stream
if str.itl.isEmpty() {
str.state = empty
Expand Down Expand Up @@ -907,6 +911,9 @@ func (l *loopyWriter) processData() (bool, error) {

if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
str.itl.dequeue()
if dataItem.returnBuffer != nil {
dataItem.returnBuffer()
}
}
if str.itl.isEmpty() {
str.state = empty
Expand Down
5 changes: 3 additions & 2 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,8 +832,9 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
return errStreamDone
}
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
streamID: s.id,
endStream: opts.Last,
returnBuffer: opts.ReturnBuffer,
}
if hdr != nil || data != nil { // If it's not an empty data frame.
// Add some data to grpc message header so that we can equally
Expand Down
9 changes: 5 additions & 4 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,10 +908,11 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
hdr = append(hdr, data[:emptyLen]...)
data = data[emptyLen:]
df := &dataFrame{
streamID: s.id,
h: hdr,
d: data,
onEachWrite: t.setResetPingStrikes,
streamID: s.id,
h: hdr,
d: data,
onEachWrite: t.setResetPingStrikes,
returnBuffer: opts.ReturnBuffer,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
Expand Down
3 changes: 3 additions & 0 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ type Options struct {
// Last indicates whether this write is the last piece for
// this stream.
Last bool
// If non-nil, ReturnBuffer should be called to return some allocated buffer
// back to a sync pool.
ReturnBuffer func()
}

// CallHdr carries the information of a particular RPC.
Expand Down
15 changes: 12 additions & 3 deletions preloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import (
// This API is EXPERIMENTAL.
type PreparedMsg struct {
// Struct for preparing msg before sending them
encodedData []byte
hdr []byte
payload []byte
encodedData []byte
hdr []byte
payload []byte
returnBuffer func()
}

// Encode marshalls and compresses the message using the codec and compressor for the stream.
Expand All @@ -55,10 +56,18 @@ func (p *PreparedMsg) Encode(s Stream, msg interface{}) error {
return err
}
p.encodedData = data

if bcodec, ok := rpcInfo.preloaderInfo.codec.(bufferedBaseCodec); ok {
p.returnBuffer = func() {
bcodec.ReturnBuffer(data)
}
}

compData, err := compress(data, rpcInfo.preloaderInfo.cp, rpcInfo.preloaderInfo.comp)
if err != nil {
return err
}

p.hdr, p.payload = msgHeader(data, compData)
return nil
}
11 changes: 10 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,16 +842,25 @@ func (s *Server) incrCallsFailed() {
}

func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
codec := s.getCodec(stream.ContentSubtype())
data, err := encode(codec, msg)
if err != nil {
grpclog.Errorln("grpc: server failed to encode response: ", err)
return err
}

if bcodec, ok := codec.(bufferedBaseCodec); ok {
opts.ReturnBuffer = func() {
bcodec.ReturnBuffer(data)
}
}

compData, err := compress(data, cp, comp)
if err != nil {
grpclog.Errorln("grpc: server failed to compress response: ", err)
return err
}

hdr, payload := msgHeader(data, compData)
// TODO(dfawley): should we be checking len(data) instead?
if len(payload) > s.opts.maxSendMessageSize {
Expand Down
Loading

0 comments on commit 13e18cb

Please sign in to comment.