diff --git a/clientconn.go b/clientconn.go index 7fa80c41..d5abdb19 100644 --- a/clientconn.go +++ b/clientconn.go @@ -121,8 +121,20 @@ type dialOptions struct { const ( defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 defaultClientMaxSendMessageSize = math.MaxInt32 + // http2IOBufSize specifies the buffer size for sending frames. + defaultWriteBufSize = 32 * 1024 + defaultReadBufSize = 32 * 1024 ) +func defaultDialOptions() dialOptions { + return dialOptions{ + copts: transport.ConnectOptions{ + WriteBufferSize: defaultWriteBufSize, + ReadBufferSize: defaultReadBufSize, + }, + } +} + // RegisterChannelz turns on channelz service. // This is an EXPERIMENTAL API. func RegisterChannelz() { @@ -141,8 +153,11 @@ func WithWaitForHandshake() DialOption { } } -// WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched -// before doing a write on the wire. +// WithWriteBufferSize determines how much data can be batched before doing a write on the wire. +// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low. +// The default value for this buffer is 32KB. +// Zero will disable the write buffer such that each write will be on underlying connection. +// Note: A Send call may not directly translate to a write. func WithWriteBufferSize(s int) DialOption { return func(o *dialOptions) { o.copts.WriteBufferSize = s @@ -151,6 +166,9 @@ func WithWriteBufferSize(s int) DialOption { // WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most // for each read syscall. +// The default value for this buffer is 32KB +// Zero will disable read buffer for a connection so data framer can access the underlying +// conn directly. func WithReadBufferSize(s int) DialOption { return func(o *dialOptions) { o.copts.ReadBufferSize = s @@ -458,10 +476,10 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { // e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ - target: target, - csMgr: &connectivityStateManager{}, - conns: make(map[*addrConn]struct{}), - + target: target, + csMgr: &connectivityStateManager{}, + conns: make(map[*addrConn]struct{}), + dopts: defaultDialOptions(), blockingpicker: newPickerWrapper(), } cc.ctx, cc.cancel = context.WithCancel(context.Background()) diff --git a/server.go b/server.go index dfdf80b1..e29bd2c4 100644 --- a/server.go +++ b/server.go @@ -141,13 +141,18 @@ var defaultServerOptions = options{ maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, maxSendMessageSize: defaultServerMaxSendMessageSize, connectionTimeout: 120 * time.Second, + writeBufferSize: defaultWriteBufSize, + readBufferSize: defaultReadBufSize, } // A ServerOption sets options such as credentials, codec and keepalive parameters, etc. type ServerOption func(*options) -// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched -// before doing a write on the wire. +// WriteBufferSize determines how much data can be batched before doing a write on the wire. +// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low. +// The default value for this buffer is 32KB. +// Zero will disable the write buffer such that each write will be on underlying connection. +// Note: A Send call may not directly translate to a write. func WriteBufferSize(s int) ServerOption { return func(o *options) { o.writeBufferSize = s @@ -156,6 +161,9 @@ func WriteBufferSize(s int) ServerOption { // ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most // for one read syscall. +// The default value for this buffer is 32KB. +// Zero will disable read buffer for a connection so data framer can access the underlying +// conn directly. func ReadBufferSize(s int) ServerOption { return func(o *options) { o.readBufferSize = s diff --git a/test/end2end_test.go b/test/end2end_test.go index 6574a64f..67099000 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -6340,3 +6340,87 @@ func testRPCTimeout(t *testing.T, e env) { cancel() } } + +func TestDisabledIOBuffers(t *testing.T) { + defer leakcheck.Check(t) + + payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(60000)) + if err != nil { + t.Fatalf("Failed to create payload: %v", err) + } + req := &testpb.StreamingOutputCallRequest{ + Payload: payload, + } + resp := &testpb.StreamingOutputCallResponse{ + Payload: payload, + } + + ss := &stubServer{ + fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { + for { + in, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + t.Errorf("stream.Recv() = _, %v, want _, ", err) + return err + } + if !reflect.DeepEqual(in.Payload.Body, payload.Body) { + t.Errorf("Received message(len: %v) on server not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body)) + return err + } + if err := stream.Send(resp); err != nil { + t.Errorf("stream.Send(_)= %v, want ", err) + return err + } + + } + }, + } + + s := grpc.NewServer(grpc.WriteBufferSize(0), grpc.ReadBufferSize(0)) + testpb.RegisterTestServiceServer(s, ss) + + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + + done := make(chan struct{}) + go func() { + s.Serve(lis) + close(done) + }() + defer s.Stop() + dctx, dcancel := context.WithTimeout(context.Background(), 5*time.Second) + defer dcancel() + cc, err := grpc.DialContext(dctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithWriteBufferSize(0), grpc.WithReadBufferSize(0)) + if err != nil { + t.Fatalf("Failed to dial server") + } + defer cc.Close() + c := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + stream, err := c.FullDuplexCall(ctx, grpc.FailFast(false)) + if err != nil { + t.Fatalf("Failed to send test RPC to server") + } + for i := 0; i < 10; i++ { + if err := stream.Send(req); err != nil { + t.Fatalf("stream.Send(_) = %v, want ", err) + } + in, err := stream.Recv() + if err != nil { + t.Fatalf("stream.Recv() = _, %v, want _, ", err) + } + if !reflect.DeepEqual(in.Payload.Body, payload.Body) { + t.Fatalf("Received message(len: %v) on client not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body)) + } + } + stream.CloseSend() + if _, err := stream.Recv(); err != io.EOF { + t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err) + } +} diff --git a/transport/http2_client.go b/transport/http2_client.go index e0d12afe..968d4a92 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -196,14 +196,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne icwz = opts.InitialConnWindowSize dynamicWindow = false } - writeBufSize := defaultWriteBufSize - if opts.WriteBufferSize > 0 { - writeBufSize = opts.WriteBufferSize - } - readBufSize := defaultReadBufSize - if opts.ReadBufferSize > 0 { - readBufSize = opts.ReadBufferSize - } + writeBufSize := opts.WriteBufferSize + readBufSize := opts.ReadBufferSize t := &http2Client{ ctx: ctx, ctxDone: ctx.Done(), // Cache Done chan. diff --git a/transport/http2_server.go b/transport/http2_server.go index 79253bd3..6b1ceabe 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -130,14 +130,8 @@ type http2Server struct { // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is // returned if something goes wrong. func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { - writeBufSize := defaultWriteBufSize - if config.WriteBufferSize > 0 { - writeBufSize = config.WriteBufferSize - } - readBufSize := defaultReadBufSize - if config.ReadBufferSize > 0 { - readBufSize = config.ReadBufferSize - } + writeBufSize := config.WriteBufferSize + readBufSize := config.ReadBufferSize framer := newFramer(conn, writeBufSize, readBufSize) // Send initial settings as connection preface to client. var isettings []http2.Setting diff --git a/transport/http_util.go b/transport/http_util.go index 7d15c7d7..456edcb8 100644 --- a/transport/http_util.go +++ b/transport/http_util.go @@ -23,6 +23,7 @@ import ( "bytes" "encoding/base64" "fmt" + "io" "net" "net/http" "strconv" @@ -43,9 +44,6 @@ const ( http2MaxFrameLen = 16384 // 16KB frame // http://http2.github.io/http2-spec/#SettingValues http2InitHeaderTableSize = 4096 - // http2IOBufSize specifies the buffer size for sending frames. - defaultWriteBufSize = 32 * 1024 - defaultReadBufSize = 32 * 1024 // baseContentType is the base content-type for gRPC. This is a valid // content-type on it's own, but can also include a content-subtype such as // "proto" as a suffix after "+" or ";". See @@ -545,6 +543,9 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { if w.err != nil { return 0, w.err } + if w.batchSize == 0 { // Buffer has been disabled. + return w.conn.Write(b) + } for len(b) > 0 { nn := copy(w.buf[w.offset:], b) b = b[nn:] @@ -578,7 +579,13 @@ type framer struct { } func newFramer(conn net.Conn, writeBufferSize, readBufferSize int) *framer { - r := bufio.NewReaderSize(conn, readBufferSize) + if writeBufferSize < 0 { + writeBufferSize = 0 + } + var r io.Reader = conn + if readBufferSize > 0 { + r = bufio.NewReaderSize(r, readBufferSize) + } w := newBufWriter(conn, writeBufferSize) f := &framer{ writer: w, diff --git a/transport/transport_test.go b/transport/transport_test.go index b7554df6..744c0dfa 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -567,6 +567,11 @@ func TestMaxConnectionAge(t *testing.T) { } } +const ( + defaultWriteBufSize = 32 * 1024 + defaultReadBufSize = 32 * 1024 +) + // TestKeepaliveServer tests that a server closes connection with a client that doesn't respond to keepalive pings. func TestKeepaliveServer(t *testing.T) { serverConfig := &ServerConfig{