Fix onclose state transition (#2346)
internal: fix onClose state transitions When onClose occurs during WaitForHandshake, it should immediately exit createTransport instead of leaving CONNECTING and entering READY. Furthermore, when any onClose happens, the state should change to TRANSIENT FAILURE. Fixes #2340 Fixes #2341 Also fixes an unreported bug in which entering READY causes a Dial call to end prematurely, instead of blocking until a READY transport is found.
This commit is contained in:
Родитель
0361d80ffd
Коммит
8d75951f9b
|
@ -643,11 +643,9 @@ func (cc *ClientConn) incrCallsFailed() {
|
|||
atomic.AddInt64(&cc.czData.callsFailed, 1)
|
||||
}
|
||||
|
||||
// connect starts to creating transport and also starts the transport monitor
|
||||
// goroutine for this ac.
|
||||
// connect starts creating a transport.
|
||||
// It does nothing if the ac is not IDLE.
|
||||
// TODO(bar) Move this to the addrConn section.
|
||||
// This was part of resetAddrConn, keep it here to make the diff look clean.
|
||||
func (ac *addrConn) connect() error {
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
|
@ -963,6 +961,23 @@ func (ac *addrConn) resetTransport(resolveNow bool) {
|
|||
ac.mu.Unlock()
|
||||
}
|
||||
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// If the connection is READY, a failure must have occurred.
|
||||
// Otherwise, we'll consider this is a transient failure when:
|
||||
// We've exhausted all addresses
|
||||
// We're in CONNECTING
|
||||
// And it's not the very first addr to try TODO(deklerk) find a better way to do this than checking ac.successfulHandshake
|
||||
if ac.state == connectivity.Ready || (ac.addrIdx == len(ac.addrs)-1 && ac.state == connectivity.Connecting && !ac.successfulHandshake) {
|
||||
ac.updateConnectivityState(connectivity.TransientFailure)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
|
||||
if err := ac.nextAddr(); err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -1109,6 +1124,8 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
|
|||
// We got the preface - huzzah! things are good.
|
||||
case <-onCloseCalled:
|
||||
// The transport has already closed - noop.
|
||||
close(allowedToReset)
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
go func() {
|
||||
|
@ -1213,8 +1230,6 @@ func (ac *addrConn) nextAddr() error {
|
|||
ac.mu.Unlock()
|
||||
return errConnClosing
|
||||
}
|
||||
ac.updateConnectivityState(connectivity.TransientFailure)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
ac.cc.resolveNow(resolver.ResolveNowOption{})
|
||||
if ac.ready != nil {
|
||||
close(ac.ready)
|
||||
|
|
|
@ -134,11 +134,11 @@ func TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) {
|
|||
|
||||
func TestDialWaitsForServerSettings(t *testing.T) {
|
||||
defer leakcheck.Check(t)
|
||||
server, err := net.Listen("tcp", "localhost:0")
|
||||
lis, err := net.Listen("tcp", "localhost:0")
|
||||
if err != nil {
|
||||
t.Fatalf("Error while listening. Err: %v", err)
|
||||
}
|
||||
defer server.Close()
|
||||
defer lis.Close()
|
||||
done := make(chan struct{})
|
||||
sent := make(chan struct{})
|
||||
dialDone := make(chan struct{})
|
||||
|
@ -146,7 +146,7 @@ func TestDialWaitsForServerSettings(t *testing.T) {
|
|||
defer func() {
|
||||
close(done)
|
||||
}()
|
||||
conn, err := server.Accept()
|
||||
conn, err := lis.Accept()
|
||||
if err != nil {
|
||||
t.Errorf("Error while accepting. Err: %v", err)
|
||||
return
|
||||
|
@ -165,7 +165,7 @@ func TestDialWaitsForServerSettings(t *testing.T) {
|
|||
}()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
client, err := DialContext(ctx, server.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock())
|
||||
client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock())
|
||||
close(dialDone)
|
||||
if err != nil {
|
||||
t.Fatalf("Error while dialing. Err: %v", err)
|
||||
|
@ -182,7 +182,7 @@ func TestDialWaitsForServerSettings(t *testing.T) {
|
|||
|
||||
func TestDialWaitsForServerSettingsAndFails(t *testing.T) {
|
||||
defer leakcheck.Check(t)
|
||||
server, err := net.Listen("tcp", "localhost:0")
|
||||
lis, err := net.Listen("tcp", "localhost:0")
|
||||
if err != nil {
|
||||
t.Fatalf("Error while listening. Err: %v", err)
|
||||
}
|
||||
|
@ -193,7 +193,7 @@ func TestDialWaitsForServerSettingsAndFails(t *testing.T) {
|
|||
close(done)
|
||||
}()
|
||||
for {
|
||||
conn, err := server.Accept()
|
||||
conn, err := lis.Accept()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
@ -204,8 +204,8 @@ func TestDialWaitsForServerSettingsAndFails(t *testing.T) {
|
|||
getMinConnectTimeout = func() time.Duration { return time.Second / 2 }
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
client, err := DialContext(ctx, server.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock())
|
||||
server.Close()
|
||||
client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock())
|
||||
lis.Close()
|
||||
if err == nil {
|
||||
client.Close()
|
||||
t.Fatalf("Unexpected success (err=nil) while dialing")
|
||||
|
@ -305,17 +305,17 @@ func TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) {
|
|||
|
||||
func TestBackoffWhenNoServerPrefaceReceived(t *testing.T) {
|
||||
defer leakcheck.Check(t)
|
||||
server, err := net.Listen("tcp", "localhost:0")
|
||||
lis, err := net.Listen("tcp", "localhost:0")
|
||||
if err != nil {
|
||||
t.Fatalf("Error while listening. Err: %v", err)
|
||||
}
|
||||
defer server.Close()
|
||||
defer lis.Close()
|
||||
done := make(chan struct{})
|
||||
go func() { // Launch the server.
|
||||
defer func() {
|
||||
close(done)
|
||||
}()
|
||||
conn, err := server.Accept() // Accept the connection only to close it immediately.
|
||||
conn, err := lis.Accept() // Accept the connection only to close it immediately.
|
||||
if err != nil {
|
||||
t.Errorf("Error while accepting. Err: %v", err)
|
||||
return
|
||||
|
@ -325,7 +325,7 @@ func TestBackoffWhenNoServerPrefaceReceived(t *testing.T) {
|
|||
var prevDuration time.Duration
|
||||
// Make sure the retry attempts are backed off properly.
|
||||
for i := 0; i < 3; i++ {
|
||||
conn, err := server.Accept()
|
||||
conn, err := lis.Accept()
|
||||
if err != nil {
|
||||
t.Errorf("Error while accepting. Err: %v", err)
|
||||
return
|
||||
|
@ -341,7 +341,7 @@ func TestBackoffWhenNoServerPrefaceReceived(t *testing.T) {
|
|||
prevAt = meow
|
||||
}
|
||||
}()
|
||||
client, err := Dial(server.Addr().String(), WithInsecure())
|
||||
client, err := Dial(lis.Addr().String(), WithInsecure())
|
||||
if err != nil {
|
||||
t.Fatalf("Error while dialing. Err: %v", err)
|
||||
}
|
||||
|
@ -820,3 +820,150 @@ func TestBackoffCancel(t *testing.T) {
|
|||
cc.Close()
|
||||
// Should not leak. May need -count 5000 to exercise.
|
||||
}
|
||||
|
||||
// CONNECTING -> READY -> TRANSIENT FAILURE -> CONNECTING -> TRANSIENT FAILURE -> CONNECTING -> TRANSIENT FAILURE
|
||||
//
|
||||
// Note: csmgr (which drives GetState and WaitForStateChange) lags behind reality a bit because state updates go
|
||||
// through the balancer. So, we are somewhat overaggressive in using WaitForStateChange in this test in order to force
|
||||
// it to keep up.
|
||||
//
|
||||
// TODO(deklerk) Rewrite this test with a custom balancer that records state transitions. This will mean we can get
|
||||
// rid of all this synchronization and realtime state-checking, and instead just check the state transitions at
|
||||
// once after all the activity has happened.
|
||||
func TestDialCloseStateTransition(t *testing.T) {
|
||||
defer leakcheck.Check(t)
|
||||
|
||||
lis, err := net.Listen("tcp", "localhost:0")
|
||||
if err != nil {
|
||||
t.Fatalf("Error while listening. Err: %v", err)
|
||||
}
|
||||
defer lis.Close()
|
||||
testFinished := make(chan struct{})
|
||||
backoffCaseReady := make(chan struct{})
|
||||
killFirstConnection := make(chan struct{})
|
||||
killSecondConnection := make(chan struct{})
|
||||
|
||||
// Launch the server.
|
||||
go func() {
|
||||
// Establish a successful connection so that we enter READY. We need to get
|
||||
// to READY so that we can get a client back for us to introspect later (as
|
||||
// opposed to just CONNECTING).
|
||||
conn, err := lis.Accept()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
framer := http2.NewFramer(conn, conn)
|
||||
if err := framer.WriteSettings(http2.Setting{}); err != nil {
|
||||
t.Errorf("Error while writing settings frame. %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-testFinished:
|
||||
return
|
||||
case <-killFirstConnection:
|
||||
}
|
||||
|
||||
// Close the conn to cause onShutdown, causing us to enter TRANSIENT FAILURE. Note that we are not in
|
||||
// WaitForHandshake at this point because the preface was sent successfully.
|
||||
conn.Close()
|
||||
|
||||
// We have to re-accept and re-close the connection because the first re-connect after a successful handshake
|
||||
// has no backoff. So, we need to get to the second re-connect after the successful handshake for our infinite
|
||||
// backoff to happen.
|
||||
conn, err = lis.Accept()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
err = conn.Close()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// The client should now be headed towards backoff.
|
||||
close(backoffCaseReady)
|
||||
|
||||
// Re-connect (without server preface).
|
||||
conn, err = lis.Accept()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
// Close the conn to cause onShutdown, causing us to enter TRANSIENT FAILURE. Note that we are in
|
||||
// WaitForHandshake at this point because the preface has not been sent yet.
|
||||
select {
|
||||
case <-testFinished:
|
||||
return
|
||||
case <-killSecondConnection:
|
||||
}
|
||||
conn.Close()
|
||||
}()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock(), withBackoff(backoffForever{}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
// It should start in READY because the server sends the server preface.
|
||||
if got, want := client.GetState(), connectivity.Ready; got != want {
|
||||
t.Fatalf("expected addrconn state to be %v, was %v", want, got)
|
||||
}
|
||||
|
||||
// Once the connection is killed, it should go:
|
||||
// READY -> TRANSIENT FAILURE (no backoff) -> CONNECTING -> TRANSIENT FAILURE (infinite backoff)
|
||||
// The first TRANSIENT FAILURE is triggered by closing a channel. Then, we wait for the server to let us know
|
||||
// when the client has progressed past the first failure (which does not get backoff, because handshake was
|
||||
// successful).
|
||||
close(killFirstConnection)
|
||||
if !client.WaitForStateChange(ctx, connectivity.Ready) {
|
||||
t.Fatal("expected WaitForStateChange to change state, but it timed out")
|
||||
}
|
||||
if !client.WaitForStateChange(ctx, connectivity.TransientFailure) {
|
||||
t.Fatal("expected WaitForStateChange to change state, but it timed out")
|
||||
}
|
||||
<-backoffCaseReady
|
||||
if !client.WaitForStateChange(ctx, connectivity.Connecting) {
|
||||
t.Fatal("expected WaitForStateChange to change state, but it timed out")
|
||||
}
|
||||
if got, want := client.GetState(), connectivity.TransientFailure; got != want {
|
||||
t.Fatalf("expected addrconn state to be %v, was %v", want, got)
|
||||
}
|
||||
|
||||
// Stop backing off, allowing a re-connect. Note: this races with the client actually getting to the backoff,
|
||||
// so continually reset backoff until we notice the state change.
|
||||
for i := 0; i < 100; i++ {
|
||||
client.ResetConnectBackoff()
|
||||
cctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
|
||||
defer cancel()
|
||||
if client.WaitForStateChange(cctx, connectivity.TransientFailure) {
|
||||
break
|
||||
}
|
||||
}
|
||||
if got, want := client.GetState(), connectivity.Connecting; got != want {
|
||||
t.Fatalf("expected addrconn state to be %v, was %v", want, got)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-testFinished:
|
||||
case killSecondConnection <- struct{}{}:
|
||||
}
|
||||
|
||||
// The connection should be killed shortly by the above goroutine, and here we watch for the first new connectivity
|
||||
// state and make sure it's TRANSIENT FAILURE. This is racy, but fairly accurate - expect it to catch failures
|
||||
// 90% of the time or so.
|
||||
if !client.WaitForStateChange(ctx, connectivity.Connecting) {
|
||||
t.Fatal("expected WaitForStateChange to change state, but it timed out")
|
||||
}
|
||||
if got, want := client.GetState(), connectivity.TransientFailure; got != want {
|
||||
t.Fatalf("expected addrconn state to be %v, was %v", want, got)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1342,8 +1342,8 @@ func TestCZSubChannelTraceCreationDeletion(t *testing.T) {
|
|||
if len(scm.Trace.Events) == 0 {
|
||||
return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
|
||||
}
|
||||
if scm.Trace.Events[len(scm.Trace.Events)-1].Desc != "Subchannel Deleted" {
|
||||
return false, fmt.Errorf("the first trace event should be \"Subchannel Deleted\", not %q", scm.Trace.Events[0].Desc)
|
||||
if got, want := scm.Trace.Events[len(scm.Trace.Events)-1].Desc, "Subchannel Deleted"; got != want {
|
||||
return false, fmt.Errorf("the last trace event should be %q, not %q", want, got)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
|
|
Загрузка…
Ссылка в новой задаче