client, server: update dial/server buffer options to support a "disable" setting (#2147)

This commit is contained in:
mmukhi 2018-06-27 11:16:33 -07:00 коммит произвёл GitHub
Родитель f1ab7acf3f
Коммит 3ec535a269
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 138 добавлений и 28 удалений

Просмотреть файл

@ -121,8 +121,20 @@ type dialOptions struct {
const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
defaultClientMaxSendMessageSize = math.MaxInt32
// http2IOBufSize specifies the buffer size for sending frames.
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
)
func defaultDialOptions() dialOptions {
return dialOptions{
copts: transport.ConnectOptions{
WriteBufferSize: defaultWriteBufSize,
ReadBufferSize: defaultReadBufSize,
},
}
}
// RegisterChannelz turns on channelz service.
// This is an EXPERIMENTAL API.
func RegisterChannelz() {
@ -141,8 +153,11 @@ func WithWaitForHandshake() DialOption {
}
}
// WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
// before doing a write on the wire.
// WithWriteBufferSize determines how much data can be batched before doing a write on the wire.
// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
// The default value for this buffer is 32KB.
// Zero will disable the write buffer such that each write will be on underlying connection.
// Note: A Send call may not directly translate to a write.
func WithWriteBufferSize(s int) DialOption {
return func(o *dialOptions) {
o.copts.WriteBufferSize = s
@ -151,6 +166,9 @@ func WithWriteBufferSize(s int) DialOption {
// WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
// for each read syscall.
// The default value for this buffer is 32KB
// Zero will disable read buffer for a connection so data framer can access the underlying
// conn directly.
func WithReadBufferSize(s int) DialOption {
return func(o *dialOptions) {
o.copts.ReadBufferSize = s
@ -461,7 +479,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())

Просмотреть файл

@ -141,13 +141,18 @@ var defaultServerOptions = options{
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize,
}
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption func(*options)
// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
// before doing a write on the wire.
// WriteBufferSize determines how much data can be batched before doing a write on the wire.
// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
// The default value for this buffer is 32KB.
// Zero will disable the write buffer such that each write will be on underlying connection.
// Note: A Send call may not directly translate to a write.
func WriteBufferSize(s int) ServerOption {
return func(o *options) {
o.writeBufferSize = s
@ -156,6 +161,9 @@ func WriteBufferSize(s int) ServerOption {
// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
// for one read syscall.
// The default value for this buffer is 32KB.
// Zero will disable read buffer for a connection so data framer can access the underlying
// conn directly.
func ReadBufferSize(s int) ServerOption {
return func(o *options) {
o.readBufferSize = s

Просмотреть файл

@ -6340,3 +6340,87 @@ func testRPCTimeout(t *testing.T, e env) {
cancel()
}
}
func TestDisabledIOBuffers(t *testing.T) {
defer leakcheck.Check(t)
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(60000))
if err != nil {
t.Fatalf("Failed to create payload: %v", err)
}
req := &testpb.StreamingOutputCallRequest{
Payload: payload,
}
resp := &testpb.StreamingOutputCallResponse{
Payload: payload,
}
ss := &stubServer{
fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
t.Errorf("stream.Recv() = _, %v, want _, <nil>", err)
return err
}
if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
t.Errorf("Received message(len: %v) on server not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
return err
}
if err := stream.Send(resp); err != nil {
t.Errorf("stream.Send(_)= %v, want <nil>", err)
return err
}
}
},
}
s := grpc.NewServer(grpc.WriteBufferSize(0), grpc.ReadBufferSize(0))
testpb.RegisterTestServiceServer(s, ss)
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}
done := make(chan struct{})
go func() {
s.Serve(lis)
close(done)
}()
defer s.Stop()
dctx, dcancel := context.WithTimeout(context.Background(), 5*time.Second)
defer dcancel()
cc, err := grpc.DialContext(dctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithWriteBufferSize(0), grpc.WithReadBufferSize(0))
if err != nil {
t.Fatalf("Failed to dial server")
}
defer cc.Close()
c := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := c.FullDuplexCall(ctx, grpc.FailFast(false))
if err != nil {
t.Fatalf("Failed to send test RPC to server")
}
for i := 0; i < 10; i++ {
if err := stream.Send(req); err != nil {
t.Fatalf("stream.Send(_) = %v, want <nil>", err)
}
in, err := stream.Recv()
if err != nil {
t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err)
}
if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
t.Fatalf("Received message(len: %v) on client not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
}
}
stream.CloseSend()
if _, err := stream.Recv(); err != io.EOF {
t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err)
}
}

Просмотреть файл

@ -196,14 +196,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
icwz = opts.InitialConnWindowSize
dynamicWindow = false
}
writeBufSize := defaultWriteBufSize
if opts.WriteBufferSize > 0 {
writeBufSize = opts.WriteBufferSize
}
readBufSize := defaultReadBufSize
if opts.ReadBufferSize > 0 {
readBufSize = opts.ReadBufferSize
}
writeBufSize := opts.WriteBufferSize
readBufSize := opts.ReadBufferSize
t := &http2Client{
ctx: ctx,
ctxDone: ctx.Done(), // Cache Done chan.

Просмотреть файл

@ -130,14 +130,8 @@ type http2Server struct {
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
writeBufSize := defaultWriteBufSize
if config.WriteBufferSize > 0 {
writeBufSize = config.WriteBufferSize
}
readBufSize := defaultReadBufSize
if config.ReadBufferSize > 0 {
readBufSize = config.ReadBufferSize
}
writeBufSize := config.WriteBufferSize
readBufSize := config.ReadBufferSize
framer := newFramer(conn, writeBufSize, readBufSize)
// Send initial settings as connection preface to client.
var isettings []http2.Setting

Просмотреть файл

@ -23,6 +23,7 @@ import (
"bytes"
"encoding/base64"
"fmt"
"io"
"net"
"net/http"
"strconv"
@ -43,9 +44,6 @@ const (
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues
http2InitHeaderTableSize = 4096
// http2IOBufSize specifies the buffer size for sending frames.
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
// baseContentType is the base content-type for gRPC. This is a valid
// content-type on it's own, but can also include a content-subtype such as
// "proto" as a suffix after "+" or ";". See
@ -545,6 +543,9 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
if w.err != nil {
return 0, w.err
}
if w.batchSize == 0 { // Buffer has been disabled.
return w.conn.Write(b)
}
for len(b) > 0 {
nn := copy(w.buf[w.offset:], b)
b = b[nn:]
@ -578,7 +579,13 @@ type framer struct {
}
func newFramer(conn net.Conn, writeBufferSize, readBufferSize int) *framer {
r := bufio.NewReaderSize(conn, readBufferSize)
if writeBufferSize < 0 {
writeBufferSize = 0
}
var r io.Reader = conn
if readBufferSize > 0 {
r = bufio.NewReaderSize(r, readBufferSize)
}
w := newBufWriter(conn, writeBufferSize)
f := &framer{
writer: w,

Просмотреть файл

@ -567,6 +567,11 @@ func TestMaxConnectionAge(t *testing.T) {
}
}
const (
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
)
// TestKeepaliveServer tests that a server closes connection with a client that doesn't respond to keepalive pings.
func TestKeepaliveServer(t *testing.T) {
serverConfig := &ServerConfig{