client: Keepalive pings should be sent every [Time] period (#3102)
This commit makes the following changes: - Keep track of the time of the last read in the transport. - Use this in the keepalive implementation to decide when to send out keepalives. - Address the issue of keepalives being sent every [Time+Timeout] period instead of every [Time] period, as mandated by proposal A8. Proposal A8 is here: https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md
This commit is contained in:
Родитель
c0909e91a5
Коммит
0f2d539339
|
@ -47,6 +47,7 @@ import (
|
|||
|
||||
// http2Client implements the ClientTransport interface with HTTP2.
|
||||
type http2Client struct {
|
||||
lastRead int64 // keep this field 64-bit aligned
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
|
||||
|
@ -76,9 +77,6 @@ type http2Client struct {
|
|||
|
||||
perRPCCreds []credentials.PerRPCCredentials
|
||||
|
||||
// Boolean to keep track of reading activity on transport.
|
||||
// 1 is true and 0 is false.
|
||||
activity uint32 // Accessed atomically.
|
||||
kp keepalive.ClientParameters
|
||||
keepaliveEnabled bool
|
||||
|
||||
|
@ -1240,7 +1238,7 @@ func (t *http2Client) reader() {
|
|||
}
|
||||
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
|
||||
if t.keepaliveEnabled {
|
||||
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
|
||||
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
|
||||
}
|
||||
sf, ok := frame.(*http2.SettingsFrame)
|
||||
if !ok {
|
||||
|
@ -1255,7 +1253,7 @@ func (t *http2Client) reader() {
|
|||
t.controlBuf.throttle()
|
||||
frame, err := t.framer.fr.ReadFrame()
|
||||
if t.keepaliveEnabled {
|
||||
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
|
||||
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
|
||||
}
|
||||
if err != nil {
|
||||
// Abort an active stream if the http2.Framer returns a
|
||||
|
@ -1299,17 +1297,41 @@ func (t *http2Client) reader() {
|
|||
}
|
||||
}
|
||||
|
||||
func minTime(a, b time.Duration) time.Duration {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
|
||||
func (t *http2Client) keepalive() {
|
||||
p := &ping{data: [8]byte{}}
|
||||
// True iff a ping has been sent, and no data has been received since then.
|
||||
outstandingPing := false
|
||||
// Amount of time remaining before which we should receive an ACK for the
|
||||
// last sent ping.
|
||||
timeoutLeft := time.Duration(0)
|
||||
// Records the last value of t.lastRead before we go block on the timer.
|
||||
// This is required to check for read activity since then.
|
||||
prevNano := time.Now().UnixNano()
|
||||
timer := time.NewTimer(t.kp.Time)
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
|
||||
timer.Reset(t.kp.Time)
|
||||
lastRead := atomic.LoadInt64(&t.lastRead)
|
||||
if lastRead > prevNano {
|
||||
// There has been read activity since the last time we were here.
|
||||
outstandingPing = false
|
||||
// Next timer should fire at kp.Time seconds from lastRead time.
|
||||
timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
|
||||
prevNano = lastRead
|
||||
continue
|
||||
}
|
||||
if outstandingPing && timeoutLeft <= 0 {
|
||||
t.Close()
|
||||
return
|
||||
}
|
||||
t.mu.Lock()
|
||||
if t.state == closing {
|
||||
// If the transport is closing, we should exit from the
|
||||
|
@ -1322,36 +1344,37 @@ func (t *http2Client) keepalive() {
|
|||
return
|
||||
}
|
||||
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
|
||||
// If a ping was sent out previously (because there were active
|
||||
// streams at that point) which wasn't acked and its timeout
|
||||
// hadn't fired, but we got here and are about to go dormant,
|
||||
// we should make sure that we unconditionally send a ping once
|
||||
// we awaken.
|
||||
outstandingPing = false
|
||||
t.kpDormant = true
|
||||
t.kpDormancyCond.Wait()
|
||||
}
|
||||
t.kpDormant = false
|
||||
t.mu.Unlock()
|
||||
|
||||
if channelz.IsOn() {
|
||||
atomic.AddInt64(&t.czData.kpCount, 1)
|
||||
}
|
||||
// We get here either because we were dormant and a new stream was
|
||||
// created which unblocked the Wait() call, or because the
|
||||
// keepalive timer expired. In both cases, we need to send a ping.
|
||||
t.controlBuf.put(p)
|
||||
|
||||
timer.Reset(t.kp.Timeout)
|
||||
select {
|
||||
case <-timer.C:
|
||||
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
|
||||
timer.Reset(t.kp.Time)
|
||||
continue
|
||||
if !outstandingPing {
|
||||
if channelz.IsOn() {
|
||||
atomic.AddInt64(&t.czData.kpCount, 1)
|
||||
}
|
||||
infof("transport: closing client transport due to idleness.")
|
||||
t.Close()
|
||||
return
|
||||
case <-t.ctx.Done():
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
return
|
||||
t.controlBuf.put(p)
|
||||
timeoutLeft = t.kp.Timeout
|
||||
outstandingPing = true
|
||||
}
|
||||
// The amount of time to sleep here is the minimum of kp.Time and
|
||||
// timeoutLeft. This will ensure that we wait only for kp.Time
|
||||
// before sending out the next ping (for cases where the ping is
|
||||
// acked).
|
||||
sleepDuration := minTime(t.kp.Time, timeoutLeft)
|
||||
timeoutLeft -= sleepDuration
|
||||
prevNano = lastRead
|
||||
timer.Reset(sleepDuration)
|
||||
case <-t.ctx.Done():
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
|
|
|
@ -350,6 +350,53 @@ func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestKeepaliveClientFrequency creates a server which expects at most 1 client
|
||||
// ping for every 1.2 seconds, while the client is configured to send a ping
|
||||
// every 1 second. So, this configuration should end up with the client
|
||||
// transport being closed. But we had a bug wherein the client was sending one
|
||||
// ping every [Time+Timeout] instead of every [Time] period, and this test
|
||||
// explicitly makes sure the fix works and the client sends a ping every [Time]
|
||||
// period.
|
||||
func TestKeepaliveClientFrequency(t *testing.T) {
|
||||
serverConfig := &ServerConfig{
|
||||
KeepalivePolicy: keepalive.EnforcementPolicy{
|
||||
MinTime: 1200 * time.Millisecond, // 1.2 seconds
|
||||
PermitWithoutStream: true,
|
||||
},
|
||||
}
|
||||
clientOptions := ConnectOptions{
|
||||
KeepaliveParams: keepalive.ClientParameters{
|
||||
Time: 1 * time.Second,
|
||||
Timeout: 2 * time.Second,
|
||||
PermitWithoutStream: true,
|
||||
},
|
||||
}
|
||||
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
|
||||
defer func() {
|
||||
client.Close()
|
||||
server.stop()
|
||||
cancel()
|
||||
}()
|
||||
|
||||
timeout := time.NewTimer(6 * time.Second)
|
||||
select {
|
||||
case <-client.Error():
|
||||
if !timeout.Stop() {
|
||||
<-timeout.C
|
||||
}
|
||||
if reason := client.GetGoAwayReason(); reason != GoAwayTooManyPings {
|
||||
t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayTooManyPings)
|
||||
}
|
||||
case <-timeout.C:
|
||||
t.Fatalf("client transport still healthy; expected GoAway from the server.")
|
||||
}
|
||||
|
||||
// Make sure the client transport is not healthy.
|
||||
if _, err := client.NewStream(context.Background(), &CallHdr{}); err == nil {
|
||||
t.Fatal("client.NewStream() should have failed, but succeeded")
|
||||
}
|
||||
}
|
||||
|
||||
// TestKeepaliveServerEnforcementWithAbusiveClientNoRPC verifies that the
|
||||
// server closes a client transport when it sends too many keepalive pings
|
||||
// (when there are no active streams), based on the configured
|
||||
|
|
Загрузка…
Ссылка в новой задаче