Skip to content

Commit

Permalink
Disable I/O buffer with zero values.
Browse files Browse the repository at this point in the history
  • Loading branch information
MakMukhi committed Jun 20, 2018
1 parent 9deadeb commit e34e502
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 10 deletions.
21 changes: 16 additions & 5 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ type dialOptions struct {
disableServiceConfig bool
}

func defaultDialOptions() dialOptions {
return dialOptions{
copts: transport.ConnectOptions{
WriteBufferSize: -1,
ReadBufferSize: -1,
},
}
}

const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
defaultClientMaxSendMessageSize = math.MaxInt32
Expand All @@ -142,6 +151,8 @@ func WithWaitForHandshake() DialOption {

// WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
// before doing a write on the wire.
// Zero will disable the write buffer such that each write will be on underlying connection.
// Note: A Send call may not directly translate to a write.
func WithWriteBufferSize(s int) DialOption {
return func(o *dialOptions) {
o.copts.WriteBufferSize = s
Expand All @@ -150,7 +161,7 @@ func WithWriteBufferSize(s int) DialOption {

// WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
// for each read syscall.
// A negative value will disable read buffer for a connection so data framer can access the underlying
// Zero will disable read buffer for a connection so data framer can access the underlying
// conn directly.
func WithReadBufferSize(s int) DialOption {
return func(o *dialOptions) {
Expand Down Expand Up @@ -459,10 +470,10 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),

target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
Expand Down
6 changes: 5 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,17 @@ var defaultServerOptions = options{
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
writeBufferSize: -1,
readBufferSize: -1,
}

// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption func(*options)

// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
// before doing a write on the wire.
// Zero will disable the write buffer such that each write call will lead to a syscall.
// Note: A Send call may not directly result into a write call.
func WriteBufferSize(s int) ServerOption {
return func(o *options) {
o.writeBufferSize = s
Expand All @@ -156,7 +160,7 @@ func WriteBufferSize(s int) ServerOption {

// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
// for one read syscall.
// A negative value will disable read buffer for a connection so data framer can access the underlying
// Zero will disable read buffer for a connection so data framer can access the underlying
// conn directly.
func ReadBufferSize(s int) ServerOption {
return func(o *options) {
Expand Down
84 changes: 84 additions & 0 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6273,3 +6273,87 @@ func testRPCTimeout(t *testing.T, e env) {
cancel()
}
}

func TestDisabledIOBuffers(t *testing.T) {
defer leakcheck.Check(t)

payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(60000))
if err != nil {
t.Fatalf("Failed to create payload: %v", err)
}
req := &testpb.StreamingOutputCallRequest{
Payload: payload,
}
resp := &testpb.StreamingOutputCallResponse{
Payload: payload,
}

ss := &stubServer{
fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
t.Errorf("stream.Recv() = _, %v, want _, <nil>", err)
return err
}
if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
t.Errorf("Received message(len: %v) on server not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
return err
}
if err := stream.Send(resp); err != nil {
t.Errorf("stream.Send(_)= %v, want <nil>", err)
return err
}

}
},
}

s := grpc.NewServer(grpc.WriteBufferSize(0), grpc.ReadBufferSize(0))
testpb.RegisterTestServiceServer(s, ss)

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}

done := make(chan struct{})
go func() {
s.Serve(lis)
close(done)
}()
defer s.Stop()
dctx, dcancel := context.WithTimeout(context.Background(), 5*time.Second)
defer dcancel()
cc, err := grpc.DialContext(dctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithWriteBufferSize(0), grpc.WithReadBufferSize(0))
if err != nil {
t.Fatalf("Failed to dial server")
}
defer cc.Close()
c := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := c.FullDuplexCall(ctx, grpc.FailFast(false))
if err != nil {
t.Fatalf("Failed to send test RPC to server")
}
for i := 0; i < 10; i++ {
if err := stream.Send(req); err != nil {
t.Fatalf("stream.Send(_) = %v, want <nil>", err)
}
in, err := stream.Recv()
if err != nil {
t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err)
}
if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
t.Fatalf("Received message(len: %v) on client not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
}
}
stream.CloseSend()
if _, err := stream.Recv(); err != io.EOF {
t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err)
}
}
4 changes: 2 additions & 2 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,11 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
dynamicWindow = false
}
writeBufSize := defaultWriteBufSize
if opts.WriteBufferSize > 0 {
if opts.WriteBufferSize >= 0 {
writeBufSize = opts.WriteBufferSize
}
readBufSize := defaultReadBufSize
if opts.ReadBufferSize != 0 {
if opts.ReadBufferSize >= 0 {
readBufSize = opts.ReadBufferSize
}
t := &http2Client{
Expand Down
4 changes: 2 additions & 2 deletions transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ type http2Server struct {
// returned if something goes wrong.
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
writeBufSize := defaultWriteBufSize
if config.WriteBufferSize > 0 {
if config.WriteBufferSize >= 0 {
writeBufSize = config.WriteBufferSize
}
readBufSize := defaultReadBufSize
if config.ReadBufferSize != 0 {
if config.ReadBufferSize >= 0 {
readBufSize = config.ReadBufferSize
}
framer := newFramer(conn, writeBufSize, readBufSize)
Expand Down
3 changes: 3 additions & 0 deletions transport/http_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,9 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
if w.err != nil {
return 0, w.err
}
if w.batchSize == 0 { // Buffer has been disabled.
return w.conn.Write(b)
}
for len(b) > 0 {
nn := copy(w.buf[w.offset:], b)
b = b[nn:]
Expand Down

0 comments on commit e34e502

Please sign in to comment.