internal: fix race between cancel and shutdown
This fixes a race in ac.tearDown and ac.resetTransport. If ac.resetTransport is in backoff when ac.tearDown occurs, there's a race between the state changing to Shutdown and ac.resetTransport calling ac.createTransport. This fixes it by returning when ac.resetTransport encounters an error during ac.nextAddr (specifically ac.ctx.Error()). It also fixes it by making sure that ac.tearDown changes state to Shutdown before canceling the context. Both fixes were implemented because they both seem to be valuable standalone additions: the former makes ac.resetTransport more understandable and less dependent on behavior happening elsewhere, and the latter makes ac.tearDown more correct. Finally, TestDialParseTargetUnknownScheme had its buffer removed; the buffer was likely added a while ago to assuage this issue. It should not be necessary anymore.
This commit is contained in:
Родитель
8997b5fa08
Коммит
82f263dc2f
|
@ -955,7 +955,7 @@ func (ac *addrConn) resetTransport(resolveNow bool) {
|
|||
}
|
||||
|
||||
if err := ac.nextAddr(); err != nil {
|
||||
grpclog.Warningf("resetTransport: error from nextAddr: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
ac.mu.Lock()
|
||||
|
@ -1176,6 +1176,8 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
|
|||
// nextAddr increments the addrIdx if there are more addresses to try. If
|
||||
// there are no more addrs to try it will re-resolve, set addrIdx to 0, and
|
||||
// increment the backoffIdx.
|
||||
//
|
||||
// nextAddr must be called without ac.mu being held.
|
||||
func (ac *addrConn) nextAddr() error {
|
||||
ac.mu.Lock()
|
||||
|
||||
|
@ -1259,15 +1261,15 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
|
|||
// tight loop.
|
||||
// tearDown doesn't remove ac from ac.cc.conns.
|
||||
func (ac *addrConn) tearDown(err error) {
|
||||
ac.cancel()
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// We have to set the state to Shutdown before we call ac.transports.GracefulClose, because it signals to
|
||||
// onClose not to try reconnecting the transport.
|
||||
// We have to set the state to Shutdown before anything else to prevent races
|
||||
// between setting the state and logic that waits on context cancelation / etc.
|
||||
ac.updateConnectivityState(connectivity.Shutdown)
|
||||
ac.cancel()
|
||||
ac.tearDownErr = err
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
ac.curAddr = resolver.Address{}
|
||||
|
|
|
@ -20,6 +20,7 @@ package grpc
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
|
@ -766,3 +767,18 @@ func TestResetConnectBackoff(t *testing.T) {
|
|||
t.Fatal("Failed to call dial within 10s after resetting backoff")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackoffCancel(t *testing.T) {
|
||||
defer leakcheck.Check(t)
|
||||
dialStrCh := make(chan string)
|
||||
cc, err := Dial("any", WithInsecure(), WithDialer(func(t string, _ time.Duration) (net.Conn, error) {
|
||||
dialStrCh <- t
|
||||
return nil, fmt.Errorf("test dialer, always error")
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create ClientConn: %v", err)
|
||||
}
|
||||
<-dialStrCh
|
||||
cc.Close()
|
||||
// Should not leak. May need -count 5000 to exercise.
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче