Merge pull request #712 from iamqizhao/master

[Notice] Change the semantics of grpc.WithTimeout DialOption
This commit is contained in:
Menghan Li 2016-06-06 14:18:51 -07:00
Родитель ce6213360c e4b9b7f606
Коммит b60d3e9ed8
4 изменённых файлов: 46 добавлений и 102 удалений

Просмотреть файл

@ -53,6 +53,9 @@ var (
// ErrClientConnClosing indicates that the operation is illegal because
// the ClientConn is closing.
ErrClientConnClosing = errors.New("grpc: the client connection is closing")
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
// errNoTransportSecurity indicates that there is no transport security
// being set for ClientConn. Users should either set one or explicitly
@ -62,15 +65,13 @@ var (
// (e.g., oauth2 token) which requires secure connection on an insecure
// connection.
errCredentialsMisuse = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportAuthenticator() to set)")
// errClientConnTimeout indicates that the connection could not be
// established or re-established within the specified timeout.
errClientConnTimeout = errors.New("grpc: timed out trying to connect")
// errNetworkIP indicates that the connection is down due to some network I/O error.
errNetworkIO = errors.New("grpc: failed with network I/O error")
// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
errNoAddr = errors.New("grpc: there is no address available to dial")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
@ -85,6 +86,7 @@ type dialOptions struct {
balancer Balancer
block bool
insecure bool
timeout time.Duration
copts transport.ConnectOptions
}
@ -182,10 +184,11 @@ func WithPerRPCCredentials(creds credentials.Credentials) DialOption {
}
}
// WithTimeout returns a DialOption that configures a timeout for dialing a client connection.
// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
// initially. This is valid if and only if WithBlock() is present.
func WithTimeout(d time.Duration) DialOption {
return func(o *dialOptions) {
o.copts.Timeout = d
o.timeout = d
}
}
@ -228,26 +231,47 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
if err := cc.balancer.Start(target); err != nil {
return nil, err
}
var (
ok bool
addrs []Address
)
ch := cc.balancer.Notify()
if ch == nil {
// There is no name resolver installed.
addr := Address{Addr: target}
if err := cc.newAddrConn(addr, false); err != nil {
return nil, err
}
addrs = append(addrs, Address{Addr: target})
} else {
addrs, ok := <-ch
addrs, ok = <-ch
if !ok || len(addrs) == 0 {
return nil, fmt.Errorf("grpc: there is no address available to dial")
return nil, errNoAddr
}
}
waitC := make(chan error)
go func() {
for _, a := range addrs {
if err := cc.newAddrConn(a, false); err != nil {
return nil, err
waitC <- err
return
}
}
close(waitC)
}()
var timeoutCh <-chan time.Time
if cc.dopts.timeout > 0 {
timeoutCh = time.After(cc.dopts.timeout)
}
select {
case err := <-waitC:
if err != nil {
cc.Close()
return nil, err
}
case <-timeoutCh:
cc.Close()
return nil, ErrClientConnTimeout
}
if ok {
go cc.lbWatcher()
}
colonPos := strings.LastIndex(target, ":")
if colonPos == -1 {
colonPos = len(target)
@ -517,7 +541,6 @@ func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState Connecti
func (ac *addrConn) resetTransport(closeTransport bool) error {
var retries int
start := time.Now()
for {
ac.mu.Lock()
ac.printf("connecting")
@ -537,29 +560,13 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
if closeTransport && t != nil {
t.Close()
}
// Adjust timeout for the current try.
copts := ac.dopts.copts
if copts.Timeout < 0 {
ac.tearDown(errClientConnTimeout)
return errClientConnTimeout
}
if copts.Timeout > 0 {
copts.Timeout -= time.Since(start)
if copts.Timeout <= 0 {
ac.tearDown(errClientConnTimeout)
return errClientConnTimeout
}
}
sleepTime := ac.dopts.bs.backoff(retries)
timeout := sleepTime
if timeout < minConnectTimeout {
timeout = minConnectTimeout
}
if copts.Timeout == 0 || copts.Timeout > timeout {
copts.Timeout = timeout
ac.dopts.copts.Timeout = sleepTime
if sleepTime < minConnectTimeout {
ac.dopts.copts.Timeout = minConnectTimeout
}
connectTime := time.Now()
newTransport, err := transport.NewClientTransport(ac.addr.Addr, &copts)
newTransport, err := transport.NewClientTransport(ac.addr.Addr, &ac.dopts.copts)
if err != nil {
ac.mu.Lock()
if ac.state == Shutdown {
@ -579,14 +586,6 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
if sleepTime < 0 {
sleepTime = 0
}
// Fail early before falling into sleep.
if ac.dopts.copts.Timeout > 0 && ac.dopts.copts.Timeout < sleepTime+time.Since(start) {
ac.mu.Lock()
ac.errorf("connection timeout")
ac.mu.Unlock()
ac.tearDown(errClientConnTimeout)
return errClientConnTimeout
}
closeTransport = false
select {
case <-time.After(sleepTime):

Просмотреть файл

@ -47,8 +47,8 @@ func TestDialTimeout(t *testing.T) {
if err == nil {
conn.Close()
}
if err != errClientConnTimeout {
t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, errClientConnTimeout)
if err != ErrClientConnTimeout {
t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, ErrClientConnTimeout)
}
}
@ -61,8 +61,8 @@ func TestTLSDialTimeout(t *testing.T) {
if err == nil {
conn.Close()
}
if err != errClientConnTimeout {
t.Fatalf("grpc.Dial(_, _) = %v, %v, want %v", conn, err, errClientConnTimeout)
if err != ErrClientConnTimeout {
t.Fatalf("grpc.Dial(_, _) = %v, %v, want %v", conn, err, ErrClientConnTimeout)
}
}

Просмотреть файл

@ -296,61 +296,6 @@ func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServ
const tlsDir = "testdata/"
func TestReconnectTimeout(t *testing.T) {
defer leakCheck(t)()
restore := declareLogNoise(t,
"transport: http2Client.notifyError got notified that the client transport was broken",
"grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport",
"grpc: Conn.transportMonitor exits due to: grpc: timed out trying to connect",
)
defer restore()
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to listen: %v", err)
}
_, port, err := net.SplitHostPort(lis.Addr().String())
if err != nil {
t.Fatalf("Failed to parse listener address: %v", err)
}
addr := "localhost:" + port
conn, err := grpc.Dial(addr, grpc.WithTimeout(5*time.Second), grpc.WithBlock(), grpc.WithInsecure())
if err != nil {
t.Fatalf("Failed to dial to the server %q: %v", addr, err)
}
// Close unaccepted connection (i.e., conn).
lis.Close()
tc := testpb.NewTestServiceClient(conn)
waitC := make(chan struct{})
go func() {
defer close(waitC)
const argSize = 271828
const respSize = 314159
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
if err != nil {
t.Error(err)
return
}
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(respSize),
Payload: payload,
}
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
if _, err := tc.UnaryCall(ctx, req); err == nil {
t.Errorf("TestService/UnaryCall(_, _) = _, <nil>, want _, non-nil")
return
}
}()
// Block until reconnect times out.
<-waitC
if err := conn.Close(); err != nil {
t.Fatalf("%v.Close() = %v, want <nil>", conn, err)
}
}
func unixDialer(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}

Просмотреть файл

@ -338,7 +338,7 @@ type ConnectOptions struct {
Dialer func(string, time.Duration) (net.Conn, error)
// AuthOptions stores the credentials required to setup a client connection and/or issue RPCs.
AuthOptions []credentials.Credentials
// Timeout specifies the timeout for dialing a client connection.
// Timeout specifies the timeout for dialing a ClientTransport.
Timeout time.Duration
}