cancel outgoing net.Dial when ClientConn is closed
This commit is contained in:
Родитель
2342e38669
Коммит
61f3f61ef0
|
@ -196,7 +196,7 @@ func WithTimeout(d time.Duration) DialOption {
|
|||
}
|
||||
|
||||
// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
|
||||
func WithDialer(f func(addr string, timeout time.Duration) (net.Conn, error)) DialOption {
|
||||
func WithDialer(f func(string, time.Duration, <-chan struct{}) (net.Conn, error)) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.copts.Dialer = f
|
||||
}
|
||||
|
@ -361,11 +361,11 @@ func (cc *ClientConn) lbWatcher() {
|
|||
|
||||
func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
|
||||
ac := &addrConn{
|
||||
cc: cc,
|
||||
addr: addr,
|
||||
dopts: cc.dopts,
|
||||
shutdownChan: make(chan struct{}),
|
||||
cc: cc,
|
||||
addr: addr,
|
||||
dopts: cc.dopts,
|
||||
}
|
||||
ac.dopts.copts.Cancel = make(chan struct{})
|
||||
if EnableTracing {
|
||||
ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
|
||||
}
|
||||
|
@ -468,11 +468,10 @@ func (cc *ClientConn) Close() error {
|
|||
|
||||
// addrConn is a network connection to a given address.
|
||||
type addrConn struct {
|
||||
cc *ClientConn
|
||||
addr Address
|
||||
dopts dialOptions
|
||||
shutdownChan chan struct{}
|
||||
events trace.EventLog
|
||||
cc *ClientConn
|
||||
addr Address
|
||||
dopts dialOptions
|
||||
events trace.EventLog
|
||||
|
||||
mu sync.Mutex
|
||||
state ConnectivityState
|
||||
|
@ -587,7 +586,7 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
|
|||
closeTransport = false
|
||||
select {
|
||||
case <-time.After(sleepTime):
|
||||
case <-ac.shutdownChan:
|
||||
case <-ac.dopts.copts.Cancel:
|
||||
}
|
||||
retries++
|
||||
grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr)
|
||||
|
@ -622,9 +621,9 @@ func (ac *addrConn) transportMonitor() {
|
|||
t := ac.transport
|
||||
ac.mu.Unlock()
|
||||
select {
|
||||
// shutdownChan is needed to detect the teardown when
|
||||
// Cancel is needed to detect the teardown when
|
||||
// the addrConn is idle (i.e., no RPC in flight).
|
||||
case <-ac.shutdownChan:
|
||||
case <-ac.dopts.copts.Cancel:
|
||||
return
|
||||
case <-t.GoAway():
|
||||
ac.tearDown(errConnDrain)
|
||||
|
@ -725,8 +724,8 @@ func (ac *addrConn) tearDown(err error) {
|
|||
if ac.transport != nil && err != errConnDrain {
|
||||
ac.transport.Close()
|
||||
}
|
||||
if ac.shutdownChan != nil {
|
||||
close(ac.shutdownChan)
|
||||
if ac.dopts.copts.Cancel != nil {
|
||||
close(ac.dopts.copts.Cancel)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -314,7 +314,13 @@ func (e env) runnable() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) {
|
||||
func (e env) dialer(addr string, timeout time.Duration, cancel <-chan struct{}) (net.Conn, error) {
|
||||
// NB: Go 1.6 added a Cancel field on net.Dialer, which would allow this
|
||||
// to be written as
|
||||
//
|
||||
// `(&net.Dialer{Cancel: cancel, Timeout: timeout}).Dial(e.network, addr)`
|
||||
//
|
||||
// but that would break compatibility with earlier Go versions.
|
||||
return net.DialTimeout(e.network, addr, timeout)
|
||||
}
|
||||
|
||||
|
@ -505,7 +511,7 @@ func (te *test) declareLogNoise(phrases ...string) {
|
|||
}
|
||||
|
||||
func (te *test) withServerTester(fn func(st *serverTester)) {
|
||||
c, err := te.e.dialer(te.srvAddr, 10*time.Second)
|
||||
c, err := te.e.dialer(te.srvAddr, 10*time.Second, nil)
|
||||
if err != nil {
|
||||
te.t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
// +build go1.6
|
||||
|
||||
/*
|
||||
* Copyright 2014, Google Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*
|
||||
*/
|
||||
|
||||
package transport
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
// newDialer constructs a net.Dialer.
|
||||
func newDialer(timeout time.Duration, cancel <-chan struct{}) *net.Dialer {
|
||||
return &net.Dialer{Cancel: cancel, Timeout: timeout}
|
||||
}
|
|
@ -107,11 +107,11 @@ type http2Client struct {
|
|||
prevGoAwayID uint32
|
||||
}
|
||||
|
||||
func dial(fn func(string, time.Duration) (net.Conn, error), addr string, timeout time.Duration) (net.Conn, error) {
|
||||
func dial(fn func(string, time.Duration, <-chan struct{}) (net.Conn, error), addr string, timeout time.Duration, cancel <-chan struct{}) (net.Conn, error) {
|
||||
if fn != nil {
|
||||
return fn(addr, timeout)
|
||||
return fn(addr, timeout, cancel)
|
||||
}
|
||||
return net.DialTimeout("tcp", addr, timeout)
|
||||
return newDialer(timeout, cancel).Dial("tcp", addr)
|
||||
}
|
||||
|
||||
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
||||
|
@ -121,7 +121,7 @@ func newHTTP2Client(addr string, opts ConnectOptions) (_ ClientTransport, err er
|
|||
scheme := "http"
|
||||
startT := time.Now()
|
||||
timeout := opts.Timeout
|
||||
conn, connErr := dial(opts.Dialer, addr, timeout)
|
||||
conn, connErr := dial(opts.Dialer, addr, timeout, opts.Cancel)
|
||||
if connErr != nil {
|
||||
return nil, ConnectionErrorf("transport: %v", connErr)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
// +build !go1.6
|
||||
|
||||
/*
|
||||
* Copyright 2016, Google Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*
|
||||
*/
|
||||
|
||||
package transport
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
// newDialer constructs a net.Dialer.
|
||||
func newDialer(timeout time.Duration, _ <-chan struct{}) *net.Dialer {
|
||||
return &net.Dialer{Timeout: timeout}
|
||||
}
|
|
@ -354,8 +354,10 @@ func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authI
|
|||
type ConnectOptions struct {
|
||||
// UserAgent is the application user agent.
|
||||
UserAgent string
|
||||
// Cancel is closed to indicate that dialing should be cancelled.
|
||||
Cancel chan struct{}
|
||||
// Dialer specifies how to dial a network address.
|
||||
Dialer func(string, time.Duration) (net.Conn, error)
|
||||
Dialer func(string, time.Duration, <-chan struct{}) (net.Conn, error)
|
||||
// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
|
||||
PerRPCCredentials []credentials.PerRPCCredentials
|
||||
// TransportCredentials stores the Authenticator required to setup a client connection.
|
||||
|
|
Загрузка…
Ссылка в новой задаче