From 97da9e087c5cf7a11263637efb410e08c51a7a0f Mon Sep 17 00:00:00 2001 From: Jean de Klerk Date: Tue, 31 Jul 2018 14:10:13 -0700 Subject: [PATCH] internal: remove transportMonitor, replace with callbacks (#2219) --- clientconn.go | 503 ++++++++++++++------------- internal/transport/http2_client.go | 33 +- internal/transport/transport.go | 4 +- internal/transport/transport_test.go | 61 ++-- status/status.go | 8 +- 5 files changed, 321 insertions(+), 288 deletions(-) diff --git a/clientconn.go b/clientconn.go index 9b035e8f..d2a78727 100644 --- a/clientconn.go +++ b/clientconn.go @@ -532,9 +532,10 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi // Caller needs to make sure len(addrs) > 0. func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) { ac := &addrConn{ - cc: cc, - addrs: addrs, - dopts: cc.dopts, + cc: cc, + addrs: addrs, + dopts: cc.dopts, + successfulHandshake: true, // make the first nextAddr() call _not_ move addrIdx up by 1 } ac.ctx, ac.cancel = context.WithCancel(cc.ctx) // Track ac in cc. This needs to be done before any getTransport(...) is called. @@ -626,17 +627,7 @@ func (ac *addrConn) connect() error { ac.mu.Unlock() // Start a goroutine connecting to the server asynchronously. - go func() { - if err := ac.resetTransport(); err != nil { - grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err) - if err != errConnClosing { - // Keep this ac in cc.conns, to get the reason it's torn down. - ac.tearDown(err) - } - return - } - ac.transportMonitor() - }() + go ac.resetTransport(false) return nil } @@ -665,7 +656,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) if curAddrFound { ac.addrs = addrs - ac.reconnectIdx = 0 // Start reconnecting from beginning in the new list. + ac.addrIdx = 0 // Start reconnecting from beginning in the new list. } return curAddrFound @@ -796,27 +787,28 @@ type addrConn struct { ctx context.Context cancel context.CancelFunc + mu sync.Mutex cc *ClientConn - addrs []resolver.Address dopts dialOptions events trace.EventLog acbw balancer.SubConn - mu sync.Mutex - curAddr resolver.Address - reconnectIdx int // The index in addrs list to start reconnecting from. - state connectivity.State + transport transport.ClientTransport // The current transport. + + addrIdx int // The index in addrs list to start reconnecting from. + curAddr resolver.Address // The current address. + addrs []resolver.Address // All addresses that the resolver resolved to. + + state connectivity.State // ready is closed and becomes nil when a new transport is up or failed // due to timeout. - ready chan struct{} - transport transport.ClientTransport + ready chan struct{} - // The reason this addrConn is torn down. - tearDownErr error + tearDownErr error // The reason this addrConn is torn down. - connectRetryNum int + backoffIdx int // backoffDeadline is the time until which resetTransport needs to - // wait before increasing connectRetryNum count. + // wait before increasing backoffIdx count. backoffDeadline time.Time // connectDeadline is the time by which all connection // negotiations must complete. @@ -828,6 +820,8 @@ type addrConn struct { callsSucceeded int64 callsFailed int64 lastCallStartedTime time.Time + + successfulHandshake bool } // adjustParams updates parameters used to create transports upon @@ -860,176 +854,268 @@ func (ac *addrConn) errorf(format string, a ...interface{}) { } } -// resetTransport recreates a transport to the address for ac. The old -// transport will close itself on error or when the clientconn is closed. -// The created transport must receive initial settings frame from the server. -// In case that doesn't happen, transportMonitor will kill the newly created -// transport after connectDeadline has expired. -// In case there was an error on the transport before the settings frame was -// received, resetTransport resumes connecting to backends after the one that -// was previously connected to. In case end of the list is reached, resetTransport -// backs off until the original deadline. -// If the DialOption WithWaitForHandshake was set, resetTrasport returns -// successfully only after server settings are received. +// resetTransport makes sure that a healthy ac.transport exists. // -// TODO(bar) make sure all state transitions are valid. -func (ac *addrConn) resetTransport() error { - ac.mu.Lock() - if ac.state == connectivity.Shutdown { - ac.mu.Unlock() - return errConnClosing - } - if ac.ready != nil { - close(ac.ready) - ac.ready = nil - } - ac.transport = nil - ridx := ac.reconnectIdx - ac.mu.Unlock() - ac.cc.mu.RLock() - ac.dopts.copts.KeepaliveParams = ac.cc.mkp - ac.cc.mu.RUnlock() - var backoffDeadline, connectDeadline time.Time - for connectRetryNum := 0; ; connectRetryNum++ { - ac.mu.Lock() - if ac.backoffDeadline.IsZero() { - // This means either a successful HTTP2 connection was established - // or this is the first time this addrConn is trying to establish a - // connection. - backoffFor := ac.dopts.bs.Backoff(connectRetryNum) // time.Duration. - // This will be the duration that dial gets to finish. - dialDuration := getMinConnectTimeout() - if backoffFor > dialDuration { - // Give dial more time as we keep failing to connect. - dialDuration = backoffFor - } - start := time.Now() - backoffDeadline = start.Add(backoffFor) - connectDeadline = start.Add(dialDuration) - ridx = 0 // Start connecting from the beginning. - } else { - // Continue trying to connect with the same deadlines. - connectRetryNum = ac.connectRetryNum - backoffDeadline = ac.backoffDeadline - connectDeadline = ac.connectDeadline - ac.backoffDeadline = time.Time{} - ac.connectDeadline = time.Time{} - ac.connectRetryNum = 0 +// The transport will close itself when it encounters an error, or on GOAWAY, or on deadline waiting for handshake, or +// when the clientconn is closed. Each iteration creating a new transport will try a different address that the balancer +// assigned to the addrConn, until it has tried all addresses. Once it has tried all addresses, it will re-resolve to +// get a new address list. If an error is received, the list is re-resolved and the next reset attempt will try from the +// beginning. This method has backoff built in. The backoff amount starts at 0 and increases each time resolution occurs +// (addresses are exhausted). The backoff amount is reset to 0 each time a handshake is received. +// +// If the DialOption WithWaitForHandshake was set, resetTransport returns successfully only after handshake is received. +func (ac *addrConn) resetTransport(resolveNow bool) { + for { + // If this is the first in a line of resets, we want to resolve immediately. The only other time we + // want to reset is if we have tried all the addresses handed to us. + if resolveNow { + ac.mu.Lock() + ac.cc.resolveNow(resolver.ResolveNowOption{}) + ac.mu.Unlock() } + + if err := ac.nextAddr(); err != nil { + grpclog.Warningf("resetTransport: error from nextAddr: %v", err) + } + + ac.mu.Lock() if ac.state == connectivity.Shutdown { ac.mu.Unlock() - return errConnClosing + return } + if ac.ready != nil { + close(ac.ready) + ac.ready = nil + } + ac.transport = nil + + backoffIdx := ac.backoffIdx + backoffFor := ac.dopts.bs.Backoff(backoffIdx) + + // This will be the duration that dial gets to finish. + dialDuration := getMinConnectTimeout() + if backoffFor > dialDuration { + // Give dial more time as we keep failing to connect. + dialDuration = backoffFor + } + start := time.Now() + connectDeadline := start.Add(dialDuration) + ac.backoffDeadline = start.Add(backoffFor) + ac.connectDeadline = connectDeadline + + ac.mu.Unlock() + + ac.cc.mu.RLock() + ac.dopts.copts.KeepaliveParams = ac.cc.mkp + ac.cc.mu.RUnlock() + + ac.mu.Lock() + + if ac.state == connectivity.Shutdown { + ac.mu.Unlock() + return + } + ac.printf("connecting") if ac.state != connectivity.Connecting { ac.state = connectivity.Connecting ac.cc.handleSubConnStateChange(ac.acbw, ac.state) } - // copy ac.addrs in case of race - addrsIter := make([]resolver.Address, len(ac.addrs)) - copy(addrsIter, ac.addrs) + + addr := ac.addrs[ac.addrIdx] copts := ac.dopts.copts ac.mu.Unlock() - connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts) - if err != nil { - return err - } - if connected { - return nil + + if err := ac.createTransport(backoffIdx, addr, copts, connectDeadline); err != nil { + // errReadTimeOut indicates that the handshake was not received before + // the deadline. We exit here because the transport's reader goroutine will + // use onClose to reset the transport. + if err == errReadTimedOut { + return + } + + continue } + + return } } +var errReadTimedOut = errors.New("read timed out") + // createTransport creates a connection to one of the backends in addrs. -// It returns true if a connection was established. -func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error) { - for i := ridx; i < len(addrs); i++ { - addr := addrs[i] - target := transport.TargetInfo{ - Addr: addr.Addr, - Metadata: addr.Metadata, - Authority: ac.cc.authority, - } - done := make(chan struct{}) - onPrefaceReceipt := func() { +func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error { + skipReset := make(chan struct{}) + allowedToReset := make(chan struct{}) + prefaceReceived := make(chan struct{}) + onCloseCalled := make(chan struct{}) + + onGoAway := func(r transport.GoAwayReason) { + ac.mu.Lock() + ac.adjustParams(r) + ac.mu.Unlock() + go ac.resetTransport(false) + } + + onClose := func() { + close(onCloseCalled) + + select { + case <-skipReset: // The outer resetTransport loop will handle reconnection. + return + case <-allowedToReset: // We're in the clear to reset. ac.mu.Lock() - close(done) - if !ac.backoffDeadline.IsZero() { - // If we haven't already started reconnecting to - // other backends. - // Note, this can happen when writer notices an error - // and triggers resetTransport while at the same time - // reader receives the preface and invokes this closure. - ac.backoffDeadline = time.Time{} - ac.connectDeadline = time.Time{} - ac.connectRetryNum = 0 - } + ac.transport = nil ac.mu.Unlock() + ac.resetTransport(false) } - // Do not cancel in the success path because of - // this issue in Go1.6: https://github.com/golang/go/issues/15078. - connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) - if channelz.IsOn() { - copts.ChannelzParentID = ac.channelzID - } - newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt) - if err != nil { - cancel() - ac.cc.blockingpicker.updateConnectionError(err) + } + + target := transport.TargetInfo{ + Addr: addr.Addr, + Metadata: addr.Metadata, + Authority: ac.cc.authority, + } + + onPrefaceReceipt := func() { + close(prefaceReceived) + + // TODO(deklerk): optimization; does anyone else actually use this lock? maybe we can just remove it for this scope + ac.mu.Lock() + ac.successfulHandshake = true + ac.backoffDeadline = time.Time{} + ac.connectDeadline = time.Time{} + ac.addrIdx = 0 + ac.backoffIdx = 0 + ac.mu.Unlock() + } + + // Do not cancel in the success path because of this issue in Go1.6: https://github.com/golang/go/issues/15078. + connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) + if channelz.IsOn() { + copts.ChannelzParentID = ac.channelzID + } + + newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose) + + if err == nil { + prefaceTimer := time.AfterFunc(connectDeadline.Sub(time.Now()), func() { ac.mu.Lock() - if ac.state == connectivity.Shutdown { - // ac.tearDown(...) has been invoked. + select { + case <-onCloseCalled: + // The transport has already closed - noop. ac.mu.Unlock() - return false, errConnClosing + case <-prefaceReceived: + // We got the preface just in the nick of time - huzzah! + ac.mu.Unlock() + default: + // We didn't get the preface in time. + ac.mu.Unlock() + newTr.Close() } - ac.mu.Unlock() - grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err) - continue - } + }) if ac.dopts.waitForHandshake { select { - case <-done: - case <-connectCtx.Done(): - // Didn't receive server preface, must kill this new transport now. - grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.") + case <-prefaceTimer.C: + // We want to close but _not_ reset, because we're going into the transient-failure-and-return flow + // and go into the next cycle of the resetTransport loop. + close(skipReset) newTr.Close() - continue - case <-ac.ctx.Done(): + err = errors.New("timed out") + case <-prefaceReceived: + // We got the preface - huzzah! things are good. + case <-onCloseCalled: + // The transport has already closed - noop. } } + } + + if err != nil { + // newTr is either nil, or closed. + + cancel() + ac.cc.blockingpicker.updateConnectionError(err) ac.mu.Lock() if ac.state == connectivity.Shutdown { + // ac.tearDown(...) has been invoked. ac.mu.Unlock() - // ac.tearDonn(...) has been invoked. - newTr.Close() - return false, errConnClosing + + // We don't want to reset during this close because we prefer to kick out of this function and let the loop + // in resetTransport take care of reconnecting. + close(skipReset) + + return errConnClosing } - ac.printf("ready") - ac.state = connectivity.Ready + ac.state = connectivity.TransientFailure ac.cc.handleSubConnStateChange(ac.acbw, ac.state) - ac.transport = newTr - ac.curAddr = addr - if ac.ready != nil { - close(ac.ready) - ac.ready = nil - } - select { - case <-done: - // If the server has responded back with preface already, - // don't set the reconnect parameters. - default: - ac.connectRetryNum = connectRetryNum - ac.backoffDeadline = backoffDeadline - ac.connectDeadline = connectDeadline - ac.reconnectIdx = i + 1 // Start reconnecting from the next backend in the list. - } ac.mu.Unlock() - return true, nil + grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err) + + // We don't want to reset during this close because we prefer to kick out of this function and let the loop + // in resetTransport take care of reconnecting. + close(skipReset) + + return err } + ac.mu.Lock() + if ac.state == connectivity.Shutdown { ac.mu.Unlock() - return false, errConnClosing + + // We don't want to reset during this close because we prefer to kick out of this function and let the loop + // in resetTransport take care of reconnecting. + close(skipReset) + + newTr.Close() + return errConnClosing + } + + ac.printf("ready") + ac.state = connectivity.Ready + ac.cc.handleSubConnStateChange(ac.acbw, ac.state) + ac.transport = newTr + ac.curAddr = addr + if ac.ready != nil { + close(ac.ready) + ac.ready = nil + } + + ac.mu.Unlock() + + // Ok, _now_ we will finally let the transport reset if it encounters a closable error. Without this, the reader + // goroutine failing races with all the code in this method that sets the connection to "ready". + close(allowedToReset) + return nil +} + +// nextAddr increments the addrIdx if there are more addresses to try. If +// there are no more addrs to try it will re-resolve, set addrIdx to 0, and +// increment the backoffIdx. +func (ac *addrConn) nextAddr() error { + ac.mu.Lock() + + // If a handshake has been observed, we expect the counters to have manually + // been reset so we'll just return, since we want the next usage to start + // at index 0. + if ac.successfulHandshake { + ac.successfulHandshake = false + ac.mu.Unlock() + return nil + } + + if ac.addrIdx < len(ac.addrs)-1 { + ac.addrIdx++ + ac.mu.Unlock() + return nil + } + + ac.addrIdx = 0 + ac.backoffIdx++ + + if ac.state == connectivity.Shutdown { + ac.mu.Unlock() + return errConnClosing } ac.state = connectivity.TransientFailure ac.cc.handleSubConnStateChange(ac.acbw, ac.state) @@ -1038,95 +1124,16 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, close(ac.ready) ac.ready = nil } + backoffDeadline := ac.backoffDeadline ac.mu.Unlock() timer := time.NewTimer(backoffDeadline.Sub(time.Now())) select { case <-timer.C: case <-ac.ctx.Done(): timer.Stop() - return false, ac.ctx.Err() - } - return false, nil -} - -// Run in a goroutine to track the error in transport and create the -// new transport if an error happens. It returns when the channel is closing. -func (ac *addrConn) transportMonitor() { - for { - var timer *time.Timer - var cdeadline <-chan time.Time - ac.mu.Lock() - t := ac.transport - if !ac.connectDeadline.IsZero() { - timer = time.NewTimer(ac.connectDeadline.Sub(time.Now())) - cdeadline = timer.C - } - ac.mu.Unlock() - // Block until we receive a goaway or an error occurs. - select { - case <-t.GoAway(): - done := t.Error() - cleanup := t.Close - // Since this transport will be orphaned (won't have a transportMonitor) - // we need to launch a goroutine to keep track of clientConn.Close() - // happening since it might not be noticed by any other goroutine for a while. - go func() { - <-done - cleanup() - }() - case <-t.Error(): - // In case this is triggered because clientConn.Close() - // was called, we want to immeditately close the transport - // since no other goroutine might notice it for a while. - t.Close() - case <-cdeadline: - ac.mu.Lock() - // This implies that client received server preface. - if ac.backoffDeadline.IsZero() { - ac.mu.Unlock() - continue - } - ac.mu.Unlock() - timer = nil - // No server preface received until deadline. - // Kill the connection. - grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.") - t.Close() - } - if timer != nil { - timer.Stop() - } - // If a GoAway happened, regardless of error, adjust our keepalive - // parameters as appropriate. - select { - case <-t.GoAway(): - ac.adjustParams(t.GetGoAwayReason()) - default: - } - ac.mu.Lock() - if ac.state == connectivity.Shutdown { - ac.mu.Unlock() - return - } - // Set connectivity state to TransientFailure before calling - // resetTransport. Transition READY->CONNECTING is not valid. - ac.state = connectivity.TransientFailure - ac.cc.handleSubConnStateChange(ac.acbw, ac.state) - ac.cc.resolveNow(resolver.ResolveNowOption{}) - ac.curAddr = resolver.Address{} - ac.mu.Unlock() - if err := ac.resetTransport(); err != nil { - ac.mu.Lock() - ac.printf("transport exiting: %v", err) - ac.mu.Unlock() - grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err) - if err != errConnClosing { - // Keep this ac in cc.conns, to get the reason it's torn down. - ac.tearDown(err) - } - return - } + return ac.ctx.Err() } + return nil } // getReadyTransport returns the transport if ac's state is READY. @@ -1134,7 +1141,7 @@ func (ac *addrConn) transportMonitor() { // If ac's state is IDLE, it will trigger ac to connect. func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { ac.mu.Lock() - if ac.state == connectivity.Ready { + if ac.state == connectivity.Ready && ac.transport != nil { t := ac.transport ac.mu.Unlock() return t, true @@ -1159,21 +1166,26 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { func (ac *addrConn) tearDown(err error) { ac.cancel() ac.mu.Lock() - defer ac.mu.Unlock() if ac.state == connectivity.Shutdown { + ac.mu.Unlock() return } + // We have to set the state to Shutdown before we call ac.transports.GracefulClose, because it signals to + // onClose not to try reconnecting the transport. + ac.state = connectivity.Shutdown + ac.tearDownErr = err + ac.cc.handleSubConnStateChange(ac.acbw, ac.state) ac.curAddr = resolver.Address{} if err == errConnDrain && ac.transport != nil { // GracefulClose(...) may be executed multiple times when // i) receiving multiple GoAway frames from the server; or // ii) there are concurrent name resolver/Balancer triggered // address removal and GoAway. + // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu. + ac.mu.Unlock() ac.transport.GracefulClose() + ac.mu.Lock() } - ac.state = connectivity.Shutdown - ac.tearDownErr = err - ac.cc.handleSubConnStateChange(ac.acbw, ac.state) if ac.events != nil { ac.events.Finish() ac.events = nil @@ -1185,6 +1197,7 @@ func (ac *addrConn) tearDown(err error) { if channelz.IsOn() { channelz.RemoveEntry(ac.channelzID) } + ac.mu.Unlock() } func (ac *addrConn) getState() connectivity.State { diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 88d1c161..86144ae1 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -124,6 +124,9 @@ type http2Client struct { msgRecv int64 lastMsgSent time.Time lastMsgRecv time.Time + + onGoAway func(GoAwayReason) + onClose func() } func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { @@ -152,7 +155,7 @@ func isTemporary(err error) bool { // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 // and starts to receive messages on it. Non-nil error returns if construction // fails. -func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ *http2Client, err error) { +func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) { scheme := "http" ctx, cancel := context.WithCancel(ctx) defer func() { @@ -234,6 +237,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne maxConcurrentStreams: defaultMaxStreamsClient, streamQuota: defaultMaxStreamsClient, streamsQuotaAvailable: make(chan struct{}, 1), + onGoAway: onGoAway, + onClose: func() {}, } t.controlBuf = newControlBuffer(t.ctxDone) if opts.InitialWindowSize >= defaultWindowSize { @@ -266,10 +271,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne t.keepaliveEnabled = true go t.keepalive() } - // Start the reader goroutine for incoming message. Each transport has - // a dedicated goroutine which reads HTTP2 frame from network. Then it - // dispatches the frame to the corresponding stream entity. - go t.reader() // Send connection preface to server. n, err := t.conn.Write(clientPreface) if err != nil { @@ -306,6 +307,15 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err) } } + + // We only assign onClose after we're sure there can not be any more t.Close calls in this goroutine, because + // onClose may (frequently does) block. + t.onClose = onClose + + // Start the reader goroutine for incoming message. Each transport has + // a dedicated goroutine which reads HTTP2 frame from network. Then it + // dispatches the frame to the corresponding stream entity. + go t.reader() t.framer.writer.Flush() go func() { t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst) @@ -735,6 +745,10 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2. // Close kicks off the shutdown process of the transport. This should be called // only once on a transport. Once it is called, the transport should not be // accessed any more. +// +// This method blocks until the addrConn that initiated this transport is +// re-connected. This happens because t.onClose() begins reconnect logic at the +// addrConn level and blocks until the addrConn is successfully connected. func (t *http2Client) Close() error { t.mu.Lock() // Make sure we only Close once. @@ -762,6 +776,7 @@ func (t *http2Client) Close() error { } t.statsHandler.HandleConn(t.ctx, connEnd) } + t.onClose() return err } @@ -1058,6 +1073,9 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { close(t.goAway) t.state = draining t.controlBuf.put(&incomingGoAway{}) + + // This has to be a new goroutine because we're still using the current goroutine to read in the transport. + t.onGoAway(t.goAwayReason) } // All streams with IDs greater than the GoAwayId // and smaller than the previous GoAway ID should be killed. @@ -1174,15 +1192,16 @@ func (t *http2Client) reader() { // Check the validity of server preface. frame, err := t.framer.fr.ReadFrame() if err != nil { - t.Close() + t.Close() // this kicks off resetTransport, so must be last before return return } + t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!) if t.keepaliveEnabled { atomic.CompareAndSwapUint32(&t.activity, 0, 1) } sf, ok := frame.(*http2.SettingsFrame) if !ok { - t.Close() + t.Close() // this kicks off resetTransport, so must be last before return return } t.onSuccess() diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 9775eeb8..21907399 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -495,8 +495,8 @@ type TargetInfo struct { // NewClientTransport establishes the transport with the required ConnectOptions // and returns it to the caller. -func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) { - return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess) +func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) { + return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess, onGoAway, onClose) } // Options provides additional hints and information for message diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 52be96f0..01c437b4 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -397,17 +397,17 @@ func setUpServerOnly(t *testing.T, port int, serverConfig *ServerConfig, ht hTyp } func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, *http2Client, func()) { - return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{}, func() {}) + return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{}) } -func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions, onHandshake func()) (*server, *http2Client, func()) { +func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions) (*server, *http2Client, func()) { server := setUpServerOnly(t, port, serverConfig, ht) addr := "localhost:" + server.port target := TargetInfo{ Addr: addr, } connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) - ct, connErr := NewClientTransport(connectCtx, context.Background(), target, copts, onHandshake) + ct, connErr := NewClientTransport(connectCtx, context.Background(), target, copts, func() {}, func(GoAwayReason) {}, func() {}) if connErr != nil { cancel() // Do not cancel in success path. t.Fatalf("failed to create transport: %v", connErr) @@ -432,7 +432,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con done <- conn }() connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) - tr, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, func() {}) + tr, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, func() {}, func(GoAwayReason) {}, func() {}) if err != nil { cancel() // Do not cancel in success path. // Server clean-up. @@ -449,7 +449,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con // sends status error to concurrent stream reader. func TestInflightStreamClosing(t *testing.T) { serverConfig := &ServerConfig{} - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() defer client.Close() @@ -492,7 +492,7 @@ func TestMaxConnectionIdle(t *testing.T) { MaxConnectionIdle: 2 * time.Second, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() defer client.Close() @@ -520,7 +520,7 @@ func TestMaxConnectionIdleNegative(t *testing.T) { MaxConnectionIdle: 2 * time.Second, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() defer client.Close() @@ -547,7 +547,7 @@ func TestMaxConnectionAge(t *testing.T) { MaxConnectionAge: 2 * time.Second, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() defer client.Close() @@ -580,7 +580,7 @@ func TestKeepaliveServer(t *testing.T) { Timeout: 1 * time.Second, }, } - server, c, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, c, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() defer c.Close() @@ -624,7 +624,7 @@ func TestKeepaliveServerNegative(t *testing.T) { Timeout: 1 * time.Second, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() defer client.Close() @@ -718,7 +718,7 @@ func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. PermitWithoutStream: true, // Run keepalive even with no RPCs. - }}, func() {}) + }}) defer cancel() defer s.stop() defer tr.Close() @@ -745,7 +745,7 @@ func TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) { PermitWithoutStream: true, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions) defer cancel() defer server.stop() defer client.Close() @@ -779,7 +779,7 @@ func TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) { Timeout: 1 * time.Second, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions) defer cancel() defer server.stop() defer client.Close() @@ -818,7 +818,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) { PermitWithoutStream: true, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions) defer cancel() defer server.stop() defer client.Close() @@ -845,7 +845,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) { Timeout: 1 * time.Second, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions) defer cancel() defer server.stop() defer client.Close() @@ -994,7 +994,7 @@ func TestLargeMessageWithDelayRead(t *testing.T) { InitialWindowSize: defaultWindowSize, InitialConnWindowSize: defaultWindowSize, } - server, ct, cancel := setUpWithOptions(t, 0, sc, delayRead, co, func() {}) + server, ct, cancel := setUpWithOptions(t, 0, sc, delayRead, co) defer cancel() defer server.stop() defer ct.Close() @@ -1083,6 +1083,7 @@ func TestLargeMessageWithDelayRead(t *testing.T) { func TestGracefulClose(t *testing.T) { server, ct, cancel := setUp(t, 0, math.MaxUint32, pingpong) + defer cancel() defer func() { // Stop the server's listener to make the server's goroutines terminate // (after the last active stream is done). @@ -1092,7 +1093,6 @@ func TestGracefulClose(t *testing.T) { leakcheck.Check(t) // Correctly clean up the server server.stop() - cancel() }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10)) defer cancel() @@ -1147,8 +1147,8 @@ func TestGracefulClose(t *testing.T) { } func TestLargeMessageSuspension(t *testing.T) { - server, ct, cancelsvr := setUp(t, 0, math.MaxUint32, suspended) - defer cancelsvr() + server, ct, cancel := setUp(t, 0, math.MaxUint32, suspended) + defer cancel() callHdr := &CallHdr{ Host: "localhost", Method: "foo.Large", @@ -1185,7 +1185,7 @@ func TestMaxStreams(t *testing.T) { serverConfig := &ServerConfig{ MaxStreams: 1, } - server, ct, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, ct, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer ct.Close() defer server.stop() @@ -1319,7 +1319,7 @@ func TestClientConnDecoupledFromApplicationRead(t *testing.T) { InitialWindowSize: defaultWindowSize, InitialConnWindowSize: defaultWindowSize, } - server, client, cancel := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions, func() {}) + server, client, cancel := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions) defer cancel() defer server.stop() defer client.Close() @@ -1406,7 +1406,7 @@ func TestServerConnDecoupledFromApplicationRead(t *testing.T) { InitialWindowSize: defaultWindowSize, InitialConnWindowSize: defaultWindowSize, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() defer client.Close() @@ -1636,7 +1636,7 @@ func TestClientWithMisbehavedServer(t *testing.T) { }() connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) defer cancel() - ct, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}) + ct, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}, func(GoAwayReason) {}, func() {}) if err != nil { t.Fatalf("Error while creating client transport: %v", err) } @@ -1799,7 +1799,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig) InitialWindowSize: wc.clientStream, InitialConnWindowSize: wc.clientConn, } - server, client, cancel := setUpWithOptions(t, 0, sc, pingpong, co, func() {}) + server, client, cancel := setUpWithOptions(t, 0, sc, pingpong, co) defer cancel() defer server.stop() defer client.Close() @@ -2031,7 +2031,7 @@ func (s *httpServer) cleanUp() { } } -func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream *Stream, cleanUp func(), cancel func()) { +func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream *Stream, cleanUp func()) { var ( err error lis net.Listener @@ -2063,9 +2063,8 @@ func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream wh: wh, } server.start(t, lis) - var connectCtx context.Context - connectCtx, cancel = context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) - client, err = newHTTP2Client(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}) + connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) + client, err = newHTTP2Client(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}, func(GoAwayReason) {}, func() {}) if err != nil { cancel() // Do not cancel in success path. t.Fatalf("Error creating client. Err: %v", err) @@ -2084,8 +2083,7 @@ func TestHTTPToGRPCStatusMapping(t *testing.T) { } func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders) { - stream, cleanUp, cancel := setUpHTTPStatusTest(t, httpStatus, wh) - defer cancel() + stream, cleanUp := setUpHTTPStatusTest(t, httpStatus, wh) defer cleanUp() want := httpStatusConvTab[httpStatus] buf := make([]byte, 8) @@ -2103,8 +2101,7 @@ func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders) } func TestHTTPStatusOKAndMissingGRPCStatus(t *testing.T) { - stream, cleanUp, cancel := setUpHTTPStatusTest(t, http.StatusOK, writeOneHeader) - defer cancel() + stream, cleanUp := setUpHTTPStatusTest(t, http.StatusOK, writeOneHeader) defer cleanUp() buf := make([]byte, 8) _, err := stream.Read(buf) diff --git a/status/status.go b/status/status.go index 9c61b094..897321ba 100644 --- a/status/status.go +++ b/status/status.go @@ -126,7 +126,9 @@ func FromError(err error) (s *Status, ok bool) { if err == nil { return &Status{s: &spb.Status{Code: int32(codes.OK)}}, true } - if se, ok := err.(interface{ GRPCStatus() *Status }); ok { + if se, ok := err.(interface { + GRPCStatus() *Status + }); ok { return se.GRPCStatus(), true } return New(codes.Unknown, err.Error()), false @@ -182,7 +184,9 @@ func Code(err error) codes.Code { if err == nil { return codes.OK } - if se, ok := err.(interface{ GRPCStatus() *Status }); ok { + if se, ok := err.(interface { + GRPCStatus() *Status + }); ok { return se.GRPCStatus().Code() } return codes.Unknown