2015-02-06 04:14:05 +03:00
/ *
*
2017-06-08 15:42:19 +03:00
* Copyright 2014 gRPC authors .
2015-02-06 04:14:05 +03:00
*
2017-06-08 15:42:19 +03:00
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2015-02-06 04:14:05 +03:00
*
2017-06-08 15:42:19 +03:00
* http : //www.apache.org/licenses/LICENSE-2.0
2015-02-06 04:14:05 +03:00
*
2017-06-08 15:42:19 +03:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
2015-02-06 04:14:05 +03:00
*
* /
2015-02-09 03:35:38 +03:00
package grpc
2015-02-06 04:14:05 +03:00
import (
2018-11-02 01:43:42 +03:00
"context"
2015-02-19 14:57:41 +03:00
"errors"
2017-10-02 19:22:57 +03:00
"fmt"
2017-08-14 22:24:23 +03:00
"math"
2015-04-17 23:50:18 +03:00
"net"
2017-08-31 20:59:09 +03:00
"reflect"
2017-04-04 02:03:05 +03:00
"strings"
2015-02-06 04:14:05 +03:00
"sync"
2018-06-28 02:18:41 +03:00
"sync/atomic"
2015-02-06 04:14:05 +03:00
"time"
2017-08-31 20:59:09 +03:00
"google.golang.org/grpc/balancer"
2017-10-19 21:32:06 +03:00
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
"google.golang.org/grpc/codes"
2017-08-09 20:31:12 +03:00
"google.golang.org/grpc/connectivity"
2015-02-09 03:39:06 +03:00
"google.golang.org/grpc/credentials"
2015-05-09 12:43:59 +03:00
"google.golang.org/grpc/grpclog"
2018-06-14 02:07:37 +03:00
"google.golang.org/grpc/internal/backoff"
2018-06-19 03:59:08 +03:00
"google.golang.org/grpc/internal/channelz"
2018-11-27 02:06:46 +03:00
"google.golang.org/grpc/internal/envconfig"
2018-11-10 00:53:47 +03:00
"google.golang.org/grpc/internal/grpcsync"
2018-07-11 21:22:45 +03:00
"google.golang.org/grpc/internal/transport"
2017-02-28 22:49:51 +03:00
"google.golang.org/grpc/keepalive"
2017-08-31 20:59:09 +03:00
"google.golang.org/grpc/resolver"
2017-10-20 22:03:44 +03:00
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
"google.golang.org/grpc/status"
2015-02-06 04:14:05 +03:00
)
2018-03-07 21:36:17 +03:00
const (
// minimum time to give a connection to complete
minConnectTimeout = 20 * time . Second
2018-06-05 19:54:12 +03:00
// must match grpclbName in grpclb/grpclb.go
grpclbName = "grpclb"
2018-03-07 21:36:17 +03:00
)
2015-02-19 14:57:41 +03:00
var (
2016-05-25 21:28:45 +03:00
// ErrClientConnClosing indicates that the operation is illegal because
// the ClientConn is closing.
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
//
// Deprecated: this error should not be relied upon by users; use the status
// code of Canceled instead.
ErrClientConnClosing = status . Error ( codes . Canceled , "grpc: the client connection is closing" )
2017-11-07 00:45:11 +03:00
// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors . New ( "grpc: the connection is drained" )
// errConnClosing indicates that the connection is closing.
errConnClosing = errors . New ( "grpc: the connection is closing" )
// errBalancerClosed indicates that the balancer is closed.
errBalancerClosed = errors . New ( "grpc: balancer is closed" )
2019-04-03 20:50:28 +03:00
// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
// service config.
invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
2017-11-07 00:45:11 +03:00
)
2016-05-25 21:28:45 +03:00
2017-11-07 00:45:11 +03:00
// The following errors are returned from Dial and DialContext
var (
2016-05-25 21:28:45 +03:00
// errNoTransportSecurity indicates that there is no transport security
2016-03-17 02:40:16 +03:00
// being set for ClientConn. Users should either set one or explicitly
2015-08-28 03:21:52 +03:00
// call WithInsecure DialOption to disable security.
2016-05-25 21:28:45 +03:00
errNoTransportSecurity = errors . New ( "grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)" )
2018-09-25 23:17:25 +03:00
// errTransportCredsAndBundle indicates that creds bundle is used together
// with other individual Transport Credentials.
errTransportCredsAndBundle = errors . New ( "grpc: credentials.Bundle may not be used with individual TransportCredentials" )
2016-06-08 23:44:43 +03:00
// errTransportCredentialsMissing indicates that users want to transmit security
2019-01-23 20:59:48 +03:00
// information (e.g., OAuth2 token) which requires secure connection on an insecure
2015-08-28 23:19:36 +03:00
// connection.
2016-06-08 23:44:43 +03:00
errTransportCredentialsMissing = errors . New ( "grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)" )
// errCredentialsConflict indicates that grpc.WithTransportCredentials()
// and grpc.WithInsecure() are both called for a connection.
errCredentialsConflict = errors . New ( "grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)" )
2015-02-19 14:57:41 +03:00
)
2017-04-07 00:08:04 +03:00
const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
2017-08-14 22:24:23 +03:00
defaultClientMaxSendMessageSize = math . MaxInt32
2018-06-27 21:16:33 +03:00
// http2IOBufSize specifies the buffer size for sending frames.
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
2017-04-07 00:08:04 +03:00
)
2017-03-10 03:58:23 +03:00
2016-08-15 20:17:09 +03:00
// Dial creates a client connection to the given target.
2015-02-06 04:14:05 +03:00
func Dial ( target string , opts ... DialOption ) ( * ClientConn , error ) {
2016-08-15 20:17:09 +03:00
return DialContext ( context . Background ( ) , target , opts ... )
}
2018-04-13 01:11:22 +03:00
// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use WithBlock() dial option.
//
// In the non-blocking case, the ctx does not act against the connection. It
// only controls the setup steps.
//
// In the blocking case, ctx can be used to cancel or expire the pending
// connection. Once this function returns, the cancellation and expiration of
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
2018-01-19 00:10:52 +03:00
//
// The target name syntax is defined in
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
2016-08-24 22:56:16 +03:00
func DialContext ( ctx context . Context , target string , opts ... DialOption ) ( conn * ClientConn , err error ) {
2015-09-29 20:24:03 +03:00
cc := & ClientConn {
2018-11-10 00:53:47 +03:00
target : target ,
csMgr : & connectivityStateManager { } ,
conns : make ( map [ * addrConn ] struct { } ) ,
dopts : defaultDialOptions ( ) ,
blockingpicker : newPickerWrapper ( ) ,
czData : new ( channelzData ) ,
firstResolveEvent : grpcsync . NewEvent ( ) ,
2015-09-29 20:24:03 +03:00
}
2018-06-28 02:18:41 +03:00
cc . retryThrottler . Store ( ( * retryThrottler ) ( nil ) )
2016-08-24 04:18:18 +03:00
cc . ctx , cc . cancel = context . WithCancel ( context . Background ( ) )
2017-04-04 01:03:24 +03:00
2016-12-20 03:31:00 +03:00
for _ , opt := range opts {
2018-07-20 03:33:42 +03:00
opt . apply ( & cc . dopts )
2016-12-20 03:31:00 +03:00
}
2017-10-02 19:22:57 +03:00
2019-04-03 01:42:35 +03:00
defer func ( ) {
if err != nil {
cc . Close ( )
}
} ( )
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
if cc . dopts . channelzParentID != 0 {
2018-08-07 02:02:47 +03:00
cc . channelzID = channelz . RegisterChannel ( & channelzChannel { cc } , cc . dopts . channelzParentID , target )
2018-09-12 21:15:32 +03:00
channelz . AddTraceEvent ( cc . channelzID , & channelz . TraceEventDesc {
Desc : "Channel Created" ,
Severity : channelz . CtINFO ,
Parent : & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Nested Channel(id:%d) created" , cc . channelzID ) ,
Severity : channelz . CtINFO ,
} ,
} )
2018-04-09 21:13:06 +03:00
} else {
2018-08-07 02:02:47 +03:00
cc . channelzID = channelz . RegisterChannel ( & channelzChannel { cc } , 0 , target )
2018-09-12 21:15:32 +03:00
channelz . AddTraceEvent ( cc . channelzID , & channelz . TraceEventDesc {
Desc : "Channel Created" ,
Severity : channelz . CtINFO ,
} )
2018-04-09 21:13:06 +03:00
}
2018-09-12 21:15:32 +03:00
cc . csMgr . channelzID = cc . channelzID
2018-04-09 21:13:06 +03:00
}
2017-10-02 19:22:57 +03:00
if ! cc . dopts . insecure {
2018-09-25 23:17:25 +03:00
if cc . dopts . copts . TransportCredentials == nil && cc . dopts . copts . CredsBundle == nil {
2017-10-02 19:22:57 +03:00
return nil , errNoTransportSecurity
}
2018-09-25 23:17:25 +03:00
if cc . dopts . copts . TransportCredentials != nil && cc . dopts . copts . CredsBundle != nil {
return nil , errTransportCredsAndBundle
}
2017-10-02 19:22:57 +03:00
} else {
2018-09-25 23:17:25 +03:00
if cc . dopts . copts . TransportCredentials != nil || cc . dopts . copts . CredsBundle != nil {
2017-10-02 19:22:57 +03:00
return nil , errCredentialsConflict
}
for _ , cd := range cc . dopts . copts . PerRPCCredentials {
if cd . RequireTransportSecurity ( ) {
return nil , errTransportCredentialsMissing
}
}
}
2019-04-03 20:50:28 +03:00
if cc . dopts . defaultServiceConfigRawJSON != nil {
sc , err := parseServiceConfig ( * cc . dopts . defaultServiceConfigRawJSON )
if err != nil {
return nil , fmt . Errorf ( "%s: %v" , invalidDefaultServiceConfigErrPrefix , err )
}
cc . dopts . defaultServiceConfig = sc
}
2017-04-11 00:33:51 +03:00
cc . mkp = cc . dopts . copts . KeepaliveParams
2017-03-24 21:29:02 +03:00
2017-04-18 02:08:50 +03:00
if cc . dopts . copts . Dialer == nil {
cc . dopts . copts . Dialer = newProxyDialer (
func ( ctx context . Context , addr string ) ( net . Conn , error ) {
2018-04-09 21:12:34 +03:00
network , addr := parseDialTarget ( addr )
2018-11-02 01:43:42 +03:00
return ( & net . Dialer { } ) . DialContext ( ctx , network , addr )
2017-04-18 02:08:50 +03:00
} ,
)
}
2017-03-24 21:29:02 +03:00
if cc . dopts . copts . UserAgent != "" {
cc . dopts . copts . UserAgent += " " + grpcUA
} else {
cc . dopts . copts . UserAgent = grpcUA
}
2016-12-20 03:31:00 +03:00
if cc . dopts . timeout > 0 {
var cancel context . CancelFunc
ctx , cancel = context . WithTimeout ( ctx , cc . dopts . timeout )
defer cancel ( )
}
2019-04-04 19:58:15 +03:00
defer func ( ) {
select {
case <- ctx . Done ( ) :
conn , err = nil , ctx . Err ( )
default :
}
} ( )
2016-12-20 03:31:00 +03:00
2017-05-15 23:51:11 +03:00
scSet := false
2016-12-20 03:31:00 +03:00
if cc . dopts . scChan != nil {
2017-04-05 21:08:50 +03:00
// Try to get an initial service config.
2016-12-20 03:31:00 +03:00
select {
case sc , ok := <- cc . dopts . scChan :
if ok {
2019-04-03 20:50:28 +03:00
cc . sc = & sc
2017-05-15 23:51:11 +03:00
scSet = true
2016-12-20 03:31:00 +03:00
}
2017-04-05 21:08:50 +03:00
default :
2016-12-20 03:31:00 +03:00
}
2015-02-06 04:14:05 +03:00
}
2016-03-23 21:49:05 +03:00
if cc . dopts . bs == nil {
2018-06-14 02:07:37 +03:00
cc . dopts . bs = backoff . Exponential {
MaxDelay : DefaultBackoffConfig . MaxDelay ,
}
2016-03-23 21:49:05 +03:00
}
2018-03-27 23:58:27 +03:00
if cc . dopts . resolverBuilder == nil {
// Only try to parse target when resolver builder is not already set.
cc . parsedTarget = parseTarget ( cc . target )
grpclog . Infof ( "parsed scheme: %q" , cc . parsedTarget . Scheme )
cc . dopts . resolverBuilder = resolver . Get ( cc . parsedTarget . Scheme )
if cc . dopts . resolverBuilder == nil {
2019-01-23 20:59:48 +03:00
// If resolver builder is still nil, the parsed target's scheme is
2018-03-27 23:58:27 +03:00
// not registered. Fallback to default resolver and set Endpoint to
2019-01-23 20:59:48 +03:00
// the original target.
2018-03-27 23:58:27 +03:00
grpclog . Infof ( "scheme %q not registered, fallback to default scheme" , cc . parsedTarget . Scheme )
cc . parsedTarget = resolver . Target {
Scheme : resolver . GetDefaultScheme ( ) ,
Endpoint : target ,
}
cc . dopts . resolverBuilder = resolver . Get ( cc . parsedTarget . Scheme )
}
} else {
cc . parsedTarget = resolver . Target { Endpoint : target }
}
2016-09-20 01:11:57 +03:00
creds := cc . dopts . copts . TransportCredentials
if creds != nil && creds . Info ( ) . ServerName != "" {
cc . authority = creds . Info ( ) . ServerName
2018-07-13 19:56:47 +03:00
} else if cc . dopts . insecure && cc . dopts . authority != "" {
cc . authority = cc . dopts . authority
2016-09-20 01:11:57 +03:00
} else {
2017-10-23 21:40:43 +03:00
// Use endpoint from "scheme://authority/endpoint" as the default
// authority for ClientConn.
cc . authority = cc . parsedTarget . Endpoint
2016-09-20 01:11:57 +03:00
}
2017-08-31 20:59:09 +03:00
2017-05-15 23:51:11 +03:00
if cc . dopts . scChan != nil && ! scSet {
2017-05-16 00:36:20 +03:00
// Blocking wait for the initial service config.
2017-05-15 23:51:11 +03:00
select {
case sc , ok := <- cc . dopts . scChan :
if ok {
2019-04-03 20:50:28 +03:00
cc . sc = & sc
2017-05-15 23:51:11 +03:00
}
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
}
}
2016-12-20 03:31:00 +03:00
if cc . dopts . scChan != nil {
go cc . scWatcher ( )
}
2017-03-21 21:35:53 +03:00
2017-10-19 21:32:06 +03:00
var credsClone credentials . TransportCredentials
if creds := cc . dopts . copts . TransportCredentials ; creds != nil {
credsClone = creds . Clone ( )
}
cc . balancerBuildOpts = balancer . BuildOptions {
2018-04-09 21:13:06 +03:00
DialCreds : credsClone ,
2018-09-25 23:17:25 +03:00
CredsBundle : cc . dopts . copts . CredsBundle ,
2018-04-09 21:13:06 +03:00
Dialer : cc . dopts . copts . Dialer ,
ChannelzParentID : cc . channelzID ,
2017-10-19 21:32:06 +03:00
}
2017-10-02 19:22:57 +03:00
// Build the resolver.
2018-11-10 02:31:07 +03:00
rWrapper , err := newCCResolverWrapper ( cc )
2017-10-02 19:22:57 +03:00
if err != nil {
return nil , fmt . Errorf ( "failed to build resolver: %v" , err )
}
2018-11-10 02:31:07 +03:00
cc . mu . Lock ( )
cc . resolverWrapper = rWrapper
cc . mu . Unlock ( )
2017-08-31 20:59:09 +03:00
// A blocking dial blocks until the clientConn is ready.
if cc . dopts . block {
for {
s := cc . GetState ( )
if s == connectivity . Ready {
break
2018-08-27 20:28:41 +03:00
} else if cc . dopts . copts . FailOnNonTempDialError && s == connectivity . TransientFailure {
if err = cc . blockingpicker . connectionError ( ) ; err != nil {
2018-11-01 20:49:35 +03:00
terr , ok := err . ( interface {
Temporary ( ) bool
} )
2018-08-27 20:28:41 +03:00
if ok && ! terr . Temporary ( ) {
return nil , err
}
}
2017-08-31 20:59:09 +03:00
}
if ! cc . WaitForStateChange ( ctx , s ) {
// ctx got timeout or canceled.
return nil , ctx . Err ( )
}
}
}
2015-09-29 20:24:03 +03:00
return cc , nil
2015-09-23 22:18:41 +03:00
}
2017-08-09 20:31:12 +03:00
// connectivityStateManager keeps the connectivity.State of ClientConn.
2017-07-25 01:00:53 +03:00
// This struct will eventually be exported so the balancers can access it.
type connectivityStateManager struct {
mu sync . Mutex
2017-08-09 20:31:12 +03:00
state connectivity . State
2017-07-25 01:00:53 +03:00
notifyChan chan struct { }
2018-09-12 21:15:32 +03:00
channelzID int64
2017-07-25 01:00:53 +03:00
}
2017-08-09 20:31:12 +03:00
// updateState updates the connectivity.State of ClientConn.
2017-07-25 01:00:53 +03:00
// If there's a change it notifies goroutines waiting on state change to
// happen.
2017-08-09 20:31:12 +03:00
func ( csm * connectivityStateManager ) updateState ( state connectivity . State ) {
2017-07-25 01:00:53 +03:00
csm . mu . Lock ( )
defer csm . mu . Unlock ( )
2017-08-09 20:31:12 +03:00
if csm . state == connectivity . Shutdown {
2017-07-25 01:00:53 +03:00
return
}
if csm . state == state {
return
}
csm . state = state
2018-09-12 21:15:32 +03:00
if channelz . IsOn ( ) {
channelz . AddTraceEvent ( csm . channelzID , & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Channel Connectivity change to %v" , state ) ,
Severity : channelz . CtINFO ,
} )
}
2017-07-25 01:00:53 +03:00
if csm . notifyChan != nil {
// There are other goroutines waiting on this channel.
close ( csm . notifyChan )
csm . notifyChan = nil
}
}
2017-08-09 20:31:12 +03:00
func ( csm * connectivityStateManager ) getState ( ) connectivity . State {
2017-07-25 01:00:53 +03:00
csm . mu . Lock ( )
defer csm . mu . Unlock ( )
return csm . state
}
func ( csm * connectivityStateManager ) getNotifyChan ( ) <- chan struct { } {
csm . mu . Lock ( )
defer csm . mu . Unlock ( )
if csm . notifyChan == nil {
csm . notifyChan = make ( chan struct { } )
}
return csm . notifyChan
}
2016-05-18 21:18:10 +03:00
// ClientConn represents a client connection to an RPC server.
2015-02-06 04:14:05 +03:00
type ClientConn struct {
2016-07-29 20:16:56 +03:00
ctx context . Context
cancel context . CancelFunc
2017-10-23 21:40:43 +03:00
target string
parsedTarget resolver . Target
authority string
dopts dialOptions
csMgr * connectivityStateManager
2015-09-24 05:09:37 +03:00
2017-10-19 21:32:06 +03:00
balancerBuildOpts balancer . BuildOptions
blockingpicker * pickerWrapper
2017-08-31 20:59:09 +03:00
2018-11-10 02:31:07 +03:00
mu sync . RWMutex
resolverWrapper * ccResolverWrapper
2019-04-03 20:50:28 +03:00
sc * ServiceConfig
2018-11-10 02:31:07 +03:00
conns map [ * addrConn ] struct { }
2017-06-16 19:59:37 +03:00
// Keepalive parameter can be updated if a GoAway is received.
2017-10-19 21:32:06 +03:00
mkp keepalive . ClientParameters
curBalancerName string
balancerWrapper * ccBalancerWrapper
2018-06-28 02:18:41 +03:00
retryThrottler atomic . Value
2018-04-09 21:13:06 +03:00
2018-11-10 00:53:47 +03:00
firstResolveEvent * grpcsync . Event
2018-08-06 21:17:12 +03:00
channelzID int64 // channelz unique identification number
czData * channelzData
2015-09-24 05:09:37 +03:00
}
2017-08-09 20:31:12 +03:00
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
2017-07-25 01:00:53 +03:00
// ctx expires. A true value is returned in former case and false in latter.
2017-08-09 20:31:12 +03:00
// This is an EXPERIMENTAL API.
func ( cc * ClientConn ) WaitForStateChange ( ctx context . Context , sourceState connectivity . State ) bool {
2017-07-25 01:00:53 +03:00
ch := cc . csMgr . getNotifyChan ( )
if cc . csMgr . getState ( ) != sourceState {
return true
}
select {
case <- ctx . Done ( ) :
return false
case <- ch :
return true
}
}
2017-08-09 20:31:12 +03:00
// GetState returns the connectivity.State of ClientConn.
// This is an EXPERIMENTAL API.
func ( cc * ClientConn ) GetState ( ) connectivity . State {
2017-07-25 01:00:53 +03:00
return cc . csMgr . getState ( )
}
2016-12-20 03:31:00 +03:00
func ( cc * ClientConn ) scWatcher ( ) {
for {
select {
case sc , ok := <- cc . dopts . scChan :
if ! ok {
return
}
cc . mu . Lock ( )
// TODO: load balance policy runtime change is ignored.
2019-01-23 20:59:48 +03:00
// We may revisit this decision in the future.
2019-04-03 20:50:28 +03:00
cc . sc = & sc
2016-12-20 03:31:00 +03:00
cc . mu . Unlock ( )
case <- cc . ctx . Done ( ) :
return
}
}
}
2018-11-10 00:53:47 +03:00
// waitForResolvedAddrs blocks until the resolver has provided addresses or the
// context expires. Returns nil unless the context expires first; otherwise
// returns a status error based on the context.
func ( cc * ClientConn ) waitForResolvedAddrs ( ctx context . Context ) error {
// This is on the RPC path, so we use a fast path to avoid the
// more-expensive "select" below after the resolver has returned once.
if cc . firstResolveEvent . HasFired ( ) {
return nil
}
select {
case <- cc . firstResolveEvent . Done ( ) :
return nil
case <- ctx . Done ( ) :
return status . FromContextError ( ctx . Err ( ) ) . Err ( )
case <- cc . ctx . Done ( ) :
return ErrClientConnClosing
}
}
2019-04-03 20:50:28 +03:00
// gRPC should resort to default service config when:
// * resolver service config is disabled
// * or, resolver does not return a service config or returns an invalid one.
func ( cc * ClientConn ) fallbackToDefaultServiceConfig ( sc string ) bool {
if cc . dopts . disableServiceConfig {
return true
}
// The logic below is temporary, will be removed once we change the resolver.State ServiceConfig field type.
// Right now, we assume that empty service config string means resolver does not return a config.
if sc == "" {
return true
}
// TODO: the logic below is temporary. Once we finish the logic to validate service config
// in resolver, we will replace the logic below.
_ , err := parseServiceConfig ( sc )
return err != nil
}
2019-03-22 20:48:55 +03:00
func ( cc * ClientConn ) updateResolverState ( s resolver . State ) error {
2017-10-19 21:32:06 +03:00
cc . mu . Lock ( )
defer cc . mu . Unlock ( )
2019-03-22 20:48:55 +03:00
// Check if the ClientConn is already closed. Some fields (e.g.
// balancerWrapper) are set to nil when closing the ClientConn, and could
// cause nil pointer panic if we don't have this check.
2017-10-19 21:32:06 +03:00
if cc . conns == nil {
2019-03-22 20:48:55 +03:00
return nil
2017-11-29 00:16:53 +03:00
}
2019-04-03 20:50:28 +03:00
if cc . fallbackToDefaultServiceConfig ( s . ServiceConfig ) {
if cc . dopts . defaultServiceConfig != nil && cc . sc == nil {
cc . applyServiceConfig ( cc . dopts . defaultServiceConfig )
}
} else {
// TODO: the parsing logic below will be moved inside resolver.
2019-03-22 20:48:55 +03:00
sc , err := parseServiceConfig ( s . ServiceConfig )
if err != nil {
return err
}
2019-04-03 20:50:28 +03:00
if cc . sc == nil || cc . sc . rawJSONString != s . ServiceConfig {
cc . applyServiceConfig ( sc )
2019-03-22 20:48:55 +03:00
}
2017-10-19 21:32:06 +03:00
}
2019-04-03 20:50:28 +03:00
// update the service config that will be sent to balancer.
if cc . sc != nil {
s . ServiceConfig = cc . sc . rawJSONString
}
2017-12-19 02:35:42 +03:00
if cc . dopts . balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
2017-12-12 23:45:05 +03:00
var isGRPCLB bool
2019-03-22 20:48:55 +03:00
for _ , a := range s . Addresses {
2017-12-12 23:45:05 +03:00
if a . Type == resolver . GRPCLB {
isGRPCLB = true
break
}
2017-11-29 00:16:53 +03:00
}
2017-12-12 23:45:05 +03:00
var newBalancerName string
2019-03-22 20:48:55 +03:00
// TODO: use new loadBalancerConfig field with appropriate priority.
2017-12-12 23:45:05 +03:00
if isGRPCLB {
newBalancerName = grpclbName
2019-04-03 20:50:28 +03:00
} else if cc . sc != nil && cc . sc . LB != nil {
2019-03-22 20:48:55 +03:00
newBalancerName = * cc . sc . LB
2017-12-12 23:45:05 +03:00
} else {
2019-03-22 20:48:55 +03:00
newBalancerName = PickFirstBalancerName
2017-12-12 23:45:05 +03:00
}
cc . switchBalancer ( newBalancerName )
2017-12-19 02:35:42 +03:00
} else if cc . balancerWrapper == nil {
// Balancer dial option was set, and this is the first time handling
// resolved addresses. Build a balancer with dopts.balancerBuilder.
cc . balancerWrapper = newCCBalancerWrapper ( cc , cc . dopts . balancerBuilder , cc . balancerBuildOpts )
2017-10-19 21:32:06 +03:00
}
2017-12-19 02:35:42 +03:00
2019-03-22 20:48:55 +03:00
cc . balancerWrapper . updateResolverState ( s )
cc . firstResolveEvent . Fire ( )
return nil
2017-10-19 21:32:06 +03:00
}
2017-12-12 23:45:05 +03:00
// switchBalancer starts the switching from current balancer to the balancer
// with the given name.
//
// It will NOT send the current address list to the new balancer. If needed,
// caller of this function should send address list to the new balancer after
// this function returns.
2017-11-23 00:59:20 +03:00
//
// Caller must hold cc.mu.
2017-10-19 21:32:06 +03:00
func ( cc * ClientConn ) switchBalancer ( name string ) {
2017-12-12 23:45:05 +03:00
if strings . ToLower ( cc . curBalancerName ) == strings . ToLower ( name ) {
2017-10-19 21:32:06 +03:00
return
}
2017-12-12 23:45:05 +03:00
grpclog . Infof ( "ClientConn switching balancer to %q" , name )
if cc . dopts . balancerBuilder != nil {
2017-12-19 02:35:42 +03:00
grpclog . Infoln ( "ignoring balancer switching: Balancer DialOption used instead" )
2017-10-19 21:32:06 +03:00
return
}
2017-11-23 00:59:20 +03:00
if cc . balancerWrapper != nil {
cc . balancerWrapper . close ( )
}
2017-10-19 21:32:06 +03:00
builder := balancer . Get ( name )
2018-09-12 21:15:32 +03:00
if channelz . IsOn ( ) {
if builder == nil {
channelz . AddTraceEvent ( cc . channelzID , & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Channel switches to new LB policy %q due to fallback from invalid balancer name" , PickFirstBalancerName ) ,
Severity : channelz . CtWarning ,
} )
} else {
channelz . AddTraceEvent ( cc . channelzID , & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Channel switches to new LB policy %q" , name ) ,
Severity : channelz . CtINFO ,
} )
}
}
2017-10-19 21:32:06 +03:00
if builder == nil {
2017-12-12 23:45:05 +03:00
grpclog . Infof ( "failed to get balancer builder for: %v, using pick_first instead" , name )
2017-10-19 21:32:06 +03:00
builder = newPickfirstBuilder ( )
}
2018-09-12 21:15:32 +03:00
2017-10-19 21:32:06 +03:00
cc . curBalancerName = builder . Name ( )
cc . balancerWrapper = newCCBalancerWrapper ( cc , builder , cc . balancerBuildOpts )
}
func ( cc * ClientConn ) handleSubConnStateChange ( sc balancer . SubConn , s connectivity . State ) {
cc . mu . Lock ( )
if cc . conns == nil {
cc . mu . Unlock ( )
return
}
// TODO(bar switching) send updates to all balancer wrappers when balancer
// gracefully switching is supported.
cc . balancerWrapper . handleSubConnStateChange ( sc , s )
cc . mu . Unlock ( )
}
2017-08-31 20:59:09 +03:00
// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
2017-11-23 00:59:20 +03:00
//
// Caller needs to make sure len(addrs) > 0.
2018-09-25 23:17:25 +03:00
func ( cc * ClientConn ) newAddrConn ( addrs [ ] resolver . Address , opts balancer . NewSubConnOptions ) ( * addrConn , error ) {
2017-08-31 20:59:09 +03:00
ac := & addrConn {
2018-12-18 00:10:13 +03:00
cc : cc ,
addrs : addrs ,
scopts : opts ,
dopts : cc . dopts ,
czData : new ( channelzData ) ,
resetBackoff : make ( chan struct { } ) ,
2017-08-21 22:27:04 +03:00
}
2017-08-31 20:59:09 +03:00
ac . ctx , ac . cancel = context . WithCancel ( cc . ctx )
// Track ac in cc. This needs to be done before any getTransport(...) is called.
cc . mu . Lock ( )
if cc . conns == nil {
cc . mu . Unlock ( )
return nil , ErrClientConnClosing
2017-08-21 22:27:04 +03:00
}
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
ac . channelzID = channelz . RegisterSubChannel ( ac , cc . channelzID , "" )
2018-09-12 21:15:32 +03:00
channelz . AddTraceEvent ( ac . channelzID , & channelz . TraceEventDesc {
Desc : "Subchannel Created" ,
Severity : channelz . CtINFO ,
Parent : & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Subchannel(id:%d) created" , ac . channelzID ) ,
Severity : channelz . CtINFO ,
} ,
} )
2018-04-09 21:13:06 +03:00
}
2017-08-31 20:59:09 +03:00
cc . conns [ ac ] = struct { } { }
cc . mu . Unlock ( )
return ac , nil
2017-08-21 22:27:04 +03:00
}
2017-08-31 20:59:09 +03:00
// removeAddrConn removes the addrConn in the subConn from clientConn.
// It also tears down the ac with the given error.
func ( cc * ClientConn ) removeAddrConn ( ac * addrConn , err error ) {
cc . mu . Lock ( )
if cc . conns == nil {
cc . mu . Unlock ( )
2017-08-21 22:27:04 +03:00
return
}
2017-08-31 20:59:09 +03:00
delete ( cc . conns , ac )
cc . mu . Unlock ( )
ac . tearDown ( err )
2017-08-21 22:27:04 +03:00
}
2018-08-07 02:02:47 +03:00
func ( cc * ClientConn ) channelzMetric ( ) * channelz . ChannelInternalMetric {
2018-04-23 21:22:25 +03:00
return & channelz . ChannelInternalMetric {
2018-08-06 21:17:12 +03:00
State : cc . GetState ( ) ,
2018-04-23 21:22:25 +03:00
Target : cc . target ,
2018-08-06 21:17:12 +03:00
CallsStarted : atomic . LoadInt64 ( & cc . czData . callsStarted ) ,
CallsSucceeded : atomic . LoadInt64 ( & cc . czData . callsSucceeded ) ,
CallsFailed : atomic . LoadInt64 ( & cc . czData . callsFailed ) ,
LastCallStartedTimestamp : time . Unix ( 0 , atomic . LoadInt64 ( & cc . czData . lastCallStartedTime ) ) ,
2018-04-23 21:22:25 +03:00
}
}
2018-07-24 02:19:11 +03:00
// Target returns the target string of the ClientConn.
// This is an EXPERIMENTAL API.
func ( cc * ClientConn ) Target ( ) string {
return cc . target
}
2018-04-23 21:22:25 +03:00
func ( cc * ClientConn ) incrCallsStarted ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & cc . czData . callsStarted , 1 )
atomic . StoreInt64 ( & cc . czData . lastCallStartedTime , time . Now ( ) . UnixNano ( ) )
2018-04-23 21:22:25 +03:00
}
func ( cc * ClientConn ) incrCallsSucceeded ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & cc . czData . callsSucceeded , 1 )
2018-04-23 21:22:25 +03:00
}
func ( cc * ClientConn ) incrCallsFailed ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & cc . czData . callsFailed , 1 )
2018-04-09 21:13:06 +03:00
}
2018-10-09 01:58:54 +03:00
// connect starts creating a transport.
2017-10-02 19:22:57 +03:00
// It does nothing if the ac is not IDLE.
2017-08-31 20:59:09 +03:00
// TODO(bar) Move this to the addrConn section.
2017-10-24 00:06:33 +03:00
func ( ac * addrConn ) connect ( ) error {
2017-08-31 20:59:09 +03:00
ac . mu . Lock ( )
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
return errConnClosing
2017-08-21 22:27:04 +03:00
}
2017-10-02 19:22:57 +03:00
if ac . state != connectivity . Idle {
ac . mu . Unlock ( )
return nil
2015-09-24 05:09:37 +03:00
}
2018-09-12 21:15:32 +03:00
ac . updateConnectivityState ( connectivity . Connecting )
2017-10-02 19:22:57 +03:00
ac . mu . Unlock ( )
2017-08-31 20:59:09 +03:00
2017-10-24 00:06:33 +03:00
// Start a goroutine connecting to the server asynchronously.
2018-12-18 00:10:13 +03:00
go ac . resetTransport ( )
2016-05-17 01:31:00 +03:00
return nil
2015-09-24 05:09:37 +03:00
}
2017-08-31 20:59:09 +03:00
// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
//
// It checks whether current connected address of ac is in the new addrs list.
// - If true, it updates ac.addrs and returns true. The ac will keep using
// the existing connection.
// - If false, it does nothing and returns false.
func ( ac * addrConn ) tryUpdateAddrs ( addrs [ ] resolver . Address ) bool {
ac . mu . Lock ( )
defer ac . mu . Unlock ( )
grpclog . Infof ( "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v" , ac . curAddr , addrs )
if ac . state == connectivity . Shutdown {
ac . addrs = addrs
return true
}
2018-12-18 00:10:13 +03:00
// Unless we're busy reconnecting already, let's reconnect from the top of
// the list.
if ac . state != connectivity . Ready {
return false
}
2017-08-31 20:59:09 +03:00
var curAddrFound bool
for _ , a := range addrs {
if reflect . DeepEqual ( ac . curAddr , a ) {
curAddrFound = true
break
}
}
grpclog . Infof ( "addrConn: tryUpdateAddrs curAddrFound: %v" , curAddrFound )
if curAddrFound {
ac . addrs = addrs
}
return curAddrFound
}
2017-05-20 02:02:02 +03:00
// GetMethodConfig gets the method config of the input method.
// If there's an exact match for input method (i.e. /service/method), we return
// the corresponding MethodConfig.
// If there isn't an exact match for the input method, we look for the default config
// under the service (i.e /service/). If there is a default MethodConfig for
2018-02-21 21:14:52 +03:00
// the service, we return it.
2017-05-20 02:02:02 +03:00
// Otherwise, we return an empty MethodConfig.
2017-05-15 23:51:11 +03:00
func ( cc * ClientConn ) GetMethodConfig ( method string ) MethodConfig {
2017-05-20 02:02:02 +03:00
// TODO: Avoid the locking here.
2016-12-20 03:31:00 +03:00
cc . mu . RLock ( )
defer cc . mu . RUnlock ( )
2019-04-03 20:50:28 +03:00
if cc . sc == nil {
return MethodConfig { }
}
2017-05-15 23:51:11 +03:00
m , ok := cc . sc . Methods [ method ]
2017-04-04 01:03:24 +03:00
if ! ok {
i := strings . LastIndex ( method , "/" )
2018-04-15 14:07:56 +03:00
m = cc . sc . Methods [ method [ : i + 1 ] ]
2017-04-04 01:03:24 +03:00
}
2017-05-15 23:51:11 +03:00
return m
2016-12-20 03:31:00 +03:00
}
2018-11-01 20:49:35 +03:00
func ( cc * ClientConn ) healthCheckConfig ( ) * healthCheckConfig {
cc . mu . RLock ( )
defer cc . mu . RUnlock ( )
2019-04-03 20:50:28 +03:00
if cc . sc == nil {
return nil
}
2018-11-01 20:49:35 +03:00
return cc . sc . healthCheckConfig
}
2018-07-11 20:18:09 +03:00
func ( cc * ClientConn ) getTransport ( ctx context . Context , failfast bool , method string ) ( transport . ClientTransport , func ( balancer . DoneInfo ) , error ) {
t , done , err := cc . blockingpicker . pick ( ctx , failfast , balancer . PickOptions {
FullMethodName : method ,
} )
2016-05-07 01:47:09 +03:00
if err != nil {
2017-10-02 19:22:57 +03:00
return nil , nil , toRPCErr ( err )
2015-09-24 05:09:37 +03:00
}
2017-10-02 19:22:57 +03:00
return t , done , nil
2015-09-24 05:09:37 +03:00
}
2019-04-03 20:50:28 +03:00
func ( cc * ClientConn ) applyServiceConfig ( sc * ServiceConfig ) error {
if sc == nil {
// should never reach here.
return fmt . Errorf ( "got nil pointer for service config" )
}
cc . sc = sc
if cc . sc . retryThrottling != nil {
newThrottler := & retryThrottler {
tokens : cc . sc . retryThrottling . MaxTokens ,
max : cc . sc . retryThrottling . MaxTokens ,
thresh : cc . sc . retryThrottling . MaxTokens / 2 ,
ratio : cc . sc . retryThrottling . TokenRatio ,
}
cc . retryThrottler . Store ( newThrottler )
} else {
cc . retryThrottler . Store ( ( * retryThrottler ) ( nil ) )
}
return nil
}
2017-11-29 00:16:53 +03:00
func ( cc * ClientConn ) resolveNow ( o resolver . ResolveNowOption ) {
2018-06-12 22:50:43 +03:00
cc . mu . RLock ( )
2017-11-29 00:16:53 +03:00
r := cc . resolverWrapper
2018-06-12 22:50:43 +03:00
cc . mu . RUnlock ( )
2017-11-29 00:16:53 +03:00
if r == nil {
return
}
go r . resolveNow ( o )
}
2018-08-27 23:21:48 +03:00
// ResetConnectBackoff wakes up all subchannels in transient failure and causes
// them to attempt another connection immediately. It also resets the backoff
// times used for subsequent attempts regardless of the current state.
//
// In general, this function should not be used. Typical service or network
// outages result in a reasonable client reconnection strategy by default.
// However, if a previously unavailable network becomes available, this may be
// used to trigger an immediate reconnect.
//
// This API is EXPERIMENTAL.
func ( cc * ClientConn ) ResetConnectBackoff ( ) {
cc . mu . Lock ( )
defer cc . mu . Unlock ( )
for ac := range cc . conns {
ac . resetConnectBackoff ( )
}
}
2016-05-18 03:18:54 +03:00
// Close tears down the ClientConn and all underlying connections.
2016-05-07 01:47:09 +03:00
func ( cc * ClientConn ) Close ( ) error {
2018-03-03 00:39:55 +03:00
defer cc . cancel ( )
2016-07-29 20:16:56 +03:00
2015-08-01 00:16:02 +03:00
cc . mu . Lock ( )
2016-05-11 05:29:44 +03:00
if cc . conns == nil {
2016-05-07 01:47:09 +03:00
cc . mu . Unlock ( )
return ErrClientConnClosing
}
2016-05-11 05:29:44 +03:00
conns := cc . conns
cc . conns = nil
2017-08-09 20:31:12 +03:00
cc . csMgr . updateState ( connectivity . Shutdown )
2017-10-19 21:32:06 +03:00
rWrapper := cc . resolverWrapper
cc . resolverWrapper = nil
bWrapper := cc . balancerWrapper
cc . balancerWrapper = nil
2016-05-07 01:47:09 +03:00
cc . mu . Unlock ( )
2018-04-23 21:22:25 +03:00
2017-10-02 19:22:57 +03:00
cc . blockingpicker . close ( )
2018-04-23 21:22:25 +03:00
2017-10-19 21:32:06 +03:00
if rWrapper != nil {
rWrapper . close ( )
2017-10-02 19:22:57 +03:00
}
2017-10-19 21:32:06 +03:00
if bWrapper != nil {
bWrapper . close ( )
2016-07-19 01:58:41 +03:00
}
2018-04-23 21:22:25 +03:00
2017-08-31 20:59:09 +03:00
for ac := range conns {
2016-05-11 05:29:44 +03:00
ac . tearDown ( ErrClientConnClosing )
2016-05-07 01:47:09 +03:00
}
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
2018-09-12 21:15:32 +03:00
ted := & channelz . TraceEventDesc {
Desc : "Channel Deleted" ,
Severity : channelz . CtINFO ,
}
if cc . dopts . channelzParentID != 0 {
ted . Parent = & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Nested channel(id:%d) deleted" , cc . channelzID ) ,
Severity : channelz . CtINFO ,
}
}
channelz . AddTraceEvent ( cc . channelzID , ted )
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
2019-04-03 01:42:35 +03:00
// the entity being deleted, and thus prevent it from being deleted right away.
2018-04-09 21:13:06 +03:00
channelz . RemoveEntry ( cc . channelzID )
}
2016-05-07 01:47:09 +03:00
return nil
}
// addrConn is a network connection to a given address.
type addrConn struct {
2016-07-29 20:16:56 +03:00
ctx context . Context
cancel context . CancelFunc
2017-12-01 20:55:42 +03:00
cc * ClientConn
dopts dialOptions
acbw balancer . SubConn
2018-09-25 23:17:25 +03:00
scopts balancer . NewSubConnOptions
2016-05-07 01:47:09 +03:00
2018-11-01 20:49:35 +03:00
// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
// health checking may require server to report healthy to set ac to READY), and is reset
// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
// is received, transport is closed, ac has been torn down).
2018-09-21 01:45:40 +03:00
transport transport . ClientTransport // The current transport.
mu sync . Mutex
curAddr resolver . Address // The current address.
addrs [ ] resolver . Address // All addresses that the resolver resolved to.
2018-09-12 21:15:32 +03:00
// Use updateConnectivityState for updating addrConn's connectivity state.
state connectivity . State
2016-07-16 02:20:34 +03:00
2018-12-18 00:10:13 +03:00
backoffIdx int // Needs to be stateful for resetConnectBackoff.
2018-08-27 23:21:48 +03:00
resetBackoff chan struct { }
2019-01-17 21:14:45 +03:00
channelzID int64 // channelz unique identification number.
czData * channelzData
2016-05-07 01:47:09 +03:00
}
2018-09-21 01:45:40 +03:00
// Note: this requires a lock on ac.mu.
2018-09-12 21:15:32 +03:00
func ( ac * addrConn ) updateConnectivityState ( s connectivity . State ) {
2018-12-18 00:10:13 +03:00
if ac . state == s {
return
}
updateMsg := fmt . Sprintf ( "Subchannel Connectivity change to %v" , s )
2018-09-12 21:15:32 +03:00
ac . state = s
if channelz . IsOn ( ) {
channelz . AddTraceEvent ( ac . channelzID , & channelz . TraceEventDesc {
2018-12-18 00:10:13 +03:00
Desc : updateMsg ,
2018-09-12 21:15:32 +03:00
Severity : channelz . CtINFO ,
} )
}
2018-12-18 00:10:13 +03:00
ac . cc . handleSubConnStateChange ( ac . acbw , s )
2018-09-12 21:15:32 +03:00
}
2017-04-11 00:33:51 +03:00
// adjustParams updates parameters used to create transports upon
// receiving a GoAway.
func ( ac * addrConn ) adjustParams ( r transport . GoAwayReason ) {
switch r {
2017-11-07 00:45:11 +03:00
case transport . GoAwayTooManyPings :
2017-04-11 00:33:51 +03:00
v := 2 * ac . dopts . copts . KeepaliveParams . Time
ac . cc . mu . Lock ( )
if v > ac . cc . mkp . Time {
ac . cc . mkp . Time = v
}
ac . cc . mu . Unlock ( )
}
}
2018-12-18 00:10:13 +03:00
func ( ac * addrConn ) resetTransport ( ) {
for i := 0 ; ; i ++ {
if i > 0 {
2018-09-21 01:45:40 +03:00
ac . cc . resolveNow ( resolver . ResolveNowOption { } )
}
2019-03-14 23:14:19 +03:00
ac . mu . Lock ( )
2019-03-19 20:28:26 +03:00
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
return
}
2018-12-18 00:10:13 +03:00
addrs := ac . addrs
backoffFor := ac . dopts . bs . Backoff ( ac . backoffIdx )
2019-02-12 03:12:42 +03:00
// This will be the duration that dial gets to finish.
2019-03-20 22:58:29 +03:00
dialDuration := minConnectTimeout
if ac . dopts . minConnectTimeout != nil {
dialDuration = ac . dopts . minConnectTimeout ( )
}
2019-02-12 03:12:42 +03:00
if dialDuration < backoffFor {
// Give dial more time as we keep failing to connect.
dialDuration = backoffFor
}
2019-03-14 23:14:19 +03:00
// We can potentially spend all the time trying the first address, and
// if the server accepts the connection and then hangs, the following
// addresses will never be tried.
//
// The spec doesn't mention what should be done for multiple addresses.
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
2019-02-12 03:12:42 +03:00
connectDeadline := time . Now ( ) . Add ( dialDuration )
2018-12-18 00:10:13 +03:00
ac . mu . Unlock ( )
2018-09-21 01:45:40 +03:00
2019-03-19 20:28:26 +03:00
newTr , addr , reconnect , err := ac . tryAllAddrs ( addrs , connectDeadline )
if err != nil {
// After exhausting all addresses, the addrConn enters
// TRANSIENT_FAILURE.
2018-12-18 00:10:13 +03:00
ac . mu . Lock ( )
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
return
}
2019-03-19 20:28:26 +03:00
ac . updateConnectivityState ( connectivity . TransientFailure )
2018-09-21 01:45:40 +03:00
2019-03-19 20:28:26 +03:00
// Backoff.
b := ac . resetBackoff
2018-09-21 01:45:40 +03:00
ac . mu . Unlock ( )
2019-03-19 20:28:26 +03:00
timer := time . NewTimer ( backoffFor )
select {
case <- timer . C :
ac . mu . Lock ( )
ac . backoffIdx ++
2019-03-14 23:14:19 +03:00
ac . mu . Unlock ( )
2019-03-19 20:28:26 +03:00
case <- b :
timer . Stop ( )
case <- ac . ctx . Done ( ) :
timer . Stop ( )
2019-03-14 23:14:19 +03:00
return
}
2019-03-19 20:28:26 +03:00
continue
}
2018-09-21 01:45:40 +03:00
2019-03-19 20:28:26 +03:00
ac . mu . Lock ( )
if ac . state == connectivity . Shutdown {
newTr . Close ( )
ac . mu . Unlock ( )
return
}
ac . curAddr = addr
ac . transport = newTr
ac . backoffIdx = 0
healthCheckConfig := ac . cc . healthCheckConfig ( )
// LB channel health checking is only enabled when all the four requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
hctx , hcancel := context . WithCancel ( ac . ctx )
healthcheckManagingState := false
if ! ac . cc . dopts . disableHealthCheck && healthCheckConfig != nil && ac . scopts . HealthCheckEnabled {
if ac . cc . dopts . healthCheckFunc == nil {
// TODO: add a link to the health check doc in the error message.
grpclog . Error ( "the client side LB channel health check function has not been set." )
} else {
// TODO(deklerk) refactor to just return transport
go ac . startHealthCheck ( hctx , newTr , addr , healthCheckConfig . ServiceName )
healthcheckManagingState = true
2018-12-18 00:10:13 +03:00
}
}
2019-03-19 20:28:26 +03:00
if ! healthcheckManagingState {
ac . updateConnectivityState ( connectivity . Ready )
}
ac . mu . Unlock ( )
// Block until the created transport is down. And when this happens,
// we restart from the top of the addr list.
<- reconnect . Done ( )
hcancel ( )
2018-12-18 00:10:13 +03:00
2019-03-19 20:28:26 +03:00
// Need to reconnect after a READY, the addrConn enters
// TRANSIENT_FAILURE.
//
// This will set addrConn to TRANSIENT_FAILURE for a very short period
// of time, and turns CONNECTING. It seems reasonable to skip this, but
// READY-CONNECTING is not a valid transition.
2018-12-18 00:10:13 +03:00
ac . mu . Lock ( )
2017-08-31 20:59:09 +03:00
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
2018-09-21 01:45:40 +03:00
return
2017-08-31 20:59:09 +03:00
}
2018-12-18 00:10:13 +03:00
ac . updateConnectivityState ( connectivity . TransientFailure )
2017-08-21 22:27:04 +03:00
ac . mu . Unlock ( )
2019-03-19 20:28:26 +03:00
}
}
2017-12-01 20:55:42 +03:00
2019-03-19 20:28:26 +03:00
// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
// first successful one. It returns the transport, the address and a Event in
// the successful case. The Event fires when the returned transport disconnects.
func ( ac * addrConn ) tryAllAddrs ( addrs [ ] resolver . Address , connectDeadline time . Time ) ( transport . ClientTransport , resolver . Address , * grpcsync . Event , error ) {
for _ , addr := range addrs {
ac . mu . Lock ( )
if ac . state == connectivity . Shutdown {
2018-12-18 00:10:13 +03:00
ac . mu . Unlock ( )
2019-03-19 20:28:26 +03:00
return nil , resolver . Address { } , nil , errConnClosing
}
ac . updateConnectivityState ( connectivity . Connecting )
ac . transport = nil
ac . cc . mu . RLock ( )
ac . dopts . copts . KeepaliveParams = ac . cc . mkp
ac . cc . mu . RUnlock ( )
copts := ac . dopts . copts
if ac . scopts . CredsBundle != nil {
copts . CredsBundle = ac . scopts . CredsBundle
}
ac . mu . Unlock ( )
if channelz . IsOn ( ) {
channelz . AddTraceEvent ( ac . channelzID , & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Subchannel picks a new address %q to connect" , addr . Addr ) ,
Severity : channelz . CtINFO ,
} )
2018-09-21 01:45:40 +03:00
}
2019-03-19 20:28:26 +03:00
newTr , reconnect , err := ac . createTransport ( addr , copts , connectDeadline )
if err == nil {
return newTr , addr , reconnect , nil
}
ac . cc . blockingpicker . updateConnectionError ( err )
2018-09-21 01:45:40 +03:00
}
2019-03-19 20:28:26 +03:00
// Couldn't connect to any address.
return nil , resolver . Address { } , nil , fmt . Errorf ( "couldn't connect to any address" )
2018-09-21 01:45:40 +03:00
}
2019-03-19 20:28:26 +03:00
// createTransport creates a connection to addr. It returns the transport and a
// Event in the successful case. The Event fires when the returned transport
// disconnects.
func ( ac * addrConn ) createTransport ( addr resolver . Address , copts transport . ConnectOptions , connectDeadline time . Time ) ( transport . ClientTransport , * grpcsync . Event , error ) {
2019-03-14 23:14:19 +03:00
prefaceReceived := make ( chan struct { } )
2018-09-21 01:45:40 +03:00
onCloseCalled := make ( chan struct { } )
2019-03-19 20:28:26 +03:00
reconnect := grpcsync . NewEvent ( )
2018-09-21 01:45:40 +03:00
2018-12-18 00:10:13 +03:00
target := transport . TargetInfo {
Addr : addr . Addr ,
Metadata : addr . Metadata ,
Authority : ac . cc . authority ,
}
2018-10-19 00:31:34 +03:00
2018-09-21 01:45:40 +03:00
onGoAway := func ( r transport . GoAwayReason ) {
ac . mu . Lock ( )
ac . adjustParams ( r )
ac . mu . Unlock ( )
2018-12-18 00:10:13 +03:00
reconnect . Fire ( )
2018-09-21 01:45:40 +03:00
}
onClose := func ( ) {
close ( onCloseCalled )
2018-12-18 00:10:13 +03:00
reconnect . Fire ( )
2018-09-21 01:45:40 +03:00
}
onPrefaceReceipt := func ( ) {
close ( prefaceReceived )
}
connectCtx , cancel := context . WithDeadline ( ac . ctx , connectDeadline )
2018-11-13 00:30:41 +03:00
defer cancel ( )
2018-09-21 01:45:40 +03:00
if channelz . IsOn ( ) {
copts . ChannelzParentID = ac . channelzID
}
newTr , err := transport . NewClientTransport ( connectCtx , ac . cc . ctx , target , copts , onPrefaceReceipt , onGoAway , onClose )
if err != nil {
// newTr is either nil, or closed.
grpclog . Warningf ( "grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting..." , addr , err )
2019-03-19 20:28:26 +03:00
return nil , nil , err
2017-12-01 20:55:42 +03:00
}
2018-09-21 01:45:40 +03:00
2019-02-27 21:04:46 +03:00
if ac . dopts . reqHandshake == envconfig . RequireHandshakeOn {
select {
case <- time . After ( connectDeadline . Sub ( time . Now ( ) ) ) :
// We didn't get the preface in time.
newTr . Close ( )
grpclog . Warningf ( "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting..." , addr )
2019-03-19 20:28:26 +03:00
return nil , nil , errors . New ( "timed out waiting for server handshake" )
2019-02-27 21:04:46 +03:00
case <- prefaceReceived :
// We got the preface - huzzah! things are good.
case <- onCloseCalled :
// The transport has already closed - noop.
2019-03-19 20:28:26 +03:00
return nil , nil , errors . New ( "connection closed" )
2019-02-27 21:04:46 +03:00
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
}
}
2019-03-19 20:28:26 +03:00
return newTr , reconnect , nil
2018-09-21 01:45:40 +03:00
}
2018-11-01 20:49:35 +03:00
func ( ac * addrConn ) startHealthCheck ( ctx context . Context , newTr transport . ClientTransport , addr resolver . Address , serviceName string ) {
// Set up the health check helper functions
newStream := func ( ) ( interface { } , error ) {
return ac . newClientStream ( ctx , & StreamDesc { ServerStreams : true } , "/grpc.health.v1.Health/Watch" , newTr )
}
firstReady := true
reportHealth := func ( ok bool ) {
ac . mu . Lock ( )
defer ac . mu . Unlock ( )
if ac . transport != newTr {
return
}
if ok {
if firstReady {
firstReady = false
ac . curAddr = addr
}
2018-12-18 00:10:13 +03:00
ac . updateConnectivityState ( connectivity . Ready )
2018-11-01 20:49:35 +03:00
} else {
2018-12-18 00:10:13 +03:00
ac . updateConnectivityState ( connectivity . TransientFailure )
2018-11-01 20:49:35 +03:00
}
}
2018-12-14 03:44:36 +03:00
err := ac . cc . dopts . healthCheckFunc ( ctx , newStream , reportHealth , serviceName )
2018-11-01 20:49:35 +03:00
if err != nil {
if status . Code ( err ) == codes . Unimplemented {
if channelz . IsOn ( ) {
channelz . AddTraceEvent ( ac . channelzID , & channelz . TraceEventDesc {
Desc : "Subchannel health check is unimplemented at server side, thus health check is disabled" ,
Severity : channelz . CtError ,
} )
}
grpclog . Error ( "Subchannel health check is unimplemented at server side, thus health check is disabled" )
} else {
grpclog . Errorf ( "HealthCheckFunc exits with unexpected error %v" , err )
}
}
}
2018-08-27 23:21:48 +03:00
func ( ac * addrConn ) resetConnectBackoff ( ) {
ac . mu . Lock ( )
close ( ac . resetBackoff )
2018-09-21 01:45:40 +03:00
ac . backoffIdx = 0
2018-08-27 23:21:48 +03:00
ac . resetBackoff = make ( chan struct { } )
ac . mu . Unlock ( )
}
2017-10-02 19:22:57 +03:00
// getReadyTransport returns the transport if ac's state is READY.
// Otherwise it returns nil, false.
// If ac's state is IDLE, it will trigger ac to connect.
func ( ac * addrConn ) getReadyTransport ( ) ( transport . ClientTransport , bool ) {
ac . mu . Lock ( )
2018-09-21 01:45:40 +03:00
if ac . state == connectivity . Ready && ac . transport != nil {
2017-10-02 19:22:57 +03:00
t := ac . transport
ac . mu . Unlock ( )
return t , true
}
var idle bool
if ac . state == connectivity . Idle {
idle = true
}
ac . mu . Unlock ( )
// Trigger idle ac to connect.
if idle {
2017-10-24 00:06:33 +03:00
ac . connect ( )
2017-10-02 19:22:57 +03:00
}
return nil , false
}
2016-05-17 01:31:00 +03:00
// tearDown starts to tear down the addrConn.
2015-02-06 04:14:05 +03:00
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
2016-05-07 01:47:09 +03:00
// some edge cases (e.g., the caller opens and closes many addrConn's in a
2015-02-06 04:14:05 +03:00
// tight loop.
2016-07-29 22:57:38 +03:00
// tearDown doesn't remove ac from ac.cc.conns.
2016-05-07 01:47:09 +03:00
func ( ac * addrConn ) tearDown ( err error ) {
ac . mu . Lock ( )
2017-12-15 23:03:41 +03:00
if ac . state == connectivity . Shutdown {
2018-09-21 01:45:40 +03:00
ac . mu . Unlock ( )
2017-12-15 23:03:41 +03:00
return
}
2018-11-01 20:49:35 +03:00
curTr := ac . transport
ac . transport = nil
2018-09-21 20:38:30 +03:00
// We have to set the state to Shutdown before anything else to prevent races
// between setting the state and logic that waits on context cancelation / etc.
2018-09-21 01:45:40 +03:00
ac . updateConnectivityState ( connectivity . Shutdown )
2018-09-21 20:38:30 +03:00
ac . cancel ( )
2017-10-20 01:16:16 +03:00
ac . curAddr = resolver . Address { }
2018-11-01 20:49:35 +03:00
if err == errConnDrain && curTr != nil {
2016-07-26 02:35:32 +03:00
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
// ii) there are concurrent name resolver/Balancer triggered
// address removal and GoAway.
2018-09-21 01:45:40 +03:00
// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
ac . mu . Unlock ( )
2018-11-01 20:49:35 +03:00
curTr . GracefulClose ( )
2018-09-21 01:45:40 +03:00
ac . mu . Lock ( )
2016-07-26 02:35:32 +03:00
}
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
2018-09-12 21:15:32 +03:00
channelz . AddTraceEvent ( ac . channelzID , & channelz . TraceEventDesc {
Desc : "Subchannel Deleted" ,
Severity : channelz . CtINFO ,
Parent : & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Subchanel(id:%d) deleted" , ac . channelzID ) ,
Severity : channelz . CtINFO ,
} ,
} )
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
// the entity beng deleted, and thus prevent it from being deleted right away.
2018-04-09 21:13:06 +03:00
channelz . RemoveEntry ( ac . channelzID )
}
2018-09-21 01:45:40 +03:00
ac . mu . Unlock ( )
2015-02-06 04:14:05 +03:00
}
2017-08-31 20:59:09 +03:00
func ( ac * addrConn ) getState ( ) connectivity . State {
ac . mu . Lock ( )
defer ac . mu . Unlock ( )
return ac . state
}
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
2018-04-09 21:13:06 +03:00
func ( ac * addrConn ) ChannelzMetric ( ) * channelz . ChannelInternalMetric {
2018-04-23 21:22:25 +03:00
ac . mu . Lock ( )
addr := ac . curAddr . Addr
ac . mu . Unlock ( )
return & channelz . ChannelInternalMetric {
2018-08-06 21:17:12 +03:00
State : ac . getState ( ) ,
2018-04-23 21:22:25 +03:00
Target : addr ,
2018-08-06 21:17:12 +03:00
CallsStarted : atomic . LoadInt64 ( & ac . czData . callsStarted ) ,
CallsSucceeded : atomic . LoadInt64 ( & ac . czData . callsSucceeded ) ,
CallsFailed : atomic . LoadInt64 ( & ac . czData . callsFailed ) ,
LastCallStartedTimestamp : time . Unix ( 0 , atomic . LoadInt64 ( & ac . czData . lastCallStartedTime ) ) ,
2018-04-23 21:22:25 +03:00
}
}
func ( ac * addrConn ) incrCallsStarted ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & ac . czData . callsStarted , 1 )
atomic . StoreInt64 ( & ac . czData . lastCallStartedTime , time . Now ( ) . UnixNano ( ) )
2018-04-23 21:22:25 +03:00
}
func ( ac * addrConn ) incrCallsSucceeded ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & ac . czData . callsSucceeded , 1 )
2018-04-23 21:22:25 +03:00
}
func ( ac * addrConn ) incrCallsFailed ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & ac . czData . callsFailed , 1 )
2018-04-09 21:13:06 +03:00
}
2018-06-28 02:18:41 +03:00
type retryThrottler struct {
max float64
thresh float64
ratio float64
mu sync . Mutex
tokens float64 // TODO(dfawley): replace with atomic and remove lock.
}
// throttle subtracts a retry token from the pool and returns whether a retry
// should be throttled (disallowed) based upon the retry throttling policy in
// the service config.
func ( rt * retryThrottler ) throttle ( ) bool {
if rt == nil {
return false
}
rt . mu . Lock ( )
defer rt . mu . Unlock ( )
rt . tokens --
if rt . tokens < 0 {
rt . tokens = 0
}
return rt . tokens <= rt . thresh
}
func ( rt * retryThrottler ) successfulRPC ( ) {
if rt == nil {
return
}
rt . mu . Lock ( )
defer rt . mu . Unlock ( )
rt . tokens += rt . ratio
if rt . tokens > rt . max {
rt . tokens = rt . max
}
}
2018-08-07 02:02:47 +03:00
type channelzChannel struct {
cc * ClientConn
}
func ( c * channelzChannel ) ChannelzMetric ( ) * channelz . ChannelInternalMetric {
return c . cc . channelzMetric ( )
}
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
//
// Deprecated: This error is never returned by grpc and should not be
// referenced by users.
var ErrClientConnTimeout = errors . New ( "grpc: timed out when dialing" )