internal: resetTransport connect deadline is across addresses (#2540)
internal: resetTransport connect deadline is across addresses Currently, the connect deadline is recalculated per-address. This PR amends that behavior such that all addresses for a single connection attempt share the same deadline. Fixes #2462
This commit is contained in:
Родитель
1263ed4d2e
Коммит
a402911c6f
|
@ -972,6 +972,14 @@ func (ac *addrConn) resetTransport() {
|
||||||
}
|
}
|
||||||
addrs := ac.addrs
|
addrs := ac.addrs
|
||||||
backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
|
backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
|
||||||
|
|
||||||
|
// This will be the duration that dial gets to finish.
|
||||||
|
dialDuration := getMinConnectTimeout()
|
||||||
|
if dialDuration < backoffFor {
|
||||||
|
// Give dial more time as we keep failing to connect.
|
||||||
|
dialDuration = backoffFor
|
||||||
|
}
|
||||||
|
connectDeadline := time.Now().Add(dialDuration)
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
|
|
||||||
addrLoop:
|
addrLoop:
|
||||||
|
@ -984,17 +992,7 @@ func (ac *addrConn) resetTransport() {
|
||||||
}
|
}
|
||||||
ac.updateConnectivityState(connectivity.Connecting)
|
ac.updateConnectivityState(connectivity.Connecting)
|
||||||
ac.transport = nil
|
ac.transport = nil
|
||||||
ac.mu.Unlock()
|
|
||||||
|
|
||||||
// This will be the duration that dial gets to finish.
|
|
||||||
dialDuration := getMinConnectTimeout()
|
|
||||||
if dialDuration < backoffFor {
|
|
||||||
// Give dial more time as we keep failing to connect.
|
|
||||||
dialDuration = backoffFor
|
|
||||||
}
|
|
||||||
connectDeadline := time.Now().Add(dialDuration)
|
|
||||||
|
|
||||||
ac.mu.Lock()
|
|
||||||
ac.cc.mu.RLock()
|
ac.cc.mu.RLock()
|
||||||
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
||||||
ac.cc.mu.RUnlock()
|
ac.cc.mu.RUnlock()
|
||||||
|
|
|
@ -624,6 +624,92 @@ func (s) TestWithAuthorityAndTLS(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// When creating a transport configured with n addresses, only calculate the
|
||||||
|
// backoff once per "round" of attempts instead of once per address (n times
|
||||||
|
// per "round" of attempts).
|
||||||
|
func (s) TestDial_OneBackoffPerRetryGroup(t *testing.T) {
|
||||||
|
getMinConnectTimeoutBackup := getMinConnectTimeout
|
||||||
|
defer func() {
|
||||||
|
getMinConnectTimeout = getMinConnectTimeoutBackup
|
||||||
|
}()
|
||||||
|
var attempts uint32
|
||||||
|
getMinConnectTimeout = func() time.Duration {
|
||||||
|
if atomic.AddUint32(&attempts, 1) == 1 {
|
||||||
|
// Once all addresses are exhausted, hang around and wait for the
|
||||||
|
// client.Close to happen rather than re-starting a new round of
|
||||||
|
// attempts.
|
||||||
|
return time.Hour
|
||||||
|
}
|
||||||
|
t.Error("only one attempt backoff calculation, but got more")
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
lis1, err := net.Listen("tcp", "localhost:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error while listening. Err: %v", err)
|
||||||
|
}
|
||||||
|
defer lis1.Close()
|
||||||
|
|
||||||
|
lis2, err := net.Listen("tcp", "localhost:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error while listening. Err: %v", err)
|
||||||
|
}
|
||||||
|
defer lis2.Close()
|
||||||
|
|
||||||
|
server1Done := make(chan struct{})
|
||||||
|
server2Done := make(chan struct{})
|
||||||
|
|
||||||
|
// Launch server 1.
|
||||||
|
go func() {
|
||||||
|
conn, err := lis1.Accept()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.Close()
|
||||||
|
close(server1Done)
|
||||||
|
}()
|
||||||
|
// Launch server 2.
|
||||||
|
go func() {
|
||||||
|
conn, err := lis2.Accept()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
conn.Close()
|
||||||
|
close(server2Done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
rb := manual.NewBuilderWithScheme("whatever")
|
||||||
|
rb.InitialAddrs([]resolver.Address{
|
||||||
|
{Addr: lis1.Addr().String()},
|
||||||
|
{Addr: lis2.Addr().String()},
|
||||||
|
})
|
||||||
|
client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
timeout := time.After(15 * time.Second)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-timeout:
|
||||||
|
t.Fatal("timed out waiting for test to finish")
|
||||||
|
case <-server1Done:
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-timeout:
|
||||||
|
t.Fatal("timed out waiting for test to finish")
|
||||||
|
case <-server2Done:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s) TestDialContextCancel(t *testing.T) {
|
func (s) TestDialContextCancel(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
cancel()
|
cancel()
|
||||||
|
|
Загрузка…
Ссылка в новой задаче