internal: remove transportMonitor, replace with callbacks (#2219)

This commit is contained in:
Jean de Klerk 2018-07-31 14:10:13 -07:00 коммит произвёл GitHub
Родитель fb6867e42b
Коммит 97da9e087c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 321 добавлений и 288 удалений

Просмотреть файл

@ -532,9 +532,10 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi
// Caller needs to make sure len(addrs) > 0.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
ac := &addrConn{
cc: cc,
addrs: addrs,
dopts: cc.dopts,
cc: cc,
addrs: addrs,
dopts: cc.dopts,
successfulHandshake: true, // make the first nextAddr() call _not_ move addrIdx up by 1
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Track ac in cc. This needs to be done before any getTransport(...) is called.
@ -626,17 +627,7 @@ func (ac *addrConn) connect() error {
ac.mu.Unlock()
// Start a goroutine connecting to the server asynchronously.
go func() {
if err := ac.resetTransport(); err != nil {
grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
ac.transportMonitor()
}()
go ac.resetTransport(false)
return nil
}
@ -665,7 +656,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
if curAddrFound {
ac.addrs = addrs
ac.reconnectIdx = 0 // Start reconnecting from beginning in the new list.
ac.addrIdx = 0 // Start reconnecting from beginning in the new list.
}
return curAddrFound
@ -796,27 +787,28 @@ type addrConn struct {
ctx context.Context
cancel context.CancelFunc
mu sync.Mutex
cc *ClientConn
addrs []resolver.Address
dopts dialOptions
events trace.EventLog
acbw balancer.SubConn
mu sync.Mutex
curAddr resolver.Address
reconnectIdx int // The index in addrs list to start reconnecting from.
state connectivity.State
transport transport.ClientTransport // The current transport.
addrIdx int // The index in addrs list to start reconnecting from.
curAddr resolver.Address // The current address.
addrs []resolver.Address // All addresses that the resolver resolved to.
state connectivity.State
// ready is closed and becomes nil when a new transport is up or failed
// due to timeout.
ready chan struct{}
transport transport.ClientTransport
ready chan struct{}
// The reason this addrConn is torn down.
tearDownErr error
tearDownErr error // The reason this addrConn is torn down.
connectRetryNum int
backoffIdx int
// backoffDeadline is the time until which resetTransport needs to
// wait before increasing connectRetryNum count.
// wait before increasing backoffIdx count.
backoffDeadline time.Time
// connectDeadline is the time by which all connection
// negotiations must complete.
@ -828,6 +820,8 @@ type addrConn struct {
callsSucceeded int64
callsFailed int64
lastCallStartedTime time.Time
successfulHandshake bool
}
// adjustParams updates parameters used to create transports upon
@ -860,176 +854,268 @@ func (ac *addrConn) errorf(format string, a ...interface{}) {
}
}
// resetTransport recreates a transport to the address for ac. The old
// transport will close itself on error or when the clientconn is closed.
// The created transport must receive initial settings frame from the server.
// In case that doesn't happen, transportMonitor will kill the newly created
// transport after connectDeadline has expired.
// In case there was an error on the transport before the settings frame was
// received, resetTransport resumes connecting to backends after the one that
// was previously connected to. In case end of the list is reached, resetTransport
// backs off until the original deadline.
// If the DialOption WithWaitForHandshake was set, resetTrasport returns
// successfully only after server settings are received.
// resetTransport makes sure that a healthy ac.transport exists.
//
// TODO(bar) make sure all state transitions are valid.
func (ac *addrConn) resetTransport() error {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
ac.transport = nil
ridx := ac.reconnectIdx
ac.mu.Unlock()
ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock()
var backoffDeadline, connectDeadline time.Time
for connectRetryNum := 0; ; connectRetryNum++ {
ac.mu.Lock()
if ac.backoffDeadline.IsZero() {
// This means either a successful HTTP2 connection was established
// or this is the first time this addrConn is trying to establish a
// connection.
backoffFor := ac.dopts.bs.Backoff(connectRetryNum) // time.Duration.
// This will be the duration that dial gets to finish.
dialDuration := getMinConnectTimeout()
if backoffFor > dialDuration {
// Give dial more time as we keep failing to connect.
dialDuration = backoffFor
}
start := time.Now()
backoffDeadline = start.Add(backoffFor)
connectDeadline = start.Add(dialDuration)
ridx = 0 // Start connecting from the beginning.
} else {
// Continue trying to connect with the same deadlines.
connectRetryNum = ac.connectRetryNum
backoffDeadline = ac.backoffDeadline
connectDeadline = ac.connectDeadline
ac.backoffDeadline = time.Time{}
ac.connectDeadline = time.Time{}
ac.connectRetryNum = 0
// The transport will close itself when it encounters an error, or on GOAWAY, or on deadline waiting for handshake, or
// when the clientconn is closed. Each iteration creating a new transport will try a different address that the balancer
// assigned to the addrConn, until it has tried all addresses. Once it has tried all addresses, it will re-resolve to
// get a new address list. If an error is received, the list is re-resolved and the next reset attempt will try from the
// beginning. This method has backoff built in. The backoff amount starts at 0 and increases each time resolution occurs
// (addresses are exhausted). The backoff amount is reset to 0 each time a handshake is received.
//
// If the DialOption WithWaitForHandshake was set, resetTransport returns successfully only after handshake is received.
func (ac *addrConn) resetTransport(resolveNow bool) {
for {
// If this is the first in a line of resets, we want to resolve immediately. The only other time we
// want to reset is if we have tried all the addresses handed to us.
if resolveNow {
ac.mu.Lock()
ac.cc.resolveNow(resolver.ResolveNowOption{})
ac.mu.Unlock()
}
if err := ac.nextAddr(); err != nil {
grpclog.Warningf("resetTransport: error from nextAddr: %v", err)
}
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
return
}
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
ac.transport = nil
backoffIdx := ac.backoffIdx
backoffFor := ac.dopts.bs.Backoff(backoffIdx)
// This will be the duration that dial gets to finish.
dialDuration := getMinConnectTimeout()
if backoffFor > dialDuration {
// Give dial more time as we keep failing to connect.
dialDuration = backoffFor
}
start := time.Now()
connectDeadline := start.Add(dialDuration)
ac.backoffDeadline = start.Add(backoffFor)
ac.connectDeadline = connectDeadline
ac.mu.Unlock()
ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock()
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
ac.printf("connecting")
if ac.state != connectivity.Connecting {
ac.state = connectivity.Connecting
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
// copy ac.addrs in case of race
addrsIter := make([]resolver.Address, len(ac.addrs))
copy(addrsIter, ac.addrs)
addr := ac.addrs[ac.addrIdx]
copts := ac.dopts.copts
ac.mu.Unlock()
connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts)
if err != nil {
return err
}
if connected {
return nil
if err := ac.createTransport(backoffIdx, addr, copts, connectDeadline); err != nil {
// errReadTimeOut indicates that the handshake was not received before
// the deadline. We exit here because the transport's reader goroutine will
// use onClose to reset the transport.
if err == errReadTimedOut {
return
}
continue
}
return
}
}
var errReadTimedOut = errors.New("read timed out")
// createTransport creates a connection to one of the backends in addrs.
// It returns true if a connection was established.
func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error) {
for i := ridx; i < len(addrs); i++ {
addr := addrs[i]
target := transport.TargetInfo{
Addr: addr.Addr,
Metadata: addr.Metadata,
Authority: ac.cc.authority,
}
done := make(chan struct{})
onPrefaceReceipt := func() {
func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
skipReset := make(chan struct{})
allowedToReset := make(chan struct{})
prefaceReceived := make(chan struct{})
onCloseCalled := make(chan struct{})
onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
ac.mu.Unlock()
go ac.resetTransport(false)
}
onClose := func() {
close(onCloseCalled)
select {
case <-skipReset: // The outer resetTransport loop will handle reconnection.
return
case <-allowedToReset: // We're in the clear to reset.
ac.mu.Lock()
close(done)
if !ac.backoffDeadline.IsZero() {
// If we haven't already started reconnecting to
// other backends.
// Note, this can happen when writer notices an error
// and triggers resetTransport while at the same time
// reader receives the preface and invokes this closure.
ac.backoffDeadline = time.Time{}
ac.connectDeadline = time.Time{}
ac.connectRetryNum = 0
}
ac.transport = nil
ac.mu.Unlock()
ac.resetTransport(false)
}
// Do not cancel in the success path because of
// this issue in Go1.6: https://github.com/golang/go/issues/15078.
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
if channelz.IsOn() {
copts.ChannelzParentID = ac.channelzID
}
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt)
if err != nil {
cancel()
ac.cc.blockingpicker.updateConnectionError(err)
}
target := transport.TargetInfo{
Addr: addr.Addr,
Metadata: addr.Metadata,
Authority: ac.cc.authority,
}
onPrefaceReceipt := func() {
close(prefaceReceived)
// TODO(deklerk): optimization; does anyone else actually use this lock? maybe we can just remove it for this scope
ac.mu.Lock()
ac.successfulHandshake = true
ac.backoffDeadline = time.Time{}
ac.connectDeadline = time.Time{}
ac.addrIdx = 0
ac.backoffIdx = 0
ac.mu.Unlock()
}
// Do not cancel in the success path because of this issue in Go1.6: https://github.com/golang/go/issues/15078.
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
if channelz.IsOn() {
copts.ChannelzParentID = ac.channelzID
}
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
if err == nil {
prefaceTimer := time.AfterFunc(connectDeadline.Sub(time.Now()), func() {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
select {
case <-onCloseCalled:
// The transport has already closed - noop.
ac.mu.Unlock()
return false, errConnClosing
case <-prefaceReceived:
// We got the preface just in the nick of time - huzzah!
ac.mu.Unlock()
default:
// We didn't get the preface in time.
ac.mu.Unlock()
newTr.Close()
}
ac.mu.Unlock()
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
continue
}
})
if ac.dopts.waitForHandshake {
select {
case <-done:
case <-connectCtx.Done():
// Didn't receive server preface, must kill this new transport now.
grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.")
case <-prefaceTimer.C:
// We want to close but _not_ reset, because we're going into the transient-failure-and-return flow
// and go into the next cycle of the resetTransport loop.
close(skipReset)
newTr.Close()
continue
case <-ac.ctx.Done():
err = errors.New("timed out")
case <-prefaceReceived:
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
}
}
}
if err != nil {
// newTr is either nil, or closed.
cancel()
ac.cc.blockingpicker.updateConnectionError(err)
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
// ac.tearDonn(...) has been invoked.
newTr.Close()
return false, errConnClosing
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
// in resetTransport take care of reconnecting.
close(skipReset)
return errConnClosing
}
ac.printf("ready")
ac.state = connectivity.Ready
ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.transport = newTr
ac.curAddr = addr
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
select {
case <-done:
// If the server has responded back with preface already,
// don't set the reconnect parameters.
default:
ac.connectRetryNum = connectRetryNum
ac.backoffDeadline = backoffDeadline
ac.connectDeadline = connectDeadline
ac.reconnectIdx = i + 1 // Start reconnecting from the next backend in the list.
}
ac.mu.Unlock()
return true, nil
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
// in resetTransport take care of reconnecting.
close(skipReset)
return err
}
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return false, errConnClosing
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
// in resetTransport take care of reconnecting.
close(skipReset)
newTr.Close()
return errConnClosing
}
ac.printf("ready")
ac.state = connectivity.Ready
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.transport = newTr
ac.curAddr = addr
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
ac.mu.Unlock()
// Ok, _now_ we will finally let the transport reset if it encounters a closable error. Without this, the reader
// goroutine failing races with all the code in this method that sets the connection to "ready".
close(allowedToReset)
return nil
}
// nextAddr increments the addrIdx if there are more addresses to try. If
// there are no more addrs to try it will re-resolve, set addrIdx to 0, and
// increment the backoffIdx.
func (ac *addrConn) nextAddr() error {
ac.mu.Lock()
// If a handshake has been observed, we expect the counters to have manually
// been reset so we'll just return, since we want the next usage to start
// at index 0.
if ac.successfulHandshake {
ac.successfulHandshake = false
ac.mu.Unlock()
return nil
}
if ac.addrIdx < len(ac.addrs)-1 {
ac.addrIdx++
ac.mu.Unlock()
return nil
}
ac.addrIdx = 0
ac.backoffIdx++
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
@ -1038,95 +1124,16 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline,
close(ac.ready)
ac.ready = nil
}
backoffDeadline := ac.backoffDeadline
ac.mu.Unlock()
timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
select {
case <-timer.C:
case <-ac.ctx.Done():
timer.Stop()
return false, ac.ctx.Err()
}
return false, nil
}
// Run in a goroutine to track the error in transport and create the
// new transport if an error happens. It returns when the channel is closing.
func (ac *addrConn) transportMonitor() {
for {
var timer *time.Timer
var cdeadline <-chan time.Time
ac.mu.Lock()
t := ac.transport
if !ac.connectDeadline.IsZero() {
timer = time.NewTimer(ac.connectDeadline.Sub(time.Now()))
cdeadline = timer.C
}
ac.mu.Unlock()
// Block until we receive a goaway or an error occurs.
select {
case <-t.GoAway():
done := t.Error()
cleanup := t.Close
// Since this transport will be orphaned (won't have a transportMonitor)
// we need to launch a goroutine to keep track of clientConn.Close()
// happening since it might not be noticed by any other goroutine for a while.
go func() {
<-done
cleanup()
}()
case <-t.Error():
// In case this is triggered because clientConn.Close()
// was called, we want to immeditately close the transport
// since no other goroutine might notice it for a while.
t.Close()
case <-cdeadline:
ac.mu.Lock()
// This implies that client received server preface.
if ac.backoffDeadline.IsZero() {
ac.mu.Unlock()
continue
}
ac.mu.Unlock()
timer = nil
// No server preface received until deadline.
// Kill the connection.
grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.")
t.Close()
}
if timer != nil {
timer.Stop()
}
// If a GoAway happened, regardless of error, adjust our keepalive
// parameters as appropriate.
select {
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
default:
}
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
// Set connectivity state to TransientFailure before calling
// resetTransport. Transition READY->CONNECTING is not valid.
ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.cc.resolveNow(resolver.ResolveNowOption{})
ac.curAddr = resolver.Address{}
ac.mu.Unlock()
if err := ac.resetTransport(); err != nil {
ac.mu.Lock()
ac.printf("transport exiting: %v", err)
ac.mu.Unlock()
grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
return ac.ctx.Err()
}
return nil
}
// getReadyTransport returns the transport if ac's state is READY.
@ -1134,7 +1141,7 @@ func (ac *addrConn) transportMonitor() {
// If ac's state is IDLE, it will trigger ac to connect.
func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
ac.mu.Lock()
if ac.state == connectivity.Ready {
if ac.state == connectivity.Ready && ac.transport != nil {
t := ac.transport
ac.mu.Unlock()
return t, true
@ -1159,21 +1166,26 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
func (ac *addrConn) tearDown(err error) {
ac.cancel()
ac.mu.Lock()
defer ac.mu.Unlock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
// We have to set the state to Shutdown before we call ac.transports.GracefulClose, because it signals to
// onClose not to try reconnecting the transport.
ac.state = connectivity.Shutdown
ac.tearDownErr = err
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.curAddr = resolver.Address{}
if err == errConnDrain && ac.transport != nil {
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
// ii) there are concurrent name resolver/Balancer triggered
// address removal and GoAway.
// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
ac.mu.Unlock()
ac.transport.GracefulClose()
ac.mu.Lock()
}
ac.state = connectivity.Shutdown
ac.tearDownErr = err
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
if ac.events != nil {
ac.events.Finish()
ac.events = nil
@ -1185,6 +1197,7 @@ func (ac *addrConn) tearDown(err error) {
if channelz.IsOn() {
channelz.RemoveEntry(ac.channelzID)
}
ac.mu.Unlock()
}
func (ac *addrConn) getState() connectivity.State {

Просмотреть файл

@ -124,6 +124,9 @@ type http2Client struct {
msgRecv int64
lastMsgSent time.Time
lastMsgRecv time.Time
onGoAway func(GoAwayReason)
onClose func()
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
@ -152,7 +155,7 @@ func isTemporary(err error) bool {
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ *http2Client, err error) {
func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
scheme := "http"
ctx, cancel := context.WithCancel(ctx)
defer func() {
@ -234,6 +237,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
onGoAway: onGoAway,
onClose: func() {},
}
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
@ -266,10 +271,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
t.keepaliveEnabled = true
go t.keepalive()
}
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
// dispatches the frame to the corresponding stream entity.
go t.reader()
// Send connection preface to server.
n, err := t.conn.Write(clientPreface)
if err != nil {
@ -306,6 +307,15 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
}
}
// We only assign onClose after we're sure there can not be any more t.Close calls in this goroutine, because
// onClose may (frequently does) block.
t.onClose = onClose
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
// dispatches the frame to the corresponding stream entity.
go t.reader()
t.framer.writer.Flush()
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
@ -735,6 +745,10 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// Close kicks off the shutdown process of the transport. This should be called
// only once on a transport. Once it is called, the transport should not be
// accessed any more.
//
// This method blocks until the addrConn that initiated this transport is
// re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected.
func (t *http2Client) Close() error {
t.mu.Lock()
// Make sure we only Close once.
@ -762,6 +776,7 @@ func (t *http2Client) Close() error {
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
t.onClose()
return err
}
@ -1058,6 +1073,9 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
close(t.goAway)
t.state = draining
t.controlBuf.put(&incomingGoAway{})
// This has to be a new goroutine because we're still using the current goroutine to read in the transport.
t.onGoAway(t.goAwayReason)
}
// All streams with IDs greater than the GoAwayId
// and smaller than the previous GoAway ID should be killed.
@ -1174,15 +1192,16 @@ func (t *http2Client) reader() {
// Check the validity of server preface.
frame, err := t.framer.fr.ReadFrame()
if err != nil {
t.Close()
t.Close() // this kicks off resetTransport, so must be last before return
return
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.Close()
t.Close() // this kicks off resetTransport, so must be last before return
return
}
t.onSuccess()

Просмотреть файл

@ -495,8 +495,8 @@ type TargetInfo struct {
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess)
func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess, onGoAway, onClose)
}
// Options provides additional hints and information for message

Просмотреть файл

@ -397,17 +397,17 @@ func setUpServerOnly(t *testing.T, port int, serverConfig *ServerConfig, ht hTyp
}
func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, *http2Client, func()) {
return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{}, func() {})
return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{})
}
func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions, onHandshake func()) (*server, *http2Client, func()) {
func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions) (*server, *http2Client, func()) {
server := setUpServerOnly(t, port, serverConfig, ht)
addr := "localhost:" + server.port
target := TargetInfo{
Addr: addr,
}
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
ct, connErr := NewClientTransport(connectCtx, context.Background(), target, copts, onHandshake)
ct, connErr := NewClientTransport(connectCtx, context.Background(), target, copts, func() {}, func(GoAwayReason) {}, func() {})
if connErr != nil {
cancel() // Do not cancel in success path.
t.Fatalf("failed to create transport: %v", connErr)
@ -432,7 +432,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
done <- conn
}()
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
tr, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, func() {})
tr, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, func() {}, func(GoAwayReason) {}, func() {})
if err != nil {
cancel() // Do not cancel in success path.
// Server clean-up.
@ -449,7 +449,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
// sends status error to concurrent stream reader.
func TestInflightStreamClosing(t *testing.T) {
serverConfig := &ServerConfig{}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer cancel()
defer server.stop()
defer client.Close()
@ -492,7 +492,7 @@ func TestMaxConnectionIdle(t *testing.T) {
MaxConnectionIdle: 2 * time.Second,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer cancel()
defer server.stop()
defer client.Close()
@ -520,7 +520,7 @@ func TestMaxConnectionIdleNegative(t *testing.T) {
MaxConnectionIdle: 2 * time.Second,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer cancel()
defer server.stop()
defer client.Close()
@ -547,7 +547,7 @@ func TestMaxConnectionAge(t *testing.T) {
MaxConnectionAge: 2 * time.Second,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer cancel()
defer server.stop()
defer client.Close()
@ -580,7 +580,7 @@ func TestKeepaliveServer(t *testing.T) {
Timeout: 1 * time.Second,
},
}
server, c, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, c, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer cancel()
defer server.stop()
defer c.Close()
@ -624,7 +624,7 @@ func TestKeepaliveServerNegative(t *testing.T) {
Timeout: 1 * time.Second,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer cancel()
defer server.stop()
defer client.Close()
@ -718,7 +718,7 @@ func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: true, // Run keepalive even with no RPCs.
}}, func() {})
}})
defer cancel()
defer s.stop()
defer tr.Close()
@ -745,7 +745,7 @@ func TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) {
PermitWithoutStream: true,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer cancel()
defer server.stop()
defer client.Close()
@ -779,7 +779,7 @@ func TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
Timeout: 1 * time.Second,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer cancel()
defer server.stop()
defer client.Close()
@ -818,7 +818,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) {
PermitWithoutStream: true,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer cancel()
defer server.stop()
defer client.Close()
@ -845,7 +845,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
Timeout: 1 * time.Second,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer cancel()
defer server.stop()
defer client.Close()
@ -994,7 +994,7 @@ func TestLargeMessageWithDelayRead(t *testing.T) {
InitialWindowSize: defaultWindowSize,
InitialConnWindowSize: defaultWindowSize,
}
server, ct, cancel := setUpWithOptions(t, 0, sc, delayRead, co, func() {})
server, ct, cancel := setUpWithOptions(t, 0, sc, delayRead, co)
defer cancel()
defer server.stop()
defer ct.Close()
@ -1083,6 +1083,7 @@ func TestLargeMessageWithDelayRead(t *testing.T) {
func TestGracefulClose(t *testing.T) {
server, ct, cancel := setUp(t, 0, math.MaxUint32, pingpong)
defer cancel()
defer func() {
// Stop the server's listener to make the server's goroutines terminate
// (after the last active stream is done).
@ -1092,7 +1093,6 @@ func TestGracefulClose(t *testing.T) {
leakcheck.Check(t)
// Correctly clean up the server
server.stop()
cancel()
}()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
defer cancel()
@ -1147,8 +1147,8 @@ func TestGracefulClose(t *testing.T) {
}
func TestLargeMessageSuspension(t *testing.T) {
server, ct, cancelsvr := setUp(t, 0, math.MaxUint32, suspended)
defer cancelsvr()
server, ct, cancel := setUp(t, 0, math.MaxUint32, suspended)
defer cancel()
callHdr := &CallHdr{
Host: "localhost",
Method: "foo.Large",
@ -1185,7 +1185,7 @@ func TestMaxStreams(t *testing.T) {
serverConfig := &ServerConfig{
MaxStreams: 1,
}
server, ct, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, ct, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer cancel()
defer ct.Close()
defer server.stop()
@ -1319,7 +1319,7 @@ func TestClientConnDecoupledFromApplicationRead(t *testing.T) {
InitialWindowSize: defaultWindowSize,
InitialConnWindowSize: defaultWindowSize,
}
server, client, cancel := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions, func() {})
server, client, cancel := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions)
defer cancel()
defer server.stop()
defer client.Close()
@ -1406,7 +1406,7 @@ func TestServerConnDecoupledFromApplicationRead(t *testing.T) {
InitialWindowSize: defaultWindowSize,
InitialConnWindowSize: defaultWindowSize,
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer cancel()
defer server.stop()
defer client.Close()
@ -1636,7 +1636,7 @@ func TestClientWithMisbehavedServer(t *testing.T) {
}()
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
defer cancel()
ct, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {})
ct, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}, func(GoAwayReason) {}, func() {})
if err != nil {
t.Fatalf("Error while creating client transport: %v", err)
}
@ -1799,7 +1799,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
InitialWindowSize: wc.clientStream,
InitialConnWindowSize: wc.clientConn,
}
server, client, cancel := setUpWithOptions(t, 0, sc, pingpong, co, func() {})
server, client, cancel := setUpWithOptions(t, 0, sc, pingpong, co)
defer cancel()
defer server.stop()
defer client.Close()
@ -2031,7 +2031,7 @@ func (s *httpServer) cleanUp() {
}
}
func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream *Stream, cleanUp func(), cancel func()) {
func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream *Stream, cleanUp func()) {
var (
err error
lis net.Listener
@ -2063,9 +2063,8 @@ func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream
wh: wh,
}
server.start(t, lis)
var connectCtx context.Context
connectCtx, cancel = context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
client, err = newHTTP2Client(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {})
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
client, err = newHTTP2Client(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}, func(GoAwayReason) {}, func() {})
if err != nil {
cancel() // Do not cancel in success path.
t.Fatalf("Error creating client. Err: %v", err)
@ -2084,8 +2083,7 @@ func TestHTTPToGRPCStatusMapping(t *testing.T) {
}
func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders) {
stream, cleanUp, cancel := setUpHTTPStatusTest(t, httpStatus, wh)
defer cancel()
stream, cleanUp := setUpHTTPStatusTest(t, httpStatus, wh)
defer cleanUp()
want := httpStatusConvTab[httpStatus]
buf := make([]byte, 8)
@ -2103,8 +2101,7 @@ func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders)
}
func TestHTTPStatusOKAndMissingGRPCStatus(t *testing.T) {
stream, cleanUp, cancel := setUpHTTPStatusTest(t, http.StatusOK, writeOneHeader)
defer cancel()
stream, cleanUp := setUpHTTPStatusTest(t, http.StatusOK, writeOneHeader)
defer cleanUp()
buf := make([]byte, 8)
_, err := stream.Read(buf)

Просмотреть файл

@ -126,7 +126,9 @@ func FromError(err error) (s *Status, ok bool) {
if err == nil {
return &Status{s: &spb.Status{Code: int32(codes.OK)}}, true
}
if se, ok := err.(interface{ GRPCStatus() *Status }); ok {
if se, ok := err.(interface {
GRPCStatus() *Status
}); ok {
return se.GRPCStatus(), true
}
return New(codes.Unknown, err.Error()), false
@ -182,7 +184,9 @@ func Code(err error) codes.Code {
if err == nil {
return codes.OK
}
if se, ok := err.(interface{ GRPCStatus() *Status }); ok {
if se, ok := err.(interface {
GRPCStatus() *Status
}); ok {
return se.GRPCStatus().Code()
}
return codes.Unknown