Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec/proto: reuse of marshal byte buffers #3167

Merged
merged 8 commits into from
Dec 20, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Documentation/encoding.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ into bytes and vice-versa for the purposes of network transmission.
## Codecs (Serialization and Deserialization)

A `Codec` contains code to serialize a message into a byte slice (`Marshal`) and
deserialize a byte slice back into a message (`Unmarshal`). `Codec`s are
registered by name into a global registry maintained in the `encoding` package.
deserialize a byte slice back into a message (`Unmarshal`). Optionally, a
`ReturnBuffer` method to potentially reuse the byte slice returned by the
`Marshal` method may also be implemented; note that this is an experimental
feature with an API that is still in flux.

`Codec`s are registered by name into a global registry maintained in the
`encoding` package.

### Implementing a `Codec`

Expand Down
18 changes: 18 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,24 @@ type baseCodec interface {
Unmarshal(data []byte, v interface{}) error
}

// A bufferedBaseCodec is exactly like a baseCodec, but also requires a
// ReturnBuffer method to be implemented. Once a Marshal caller is done with
// the returned byte buffer, they can choose to return it back to the encoding
// library for re-use using this method.
//
// This API is EXPERIMENTAL.
adtac marked this conversation as resolved.
Show resolved Hide resolved
type bufferedBaseCodec interface {
baseCodec
adtac marked this conversation as resolved.
Show resolved Hide resolved
// If implemented in a codec, this function may be called with the byte
// buffer returned by Marshal after gRPC is done with the buffer.
//
// gRPC will not call ReturnBuffer after it's done with the buffer if any of
// the following is true:
// 1. Stats handlers are used.
// 2. Binlogs are enabled.
ReturnBuffer(buf []byte)
}

var _ baseCodec = Codec(nil)
var _ baseCodec = encoding.Codec(nil)

Expand Down
18 changes: 18 additions & 0 deletions encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,24 @@ type Codec interface {
Name() string
}

// A BufferedCodec is exactly like a Codec, but also requires a ReturnBuffer
// method to be implemented. Once a Marshal caller is done with the returned
// byte buffer, they can choose to return it back to the encoding library for
// re-use using this method.
//
// This API is EXPERIMENTAL.
type BufferedCodec interface {
adtac marked this conversation as resolved.
Show resolved Hide resolved
Codec
// If implemented in a codec, this function may be called with the byte
// buffer returned by Marshal after gRPC is done with the buffer.
//
// gRPC will not call ReturnBuffer after it's done with the buffer if any of
// the following is true:
// 1. Stats handlers are used.
// 2. Binlogs are enabled.
ReturnBuffer(buf []byte)
}

var registeredCodecs = make(map[string]Codec)

// RegisterCodec registers the provided Codec for use with all gRPC clients and
Expand Down
68 changes: 35 additions & 33 deletions encoding/proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package proto

import (
"math"
"sync"

"github.com/golang/protobuf/proto"
Expand All @@ -38,29 +37,16 @@ func init() {
// codec is a Codec implementation with protobuf. It is the default codec for gRPC.
type codec struct{}

type cachedProtoBuffer struct {
lastMarshaledSize uint32
proto.Buffer
}

func capToMaxInt32(val int) uint32 {
if val > math.MaxInt32 {
return uint32(math.MaxInt32)
}
return uint32(val)
}

func marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) {
func marshal(v interface{}, pb *proto.Buffer) ([]byte, error) {
protoMsg := v.(proto.Message)
newSlice := make([]byte, 0, cb.lastMarshaledSize)
newSlice := returnBufferPool.Get().([]byte)

cb.SetBuf(newSlice)
cb.Reset()
if err := cb.Marshal(protoMsg); err != nil {
pb.SetBuf(newSlice)
pb.Reset()
if err := pb.Marshal(protoMsg); err != nil {
return nil, err
}
out := cb.Bytes()
cb.lastMarshaledSize = capToMaxInt32(len(out))
out := pb.Bytes()
return out, nil
}

Expand All @@ -70,12 +56,12 @@ func (codec) Marshal(v interface{}) ([]byte, error) {
return pm.Marshal()
}

cb := protoBufferPool.Get().(*cachedProtoBuffer)
out, err := marshal(v, cb)
pb := protoBufferPool.Get().(*proto.Buffer)
out, err := marshal(v, pb)

// put back buffer and lose the ref to the slice
cb.SetBuf(nil)
protoBufferPool.Put(cb)
pb.SetBuf(nil)
protoBufferPool.Put(pb)
return out, err
}

Expand All @@ -88,23 +74,39 @@ func (codec) Unmarshal(data []byte, v interface{}) error {
return pu.Unmarshal(data)
}

cb := protoBufferPool.Get().(*cachedProtoBuffer)
cb.SetBuf(data)
err := cb.Unmarshal(protoMsg)
cb.SetBuf(nil)
protoBufferPool.Put(cb)
pb := protoBufferPool.Get().(*proto.Buffer)
pb.SetBuf(data)
err := pb.Unmarshal(protoMsg)
pb.SetBuf(nil)
protoBufferPool.Put(pb)
return err
}

func (codec) ReturnBuffer(data []byte) {
// Make sure we set the length of the buffer to zero so that future appends
// will start from the zeroeth byte, not append to the previous, stale data.
//
// Apparently, sync.Pool with non-pointer objects (slices, in this case)
dfawley marked this conversation as resolved.
Show resolved Hide resolved
// causes small allocations because of how interface{} works under the hood.
// This isn't a problem for us, however, because we're more concerned with
// _how_ much that allocation is. Ideally, we'd be using bytes.Buffer as the
// Marshal return value to remove even that allocation, but we can't change
// the Marshal interface at this point.
returnBufferPool.Put(data[:0])
}

func (codec) Name() string {
return Name
}

var protoBufferPool = &sync.Pool{
New: func() interface{} {
return &cachedProtoBuffer{
Buffer: proto.Buffer{},
lastMarshaledSize: 16,
}
return &proto.Buffer{}
},
}

var returnBufferPool = &sync.Pool{
New: func() interface{} {
return make([]byte, 0, 16)
},
}
56 changes: 56 additions & 0 deletions encoding/proto/proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package proto

import (
"bytes"
"fmt"
"sync"
"testing"

Expand Down Expand Up @@ -127,3 +128,58 @@ func TestStaggeredMarshalAndUnmarshalUsingSamePool(t *testing.T) {
}
}
}

func TestBufferReuse(t *testing.T) {
c := codec{}

var bptr string
marshal := func(toMarshal []byte) []byte {
protoIn := &codec_perf.Buffer{Body: toMarshal}
b, err := c.Marshal(protoIn)
if err != nil {
t.Errorf("codec.Marshal(%v) failed: %v", protoIn, err)
}
lbptr := fmt.Sprintf("%p", b)
if bptr == "" {
bptr = lbptr
} else if bptr != lbptr {
t.Errorf("expected the same buffer to be reused: lptr = %s, ptr = %s", lbptr, bptr)
}
bc := append([]byte(nil), b...)
c.ReturnBuffer(b)
return bc
}

unmarshal := func(b []byte) []byte {
protoOut := &codec_perf.Buffer{}
if err := c.Unmarshal(b, protoOut); err != nil {
t.Errorf("codec.Unarshal(%v) failed: %v", protoOut, err)
}
return protoOut.GetBody()
}

check := func(in []byte, out []byte) {
if len(in) != len(out) {
t.Errorf("unequal lengths: len(in=%v)=%d, len(out=%v)=%d", in, len(in), out, len(out))
}

for i := 0; i < len(in); i++ {
if in[i] != out[i] {
t.Errorf("unequal values: in[%d] = %v, out[%d] = %v", i, in[i], i, out[i])
}
}
}

// To test that the returned buffer does not have unexpected data at the end,
// we use a second input data that is smaller than the first.
in1 := []byte{1, 2, 3}
b1 := marshal(in1)
in2 := []byte{4, 5}
b2 := marshal(in2)

out1 := unmarshal(b1)
out2 := unmarshal(b2)

check(in1, out1)
check(in2, out2)
}
2 changes: 2 additions & 0 deletions internal/leakcheck/leakcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ var goroutinesToIgnore = []string{
"runtime_mcall",
"(*loggingT).flushDaemon",
"goroutine in C code",
"grpc.callReturnBuffers",
adtac marked this conversation as resolved.
Show resolved Hide resolved
"grpc.waitCallReturnBuffer",
}

// RegisterIgnoreGoroutine appends s into the ignore goroutine list. The
Expand Down
17 changes: 17 additions & 0 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type dataFrame struct {
// onEachWrite is called every time
// a part of d is written out.
onEachWrite func()
wg *sync.WaitGroup
}

func (*dataFrame) isTransportResponseFrame() bool { return false }
Expand Down Expand Up @@ -726,6 +727,16 @@ func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequ
func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
c.onWrite()
if str, ok := l.estdStreams[c.streamID]; ok {
// If the stream is active, allow the return buffer cleanup function to
// proceed because processData is not going to execute on this stream to
// call wg.Done() anymore.
if str.state == active {
for !str.itl.isEmpty() {
if dataItem, ok := str.itl.dequeue().(*dataFrame); ok && dataItem.wg != nil {
adtac marked this conversation as resolved.
Show resolved Hide resolved
dataItem.wg.Done()
}
}
}
// On the server side it could be a trailers-only response or
// a RST_STREAM before stream initialization thus the stream might
// not be established yet.
Expand Down Expand Up @@ -842,6 +853,9 @@ func (l *loopyWriter) processData() (bool, error) {
return false, err
}
str.itl.dequeue() // remove the empty data item from stream
if dataItem.wg != nil {
dataItem.wg.Done()
}
if str.itl.isEmpty() {
str.state = empty
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
Expand Down Expand Up @@ -907,6 +921,9 @@ func (l *loopyWriter) processData() (bool, error) {

if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
str.itl.dequeue()
if dataItem.wg != nil {
dataItem.wg.Done()
}
}
if str.itl.isEmpty() {
str.state = empty
Expand Down
4 changes: 4 additions & 0 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
wg: opts.ReturnBufferWaitGroup,
}
if hdr != nil || data != nil { // If it's not an empty data frame.
// Add some data to grpc message header so that we can equally
Expand All @@ -850,6 +851,9 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
return err
}
}
if df.wg != nil {
df.wg.Add(1)
}
return t.controlBuf.put(df)
}

Expand Down
4 changes: 4 additions & 0 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
h: hdr,
d: data,
onEachWrite: t.setResetPingStrikes,
wg: opts.ReturnBufferWaitGroup,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
Expand All @@ -921,6 +922,9 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
}
return ContextErr(s.ctx.Err())
}
if df.wg != nil {
df.wg.Add(1)
}
return t.controlBuf.put(df)
}

Expand Down
3 changes: 3 additions & 0 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ type Options struct {
// Last indicates whether this write is the last piece for
// this stream.
Last bool
// If non-nil, ReturnBufferWaitGroup.Done() should be called in order to
// return some allocated buffer back to a sync pool.
ReturnBufferWaitGroup *sync.WaitGroup
}

// CallHdr carries the information of a particular RPC.
Expand Down
15 changes: 12 additions & 3 deletions preloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import (
// This API is EXPERIMENTAL.
type PreparedMsg struct {
// Struct for preparing msg before sending them
encodedData []byte
hdr []byte
payload []byte
encodedData []byte
hdr []byte
payload []byte
returnBuffer func()
}

// Encode marshalls and compresses the message using the codec and compressor for the stream.
Expand All @@ -55,6 +56,14 @@ func (p *PreparedMsg) Encode(s Stream, msg interface{}) error {
return err
}
p.encodedData = data
if len(data) >= bufferReuseThreshold {
adtac marked this conversation as resolved.
Show resolved Hide resolved
if bcodec, ok := rpcInfo.preloaderInfo.codec.(bufferedBaseCodec); ok {
p.returnBuffer = func() {
adtac marked this conversation as resolved.
Show resolved Hide resolved
bcodec.ReturnBuffer(data)
}
}
}

compData, err := compress(data, rpcInfo.preloaderInfo.cp, rpcInfo.preloaderInfo.comp)
if err != nil {
return err
Expand Down
Loading