2015-02-06 04:14:05 +03:00
/ *
*
2017-06-08 15:42:19 +03:00
* Copyright 2014 gRPC authors .
2015-02-06 04:14:05 +03:00
*
2017-06-08 15:42:19 +03:00
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2015-02-06 04:14:05 +03:00
*
2017-06-08 15:42:19 +03:00
* http : //www.apache.org/licenses/LICENSE-2.0
2015-02-06 04:14:05 +03:00
*
2017-06-08 15:42:19 +03:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
2015-02-06 04:14:05 +03:00
*
* /
2015-02-09 03:35:38 +03:00
package grpc
2015-02-06 04:14:05 +03:00
import (
2018-11-02 01:43:42 +03:00
"context"
2015-02-19 14:57:41 +03:00
"errors"
2017-10-02 19:22:57 +03:00
"fmt"
2017-08-14 22:24:23 +03:00
"math"
2015-04-17 23:50:18 +03:00
"net"
2017-08-31 20:59:09 +03:00
"reflect"
2017-04-04 02:03:05 +03:00
"strings"
2015-02-06 04:14:05 +03:00
"sync"
2018-06-28 02:18:41 +03:00
"sync/atomic"
2015-02-06 04:14:05 +03:00
"time"
2017-08-31 20:59:09 +03:00
"google.golang.org/grpc/balancer"
2017-10-19 21:32:06 +03:00
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
"google.golang.org/grpc/codes"
2017-08-09 20:31:12 +03:00
"google.golang.org/grpc/connectivity"
2015-02-09 03:39:06 +03:00
"google.golang.org/grpc/credentials"
2015-05-09 12:43:59 +03:00
"google.golang.org/grpc/grpclog"
2018-06-14 02:07:37 +03:00
"google.golang.org/grpc/internal/backoff"
2018-06-19 03:59:08 +03:00
"google.golang.org/grpc/internal/channelz"
2018-11-27 02:06:46 +03:00
"google.golang.org/grpc/internal/envconfig"
2018-11-10 00:53:47 +03:00
"google.golang.org/grpc/internal/grpcsync"
2018-07-11 21:22:45 +03:00
"google.golang.org/grpc/internal/transport"
2017-02-28 22:49:51 +03:00
"google.golang.org/grpc/keepalive"
2018-10-13 01:44:20 +03:00
"google.golang.org/grpc/metadata"
2017-08-31 20:59:09 +03:00
"google.golang.org/grpc/resolver"
2017-10-20 22:03:44 +03:00
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
"google.golang.org/grpc/status"
2015-02-06 04:14:05 +03:00
)
2018-03-07 21:36:17 +03:00
const (
// minimum time to give a connection to complete
minConnectTimeout = 20 * time . Second
2018-06-05 19:54:12 +03:00
// must match grpclbName in grpclb/grpclb.go
grpclbName = "grpclb"
2018-03-07 21:36:17 +03:00
)
2015-02-19 14:57:41 +03:00
var (
2016-05-25 21:28:45 +03:00
// ErrClientConnClosing indicates that the operation is illegal because
// the ClientConn is closing.
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
//
// Deprecated: this error should not be relied upon by users; use the status
// code of Canceled instead.
ErrClientConnClosing = status . Error ( codes . Canceled , "grpc: the client connection is closing" )
2017-11-07 00:45:11 +03:00
// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors . New ( "grpc: the connection is drained" )
// errConnClosing indicates that the connection is closing.
errConnClosing = errors . New ( "grpc: the connection is closing" )
// errBalancerClosed indicates that the balancer is closed.
errBalancerClosed = errors . New ( "grpc: balancer is closed" )
2018-03-07 21:36:17 +03:00
// We use an accessor so that minConnectTimeout can be
// atomically read and updated while testing.
getMinConnectTimeout = func ( ) time . Duration {
return minConnectTimeout
}
2017-11-07 00:45:11 +03:00
)
2016-05-25 21:28:45 +03:00
2017-11-07 00:45:11 +03:00
// The following errors are returned from Dial and DialContext
var (
2016-05-25 21:28:45 +03:00
// errNoTransportSecurity indicates that there is no transport security
2016-03-17 02:40:16 +03:00
// being set for ClientConn. Users should either set one or explicitly
2015-08-28 03:21:52 +03:00
// call WithInsecure DialOption to disable security.
2016-05-25 21:28:45 +03:00
errNoTransportSecurity = errors . New ( "grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)" )
2018-09-25 23:17:25 +03:00
// errTransportCredsAndBundle indicates that creds bundle is used together
// with other individual Transport Credentials.
errTransportCredsAndBundle = errors . New ( "grpc: credentials.Bundle may not be used with individual TransportCredentials" )
2016-06-08 23:44:43 +03:00
// errTransportCredentialsMissing indicates that users want to transmit security
// information (e.g., oauth2 token) which requires secure connection on an insecure
2015-08-28 23:19:36 +03:00
// connection.
2016-06-08 23:44:43 +03:00
errTransportCredentialsMissing = errors . New ( "grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)" )
// errCredentialsConflict indicates that grpc.WithTransportCredentials()
// and grpc.WithInsecure() are both called for a connection.
errCredentialsConflict = errors . New ( "grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)" )
2015-02-19 14:57:41 +03:00
)
2017-04-07 00:08:04 +03:00
const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
2017-08-14 22:24:23 +03:00
defaultClientMaxSendMessageSize = math . MaxInt32
2018-06-27 21:16:33 +03:00
// http2IOBufSize specifies the buffer size for sending frames.
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
2017-04-07 00:08:04 +03:00
)
2017-03-10 03:58:23 +03:00
2016-08-15 20:17:09 +03:00
// Dial creates a client connection to the given target.
2015-02-06 04:14:05 +03:00
func Dial ( target string , opts ... DialOption ) ( * ClientConn , error ) {
2016-08-15 20:17:09 +03:00
return DialContext ( context . Background ( ) , target , opts ... )
}
2018-04-13 01:11:22 +03:00
// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use WithBlock() dial option.
//
// In the non-blocking case, the ctx does not act against the connection. It
// only controls the setup steps.
//
// In the blocking case, ctx can be used to cancel or expire the pending
// connection. Once this function returns, the cancellation and expiration of
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
2018-01-19 00:10:52 +03:00
//
// The target name syntax is defined in
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
2016-08-24 22:56:16 +03:00
func DialContext ( ctx context . Context , target string , opts ... DialOption ) ( conn * ClientConn , err error ) {
2015-09-29 20:24:03 +03:00
cc := & ClientConn {
2018-11-10 00:53:47 +03:00
target : target ,
csMgr : & connectivityStateManager { } ,
conns : make ( map [ * addrConn ] struct { } ) ,
dopts : defaultDialOptions ( ) ,
blockingpicker : newPickerWrapper ( ) ,
czData : new ( channelzData ) ,
firstResolveEvent : grpcsync . NewEvent ( ) ,
2015-09-29 20:24:03 +03:00
}
2018-06-28 02:18:41 +03:00
cc . retryThrottler . Store ( ( * retryThrottler ) ( nil ) )
2016-08-24 04:18:18 +03:00
cc . ctx , cc . cancel = context . WithCancel ( context . Background ( ) )
2017-04-04 01:03:24 +03:00
2016-12-20 03:31:00 +03:00
for _ , opt := range opts {
2018-07-20 03:33:42 +03:00
opt . apply ( & cc . dopts )
2016-12-20 03:31:00 +03:00
}
2017-10-02 19:22:57 +03:00
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
if cc . dopts . channelzParentID != 0 {
2018-08-07 02:02:47 +03:00
cc . channelzID = channelz . RegisterChannel ( & channelzChannel { cc } , cc . dopts . channelzParentID , target )
2018-09-12 21:15:32 +03:00
channelz . AddTraceEvent ( cc . channelzID , & channelz . TraceEventDesc {
Desc : "Channel Created" ,
Severity : channelz . CtINFO ,
Parent : & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Nested Channel(id:%d) created" , cc . channelzID ) ,
Severity : channelz . CtINFO ,
} ,
} )
2018-04-09 21:13:06 +03:00
} else {
2018-08-07 02:02:47 +03:00
cc . channelzID = channelz . RegisterChannel ( & channelzChannel { cc } , 0 , target )
2018-09-12 21:15:32 +03:00
channelz . AddTraceEvent ( cc . channelzID , & channelz . TraceEventDesc {
Desc : "Channel Created" ,
Severity : channelz . CtINFO ,
} )
2018-04-09 21:13:06 +03:00
}
2018-09-12 21:15:32 +03:00
cc . csMgr . channelzID = cc . channelzID
2018-04-09 21:13:06 +03:00
}
2017-10-02 19:22:57 +03:00
if ! cc . dopts . insecure {
2018-09-25 23:17:25 +03:00
if cc . dopts . copts . TransportCredentials == nil && cc . dopts . copts . CredsBundle == nil {
2017-10-02 19:22:57 +03:00
return nil , errNoTransportSecurity
}
2018-09-25 23:17:25 +03:00
if cc . dopts . copts . TransportCredentials != nil && cc . dopts . copts . CredsBundle != nil {
return nil , errTransportCredsAndBundle
}
2017-10-02 19:22:57 +03:00
} else {
2018-09-25 23:17:25 +03:00
if cc . dopts . copts . TransportCredentials != nil || cc . dopts . copts . CredsBundle != nil {
2017-10-02 19:22:57 +03:00
return nil , errCredentialsConflict
}
for _ , cd := range cc . dopts . copts . PerRPCCredentials {
if cd . RequireTransportSecurity ( ) {
return nil , errTransportCredentialsMissing
}
}
}
2017-04-11 00:33:51 +03:00
cc . mkp = cc . dopts . copts . KeepaliveParams
2017-03-24 21:29:02 +03:00
2017-04-18 02:08:50 +03:00
if cc . dopts . copts . Dialer == nil {
cc . dopts . copts . Dialer = newProxyDialer (
func ( ctx context . Context , addr string ) ( net . Conn , error ) {
2018-04-09 21:12:34 +03:00
network , addr := parseDialTarget ( addr )
2018-11-02 01:43:42 +03:00
return ( & net . Dialer { } ) . DialContext ( ctx , network , addr )
2017-04-18 02:08:50 +03:00
} ,
)
}
2017-03-24 21:29:02 +03:00
if cc . dopts . copts . UserAgent != "" {
cc . dopts . copts . UserAgent += " " + grpcUA
} else {
cc . dopts . copts . UserAgent = grpcUA
}
2016-12-20 03:31:00 +03:00
if cc . dopts . timeout > 0 {
var cancel context . CancelFunc
ctx , cancel = context . WithTimeout ( ctx , cc . dopts . timeout )
defer cancel ( )
}
2016-08-24 22:56:16 +03:00
defer func ( ) {
2016-08-24 04:18:18 +03:00
select {
case <- ctx . Done ( ) :
2016-08-25 22:18:19 +03:00
conn , err = nil , ctx . Err ( )
2016-08-24 22:56:16 +03:00
default :
2016-08-24 04:18:18 +03:00
}
2016-08-25 22:18:19 +03:00
if err != nil {
cc . Close ( )
}
2016-08-24 04:18:18 +03:00
} ( )
2017-05-15 23:51:11 +03:00
scSet := false
2016-12-20 03:31:00 +03:00
if cc . dopts . scChan != nil {
2017-04-05 21:08:50 +03:00
// Try to get an initial service config.
2016-12-20 03:31:00 +03:00
select {
case sc , ok := <- cc . dopts . scChan :
if ok {
cc . sc = sc
2017-05-15 23:51:11 +03:00
scSet = true
2016-12-20 03:31:00 +03:00
}
2017-04-05 21:08:50 +03:00
default :
2016-12-20 03:31:00 +03:00
}
2015-02-06 04:14:05 +03:00
}
2016-03-23 21:49:05 +03:00
if cc . dopts . bs == nil {
2018-06-14 02:07:37 +03:00
cc . dopts . bs = backoff . Exponential {
MaxDelay : DefaultBackoffConfig . MaxDelay ,
}
2016-03-23 21:49:05 +03:00
}
2018-03-27 23:58:27 +03:00
if cc . dopts . resolverBuilder == nil {
// Only try to parse target when resolver builder is not already set.
cc . parsedTarget = parseTarget ( cc . target )
grpclog . Infof ( "parsed scheme: %q" , cc . parsedTarget . Scheme )
cc . dopts . resolverBuilder = resolver . Get ( cc . parsedTarget . Scheme )
if cc . dopts . resolverBuilder == nil {
// If resolver builder is still nil, the parse target's scheme is
// not registered. Fallback to default resolver and set Endpoint to
// the original unparsed target.
grpclog . Infof ( "scheme %q not registered, fallback to default scheme" , cc . parsedTarget . Scheme )
cc . parsedTarget = resolver . Target {
Scheme : resolver . GetDefaultScheme ( ) ,
Endpoint : target ,
}
cc . dopts . resolverBuilder = resolver . Get ( cc . parsedTarget . Scheme )
}
} else {
cc . parsedTarget = resolver . Target { Endpoint : target }
}
2016-09-20 01:11:57 +03:00
creds := cc . dopts . copts . TransportCredentials
if creds != nil && creds . Info ( ) . ServerName != "" {
cc . authority = creds . Info ( ) . ServerName
2018-07-13 19:56:47 +03:00
} else if cc . dopts . insecure && cc . dopts . authority != "" {
cc . authority = cc . dopts . authority
2016-09-20 01:11:57 +03:00
} else {
2017-10-23 21:40:43 +03:00
// Use endpoint from "scheme://authority/endpoint" as the default
// authority for ClientConn.
cc . authority = cc . parsedTarget . Endpoint
2016-09-20 01:11:57 +03:00
}
2017-08-31 20:59:09 +03:00
2017-05-15 23:51:11 +03:00
if cc . dopts . scChan != nil && ! scSet {
2017-05-16 00:36:20 +03:00
// Blocking wait for the initial service config.
2017-05-15 23:51:11 +03:00
select {
case sc , ok := <- cc . dopts . scChan :
if ok {
cc . sc = sc
}
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
}
}
2016-12-20 03:31:00 +03:00
if cc . dopts . scChan != nil {
go cc . scWatcher ( )
}
2017-03-21 21:35:53 +03:00
2017-10-19 21:32:06 +03:00
var credsClone credentials . TransportCredentials
if creds := cc . dopts . copts . TransportCredentials ; creds != nil {
credsClone = creds . Clone ( )
}
cc . balancerBuildOpts = balancer . BuildOptions {
2018-04-09 21:13:06 +03:00
DialCreds : credsClone ,
2018-09-25 23:17:25 +03:00
CredsBundle : cc . dopts . copts . CredsBundle ,
2018-04-09 21:13:06 +03:00
Dialer : cc . dopts . copts . Dialer ,
ChannelzParentID : cc . channelzID ,
2017-10-19 21:32:06 +03:00
}
2017-10-02 19:22:57 +03:00
// Build the resolver.
2018-11-10 02:31:07 +03:00
rWrapper , err := newCCResolverWrapper ( cc )
2017-10-02 19:22:57 +03:00
if err != nil {
return nil , fmt . Errorf ( "failed to build resolver: %v" , err )
}
2018-11-10 02:31:07 +03:00
cc . mu . Lock ( )
cc . resolverWrapper = rWrapper
cc . mu . Unlock ( )
2017-08-31 20:59:09 +03:00
// A blocking dial blocks until the clientConn is ready.
if cc . dopts . block {
for {
s := cc . GetState ( )
if s == connectivity . Ready {
break
2018-08-27 20:28:41 +03:00
} else if cc . dopts . copts . FailOnNonTempDialError && s == connectivity . TransientFailure {
if err = cc . blockingpicker . connectionError ( ) ; err != nil {
2018-11-01 20:49:35 +03:00
terr , ok := err . ( interface {
Temporary ( ) bool
} )
2018-08-27 20:28:41 +03:00
if ok && ! terr . Temporary ( ) {
return nil , err
}
}
2017-08-31 20:59:09 +03:00
}
if ! cc . WaitForStateChange ( ctx , s ) {
// ctx got timeout or canceled.
return nil , ctx . Err ( )
}
}
}
2015-09-29 20:24:03 +03:00
return cc , nil
2015-09-23 22:18:41 +03:00
}
2017-08-09 20:31:12 +03:00
// connectivityStateManager keeps the connectivity.State of ClientConn.
2017-07-25 01:00:53 +03:00
// This struct will eventually be exported so the balancers can access it.
type connectivityStateManager struct {
mu sync . Mutex
2017-08-09 20:31:12 +03:00
state connectivity . State
2017-07-25 01:00:53 +03:00
notifyChan chan struct { }
2018-09-12 21:15:32 +03:00
channelzID int64
2017-07-25 01:00:53 +03:00
}
2017-08-09 20:31:12 +03:00
// updateState updates the connectivity.State of ClientConn.
2017-07-25 01:00:53 +03:00
// If there's a change it notifies goroutines waiting on state change to
// happen.
2017-08-09 20:31:12 +03:00
func ( csm * connectivityStateManager ) updateState ( state connectivity . State ) {
2017-07-25 01:00:53 +03:00
csm . mu . Lock ( )
defer csm . mu . Unlock ( )
2017-08-09 20:31:12 +03:00
if csm . state == connectivity . Shutdown {
2017-07-25 01:00:53 +03:00
return
}
if csm . state == state {
return
}
csm . state = state
2018-09-12 21:15:32 +03:00
if channelz . IsOn ( ) {
channelz . AddTraceEvent ( csm . channelzID , & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Channel Connectivity change to %v" , state ) ,
Severity : channelz . CtINFO ,
} )
}
2017-07-25 01:00:53 +03:00
if csm . notifyChan != nil {
// There are other goroutines waiting on this channel.
close ( csm . notifyChan )
csm . notifyChan = nil
}
}
2017-08-09 20:31:12 +03:00
func ( csm * connectivityStateManager ) getState ( ) connectivity . State {
2017-07-25 01:00:53 +03:00
csm . mu . Lock ( )
defer csm . mu . Unlock ( )
return csm . state
}
func ( csm * connectivityStateManager ) getNotifyChan ( ) <- chan struct { } {
csm . mu . Lock ( )
defer csm . mu . Unlock ( )
if csm . notifyChan == nil {
csm . notifyChan = make ( chan struct { } )
}
return csm . notifyChan
}
2016-05-18 21:18:10 +03:00
// ClientConn represents a client connection to an RPC server.
2015-02-06 04:14:05 +03:00
type ClientConn struct {
2016-07-29 20:16:56 +03:00
ctx context . Context
cancel context . CancelFunc
2017-10-23 21:40:43 +03:00
target string
parsedTarget resolver . Target
authority string
dopts dialOptions
csMgr * connectivityStateManager
2015-09-24 05:09:37 +03:00
2017-10-19 21:32:06 +03:00
balancerBuildOpts balancer . BuildOptions
blockingpicker * pickerWrapper
2017-08-31 20:59:09 +03:00
2018-11-10 02:31:07 +03:00
mu sync . RWMutex
resolverWrapper * ccResolverWrapper
sc ServiceConfig
scRaw string
conns map [ * addrConn ] struct { }
2017-06-16 19:59:37 +03:00
// Keepalive parameter can be updated if a GoAway is received.
2017-10-19 21:32:06 +03:00
mkp keepalive . ClientParameters
curBalancerName string
2017-12-12 23:45:05 +03:00
preBalancerName string // previous balancer name.
2017-10-19 21:32:06 +03:00
curAddresses [ ] resolver . Address
balancerWrapper * ccBalancerWrapper
2018-06-28 02:18:41 +03:00
retryThrottler atomic . Value
2018-04-09 21:13:06 +03:00
2018-11-10 00:53:47 +03:00
firstResolveEvent * grpcsync . Event
2018-08-06 21:17:12 +03:00
channelzID int64 // channelz unique identification number
czData * channelzData
2015-09-24 05:09:37 +03:00
}
2017-08-09 20:31:12 +03:00
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
2017-07-25 01:00:53 +03:00
// ctx expires. A true value is returned in former case and false in latter.
2017-08-09 20:31:12 +03:00
// This is an EXPERIMENTAL API.
func ( cc * ClientConn ) WaitForStateChange ( ctx context . Context , sourceState connectivity . State ) bool {
2017-07-25 01:00:53 +03:00
ch := cc . csMgr . getNotifyChan ( )
if cc . csMgr . getState ( ) != sourceState {
return true
}
select {
case <- ctx . Done ( ) :
return false
case <- ch :
return true
}
}
2017-08-09 20:31:12 +03:00
// GetState returns the connectivity.State of ClientConn.
// This is an EXPERIMENTAL API.
func ( cc * ClientConn ) GetState ( ) connectivity . State {
2017-07-25 01:00:53 +03:00
return cc . csMgr . getState ( )
}
2016-12-20 03:31:00 +03:00
func ( cc * ClientConn ) scWatcher ( ) {
for {
select {
case sc , ok := <- cc . dopts . scChan :
if ! ok {
return
}
cc . mu . Lock ( )
// TODO: load balance policy runtime change is ignored.
// We may revist this decision in the future.
cc . sc = sc
2017-10-19 22:09:19 +03:00
cc . scRaw = ""
2016-12-20 03:31:00 +03:00
cc . mu . Unlock ( )
case <- cc . ctx . Done ( ) :
return
}
}
}
2018-11-10 00:53:47 +03:00
// waitForResolvedAddrs blocks until the resolver has provided addresses or the
// context expires. Returns nil unless the context expires first; otherwise
// returns a status error based on the context.
func ( cc * ClientConn ) waitForResolvedAddrs ( ctx context . Context ) error {
// This is on the RPC path, so we use a fast path to avoid the
// more-expensive "select" below after the resolver has returned once.
if cc . firstResolveEvent . HasFired ( ) {
return nil
}
select {
case <- cc . firstResolveEvent . Done ( ) :
return nil
case <- ctx . Done ( ) :
return status . FromContextError ( ctx . Err ( ) ) . Err ( )
case <- cc . ctx . Done ( ) :
return ErrClientConnClosing
}
}
2017-10-19 21:32:06 +03:00
func ( cc * ClientConn ) handleResolvedAddrs ( addrs [ ] resolver . Address , err error ) {
cc . mu . Lock ( )
defer cc . mu . Unlock ( )
if cc . conns == nil {
2017-11-29 00:16:53 +03:00
// cc was closed.
return
}
if reflect . DeepEqual ( cc . curAddresses , addrs ) {
2017-10-19 21:32:06 +03:00
return
}
2017-12-12 23:45:05 +03:00
cc . curAddresses = addrs
2018-11-10 00:53:47 +03:00
cc . firstResolveEvent . Fire ( )
2017-12-12 23:45:05 +03:00
2017-12-19 02:35:42 +03:00
if cc . dopts . balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
2017-12-12 23:45:05 +03:00
var isGRPCLB bool
for _ , a := range addrs {
if a . Type == resolver . GRPCLB {
isGRPCLB = true
break
}
2017-11-29 00:16:53 +03:00
}
2017-12-12 23:45:05 +03:00
var newBalancerName string
if isGRPCLB {
newBalancerName = grpclbName
} else {
// Address list doesn't contain grpclb address. Try to pick a
// non-grpclb balancer.
newBalancerName = cc . curBalancerName
// If current balancer is grpclb, switch to the previous one.
if newBalancerName == grpclbName {
newBalancerName = cc . preBalancerName
}
// The following could be true in two cases:
// - the first time handling resolved addresses
// (curBalancerName="")
// - the first time handling non-grpclb addresses
// (curBalancerName="grpclb", preBalancerName="")
if newBalancerName == "" {
2017-12-19 02:35:42 +03:00
newBalancerName = PickFirstBalancerName
2017-12-12 23:45:05 +03:00
}
}
cc . switchBalancer ( newBalancerName )
2017-12-19 02:35:42 +03:00
} else if cc . balancerWrapper == nil {
// Balancer dial option was set, and this is the first time handling
// resolved addresses. Build a balancer with dopts.balancerBuilder.
cc . balancerWrapper = newCCBalancerWrapper ( cc , cc . dopts . balancerBuilder , cc . balancerBuildOpts )
2017-10-19 21:32:06 +03:00
}
2017-12-19 02:35:42 +03:00
2017-10-19 21:32:06 +03:00
cc . balancerWrapper . handleResolvedAddrs ( addrs , nil )
}
2017-12-12 23:45:05 +03:00
// switchBalancer starts the switching from current balancer to the balancer
// with the given name.
//
// It will NOT send the current address list to the new balancer. If needed,
// caller of this function should send address list to the new balancer after
// this function returns.
2017-11-23 00:59:20 +03:00
//
// Caller must hold cc.mu.
2017-10-19 21:32:06 +03:00
func ( cc * ClientConn ) switchBalancer ( name string ) {
if cc . conns == nil {
return
}
2017-12-12 23:45:05 +03:00
if strings . ToLower ( cc . curBalancerName ) == strings . ToLower ( name ) {
2017-10-19 21:32:06 +03:00
return
}
2017-12-12 23:45:05 +03:00
grpclog . Infof ( "ClientConn switching balancer to %q" , name )
if cc . dopts . balancerBuilder != nil {
2017-12-19 02:35:42 +03:00
grpclog . Infoln ( "ignoring balancer switching: Balancer DialOption used instead" )
2017-10-19 21:32:06 +03:00
return
}
// TODO(bar switching) change this to two steps: drain and close.
// Keep track of sc in wrapper.
2017-11-23 00:59:20 +03:00
if cc . balancerWrapper != nil {
cc . balancerWrapper . close ( )
}
2017-10-19 21:32:06 +03:00
builder := balancer . Get ( name )
2018-09-12 21:15:32 +03:00
// TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
// we reuse previous one?
if channelz . IsOn ( ) {
if builder == nil {
channelz . AddTraceEvent ( cc . channelzID , & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Channel switches to new LB policy %q due to fallback from invalid balancer name" , PickFirstBalancerName ) ,
Severity : channelz . CtWarning ,
} )
} else {
channelz . AddTraceEvent ( cc . channelzID , & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Channel switches to new LB policy %q" , name ) ,
Severity : channelz . CtINFO ,
} )
}
}
2017-10-19 21:32:06 +03:00
if builder == nil {
2017-12-12 23:45:05 +03:00
grpclog . Infof ( "failed to get balancer builder for: %v, using pick_first instead" , name )
2017-10-19 21:32:06 +03:00
builder = newPickfirstBuilder ( )
}
2018-09-12 21:15:32 +03:00
2017-12-12 23:45:05 +03:00
cc . preBalancerName = cc . curBalancerName
2017-10-19 21:32:06 +03:00
cc . curBalancerName = builder . Name ( )
cc . balancerWrapper = newCCBalancerWrapper ( cc , builder , cc . balancerBuildOpts )
}
func ( cc * ClientConn ) handleSubConnStateChange ( sc balancer . SubConn , s connectivity . State ) {
cc . mu . Lock ( )
if cc . conns == nil {
cc . mu . Unlock ( )
return
}
// TODO(bar switching) send updates to all balancer wrappers when balancer
// gracefully switching is supported.
cc . balancerWrapper . handleSubConnStateChange ( sc , s )
cc . mu . Unlock ( )
}
2017-08-31 20:59:09 +03:00
// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
2017-11-23 00:59:20 +03:00
//
// Caller needs to make sure len(addrs) > 0.
2018-09-25 23:17:25 +03:00
func ( cc * ClientConn ) newAddrConn ( addrs [ ] resolver . Address , opts balancer . NewSubConnOptions ) ( * addrConn , error ) {
2017-08-31 20:59:09 +03:00
ac := & addrConn {
2018-12-18 00:10:13 +03:00
cc : cc ,
addrs : addrs ,
scopts : opts ,
dopts : cc . dopts ,
czData : new ( channelzData ) ,
resetBackoff : make ( chan struct { } ) ,
2017-08-21 22:27:04 +03:00
}
2017-08-31 20:59:09 +03:00
ac . ctx , ac . cancel = context . WithCancel ( cc . ctx )
// Track ac in cc. This needs to be done before any getTransport(...) is called.
cc . mu . Lock ( )
if cc . conns == nil {
cc . mu . Unlock ( )
return nil , ErrClientConnClosing
2017-08-21 22:27:04 +03:00
}
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
ac . channelzID = channelz . RegisterSubChannel ( ac , cc . channelzID , "" )
2018-09-12 21:15:32 +03:00
channelz . AddTraceEvent ( ac . channelzID , & channelz . TraceEventDesc {
Desc : "Subchannel Created" ,
Severity : channelz . CtINFO ,
Parent : & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Subchannel(id:%d) created" , ac . channelzID ) ,
Severity : channelz . CtINFO ,
} ,
} )
2018-04-09 21:13:06 +03:00
}
2017-08-31 20:59:09 +03:00
cc . conns [ ac ] = struct { } { }
cc . mu . Unlock ( )
return ac , nil
2017-08-21 22:27:04 +03:00
}
2017-08-31 20:59:09 +03:00
// removeAddrConn removes the addrConn in the subConn from clientConn.
// It also tears down the ac with the given error.
func ( cc * ClientConn ) removeAddrConn ( ac * addrConn , err error ) {
cc . mu . Lock ( )
if cc . conns == nil {
cc . mu . Unlock ( )
2017-08-21 22:27:04 +03:00
return
}
2017-08-31 20:59:09 +03:00
delete ( cc . conns , ac )
cc . mu . Unlock ( )
ac . tearDown ( err )
2017-08-21 22:27:04 +03:00
}
2018-08-07 02:02:47 +03:00
func ( cc * ClientConn ) channelzMetric ( ) * channelz . ChannelInternalMetric {
2018-04-23 21:22:25 +03:00
return & channelz . ChannelInternalMetric {
2018-08-06 21:17:12 +03:00
State : cc . GetState ( ) ,
2018-04-23 21:22:25 +03:00
Target : cc . target ,
2018-08-06 21:17:12 +03:00
CallsStarted : atomic . LoadInt64 ( & cc . czData . callsStarted ) ,
CallsSucceeded : atomic . LoadInt64 ( & cc . czData . callsSucceeded ) ,
CallsFailed : atomic . LoadInt64 ( & cc . czData . callsFailed ) ,
LastCallStartedTimestamp : time . Unix ( 0 , atomic . LoadInt64 ( & cc . czData . lastCallStartedTime ) ) ,
2018-04-23 21:22:25 +03:00
}
}
2018-07-24 02:19:11 +03:00
// Target returns the target string of the ClientConn.
// This is an EXPERIMENTAL API.
func ( cc * ClientConn ) Target ( ) string {
return cc . target
}
2018-04-23 21:22:25 +03:00
func ( cc * ClientConn ) incrCallsStarted ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & cc . czData . callsStarted , 1 )
atomic . StoreInt64 ( & cc . czData . lastCallStartedTime , time . Now ( ) . UnixNano ( ) )
2018-04-23 21:22:25 +03:00
}
func ( cc * ClientConn ) incrCallsSucceeded ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & cc . czData . callsSucceeded , 1 )
2018-04-23 21:22:25 +03:00
}
func ( cc * ClientConn ) incrCallsFailed ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & cc . czData . callsFailed , 1 )
2018-04-09 21:13:06 +03:00
}
2018-10-09 01:58:54 +03:00
// connect starts creating a transport.
2017-10-02 19:22:57 +03:00
// It does nothing if the ac is not IDLE.
2017-08-31 20:59:09 +03:00
// TODO(bar) Move this to the addrConn section.
2017-10-24 00:06:33 +03:00
func ( ac * addrConn ) connect ( ) error {
2017-08-31 20:59:09 +03:00
ac . mu . Lock ( )
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
return errConnClosing
2017-08-21 22:27:04 +03:00
}
2017-10-02 19:22:57 +03:00
if ac . state != connectivity . Idle {
ac . mu . Unlock ( )
return nil
2015-09-24 05:09:37 +03:00
}
2018-09-12 21:15:32 +03:00
ac . updateConnectivityState ( connectivity . Connecting )
2017-10-02 19:22:57 +03:00
ac . mu . Unlock ( )
2017-08-31 20:59:09 +03:00
2017-10-24 00:06:33 +03:00
// Start a goroutine connecting to the server asynchronously.
2018-12-18 00:10:13 +03:00
go ac . resetTransport ( )
2016-05-17 01:31:00 +03:00
return nil
2015-09-24 05:09:37 +03:00
}
2017-08-31 20:59:09 +03:00
// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
//
// It checks whether current connected address of ac is in the new addrs list.
// - If true, it updates ac.addrs and returns true. The ac will keep using
// the existing connection.
// - If false, it does nothing and returns false.
func ( ac * addrConn ) tryUpdateAddrs ( addrs [ ] resolver . Address ) bool {
ac . mu . Lock ( )
defer ac . mu . Unlock ( )
grpclog . Infof ( "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v" , ac . curAddr , addrs )
if ac . state == connectivity . Shutdown {
ac . addrs = addrs
return true
}
2018-12-18 00:10:13 +03:00
// Unless we're busy reconnecting already, let's reconnect from the top of
// the list.
if ac . state != connectivity . Ready {
return false
}
2017-08-31 20:59:09 +03:00
var curAddrFound bool
for _ , a := range addrs {
if reflect . DeepEqual ( ac . curAddr , a ) {
curAddrFound = true
break
}
}
grpclog . Infof ( "addrConn: tryUpdateAddrs curAddrFound: %v" , curAddrFound )
if curAddrFound {
ac . addrs = addrs
}
return curAddrFound
}
2017-05-20 02:02:02 +03:00
// GetMethodConfig gets the method config of the input method.
// If there's an exact match for input method (i.e. /service/method), we return
// the corresponding MethodConfig.
// If there isn't an exact match for the input method, we look for the default config
// under the service (i.e /service/). If there is a default MethodConfig for
2018-02-21 21:14:52 +03:00
// the service, we return it.
2017-05-20 02:02:02 +03:00
// Otherwise, we return an empty MethodConfig.
2017-05-15 23:51:11 +03:00
func ( cc * ClientConn ) GetMethodConfig ( method string ) MethodConfig {
2017-05-20 02:02:02 +03:00
// TODO: Avoid the locking here.
2016-12-20 03:31:00 +03:00
cc . mu . RLock ( )
defer cc . mu . RUnlock ( )
2017-05-15 23:51:11 +03:00
m , ok := cc . sc . Methods [ method ]
2017-04-04 01:03:24 +03:00
if ! ok {
i := strings . LastIndex ( method , "/" )
2018-04-15 14:07:56 +03:00
m = cc . sc . Methods [ method [ : i + 1 ] ]
2017-04-04 01:03:24 +03:00
}
2017-05-15 23:51:11 +03:00
return m
2016-12-20 03:31:00 +03:00
}
2018-11-01 20:49:35 +03:00
func ( cc * ClientConn ) healthCheckConfig ( ) * healthCheckConfig {
cc . mu . RLock ( )
defer cc . mu . RUnlock ( )
return cc . sc . healthCheckConfig
}
2018-07-11 20:18:09 +03:00
func ( cc * ClientConn ) getTransport ( ctx context . Context , failfast bool , method string ) ( transport . ClientTransport , func ( balancer . DoneInfo ) , error ) {
2018-10-13 01:44:20 +03:00
hdr , _ := metadata . FromOutgoingContext ( ctx )
2018-07-11 20:18:09 +03:00
t , done , err := cc . blockingpicker . pick ( ctx , failfast , balancer . PickOptions {
FullMethodName : method ,
2018-10-13 01:44:20 +03:00
Header : hdr ,
2018-07-11 20:18:09 +03:00
} )
2016-05-07 01:47:09 +03:00
if err != nil {
2017-10-02 19:22:57 +03:00
return nil , nil , toRPCErr ( err )
2015-09-24 05:09:37 +03:00
}
2017-10-02 19:22:57 +03:00
return t , done , nil
2015-09-24 05:09:37 +03:00
}
2017-10-19 22:09:19 +03:00
// handleServiceConfig parses the service config string in JSON format to Go native
// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
func ( cc * ClientConn ) handleServiceConfig ( js string ) error {
2018-05-08 20:28:26 +03:00
if cc . dopts . disableServiceConfig {
return nil
}
2018-09-12 21:15:32 +03:00
if cc . scRaw == js {
return nil
}
if channelz . IsOn ( ) {
channelz . AddTraceEvent ( cc . channelzID , & channelz . TraceEventDesc {
// The special formatting of \"%s\" instead of %q is to provide nice printing of service config
// for human consumption.
Desc : fmt . Sprintf ( "Channel has a new service config \"%s\"" , js ) ,
Severity : channelz . CtINFO ,
} )
}
2017-10-19 22:09:19 +03:00
sc , err := parseServiceConfig ( js )
if err != nil {
return err
}
cc . mu . Lock ( )
2018-10-11 21:43:16 +03:00
// Check if the ClientConn is already closed. Some fields (e.g.
// balancerWrapper) are set to nil when closing the ClientConn, and could
// cause nil pointer panic if we don't have this check.
if cc . conns == nil {
cc . mu . Unlock ( )
return nil
}
2017-10-19 22:09:19 +03:00
cc . scRaw = js
cc . sc = sc
2018-06-28 02:18:41 +03:00
if sc . retryThrottling != nil {
newThrottler := & retryThrottler {
tokens : sc . retryThrottling . MaxTokens ,
max : sc . retryThrottling . MaxTokens ,
thresh : sc . retryThrottling . MaxTokens / 2 ,
ratio : sc . retryThrottling . TokenRatio ,
}
cc . retryThrottler . Store ( newThrottler )
} else {
cc . retryThrottler . Store ( ( * retryThrottler ) ( nil ) )
}
2017-12-12 23:45:05 +03:00
if sc . LB != nil && * sc . LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
if cc . curBalancerName == grpclbName {
// If current balancer is grpclb, there's at least one grpclb
// balancer address in the resolved list. Don't switch the balancer,
// but change the previous balancer name, so if a new resolved
// address list doesn't contain grpclb address, balancer will be
// switched to *sc.LB.
cc . preBalancerName = * sc . LB
} else {
cc . switchBalancer ( * sc . LB )
cc . balancerWrapper . handleResolvedAddrs ( cc . curAddresses , nil )
}
2017-11-17 22:11:05 +03:00
}
2018-06-28 02:18:41 +03:00
2017-10-19 22:09:19 +03:00
cc . mu . Unlock ( )
return nil
}
2017-11-29 00:16:53 +03:00
func ( cc * ClientConn ) resolveNow ( o resolver . ResolveNowOption ) {
2018-06-12 22:50:43 +03:00
cc . mu . RLock ( )
2017-11-29 00:16:53 +03:00
r := cc . resolverWrapper
2018-06-12 22:50:43 +03:00
cc . mu . RUnlock ( )
2017-11-29 00:16:53 +03:00
if r == nil {
return
}
go r . resolveNow ( o )
}
2018-08-27 23:21:48 +03:00
// ResetConnectBackoff wakes up all subchannels in transient failure and causes
// them to attempt another connection immediately. It also resets the backoff
// times used for subsequent attempts regardless of the current state.
//
// In general, this function should not be used. Typical service or network
// outages result in a reasonable client reconnection strategy by default.
// However, if a previously unavailable network becomes available, this may be
// used to trigger an immediate reconnect.
//
// This API is EXPERIMENTAL.
func ( cc * ClientConn ) ResetConnectBackoff ( ) {
cc . mu . Lock ( )
defer cc . mu . Unlock ( )
for ac := range cc . conns {
ac . resetConnectBackoff ( )
}
}
2016-05-18 03:18:54 +03:00
// Close tears down the ClientConn and all underlying connections.
2016-05-07 01:47:09 +03:00
func ( cc * ClientConn ) Close ( ) error {
2018-03-03 00:39:55 +03:00
defer cc . cancel ( )
2016-07-29 20:16:56 +03:00
2015-08-01 00:16:02 +03:00
cc . mu . Lock ( )
2016-05-11 05:29:44 +03:00
if cc . conns == nil {
2016-05-07 01:47:09 +03:00
cc . mu . Unlock ( )
return ErrClientConnClosing
}
2016-05-11 05:29:44 +03:00
conns := cc . conns
cc . conns = nil
2017-08-09 20:31:12 +03:00
cc . csMgr . updateState ( connectivity . Shutdown )
2017-10-19 21:32:06 +03:00
rWrapper := cc . resolverWrapper
cc . resolverWrapper = nil
bWrapper := cc . balancerWrapper
cc . balancerWrapper = nil
2016-05-07 01:47:09 +03:00
cc . mu . Unlock ( )
2018-04-23 21:22:25 +03:00
2017-10-02 19:22:57 +03:00
cc . blockingpicker . close ( )
2018-04-23 21:22:25 +03:00
2017-10-19 21:32:06 +03:00
if rWrapper != nil {
rWrapper . close ( )
2017-10-02 19:22:57 +03:00
}
2017-10-19 21:32:06 +03:00
if bWrapper != nil {
bWrapper . close ( )
2016-07-19 01:58:41 +03:00
}
2018-04-23 21:22:25 +03:00
2017-08-31 20:59:09 +03:00
for ac := range conns {
2016-05-11 05:29:44 +03:00
ac . tearDown ( ErrClientConnClosing )
2016-05-07 01:47:09 +03:00
}
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
2018-09-12 21:15:32 +03:00
ted := & channelz . TraceEventDesc {
Desc : "Channel Deleted" ,
Severity : channelz . CtINFO ,
}
if cc . dopts . channelzParentID != 0 {
ted . Parent = & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Nested channel(id:%d) deleted" , cc . channelzID ) ,
Severity : channelz . CtINFO ,
}
}
channelz . AddTraceEvent ( cc . channelzID , ted )
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
// the entity beng deleted, and thus prevent it from being deleted right away.
2018-04-09 21:13:06 +03:00
channelz . RemoveEntry ( cc . channelzID )
}
2016-05-07 01:47:09 +03:00
return nil
}
// addrConn is a network connection to a given address.
type addrConn struct {
2016-07-29 20:16:56 +03:00
ctx context . Context
cancel context . CancelFunc
2017-12-01 20:55:42 +03:00
cc * ClientConn
dopts dialOptions
acbw balancer . SubConn
2018-09-25 23:17:25 +03:00
scopts balancer . NewSubConnOptions
2016-05-07 01:47:09 +03:00
2018-11-01 20:49:35 +03:00
// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
// health checking may require server to report healthy to set ac to READY), and is reset
// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
// is received, transport is closed, ac has been torn down).
2018-09-21 01:45:40 +03:00
transport transport . ClientTransport // The current transport.
mu sync . Mutex
curAddr resolver . Address // The current address.
addrs [ ] resolver . Address // All addresses that the resolver resolved to.
2018-09-12 21:15:32 +03:00
// Use updateConnectivityState for updating addrConn's connectivity state.
state connectivity . State
2016-07-16 02:20:34 +03:00
2018-09-21 01:45:40 +03:00
tearDownErr error // The reason this addrConn is torn down.
2017-12-01 20:55:42 +03:00
2018-12-18 00:10:13 +03:00
backoffIdx int // Needs to be stateful for resetConnectBackoff.
2018-08-27 23:21:48 +03:00
resetBackoff chan struct { }
2019-01-17 21:14:45 +03:00
channelzID int64 // channelz unique identification number.
czData * channelzData
2016-05-07 01:47:09 +03:00
}
2018-09-21 01:45:40 +03:00
// Note: this requires a lock on ac.mu.
2018-09-12 21:15:32 +03:00
func ( ac * addrConn ) updateConnectivityState ( s connectivity . State ) {
2018-12-18 00:10:13 +03:00
if ac . state == s {
return
}
updateMsg := fmt . Sprintf ( "Subchannel Connectivity change to %v" , s )
grpclog . Infof ( updateMsg )
2018-09-12 21:15:32 +03:00
ac . state = s
if channelz . IsOn ( ) {
channelz . AddTraceEvent ( ac . channelzID , & channelz . TraceEventDesc {
2018-12-18 00:10:13 +03:00
Desc : updateMsg ,
2018-09-12 21:15:32 +03:00
Severity : channelz . CtINFO ,
} )
}
2018-12-18 00:10:13 +03:00
ac . cc . handleSubConnStateChange ( ac . acbw , s )
2018-09-12 21:15:32 +03:00
}
2017-04-11 00:33:51 +03:00
// adjustParams updates parameters used to create transports upon
// receiving a GoAway.
func ( ac * addrConn ) adjustParams ( r transport . GoAwayReason ) {
switch r {
2017-11-07 00:45:11 +03:00
case transport . GoAwayTooManyPings :
2017-04-11 00:33:51 +03:00
v := 2 * ac . dopts . copts . KeepaliveParams . Time
ac . cc . mu . Lock ( )
if v > ac . cc . mkp . Time {
ac . cc . mkp . Time = v
}
ac . cc . mu . Unlock ( )
}
}
2018-12-18 00:10:13 +03:00
func ( ac * addrConn ) resetTransport ( ) {
for i := 0 ; ; i ++ {
tryNextAddrFromStart := grpcsync . NewEvent ( )
ac . mu . Lock ( )
if i > 0 {
2018-09-21 01:45:40 +03:00
ac . cc . resolveNow ( resolver . ResolveNowOption { } )
}
2018-12-18 00:10:13 +03:00
addrs := ac . addrs
backoffFor := ac . dopts . bs . Backoff ( ac . backoffIdx )
ac . mu . Unlock ( )
2018-09-21 01:45:40 +03:00
2018-12-18 00:10:13 +03:00
addrLoop :
for _ , addr := range addrs {
ac . mu . Lock ( )
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
return
}
ac . updateConnectivityState ( connectivity . Connecting )
ac . transport = nil
2018-10-09 01:58:54 +03:00
ac . mu . Unlock ( )
2018-12-18 00:10:13 +03:00
// This will be the duration that dial gets to finish.
dialDuration := getMinConnectTimeout ( )
if dialDuration < backoffFor {
// Give dial more time as we keep failing to connect.
dialDuration = backoffFor
}
connectDeadline := time . Now ( ) . Add ( dialDuration )
2018-10-09 01:58:54 +03:00
2018-12-18 00:10:13 +03:00
ac . mu . Lock ( )
ac . cc . mu . RLock ( )
ac . dopts . copts . KeepaliveParams = ac . cc . mkp
ac . cc . mu . RUnlock ( )
2018-09-21 01:45:40 +03:00
2018-12-18 00:10:13 +03:00
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
return
}
copts := ac . dopts . copts
if ac . scopts . CredsBundle != nil {
copts . CredsBundle = ac . scopts . CredsBundle
}
hctx , hcancel := context . WithCancel ( ac . ctx )
defer hcancel ( )
2018-09-21 01:45:40 +03:00
ac . mu . Unlock ( )
2018-12-18 00:10:13 +03:00
if channelz . IsOn ( ) {
channelz . AddTraceEvent ( ac . channelzID , & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Subchannel picks a new address %q to connect" , addr . Addr ) ,
Severity : channelz . CtINFO ,
} )
}
2018-09-21 01:45:40 +03:00
2018-12-18 00:10:13 +03:00
reconnect := grpcsync . NewEvent ( )
prefaceReceived := make ( chan struct { } )
newTr , err := ac . createTransport ( addr , copts , connectDeadline , reconnect , prefaceReceived )
if err == nil {
ac . mu . Lock ( )
ac . curAddr = addr
ac . transport = newTr
ac . mu . Unlock ( )
healthCheckConfig := ac . cc . healthCheckConfig ( )
// LB channel health checking is only enabled when all the four requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
healthcheckManagingState := false
if ! ac . cc . dopts . disableHealthCheck && healthCheckConfig != nil && ac . scopts . HealthCheckEnabled {
if ac . cc . dopts . healthCheckFunc == nil {
// TODO: add a link to the health check doc in the error message.
grpclog . Error ( "the client side LB channel health check function has not been set." )
} else {
// TODO(deklerk) refactor to just return transport
go ac . startHealthCheck ( hctx , newTr , addr , healthCheckConfig . ServiceName )
healthcheckManagingState = true
}
}
if ! healthcheckManagingState {
ac . mu . Lock ( )
ac . updateConnectivityState ( connectivity . Ready )
ac . mu . Unlock ( )
}
} else {
hcancel ( )
if err == errConnClosing {
return
}
2018-09-21 01:45:40 +03:00
2018-12-18 00:10:13 +03:00
if tryNextAddrFromStart . HasFired ( ) {
break addrLoop
}
continue
}
2018-09-21 01:45:40 +03:00
2018-12-18 00:10:13 +03:00
ac . mu . Lock ( )
reqHandshake := ac . dopts . reqHandshake
ac . mu . Unlock ( )
2018-09-21 01:45:40 +03:00
2018-12-18 00:10:13 +03:00
<- reconnect . Done ( )
hcancel ( )
2018-09-21 01:45:40 +03:00
2018-12-18 00:10:13 +03:00
if reqHandshake == envconfig . RequireHandshakeHybrid {
// In RequireHandshakeHybrid mode, we must check to see whether
// server preface has arrived yet to decide whether to start
// reconnecting at the top of the list (server preface received)
// or continue with the next addr in the list as if the
// connection were not successful (server preface not received).
select {
case <- prefaceReceived :
// We received a server preface - huzzah! We consider this
// a success and restart from the top of the addr list.
ac . mu . Lock ( )
ac . backoffIdx = 0
ac . mu . Unlock ( )
break addrLoop
default :
// Despite having set state to READY, in hybrid mode we
// consider this a failure and continue connecting at the
// next addr in the list.
ac . mu . Lock ( )
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
return
}
ac . updateConnectivityState ( connectivity . TransientFailure )
ac . mu . Unlock ( )
if tryNextAddrFromStart . HasFired ( ) {
break addrLoop
}
}
} else {
// In RequireHandshakeOn mode, we would have already waited for
// the server preface, so we consider this a success and restart
// from the top of the addr list. In RequireHandshakeOff mode,
// we don't care to wait for the server preface before
// considering this a success, so we also restart from the top
// of the addr list.
ac . mu . Lock ( )
ac . backoffIdx = 0
ac . mu . Unlock ( )
break addrLoop
}
}
// After exhausting all addresses, or after need to reconnect after a
// READY, the addrConn enters TRANSIENT_FAILURE.
ac . mu . Lock ( )
2017-08-31 20:59:09 +03:00
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
2018-09-21 01:45:40 +03:00
return
2017-08-31 20:59:09 +03:00
}
2018-12-18 00:10:13 +03:00
ac . updateConnectivityState ( connectivity . TransientFailure )
2018-09-21 01:45:40 +03:00
2018-12-18 00:10:13 +03:00
// Backoff.
b := ac . resetBackoff
timer := time . NewTimer ( backoffFor )
acctx := ac . ctx
2017-08-21 22:27:04 +03:00
ac . mu . Unlock ( )
2017-12-01 20:55:42 +03:00
2018-12-18 00:10:13 +03:00
select {
case <- timer . C :
ac . mu . Lock ( )
ac . backoffIdx ++
ac . mu . Unlock ( )
case <- b :
timer . Stop ( )
case <- acctx . Done ( ) :
timer . Stop ( )
return
2018-09-21 01:45:40 +03:00
}
}
}
2018-12-18 00:10:13 +03:00
// createTransport creates a connection to one of the backends in addrs. It
// sets ac.transport in the success case, or it returns an error if it was
// unable to successfully create a transport.
//
// If waitForHandshake is enabled, it blocks until server preface arrives.
func ( ac * addrConn ) createTransport ( addr resolver . Address , copts transport . ConnectOptions , connectDeadline time . Time , reconnect * grpcsync . Event , prefaceReceived chan struct { } ) ( transport . ClientTransport , error ) {
2018-09-21 01:45:40 +03:00
onCloseCalled := make ( chan struct { } )
2018-12-18 00:10:13 +03:00
target := transport . TargetInfo {
Addr : addr . Addr ,
Metadata : addr . Metadata ,
Authority : ac . cc . authority ,
}
2018-10-19 00:31:34 +03:00
2019-01-16 03:09:50 +03:00
prefaceTimer := time . NewTimer ( time . Until ( connectDeadline ) )
2018-11-01 20:49:35 +03:00
2018-09-21 01:45:40 +03:00
onGoAway := func ( r transport . GoAwayReason ) {
ac . mu . Lock ( )
ac . adjustParams ( r )
ac . mu . Unlock ( )
2018-12-18 00:10:13 +03:00
reconnect . Fire ( )
2018-09-21 01:45:40 +03:00
}
onClose := func ( ) {
close ( onCloseCalled )
2018-09-28 00:23:29 +03:00
prefaceTimer . Stop ( )
2018-12-18 00:10:13 +03:00
reconnect . Fire ( )
2018-09-21 01:45:40 +03:00
}
onPrefaceReceipt := func ( ) {
close ( prefaceReceived )
2018-09-28 00:23:29 +03:00
prefaceTimer . Stop ( )
2018-09-21 01:45:40 +03:00
}
connectCtx , cancel := context . WithDeadline ( ac . ctx , connectDeadline )
2018-11-13 00:30:41 +03:00
defer cancel ( )
2018-09-21 01:45:40 +03:00
if channelz . IsOn ( ) {
copts . ChannelzParentID = ac . channelzID
}
newTr , err := transport . NewClientTransport ( connectCtx , ac . cc . ctx , target , copts , onPrefaceReceipt , onGoAway , onClose )
if err == nil {
2018-11-27 02:06:46 +03:00
if ac . dopts . reqHandshake == envconfig . RequireHandshakeOn {
2017-12-01 20:55:42 +03:00
select {
2018-09-21 01:45:40 +03:00
case <- prefaceTimer . C :
2018-09-28 00:23:29 +03:00
// We didn't get the preface in time.
2017-12-01 20:55:42 +03:00
newTr . Close ( )
2018-09-28 00:23:29 +03:00
err = errors . New ( "timed out waiting for server handshake" )
2018-09-21 01:45:40 +03:00
case <- prefaceReceived :
// We got the preface - huzzah! things are good.
case <- onCloseCalled :
// The transport has already closed - noop.
2018-12-18 00:10:13 +03:00
return nil , errors . New ( "connection closed" )
2017-12-01 20:55:42 +03:00
}
2018-11-27 02:06:46 +03:00
} else if ac . dopts . reqHandshake == envconfig . RequireHandshakeHybrid {
2018-09-28 00:23:29 +03:00
go func ( ) {
select {
case <- prefaceTimer . C :
// We didn't get the preface in time.
newTr . Close ( )
case <- prefaceReceived :
// We got the preface just in the nick of time - huzzah!
case <- onCloseCalled :
// The transport has already closed - noop.
}
} ( )
2015-03-05 20:45:50 +03:00
}
2018-09-21 01:45:40 +03:00
}
if err != nil {
// newTr is either nil, or closed.
ac . cc . blockingpicker . updateConnectionError ( err )
2017-08-31 20:59:09 +03:00
ac . mu . Lock ( )
2017-12-01 20:55:42 +03:00
if ac . state == connectivity . Shutdown {
2018-09-21 01:45:40 +03:00
// ac.tearDown(...) has been invoked.
2017-12-01 20:55:42 +03:00
ac . mu . Unlock ( )
2018-09-21 01:45:40 +03:00
2018-12-18 00:10:13 +03:00
return nil , errConnClosing
2017-12-01 20:55:42 +03:00
}
2017-08-31 20:59:09 +03:00
ac . mu . Unlock ( )
2018-09-21 01:45:40 +03:00
grpclog . Warningf ( "grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting..." , addr , err )
2018-12-18 00:10:13 +03:00
return nil , err
2017-12-01 20:55:42 +03:00
}
2018-09-21 01:45:40 +03:00
2018-11-01 20:49:35 +03:00
// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac . mu . Lock ( )
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
newTr . Close ( )
2018-12-18 00:10:13 +03:00
return nil , errConnClosing
2018-11-01 20:49:35 +03:00
}
ac . mu . Unlock ( )
2018-12-18 00:10:13 +03:00
// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
2017-12-01 20:55:42 +03:00
ac . mu . Lock ( )
2018-04-10 22:48:04 +03:00
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
2018-12-18 00:10:13 +03:00
newTr . Close ( )
return nil , errConnClosing
2018-09-21 01:45:40 +03:00
}
ac . mu . Unlock ( )
2018-12-18 00:10:13 +03:00
return newTr , nil
2018-09-21 01:45:40 +03:00
}
2018-11-01 20:49:35 +03:00
func ( ac * addrConn ) startHealthCheck ( ctx context . Context , newTr transport . ClientTransport , addr resolver . Address , serviceName string ) {
// Set up the health check helper functions
newStream := func ( ) ( interface { } , error ) {
return ac . newClientStream ( ctx , & StreamDesc { ServerStreams : true } , "/grpc.health.v1.Health/Watch" , newTr )
}
firstReady := true
reportHealth := func ( ok bool ) {
ac . mu . Lock ( )
defer ac . mu . Unlock ( )
if ac . transport != newTr {
return
}
if ok {
if firstReady {
firstReady = false
ac . curAddr = addr
}
2018-12-18 00:10:13 +03:00
ac . updateConnectivityState ( connectivity . Ready )
2018-11-01 20:49:35 +03:00
} else {
2018-12-18 00:10:13 +03:00
ac . updateConnectivityState ( connectivity . TransientFailure )
2018-11-01 20:49:35 +03:00
}
}
2018-12-14 03:44:36 +03:00
err := ac . cc . dopts . healthCheckFunc ( ctx , newStream , reportHealth , serviceName )
2018-11-01 20:49:35 +03:00
if err != nil {
if status . Code ( err ) == codes . Unimplemented {
if channelz . IsOn ( ) {
channelz . AddTraceEvent ( ac . channelzID , & channelz . TraceEventDesc {
Desc : "Subchannel health check is unimplemented at server side, thus health check is disabled" ,
Severity : channelz . CtError ,
} )
}
grpclog . Error ( "Subchannel health check is unimplemented at server side, thus health check is disabled" )
} else {
grpclog . Errorf ( "HealthCheckFunc exits with unexpected error %v" , err )
}
}
}
2018-08-27 23:21:48 +03:00
func ( ac * addrConn ) resetConnectBackoff ( ) {
ac . mu . Lock ( )
close ( ac . resetBackoff )
2018-09-21 01:45:40 +03:00
ac . backoffIdx = 0
2018-08-27 23:21:48 +03:00
ac . resetBackoff = make ( chan struct { } )
ac . mu . Unlock ( )
}
2017-10-02 19:22:57 +03:00
// getReadyTransport returns the transport if ac's state is READY.
// Otherwise it returns nil, false.
// If ac's state is IDLE, it will trigger ac to connect.
func ( ac * addrConn ) getReadyTransport ( ) ( transport . ClientTransport , bool ) {
ac . mu . Lock ( )
2018-09-21 01:45:40 +03:00
if ac . state == connectivity . Ready && ac . transport != nil {
2017-10-02 19:22:57 +03:00
t := ac . transport
ac . mu . Unlock ( )
return t , true
}
var idle bool
if ac . state == connectivity . Idle {
idle = true
}
ac . mu . Unlock ( )
// Trigger idle ac to connect.
if idle {
2017-10-24 00:06:33 +03:00
ac . connect ( )
2017-10-02 19:22:57 +03:00
}
return nil , false
}
2016-05-17 01:31:00 +03:00
// tearDown starts to tear down the addrConn.
2015-02-06 04:14:05 +03:00
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
2016-05-07 01:47:09 +03:00
// some edge cases (e.g., the caller opens and closes many addrConn's in a
2015-02-06 04:14:05 +03:00
// tight loop.
2016-07-29 22:57:38 +03:00
// tearDown doesn't remove ac from ac.cc.conns.
2016-05-07 01:47:09 +03:00
func ( ac * addrConn ) tearDown ( err error ) {
ac . mu . Lock ( )
2017-12-15 23:03:41 +03:00
if ac . state == connectivity . Shutdown {
2018-09-21 01:45:40 +03:00
ac . mu . Unlock ( )
2017-12-15 23:03:41 +03:00
return
}
2018-11-01 20:49:35 +03:00
curTr := ac . transport
ac . transport = nil
2018-09-21 20:38:30 +03:00
// We have to set the state to Shutdown before anything else to prevent races
// between setting the state and logic that waits on context cancelation / etc.
2018-09-21 01:45:40 +03:00
ac . updateConnectivityState ( connectivity . Shutdown )
2018-09-21 20:38:30 +03:00
ac . cancel ( )
2018-09-21 01:45:40 +03:00
ac . tearDownErr = err
2017-10-20 01:16:16 +03:00
ac . curAddr = resolver . Address { }
2018-11-01 20:49:35 +03:00
if err == errConnDrain && curTr != nil {
2016-07-26 02:35:32 +03:00
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
// ii) there are concurrent name resolver/Balancer triggered
// address removal and GoAway.
2018-09-21 01:45:40 +03:00
// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
ac . mu . Unlock ( )
2018-11-01 20:49:35 +03:00
curTr . GracefulClose ( )
2018-09-21 01:45:40 +03:00
ac . mu . Lock ( )
2016-07-26 02:35:32 +03:00
}
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
2018-09-12 21:15:32 +03:00
channelz . AddTraceEvent ( ac . channelzID , & channelz . TraceEventDesc {
Desc : "Subchannel Deleted" ,
Severity : channelz . CtINFO ,
Parent : & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Subchanel(id:%d) deleted" , ac . channelzID ) ,
Severity : channelz . CtINFO ,
} ,
} )
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
// the entity beng deleted, and thus prevent it from being deleted right away.
2018-04-09 21:13:06 +03:00
channelz . RemoveEntry ( ac . channelzID )
}
2018-09-21 01:45:40 +03:00
ac . mu . Unlock ( )
2015-02-06 04:14:05 +03:00
}
2017-08-31 20:59:09 +03:00
func ( ac * addrConn ) getState ( ) connectivity . State {
ac . mu . Lock ( )
defer ac . mu . Unlock ( )
return ac . state
}
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
2018-04-09 21:13:06 +03:00
func ( ac * addrConn ) ChannelzMetric ( ) * channelz . ChannelInternalMetric {
2018-04-23 21:22:25 +03:00
ac . mu . Lock ( )
addr := ac . curAddr . Addr
ac . mu . Unlock ( )
return & channelz . ChannelInternalMetric {
2018-08-06 21:17:12 +03:00
State : ac . getState ( ) ,
2018-04-23 21:22:25 +03:00
Target : addr ,
2018-08-06 21:17:12 +03:00
CallsStarted : atomic . LoadInt64 ( & ac . czData . callsStarted ) ,
CallsSucceeded : atomic . LoadInt64 ( & ac . czData . callsSucceeded ) ,
CallsFailed : atomic . LoadInt64 ( & ac . czData . callsFailed ) ,
LastCallStartedTimestamp : time . Unix ( 0 , atomic . LoadInt64 ( & ac . czData . lastCallStartedTime ) ) ,
2018-04-23 21:22:25 +03:00
}
}
func ( ac * addrConn ) incrCallsStarted ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & ac . czData . callsStarted , 1 )
atomic . StoreInt64 ( & ac . czData . lastCallStartedTime , time . Now ( ) . UnixNano ( ) )
2018-04-23 21:22:25 +03:00
}
func ( ac * addrConn ) incrCallsSucceeded ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & ac . czData . callsSucceeded , 1 )
2018-04-23 21:22:25 +03:00
}
func ( ac * addrConn ) incrCallsFailed ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & ac . czData . callsFailed , 1 )
2018-04-09 21:13:06 +03:00
}
2018-06-28 02:18:41 +03:00
type retryThrottler struct {
max float64
thresh float64
ratio float64
mu sync . Mutex
tokens float64 // TODO(dfawley): replace with atomic and remove lock.
}
// throttle subtracts a retry token from the pool and returns whether a retry
// should be throttled (disallowed) based upon the retry throttling policy in
// the service config.
func ( rt * retryThrottler ) throttle ( ) bool {
if rt == nil {
return false
}
rt . mu . Lock ( )
defer rt . mu . Unlock ( )
rt . tokens --
if rt . tokens < 0 {
rt . tokens = 0
}
return rt . tokens <= rt . thresh
}
func ( rt * retryThrottler ) successfulRPC ( ) {
if rt == nil {
return
}
rt . mu . Lock ( )
defer rt . mu . Unlock ( )
rt . tokens += rt . ratio
if rt . tokens > rt . max {
rt . tokens = rt . max
}
}
2018-08-07 02:02:47 +03:00
type channelzChannel struct {
cc * ClientConn
}
func ( c * channelzChannel ) ChannelzMetric ( ) * channelz . ChannelInternalMetric {
return c . cc . channelzMetric ( )
}
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
//
// Deprecated: This error is never returned by grpc and should not be
// referenced by users.
var ErrClientConnTimeout = errors . New ( "grpc: timed out when dialing" )