diff --git a/clientconn_test.go b/clientconn_test.go index f02cbeb0..76a43381 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -939,12 +939,39 @@ func (s) TestClientUpdatesParamsAfterGoAway(t *testing.T) { t.Fatalf("Failed to listen. Err: %v", err) } defer lis.Close() + connected := make(chan struct{}) + go func() { + conn, err := lis.Accept() + if err != nil { + t.Errorf("error accepting connection: %v", err) + return + } + defer conn.Close() + f := http2.NewFramer(conn, conn) + // Start a goroutine to read from the conn to prevent the client from + // blocking after it writes its preface. + go func() { + for { + if _, err := f.ReadFrame(); err != nil { + return + } + } + }() + if err := f.WriteSettings(http2.Setting{}); err != nil { + t.Errorf("error writing settings: %v", err) + return + } + <-connected + if err := f.WriteGoAway(0, http2.ErrCodeEnhanceYourCalm, []byte("too_many_pings")); err != nil { + t.Errorf("error writing GOAWAY: %v", err) + return + } + }() addr := lis.Addr().String() - s := NewServer() - go s.Serve(lis) - defer s.Stop() - cc, err := Dial(addr, WithBlock(), WithInsecure(), WithKeepaliveParams(keepalive.ClientParameters{ - Time: 50 * time.Millisecond, + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + cc, err := DialContext(ctx, addr, WithBlock(), WithInsecure(), WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, Timeout: 100 * time.Millisecond, PermitWithoutStream: true, })) @@ -952,12 +979,21 @@ func (s) TestClientUpdatesParamsAfterGoAway(t *testing.T) { t.Fatalf("Dial(%s, _) = _, %v, want _, ", addr, err) } defer cc.Close() - time.Sleep(1 * time.Second) - cc.mu.RLock() - defer cc.mu.RUnlock() - v := cc.mkp.Time - if v < 100*time.Millisecond { - t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 100ms", v) + close(connected) + for { + time.Sleep(10 * time.Millisecond) + cc.mu.RLock() + v := cc.mkp.Time + if v == 20*time.Second { + // Success + cc.mu.RUnlock() + return + } + if ctx.Err() != nil { + // Timeout + t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 20s", v) + } + cc.mu.RUnlock() } } diff --git a/dialoptions.go b/dialoptions.go index 21c668c7..0874fc30 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -26,6 +26,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/envconfig" @@ -388,6 +389,10 @@ func WithUserAgent(s string) DialOption { // WithKeepaliveParams returns a DialOption that specifies keepalive parameters // for the client transport. func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption { + if kp.Time < internal.KeepaliveMinPingTime { + grpclog.Warningf("Adjusting keepalive ping interval to minimum period of %v", internal.KeepaliveMinPingTime) + kp.Time = internal.KeepaliveMinPingTime + } return newFuncDialOption(func(o *dialOptions) { o.copts.KeepaliveParams = kp }) diff --git a/internal/internal.go b/internal/internal.go index eaa54d4f..db51cf18 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -20,7 +20,10 @@ // symbols to avoid circular dependencies. package internal -import "context" +import ( + "context" + "time" +) var ( // WithContextDialer is exported by dialoptions.go @@ -33,6 +36,9 @@ var ( HealthCheckFunc HealthChecker // BalancerUnregister is exported by package balancer to unregister a balancer. BalancerUnregister func(name string) + // KeepaliveMinPingTime is the minimum ping interval. This must be 10s by + // default, but tests may wish to set it lower for convenience. + KeepaliveMinPingTime = 10 * time.Second ) // HealthChecker defines the signature of the client-side LB channel health checking function. diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go index 899e72d7..34d31b5e 100644 --- a/keepalive/keepalive.go +++ b/keepalive/keepalive.go @@ -33,6 +33,7 @@ import ( type ClientParameters struct { // After a duration of this time if the client doesn't see any activity it // pings the server to see if the transport is still alive. + // If set below 10s, a minimum value of 10s will be used instead. Time time.Duration // The current default value is infinity. // After having pinged for keepalive check, the client waits for a duration // of Timeout and if no activity is seen even after that the connection is @@ -62,6 +63,7 @@ type ServerParameters struct { MaxConnectionAgeGrace time.Duration // The current default value is infinity. // After a duration of this time if the server doesn't see any activity it // pings the client to see if the transport is still alive. + // If set below 1s, a minimum value of 1s will be used instead. Time time.Duration // The current default value is 2 hours. // After having pinged for keepalive check, the server waits for a duration // of Timeout and if no activity is seen even after that the connection is diff --git a/server.go b/server.go index 0f454971..33272a47 100644 --- a/server.go +++ b/server.go @@ -182,6 +182,11 @@ func InitialConnWindowSize(s int32) ServerOption { // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { + if kp.Time > 0 && kp.Time < time.Second { + grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s") + kp.Time = time.Second + } + return func(o *options) { o.keepaliveParams = kp } diff --git a/test/channelz_test.go b/test/channelz_test.go index d31f4591..eedfc256 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/resolver" @@ -847,8 +848,8 @@ func doIdleCallToInvokeKeepAlive(tc testpb.TestServiceClient, t *testing.T) { if err != nil { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want ", err) } - // 2500ms allow for 2 keepalives (1000ms per round trip) - time.Sleep(2500 * time.Millisecond) + // Allow for at least 2 keepalives (1s per ping interval) + time.Sleep(4 * time.Second) cancel() } @@ -1125,15 +1126,24 @@ func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) { func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) { channelz.NewChannelzStorage() + defer func(t time.Duration) { internal.KeepaliveMinPingTime = t }(internal.KeepaliveMinPingTime) + internal.KeepaliveMinPingTime = time.Second e := tcpClearRREnv te := newTest(t, e) - te.cliKeepAlive = &keepalive.ClientParameters{Time: 500 * time.Millisecond, Timeout: 500 * time.Millisecond} + te.customDialOptions = append(te.customDialOptions, grpc.WithKeepaliveParams( + keepalive.ClientParameters{ + Time: time.Second, + Timeout: 500 * time.Millisecond, + PermitWithoutStream: true, + })) + te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveEnforcementPolicy( + keepalive.EnforcementPolicy{ + MinTime: 500 * time.Millisecond, + PermitWithoutStream: true, + })) te.startServer(&testServer{security: e.security}) + te.clientConn() // Dial the server defer te.tearDown() - cc := te.clientConn() - tc := testpb.NewTestServiceClient(cc) - doIdleCallToInvokeKeepAlive(tc, t) - if err := verifyResultWithDelay(func() (bool, error) { tchan, _ := channelz.GetTopChannels(0, 0) if len(tchan) != 1 { @@ -1157,7 +1167,7 @@ func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) { break } skt := channelz.GetSocket(id) - if skt.SocketData.KeepAlivesSent != 2 { // doIdleCallToInvokeKeepAlive func is set up to send 2 KeepAlives. + if skt.SocketData.KeepAlivesSent != 2 { return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", skt.SocketData.KeepAlivesSent) } return true, nil @@ -1230,7 +1240,7 @@ func (s) TestCZServerSocketMetricsKeepAlive(t *testing.T) { channelz.NewChannelzStorage() e := tcpClearRREnv te := newTest(t, e) - te.svrKeepAlive = &keepalive.ServerParameters{Time: 500 * time.Millisecond, Timeout: 500 * time.Millisecond} + te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{Time: time.Second, Timeout: 500 * time.Millisecond})) te.startServer(&testServer{security: e.security}) defer te.tearDown() cc := te.clientConn() diff --git a/test/end2end_test.go b/test/end2end_test.go index a1ac4d38..592df8f4 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -62,7 +62,6 @@ import ( "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/leakcheck" "google.golang.org/grpc/internal/testutils" - "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" @@ -510,8 +509,6 @@ type test struct { customDialOptions []grpc.DialOption customServerOptions []grpc.ServerOption resolverScheme string - cliKeepAlive *keepalive.ClientParameters - svrKeepAlive *keepalive.ServerParameters // All test dialing is blocking by default. Set this to true if dial // should be non-blocking. @@ -633,9 +630,6 @@ func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network, case "clientTimeoutCreds": sopts = append(sopts, grpc.Creds(&clientTimeoutCreds{})) } - if te.svrKeepAlive != nil { - sopts = append(sopts, grpc.KeepaliveParams(*te.svrKeepAlive)) - } sopts = append(sopts, te.customServerOptions...) s := grpc.NewServer(sopts...) te.srv = s @@ -873,9 +867,6 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) if te.srvAddr == "" { te.srvAddr = "client.side.only.test" } - if te.cliKeepAlive != nil { - opts = append(opts, grpc.WithKeepaliveParams(*te.cliKeepAlive)) - } opts = append(opts, te.customDialOptions...) return opts, scheme }