keepalive: apply minimum ping time of 10s to client and 1s to server (#2642)
* keepalive: apply minimum ping time of 10s to client and 1s to server * review fixes
This commit is contained in:
Родитель
ae7b4f21da
Коммит
ed70822b12
|
@ -939,12 +939,39 @@ func (s) TestClientUpdatesParamsAfterGoAway(t *testing.T) {
|
|||
t.Fatalf("Failed to listen. Err: %v", err)
|
||||
}
|
||||
defer lis.Close()
|
||||
connected := make(chan struct{})
|
||||
go func() {
|
||||
conn, err := lis.Accept()
|
||||
if err != nil {
|
||||
t.Errorf("error accepting connection: %v", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
f := http2.NewFramer(conn, conn)
|
||||
// Start a goroutine to read from the conn to prevent the client from
|
||||
// blocking after it writes its preface.
|
||||
go func() {
|
||||
for {
|
||||
if _, err := f.ReadFrame(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
if err := f.WriteSettings(http2.Setting{}); err != nil {
|
||||
t.Errorf("error writing settings: %v", err)
|
||||
return
|
||||
}
|
||||
<-connected
|
||||
if err := f.WriteGoAway(0, http2.ErrCodeEnhanceYourCalm, []byte("too_many_pings")); err != nil {
|
||||
t.Errorf("error writing GOAWAY: %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
addr := lis.Addr().String()
|
||||
s := NewServer()
|
||||
go s.Serve(lis)
|
||||
defer s.Stop()
|
||||
cc, err := Dial(addr, WithBlock(), WithInsecure(), WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: 50 * time.Millisecond,
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
cc, err := DialContext(ctx, addr, WithBlock(), WithInsecure(), WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: 10 * time.Second,
|
||||
Timeout: 100 * time.Millisecond,
|
||||
PermitWithoutStream: true,
|
||||
}))
|
||||
|
@ -952,12 +979,21 @@ func (s) TestClientUpdatesParamsAfterGoAway(t *testing.T) {
|
|||
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
|
||||
}
|
||||
defer cc.Close()
|
||||
time.Sleep(1 * time.Second)
|
||||
cc.mu.RLock()
|
||||
defer cc.mu.RUnlock()
|
||||
v := cc.mkp.Time
|
||||
if v < 100*time.Millisecond {
|
||||
t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 100ms", v)
|
||||
close(connected)
|
||||
for {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
cc.mu.RLock()
|
||||
v := cc.mkp.Time
|
||||
if v == 20*time.Second {
|
||||
// Success
|
||||
cc.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
// Timeout
|
||||
t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 20s", v)
|
||||
}
|
||||
cc.mu.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/backoff"
|
||||
"google.golang.org/grpc/internal/envconfig"
|
||||
|
@ -388,6 +389,10 @@ func WithUserAgent(s string) DialOption {
|
|||
// WithKeepaliveParams returns a DialOption that specifies keepalive parameters
|
||||
// for the client transport.
|
||||
func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
|
||||
if kp.Time < internal.KeepaliveMinPingTime {
|
||||
grpclog.Warningf("Adjusting keepalive ping interval to minimum period of %v", internal.KeepaliveMinPingTime)
|
||||
kp.Time = internal.KeepaliveMinPingTime
|
||||
}
|
||||
return newFuncDialOption(func(o *dialOptions) {
|
||||
o.copts.KeepaliveParams = kp
|
||||
})
|
||||
|
|
|
@ -20,7 +20,10 @@
|
|||
// symbols to avoid circular dependencies.
|
||||
package internal
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
// WithContextDialer is exported by dialoptions.go
|
||||
|
@ -33,6 +36,9 @@ var (
|
|||
HealthCheckFunc HealthChecker
|
||||
// BalancerUnregister is exported by package balancer to unregister a balancer.
|
||||
BalancerUnregister func(name string)
|
||||
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
|
||||
// default, but tests may wish to set it lower for convenience.
|
||||
KeepaliveMinPingTime = 10 * time.Second
|
||||
)
|
||||
|
||||
// HealthChecker defines the signature of the client-side LB channel health checking function.
|
||||
|
|
|
@ -33,6 +33,7 @@ import (
|
|||
type ClientParameters struct {
|
||||
// After a duration of this time if the client doesn't see any activity it
|
||||
// pings the server to see if the transport is still alive.
|
||||
// If set below 10s, a minimum value of 10s will be used instead.
|
||||
Time time.Duration // The current default value is infinity.
|
||||
// After having pinged for keepalive check, the client waits for a duration
|
||||
// of Timeout and if no activity is seen even after that the connection is
|
||||
|
@ -62,6 +63,7 @@ type ServerParameters struct {
|
|||
MaxConnectionAgeGrace time.Duration // The current default value is infinity.
|
||||
// After a duration of this time if the server doesn't see any activity it
|
||||
// pings the client to see if the transport is still alive.
|
||||
// If set below 1s, a minimum value of 1s will be used instead.
|
||||
Time time.Duration // The current default value is 2 hours.
|
||||
// After having pinged for keepalive check, the server waits for a duration
|
||||
// of Timeout and if no activity is seen even after that the connection is
|
||||
|
|
|
@ -182,6 +182,11 @@ func InitialConnWindowSize(s int32) ServerOption {
|
|||
|
||||
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
|
||||
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
|
||||
if kp.Time > 0 && kp.Time < time.Second {
|
||||
grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
|
||||
kp.Time = time.Second
|
||||
}
|
||||
|
||||
return func(o *options) {
|
||||
o.keepaliveParams = kp
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import (
|
|||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/resolver"
|
||||
|
@ -847,8 +848,8 @@ func doIdleCallToInvokeKeepAlive(tc testpb.TestServiceClient, t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
|
||||
}
|
||||
// 2500ms allow for 2 keepalives (1000ms per round trip)
|
||||
time.Sleep(2500 * time.Millisecond)
|
||||
// Allow for at least 2 keepalives (1s per ping interval)
|
||||
time.Sleep(4 * time.Second)
|
||||
cancel()
|
||||
}
|
||||
|
||||
|
@ -1125,15 +1126,24 @@ func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
|
|||
|
||||
func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) {
|
||||
channelz.NewChannelzStorage()
|
||||
defer func(t time.Duration) { internal.KeepaliveMinPingTime = t }(internal.KeepaliveMinPingTime)
|
||||
internal.KeepaliveMinPingTime = time.Second
|
||||
e := tcpClearRREnv
|
||||
te := newTest(t, e)
|
||||
te.cliKeepAlive = &keepalive.ClientParameters{Time: 500 * time.Millisecond, Timeout: 500 * time.Millisecond}
|
||||
te.customDialOptions = append(te.customDialOptions, grpc.WithKeepaliveParams(
|
||||
keepalive.ClientParameters{
|
||||
Time: time.Second,
|
||||
Timeout: 500 * time.Millisecond,
|
||||
PermitWithoutStream: true,
|
||||
}))
|
||||
te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveEnforcementPolicy(
|
||||
keepalive.EnforcementPolicy{
|
||||
MinTime: 500 * time.Millisecond,
|
||||
PermitWithoutStream: true,
|
||||
}))
|
||||
te.startServer(&testServer{security: e.security})
|
||||
te.clientConn() // Dial the server
|
||||
defer te.tearDown()
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
doIdleCallToInvokeKeepAlive(tc, t)
|
||||
|
||||
if err := verifyResultWithDelay(func() (bool, error) {
|
||||
tchan, _ := channelz.GetTopChannels(0, 0)
|
||||
if len(tchan) != 1 {
|
||||
|
@ -1157,7 +1167,7 @@ func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) {
|
|||
break
|
||||
}
|
||||
skt := channelz.GetSocket(id)
|
||||
if skt.SocketData.KeepAlivesSent != 2 { // doIdleCallToInvokeKeepAlive func is set up to send 2 KeepAlives.
|
||||
if skt.SocketData.KeepAlivesSent != 2 {
|
||||
return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", skt.SocketData.KeepAlivesSent)
|
||||
}
|
||||
return true, nil
|
||||
|
@ -1230,7 +1240,7 @@ func (s) TestCZServerSocketMetricsKeepAlive(t *testing.T) {
|
|||
channelz.NewChannelzStorage()
|
||||
e := tcpClearRREnv
|
||||
te := newTest(t, e)
|
||||
te.svrKeepAlive = &keepalive.ServerParameters{Time: 500 * time.Millisecond, Timeout: 500 * time.Millisecond}
|
||||
te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{Time: time.Second, Timeout: 500 * time.Millisecond}))
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
cc := te.clientConn()
|
||||
|
|
|
@ -62,7 +62,6 @@ import (
|
|||
"google.golang.org/grpc/internal/grpctest"
|
||||
"google.golang.org/grpc/internal/leakcheck"
|
||||
"google.golang.org/grpc/internal/testutils"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/resolver"
|
||||
|
@ -510,8 +509,6 @@ type test struct {
|
|||
customDialOptions []grpc.DialOption
|
||||
customServerOptions []grpc.ServerOption
|
||||
resolverScheme string
|
||||
cliKeepAlive *keepalive.ClientParameters
|
||||
svrKeepAlive *keepalive.ServerParameters
|
||||
|
||||
// All test dialing is blocking by default. Set this to true if dial
|
||||
// should be non-blocking.
|
||||
|
@ -633,9 +630,6 @@ func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network,
|
|||
case "clientTimeoutCreds":
|
||||
sopts = append(sopts, grpc.Creds(&clientTimeoutCreds{}))
|
||||
}
|
||||
if te.svrKeepAlive != nil {
|
||||
sopts = append(sopts, grpc.KeepaliveParams(*te.svrKeepAlive))
|
||||
}
|
||||
sopts = append(sopts, te.customServerOptions...)
|
||||
s := grpc.NewServer(sopts...)
|
||||
te.srv = s
|
||||
|
@ -873,9 +867,6 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string)
|
|||
if te.srvAddr == "" {
|
||||
te.srvAddr = "client.side.only.test"
|
||||
}
|
||||
if te.cliKeepAlive != nil {
|
||||
opts = append(opts, grpc.WithKeepaliveParams(*te.cliKeepAlive))
|
||||
}
|
||||
opts = append(opts, te.customDialOptions...)
|
||||
return opts, scheme
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче