2015-02-06 04:14:05 +03:00
/ *
*
2017-06-08 15:42:19 +03:00
* Copyright 2014 gRPC authors .
2015-02-06 04:14:05 +03:00
*
2017-06-08 15:42:19 +03:00
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2015-02-06 04:14:05 +03:00
*
2017-06-08 15:42:19 +03:00
* http : //www.apache.org/licenses/LICENSE-2.0
2015-02-06 04:14:05 +03:00
*
2017-06-08 15:42:19 +03:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
2015-02-06 04:14:05 +03:00
*
* /
2015-02-09 03:35:38 +03:00
package grpc
2015-02-06 04:14:05 +03:00
import (
2018-11-02 01:43:42 +03:00
"context"
2015-02-19 14:57:41 +03:00
"errors"
2017-10-02 19:22:57 +03:00
"fmt"
2017-08-14 22:24:23 +03:00
"math"
2015-04-17 23:50:18 +03:00
"net"
2017-08-31 20:59:09 +03:00
"reflect"
2017-04-04 02:03:05 +03:00
"strings"
2015-02-06 04:14:05 +03:00
"sync"
2018-06-28 02:18:41 +03:00
"sync/atomic"
2015-02-06 04:14:05 +03:00
"time"
2017-08-31 20:59:09 +03:00
"google.golang.org/grpc/balancer"
2019-10-04 22:59:43 +03:00
"google.golang.org/grpc/balancer/base"
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
"google.golang.org/grpc/codes"
2017-08-09 20:31:12 +03:00
"google.golang.org/grpc/connectivity"
2015-02-09 03:39:06 +03:00
"google.golang.org/grpc/credentials"
2018-06-14 02:07:37 +03:00
"google.golang.org/grpc/internal/backoff"
2018-06-19 03:59:08 +03:00
"google.golang.org/grpc/internal/channelz"
2018-11-10 00:53:47 +03:00
"google.golang.org/grpc/internal/grpcsync"
2020-02-12 04:51:29 +03:00
"google.golang.org/grpc/internal/grpcutil"
2018-07-11 21:22:45 +03:00
"google.golang.org/grpc/internal/transport"
2017-02-28 22:49:51 +03:00
"google.golang.org/grpc/keepalive"
2017-08-31 20:59:09 +03:00
"google.golang.org/grpc/resolver"
2019-05-30 19:12:58 +03:00
"google.golang.org/grpc/serviceconfig"
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
"google.golang.org/grpc/status"
2019-10-22 23:01:54 +03:00
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
_ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
2015-02-06 04:14:05 +03:00
)
2018-03-07 21:36:17 +03:00
const (
// minimum time to give a connection to complete
minConnectTimeout = 20 * time . Second
2018-06-05 19:54:12 +03:00
// must match grpclbName in grpclb/grpclb.go
grpclbName = "grpclb"
2018-03-07 21:36:17 +03:00
)
2015-02-19 14:57:41 +03:00
var (
2016-05-25 21:28:45 +03:00
// ErrClientConnClosing indicates that the operation is illegal because
// the ClientConn is closing.
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
//
// Deprecated: this error should not be relied upon by users; use the status
// code of Canceled instead.
ErrClientConnClosing = status . Error ( codes . Canceled , "grpc: the client connection is closing" )
2017-11-07 00:45:11 +03:00
// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors . New ( "grpc: the connection is drained" )
// errConnClosing indicates that the connection is closing.
errConnClosing = errors . New ( "grpc: the connection is closing" )
2020-01-14 00:12:55 +03:00
// errBalancerClosed indicates that the balancer is closed.
errBalancerClosed = errors . New ( "grpc: balancer is closed" )
2019-04-03 20:50:28 +03:00
// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
// service config.
invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
2017-11-07 00:45:11 +03:00
)
2016-05-25 21:28:45 +03:00
2017-11-07 00:45:11 +03:00
// The following errors are returned from Dial and DialContext
var (
2016-05-25 21:28:45 +03:00
// errNoTransportSecurity indicates that there is no transport security
2016-03-17 02:40:16 +03:00
// being set for ClientConn. Users should either set one or explicitly
2015-08-28 03:21:52 +03:00
// call WithInsecure DialOption to disable security.
2016-05-25 21:28:45 +03:00
errNoTransportSecurity = errors . New ( "grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)" )
2018-09-25 23:17:25 +03:00
// errTransportCredsAndBundle indicates that creds bundle is used together
// with other individual Transport Credentials.
errTransportCredsAndBundle = errors . New ( "grpc: credentials.Bundle may not be used with individual TransportCredentials" )
2016-06-08 23:44:43 +03:00
// errTransportCredentialsMissing indicates that users want to transmit security
2019-01-23 20:59:48 +03:00
// information (e.g., OAuth2 token) which requires secure connection on an insecure
2015-08-28 23:19:36 +03:00
// connection.
2016-06-08 23:44:43 +03:00
errTransportCredentialsMissing = errors . New ( "grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)" )
// errCredentialsConflict indicates that grpc.WithTransportCredentials()
// and grpc.WithInsecure() are both called for a connection.
errCredentialsConflict = errors . New ( "grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)" )
2015-02-19 14:57:41 +03:00
)
2017-04-07 00:08:04 +03:00
const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
2017-08-14 22:24:23 +03:00
defaultClientMaxSendMessageSize = math . MaxInt32
2018-06-27 21:16:33 +03:00
// http2IOBufSize specifies the buffer size for sending frames.
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
2017-04-07 00:08:04 +03:00
)
2017-03-10 03:58:23 +03:00
2016-08-15 20:17:09 +03:00
// Dial creates a client connection to the given target.
2015-02-06 04:14:05 +03:00
func Dial ( target string , opts ... DialOption ) ( * ClientConn , error ) {
2016-08-15 20:17:09 +03:00
return DialContext ( context . Background ( ) , target , opts ... )
}
2018-04-13 01:11:22 +03:00
// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use WithBlock() dial option.
//
// In the non-blocking case, the ctx does not act against the connection. It
// only controls the setup steps.
//
// In the blocking case, ctx can be used to cancel or expire the pending
// connection. Once this function returns, the cancellation and expiration of
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
2018-01-19 00:10:52 +03:00
//
// The target name syntax is defined in
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
2016-08-24 22:56:16 +03:00
func DialContext ( ctx context . Context , target string , opts ... DialOption ) ( conn * ClientConn , err error ) {
2015-09-29 20:24:03 +03:00
cc := & ClientConn {
2018-11-10 00:53:47 +03:00
target : target ,
csMgr : & connectivityStateManager { } ,
conns : make ( map [ * addrConn ] struct { } ) ,
dopts : defaultDialOptions ( ) ,
blockingpicker : newPickerWrapper ( ) ,
czData : new ( channelzData ) ,
firstResolveEvent : grpcsync . NewEvent ( ) ,
2015-09-29 20:24:03 +03:00
}
2018-06-28 02:18:41 +03:00
cc . retryThrottler . Store ( ( * retryThrottler ) ( nil ) )
2016-08-24 04:18:18 +03:00
cc . ctx , cc . cancel = context . WithCancel ( context . Background ( ) )
2017-04-04 01:03:24 +03:00
2016-12-20 03:31:00 +03:00
for _ , opt := range opts {
2018-07-20 03:33:42 +03:00
opt . apply ( & cc . dopts )
2016-12-20 03:31:00 +03:00
}
2017-10-02 19:22:57 +03:00
2019-04-15 21:13:34 +03:00
chainUnaryClientInterceptors ( cc )
chainStreamClientInterceptors ( cc )
2019-04-03 01:42:35 +03:00
defer func ( ) {
if err != nil {
cc . Close ( )
}
} ( )
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
if cc . dopts . channelzParentID != 0 {
2018-08-07 02:02:47 +03:00
cc . channelzID = channelz . RegisterChannel ( & channelzChannel { cc } , cc . dopts . channelzParentID , target )
2020-02-14 21:11:26 +03:00
channelz . AddTraceEvent ( cc . channelzID , 0 , & channelz . TraceEventDesc {
2018-09-12 21:15:32 +03:00
Desc : "Channel Created" ,
Severity : channelz . CtINFO ,
Parent : & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Nested Channel(id:%d) created" , cc . channelzID ) ,
Severity : channelz . CtINFO ,
} ,
} )
2018-04-09 21:13:06 +03:00
} else {
2018-08-07 02:02:47 +03:00
cc . channelzID = channelz . RegisterChannel ( & channelzChannel { cc } , 0 , target )
2020-02-14 21:11:26 +03:00
channelz . Info ( cc . channelzID , "Channel Created" )
2018-04-09 21:13:06 +03:00
}
2018-09-12 21:15:32 +03:00
cc . csMgr . channelzID = cc . channelzID
2018-04-09 21:13:06 +03:00
}
2017-10-02 19:22:57 +03:00
if ! cc . dopts . insecure {
2018-09-25 23:17:25 +03:00
if cc . dopts . copts . TransportCredentials == nil && cc . dopts . copts . CredsBundle == nil {
2017-10-02 19:22:57 +03:00
return nil , errNoTransportSecurity
}
2018-09-25 23:17:25 +03:00
if cc . dopts . copts . TransportCredentials != nil && cc . dopts . copts . CredsBundle != nil {
return nil , errTransportCredsAndBundle
}
2017-10-02 19:22:57 +03:00
} else {
2018-09-25 23:17:25 +03:00
if cc . dopts . copts . TransportCredentials != nil || cc . dopts . copts . CredsBundle != nil {
2017-10-02 19:22:57 +03:00
return nil , errCredentialsConflict
}
for _ , cd := range cc . dopts . copts . PerRPCCredentials {
if cd . RequireTransportSecurity ( ) {
return nil , errTransportCredentialsMissing
}
}
}
2019-04-03 20:50:28 +03:00
if cc . dopts . defaultServiceConfigRawJSON != nil {
2019-10-04 22:59:43 +03:00
scpr := parseServiceConfig ( * cc . dopts . defaultServiceConfigRawJSON )
if scpr . Err != nil {
return nil , fmt . Errorf ( "%s: %v" , invalidDefaultServiceConfigErrPrefix , scpr . Err )
2019-04-03 20:50:28 +03:00
}
2019-10-04 22:59:43 +03:00
cc . dopts . defaultServiceConfig , _ = scpr . Config . ( * ServiceConfig )
2019-04-03 20:50:28 +03:00
}
2017-04-11 00:33:51 +03:00
cc . mkp = cc . dopts . copts . KeepaliveParams
2017-03-24 21:29:02 +03:00
2017-04-18 02:08:50 +03:00
if cc . dopts . copts . Dialer == nil {
2020-03-27 01:29:40 +03:00
cc . dopts . copts . Dialer = func ( ctx context . Context , addr string ) ( net . Conn , error ) {
network , addr := parseDialTarget ( addr )
return ( & net . Dialer { } ) . DialContext ( ctx , network , addr )
}
if cc . dopts . withProxy {
cc . dopts . copts . Dialer = newProxyDialer ( cc . dopts . copts . Dialer )
}
2017-04-18 02:08:50 +03:00
}
2017-03-24 21:29:02 +03:00
if cc . dopts . copts . UserAgent != "" {
cc . dopts . copts . UserAgent += " " + grpcUA
} else {
cc . dopts . copts . UserAgent = grpcUA
}
2016-12-20 03:31:00 +03:00
if cc . dopts . timeout > 0 {
var cancel context . CancelFunc
ctx , cancel = context . WithTimeout ( ctx , cc . dopts . timeout )
defer cancel ( )
}
2019-04-04 19:58:15 +03:00
defer func ( ) {
select {
case <- ctx . Done ( ) :
2020-04-09 23:45:13 +03:00
switch {
case ctx . Err ( ) == err :
conn = nil
case err == nil || ! cc . dopts . returnLastError :
conn , err = nil , ctx . Err ( )
default :
conn , err = nil , fmt . Errorf ( "%v: %v" , ctx . Err ( ) , err )
}
2019-04-04 19:58:15 +03:00
default :
}
} ( )
2016-12-20 03:31:00 +03:00
2017-05-15 23:51:11 +03:00
scSet := false
2016-12-20 03:31:00 +03:00
if cc . dopts . scChan != nil {
2017-04-05 21:08:50 +03:00
// Try to get an initial service config.
2016-12-20 03:31:00 +03:00
select {
case sc , ok := <- cc . dopts . scChan :
if ok {
2019-04-03 20:50:28 +03:00
cc . sc = & sc
2017-05-15 23:51:11 +03:00
scSet = true
2016-12-20 03:31:00 +03:00
}
2017-04-05 21:08:50 +03:00
default :
2016-12-20 03:31:00 +03:00
}
2015-02-06 04:14:05 +03:00
}
2016-03-23 21:49:05 +03:00
if cc . dopts . bs == nil {
2019-10-04 02:47:13 +03:00
cc . dopts . bs = backoff . DefaultExponential
2016-03-23 21:49:05 +03:00
}
2020-01-15 19:54:42 +03:00
// Determine the resolver to use.
2020-02-12 04:51:29 +03:00
cc . parsedTarget = grpcutil . ParseTarget ( cc . target )
2020-02-14 21:11:26 +03:00
channelz . Infof ( cc . channelzID , "parsed scheme: %q" , cc . parsedTarget . Scheme )
2020-01-15 19:54:42 +03:00
resolverBuilder := cc . getResolver ( cc . parsedTarget . Scheme )
if resolverBuilder == nil {
// If resolver builder is still nil, the parsed target's scheme is
// not registered. Fallback to default resolver and set Endpoint to
// the original target.
2020-02-14 21:11:26 +03:00
channelz . Infof ( cc . channelzID , "scheme %q not registered, fallback to default scheme" , cc . parsedTarget . Scheme )
2020-01-15 19:54:42 +03:00
cc . parsedTarget = resolver . Target {
Scheme : resolver . GetDefaultScheme ( ) ,
Endpoint : target ,
}
resolverBuilder = cc . getResolver ( cc . parsedTarget . Scheme )
if resolverBuilder == nil {
return nil , fmt . Errorf ( "could not get resolver for default scheme: %q" , cc . parsedTarget . Scheme )
2018-03-27 23:58:27 +03:00
}
}
2020-01-15 19:54:42 +03:00
2016-09-20 01:11:57 +03:00
creds := cc . dopts . copts . TransportCredentials
if creds != nil && creds . Info ( ) . ServerName != "" {
cc . authority = creds . Info ( ) . ServerName
2018-07-13 19:56:47 +03:00
} else if cc . dopts . insecure && cc . dopts . authority != "" {
cc . authority = cc . dopts . authority
2016-09-20 01:11:57 +03:00
} else {
2017-10-23 21:40:43 +03:00
// Use endpoint from "scheme://authority/endpoint" as the default
// authority for ClientConn.
cc . authority = cc . parsedTarget . Endpoint
2016-09-20 01:11:57 +03:00
}
2017-08-31 20:59:09 +03:00
2017-05-15 23:51:11 +03:00
if cc . dopts . scChan != nil && ! scSet {
2017-05-16 00:36:20 +03:00
// Blocking wait for the initial service config.
2017-05-15 23:51:11 +03:00
select {
case sc , ok := <- cc . dopts . scChan :
if ok {
2019-04-03 20:50:28 +03:00
cc . sc = & sc
2017-05-15 23:51:11 +03:00
}
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
}
}
2016-12-20 03:31:00 +03:00
if cc . dopts . scChan != nil {
go cc . scWatcher ( )
}
2017-03-21 21:35:53 +03:00
2017-10-19 21:32:06 +03:00
var credsClone credentials . TransportCredentials
if creds := cc . dopts . copts . TransportCredentials ; creds != nil {
credsClone = creds . Clone ( )
}
cc . balancerBuildOpts = balancer . BuildOptions {
2018-04-09 21:13:06 +03:00
DialCreds : credsClone ,
2018-09-25 23:17:25 +03:00
CredsBundle : cc . dopts . copts . CredsBundle ,
2018-04-09 21:13:06 +03:00
Dialer : cc . dopts . copts . Dialer ,
ChannelzParentID : cc . channelzID ,
2019-05-09 23:27:41 +03:00
Target : cc . parsedTarget ,
2017-10-19 21:32:06 +03:00
}
2017-10-02 19:22:57 +03:00
// Build the resolver.
2020-01-15 19:54:42 +03:00
rWrapper , err := newCCResolverWrapper ( cc , resolverBuilder )
2017-10-02 19:22:57 +03:00
if err != nil {
return nil , fmt . Errorf ( "failed to build resolver: %v" , err )
}
2018-11-10 02:31:07 +03:00
cc . mu . Lock ( )
cc . resolverWrapper = rWrapper
cc . mu . Unlock ( )
2020-01-15 19:54:42 +03:00
2017-08-31 20:59:09 +03:00
// A blocking dial blocks until the clientConn is ready.
if cc . dopts . block {
for {
s := cc . GetState ( )
if s == connectivity . Ready {
break
2018-08-27 20:28:41 +03:00
} else if cc . dopts . copts . FailOnNonTempDialError && s == connectivity . TransientFailure {
if err = cc . blockingpicker . connectionError ( ) ; err != nil {
2018-11-01 20:49:35 +03:00
terr , ok := err . ( interface {
Temporary ( ) bool
} )
2018-08-27 20:28:41 +03:00
if ok && ! terr . Temporary ( ) {
return nil , err
}
}
2017-08-31 20:59:09 +03:00
}
if ! cc . WaitForStateChange ( ctx , s ) {
// ctx got timeout or canceled.
2020-04-09 23:45:13 +03:00
if err = cc . blockingpicker . connectionError ( ) ; err != nil && cc . dopts . returnLastError {
return nil , err
}
2017-08-31 20:59:09 +03:00
return nil , ctx . Err ( )
}
}
}
2015-09-29 20:24:03 +03:00
return cc , nil
2015-09-23 22:18:41 +03:00
}
2019-04-15 21:13:34 +03:00
// chainUnaryClientInterceptors chains all unary client interceptors into one.
func chainUnaryClientInterceptors ( cc * ClientConn ) {
interceptors := cc . dopts . chainUnaryInts
// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
// be executed before any other chained interceptors.
if cc . dopts . unaryInt != nil {
interceptors = append ( [ ] UnaryClientInterceptor { cc . dopts . unaryInt } , interceptors ... )
}
var chainedInt UnaryClientInterceptor
if len ( interceptors ) == 0 {
chainedInt = nil
} else if len ( interceptors ) == 1 {
chainedInt = interceptors [ 0 ]
} else {
chainedInt = func ( ctx context . Context , method string , req , reply interface { } , cc * ClientConn , invoker UnaryInvoker , opts ... CallOption ) error {
return interceptors [ 0 ] ( ctx , method , req , reply , cc , getChainUnaryInvoker ( interceptors , 0 , invoker ) , opts ... )
}
}
cc . dopts . unaryInt = chainedInt
}
// getChainUnaryInvoker recursively generate the chained unary invoker.
func getChainUnaryInvoker ( interceptors [ ] UnaryClientInterceptor , curr int , finalInvoker UnaryInvoker ) UnaryInvoker {
if curr == len ( interceptors ) - 1 {
return finalInvoker
}
return func ( ctx context . Context , method string , req , reply interface { } , cc * ClientConn , opts ... CallOption ) error {
return interceptors [ curr + 1 ] ( ctx , method , req , reply , cc , getChainUnaryInvoker ( interceptors , curr + 1 , finalInvoker ) , opts ... )
}
}
// chainStreamClientInterceptors chains all stream client interceptors into one.
func chainStreamClientInterceptors ( cc * ClientConn ) {
interceptors := cc . dopts . chainStreamInts
// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
// be executed before any other chained interceptors.
if cc . dopts . streamInt != nil {
interceptors = append ( [ ] StreamClientInterceptor { cc . dopts . streamInt } , interceptors ... )
}
var chainedInt StreamClientInterceptor
if len ( interceptors ) == 0 {
chainedInt = nil
} else if len ( interceptors ) == 1 {
chainedInt = interceptors [ 0 ]
} else {
chainedInt = func ( ctx context . Context , desc * StreamDesc , cc * ClientConn , method string , streamer Streamer , opts ... CallOption ) ( ClientStream , error ) {
return interceptors [ 0 ] ( ctx , desc , cc , method , getChainStreamer ( interceptors , 0 , streamer ) , opts ... )
}
}
cc . dopts . streamInt = chainedInt
}
// getChainStreamer recursively generate the chained client stream constructor.
func getChainStreamer ( interceptors [ ] StreamClientInterceptor , curr int , finalStreamer Streamer ) Streamer {
if curr == len ( interceptors ) - 1 {
return finalStreamer
}
return func ( ctx context . Context , desc * StreamDesc , cc * ClientConn , method string , opts ... CallOption ) ( ClientStream , error ) {
return interceptors [ curr + 1 ] ( ctx , desc , cc , method , getChainStreamer ( interceptors , curr + 1 , finalStreamer ) , opts ... )
}
}
2017-08-09 20:31:12 +03:00
// connectivityStateManager keeps the connectivity.State of ClientConn.
2017-07-25 01:00:53 +03:00
// This struct will eventually be exported so the balancers can access it.
type connectivityStateManager struct {
mu sync . Mutex
2017-08-09 20:31:12 +03:00
state connectivity . State
2017-07-25 01:00:53 +03:00
notifyChan chan struct { }
2018-09-12 21:15:32 +03:00
channelzID int64
2017-07-25 01:00:53 +03:00
}
2017-08-09 20:31:12 +03:00
// updateState updates the connectivity.State of ClientConn.
2017-07-25 01:00:53 +03:00
// If there's a change it notifies goroutines waiting on state change to
// happen.
2017-08-09 20:31:12 +03:00
func ( csm * connectivityStateManager ) updateState ( state connectivity . State ) {
2017-07-25 01:00:53 +03:00
csm . mu . Lock ( )
defer csm . mu . Unlock ( )
2017-08-09 20:31:12 +03:00
if csm . state == connectivity . Shutdown {
2017-07-25 01:00:53 +03:00
return
}
if csm . state == state {
return
}
csm . state = state
2020-02-14 21:11:26 +03:00
channelz . Infof ( csm . channelzID , "Channel Connectivity change to %v" , state )
2017-07-25 01:00:53 +03:00
if csm . notifyChan != nil {
// There are other goroutines waiting on this channel.
close ( csm . notifyChan )
csm . notifyChan = nil
}
}
2017-08-09 20:31:12 +03:00
func ( csm * connectivityStateManager ) getState ( ) connectivity . State {
2017-07-25 01:00:53 +03:00
csm . mu . Lock ( )
defer csm . mu . Unlock ( )
return csm . state
}
func ( csm * connectivityStateManager ) getNotifyChan ( ) <- chan struct { } {
csm . mu . Lock ( )
defer csm . mu . Unlock ( )
if csm . notifyChan == nil {
csm . notifyChan = make ( chan struct { } )
}
return csm . notifyChan
}
2020-01-25 01:49:31 +03:00
// ClientConnInterface defines the functions clients need to perform unary and
// streaming RPCs. It is implemented by *ClientConn, and is only intended to
// be referenced by generated code.
type ClientConnInterface interface {
// Invoke performs a unary RPC and returns after the response is received
// into reply.
Invoke ( ctx context . Context , method string , args interface { } , reply interface { } , opts ... CallOption ) error
// NewStream begins a streaming RPC.
NewStream ( ctx context . Context , desc * StreamDesc , method string , opts ... CallOption ) ( ClientStream , error )
}
// Assert *ClientConn implements ClientConnInterface.
var _ ClientConnInterface = ( * ClientConn ) ( nil )
2019-10-15 19:18:24 +03:00
// ClientConn represents a virtual connection to a conceptual endpoint, to
// perform RPCs.
//
// A ClientConn is free to have zero or more actual connections to the endpoint
// based on configuration, load, etc. It is also free to determine which actual
// endpoints to use and may change it every RPC, permitting client-side load
// balancing.
//
// A ClientConn encapsulates a range of functionality including name
// resolution, TCP connection establishment (with retries and backoff) and TLS
// handshakes. It also handles errors on established connections by
// re-resolving the name and reconnecting.
2015-02-06 04:14:05 +03:00
type ClientConn struct {
2016-07-29 20:16:56 +03:00
ctx context . Context
cancel context . CancelFunc
2017-10-23 21:40:43 +03:00
target string
parsedTarget resolver . Target
authority string
dopts dialOptions
csMgr * connectivityStateManager
2015-09-24 05:09:37 +03:00
2017-10-19 21:32:06 +03:00
balancerBuildOpts balancer . BuildOptions
blockingpicker * pickerWrapper
2017-08-31 20:59:09 +03:00
2018-11-10 02:31:07 +03:00
mu sync . RWMutex
resolverWrapper * ccResolverWrapper
2019-04-03 20:50:28 +03:00
sc * ServiceConfig
2018-11-10 02:31:07 +03:00
conns map [ * addrConn ] struct { }
2017-06-16 19:59:37 +03:00
// Keepalive parameter can be updated if a GoAway is received.
2017-10-19 21:32:06 +03:00
mkp keepalive . ClientParameters
curBalancerName string
balancerWrapper * ccBalancerWrapper
2018-06-28 02:18:41 +03:00
retryThrottler atomic . Value
2018-04-09 21:13:06 +03:00
2018-11-10 00:53:47 +03:00
firstResolveEvent * grpcsync . Event
2018-08-06 21:17:12 +03:00
channelzID int64 // channelz unique identification number
czData * channelzData
2015-09-24 05:09:37 +03:00
}
2017-08-09 20:31:12 +03:00
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
2017-07-25 01:00:53 +03:00
// ctx expires. A true value is returned in former case and false in latter.
2017-08-09 20:31:12 +03:00
// This is an EXPERIMENTAL API.
func ( cc * ClientConn ) WaitForStateChange ( ctx context . Context , sourceState connectivity . State ) bool {
2017-07-25 01:00:53 +03:00
ch := cc . csMgr . getNotifyChan ( )
if cc . csMgr . getState ( ) != sourceState {
return true
}
select {
case <- ctx . Done ( ) :
return false
case <- ch :
return true
}
}
2017-08-09 20:31:12 +03:00
// GetState returns the connectivity.State of ClientConn.
// This is an EXPERIMENTAL API.
func ( cc * ClientConn ) GetState ( ) connectivity . State {
2017-07-25 01:00:53 +03:00
return cc . csMgr . getState ( )
}
2016-12-20 03:31:00 +03:00
func ( cc * ClientConn ) scWatcher ( ) {
for {
select {
case sc , ok := <- cc . dopts . scChan :
if ! ok {
return
}
cc . mu . Lock ( )
// TODO: load balance policy runtime change is ignored.
2019-01-23 20:59:48 +03:00
// We may revisit this decision in the future.
2019-04-03 20:50:28 +03:00
cc . sc = & sc
2016-12-20 03:31:00 +03:00
cc . mu . Unlock ( )
case <- cc . ctx . Done ( ) :
return
}
}
}
2018-11-10 00:53:47 +03:00
// waitForResolvedAddrs blocks until the resolver has provided addresses or the
// context expires. Returns nil unless the context expires first; otherwise
// returns a status error based on the context.
func ( cc * ClientConn ) waitForResolvedAddrs ( ctx context . Context ) error {
// This is on the RPC path, so we use a fast path to avoid the
// more-expensive "select" below after the resolver has returned once.
if cc . firstResolveEvent . HasFired ( ) {
return nil
}
select {
case <- cc . firstResolveEvent . Done ( ) :
return nil
case <- ctx . Done ( ) :
return status . FromContextError ( ctx . Err ( ) ) . Err ( )
case <- cc . ctx . Done ( ) :
return ErrClientConnClosing
}
}
2019-10-04 22:59:43 +03:00
var emptyServiceConfig * ServiceConfig
func init ( ) {
cfg := parseServiceConfig ( "{}" )
if cfg . Err != nil {
panic ( fmt . Sprintf ( "impossible error parsing empty service config: %v" , cfg . Err ) )
}
emptyServiceConfig = cfg . Config . ( * ServiceConfig )
}
func ( cc * ClientConn ) maybeApplyDefaultServiceConfig ( addrs [ ] resolver . Address ) {
if cc . sc != nil {
cc . applyServiceConfigAndBalancer ( cc . sc , addrs )
return
}
if cc . dopts . defaultServiceConfig != nil {
cc . applyServiceConfigAndBalancer ( cc . dopts . defaultServiceConfig , addrs )
} else {
cc . applyServiceConfigAndBalancer ( emptyServiceConfig , addrs )
}
}
func ( cc * ClientConn ) updateResolverState ( s resolver . State , err error ) error {
defer cc . firstResolveEvent . Fire ( )
2017-10-19 21:32:06 +03:00
cc . mu . Lock ( )
2019-03-22 20:48:55 +03:00
// Check if the ClientConn is already closed. Some fields (e.g.
// balancerWrapper) are set to nil when closing the ClientConn, and could
// cause nil pointer panic if we don't have this check.
2017-10-19 21:32:06 +03:00
if cc . conns == nil {
2019-10-04 22:59:43 +03:00
cc . mu . Unlock ( )
2019-03-22 20:48:55 +03:00
return nil
2017-11-29 00:16:53 +03:00
}
2019-10-04 22:59:43 +03:00
if err != nil {
// May need to apply the initial service config in case the resolver
// doesn't support service configs, or doesn't provide a service config
// with the new addresses.
cc . maybeApplyDefaultServiceConfig ( nil )
if cc . balancerWrapper != nil {
cc . balancerWrapper . resolverError ( err )
2019-04-03 20:50:28 +03:00
}
2019-10-04 22:59:43 +03:00
// No addresses are valid with err set; return early.
cc . mu . Unlock ( )
return balancer . ErrBadResolverState
2019-04-03 20:50:28 +03:00
}
2019-10-04 22:59:43 +03:00
var ret error
if cc . dopts . disableServiceConfig || s . ServiceConfig == nil {
cc . maybeApplyDefaultServiceConfig ( s . Addresses )
// TODO: do we need to apply a failing LB policy if there is no
// default, per the error handling design?
} else {
if sc , ok := s . ServiceConfig . Config . ( * ServiceConfig ) ; s . ServiceConfig . Err == nil && ok {
cc . applyServiceConfigAndBalancer ( sc , s . Addresses )
2017-12-12 23:45:05 +03:00
} else {
2019-10-04 22:59:43 +03:00
ret = balancer . ErrBadResolverState
if cc . balancerWrapper == nil {
var err error
if s . ServiceConfig . Err != nil {
err = status . Errorf ( codes . Unavailable , "error parsing service config: %v" , s . ServiceConfig . Err )
} else {
err = status . Errorf ( codes . Unavailable , "illegal service config type: %T" , s . ServiceConfig . Config )
2019-05-30 19:12:58 +03:00
}
2019-10-04 22:59:43 +03:00
cc . blockingpicker . updatePicker ( base . NewErrPicker ( err ) )
cc . csMgr . updateState ( connectivity . TransientFailure )
cc . mu . Unlock ( )
return ret
2019-05-30 19:12:58 +03:00
}
2017-12-12 23:45:05 +03:00
}
2017-10-19 21:32:06 +03:00
}
2017-12-19 02:35:42 +03:00
2019-10-04 22:59:43 +03:00
var balCfg serviceconfig . LoadBalancingConfig
if cc . dopts . balancerBuilder == nil && cc . sc != nil && cc . sc . lbConfig != nil {
balCfg = cc . sc . lbConfig . cfg
}
cbn := cc . curBalancerName
bw := cc . balancerWrapper
cc . mu . Unlock ( )
if cbn != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
for i := 0 ; i < len ( s . Addresses ) ; {
if s . Addresses [ i ] . Type == resolver . GRPCLB {
copy ( s . Addresses [ i : ] , s . Addresses [ i + 1 : ] )
s . Addresses = s . Addresses [ : len ( s . Addresses ) - 1 ]
continue
}
i ++
}
}
uccsErr := bw . updateClientConnState ( & balancer . ClientConnState { ResolverState : s , BalancerConfig : balCfg } )
if ret == nil {
ret = uccsErr // prefer ErrBadResolver state since any other error is
// currently meaningless to the caller.
}
return ret
2017-10-19 21:32:06 +03:00
}
2017-12-12 23:45:05 +03:00
// switchBalancer starts the switching from current balancer to the balancer
// with the given name.
//
// It will NOT send the current address list to the new balancer. If needed,
// caller of this function should send address list to the new balancer after
// this function returns.
2017-11-23 00:59:20 +03:00
//
// Caller must hold cc.mu.
2017-10-19 21:32:06 +03:00
func ( cc * ClientConn ) switchBalancer ( name string ) {
2019-05-24 21:13:46 +03:00
if strings . EqualFold ( cc . curBalancerName , name ) {
2017-10-19 21:32:06 +03:00
return
}
2020-02-14 21:11:26 +03:00
channelz . Infof ( cc . channelzID , "ClientConn switching balancer to %q" , name )
2017-12-12 23:45:05 +03:00
if cc . dopts . balancerBuilder != nil {
2020-02-14 21:11:26 +03:00
channelz . Info ( cc . channelzID , "ignoring balancer switching: Balancer DialOption used instead" )
2017-10-19 21:32:06 +03:00
return
}
2017-11-23 00:59:20 +03:00
if cc . balancerWrapper != nil {
cc . balancerWrapper . close ( )
}
2017-10-19 21:32:06 +03:00
builder := balancer . Get ( name )
if builder == nil {
2020-02-14 21:11:26 +03:00
channelz . Warningf ( cc . channelzID , "Channel switches to new LB policy %q due to fallback from invalid balancer name" , PickFirstBalancerName )
channelz . Infof ( cc . channelzID , "failed to get balancer builder for: %v, using pick_first instead" , name )
2017-10-19 21:32:06 +03:00
builder = newPickfirstBuilder ( )
2020-02-14 21:11:26 +03:00
} else {
channelz . Infof ( cc . channelzID , "Channel switches to new LB policy %q" , name )
2017-10-19 21:32:06 +03:00
}
2018-09-12 21:15:32 +03:00
2017-10-19 21:32:06 +03:00
cc . curBalancerName = builder . Name ( )
cc . balancerWrapper = newCCBalancerWrapper ( cc , builder , cc . balancerBuildOpts )
}
2019-11-21 21:27:29 +03:00
func ( cc * ClientConn ) handleSubConnStateChange ( sc balancer . SubConn , s connectivity . State , err error ) {
2017-10-19 21:32:06 +03:00
cc . mu . Lock ( )
if cc . conns == nil {
cc . mu . Unlock ( )
return
}
// TODO(bar switching) send updates to all balancer wrappers when balancer
// gracefully switching is supported.
2019-11-21 21:27:29 +03:00
cc . balancerWrapper . handleSubConnStateChange ( sc , s , err )
2017-10-19 21:32:06 +03:00
cc . mu . Unlock ( )
}
2017-08-31 20:59:09 +03:00
// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
2017-11-23 00:59:20 +03:00
//
// Caller needs to make sure len(addrs) > 0.
2018-09-25 23:17:25 +03:00
func ( cc * ClientConn ) newAddrConn ( addrs [ ] resolver . Address , opts balancer . NewSubConnOptions ) ( * addrConn , error ) {
2017-08-31 20:59:09 +03:00
ac := & addrConn {
2020-02-13 02:41:45 +03:00
state : connectivity . Idle ,
2018-12-18 00:10:13 +03:00
cc : cc ,
addrs : addrs ,
scopts : opts ,
dopts : cc . dopts ,
czData : new ( channelzData ) ,
resetBackoff : make ( chan struct { } ) ,
2017-08-21 22:27:04 +03:00
}
2017-08-31 20:59:09 +03:00
ac . ctx , ac . cancel = context . WithCancel ( cc . ctx )
// Track ac in cc. This needs to be done before any getTransport(...) is called.
cc . mu . Lock ( )
if cc . conns == nil {
cc . mu . Unlock ( )
return nil , ErrClientConnClosing
2017-08-21 22:27:04 +03:00
}
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
ac . channelzID = channelz . RegisterSubChannel ( ac , cc . channelzID , "" )
2020-02-14 21:11:26 +03:00
channelz . AddTraceEvent ( ac . channelzID , 0 , & channelz . TraceEventDesc {
2018-09-12 21:15:32 +03:00
Desc : "Subchannel Created" ,
Severity : channelz . CtINFO ,
Parent : & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Subchannel(id:%d) created" , ac . channelzID ) ,
Severity : channelz . CtINFO ,
} ,
} )
2018-04-09 21:13:06 +03:00
}
2017-08-31 20:59:09 +03:00
cc . conns [ ac ] = struct { } { }
cc . mu . Unlock ( )
return ac , nil
2017-08-21 22:27:04 +03:00
}
2017-08-31 20:59:09 +03:00
// removeAddrConn removes the addrConn in the subConn from clientConn.
// It also tears down the ac with the given error.
func ( cc * ClientConn ) removeAddrConn ( ac * addrConn , err error ) {
cc . mu . Lock ( )
if cc . conns == nil {
cc . mu . Unlock ( )
2017-08-21 22:27:04 +03:00
return
}
2017-08-31 20:59:09 +03:00
delete ( cc . conns , ac )
cc . mu . Unlock ( )
ac . tearDown ( err )
2017-08-21 22:27:04 +03:00
}
2018-08-07 02:02:47 +03:00
func ( cc * ClientConn ) channelzMetric ( ) * channelz . ChannelInternalMetric {
2018-04-23 21:22:25 +03:00
return & channelz . ChannelInternalMetric {
2018-08-06 21:17:12 +03:00
State : cc . GetState ( ) ,
2018-04-23 21:22:25 +03:00
Target : cc . target ,
2018-08-06 21:17:12 +03:00
CallsStarted : atomic . LoadInt64 ( & cc . czData . callsStarted ) ,
CallsSucceeded : atomic . LoadInt64 ( & cc . czData . callsSucceeded ) ,
CallsFailed : atomic . LoadInt64 ( & cc . czData . callsFailed ) ,
LastCallStartedTimestamp : time . Unix ( 0 , atomic . LoadInt64 ( & cc . czData . lastCallStartedTime ) ) ,
2018-04-23 21:22:25 +03:00
}
}
2018-07-24 02:19:11 +03:00
// Target returns the target string of the ClientConn.
// This is an EXPERIMENTAL API.
func ( cc * ClientConn ) Target ( ) string {
return cc . target
}
2018-04-23 21:22:25 +03:00
func ( cc * ClientConn ) incrCallsStarted ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & cc . czData . callsStarted , 1 )
atomic . StoreInt64 ( & cc . czData . lastCallStartedTime , time . Now ( ) . UnixNano ( ) )
2018-04-23 21:22:25 +03:00
}
func ( cc * ClientConn ) incrCallsSucceeded ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & cc . czData . callsSucceeded , 1 )
2018-04-23 21:22:25 +03:00
}
func ( cc * ClientConn ) incrCallsFailed ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & cc . czData . callsFailed , 1 )
2018-04-09 21:13:06 +03:00
}
2018-10-09 01:58:54 +03:00
// connect starts creating a transport.
2017-10-02 19:22:57 +03:00
// It does nothing if the ac is not IDLE.
2017-08-31 20:59:09 +03:00
// TODO(bar) Move this to the addrConn section.
2017-10-24 00:06:33 +03:00
func ( ac * addrConn ) connect ( ) error {
2017-08-31 20:59:09 +03:00
ac . mu . Lock ( )
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
return errConnClosing
2017-08-21 22:27:04 +03:00
}
2017-10-02 19:22:57 +03:00
if ac . state != connectivity . Idle {
ac . mu . Unlock ( )
return nil
2015-09-24 05:09:37 +03:00
}
2019-06-11 02:58:43 +03:00
// Update connectivity state within the lock to prevent subsequent or
// concurrent calls from resetting the transport more than once.
2019-11-21 21:27:29 +03:00
ac . updateConnectivityState ( connectivity . Connecting , nil )
2017-10-02 19:22:57 +03:00
ac . mu . Unlock ( )
2017-08-31 20:59:09 +03:00
2017-10-24 00:06:33 +03:00
// Start a goroutine connecting to the server asynchronously.
2018-12-18 00:10:13 +03:00
go ac . resetTransport ( )
2016-05-17 01:31:00 +03:00
return nil
2015-09-24 05:09:37 +03:00
}
2017-08-31 20:59:09 +03:00
// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
//
2019-05-30 23:13:47 +03:00
// If ac is Connecting, it returns false. The caller should tear down the ac and
// create a new one. Note that the backoff will be reset when this happens.
//
// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
// addresses will be picked up by retry in the next iteration after backoff.
//
// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
//
// If ac is Ready, it checks whether current connected address of ac is in the
// new addrs list.
2017-08-31 20:59:09 +03:00
// - If true, it updates ac.addrs and returns true. The ac will keep using
// the existing connection.
// - If false, it does nothing and returns false.
func ( ac * addrConn ) tryUpdateAddrs ( addrs [ ] resolver . Address ) bool {
ac . mu . Lock ( )
defer ac . mu . Unlock ( )
2020-02-14 21:11:26 +03:00
channelz . Infof ( ac . channelzID , "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v" , ac . curAddr , addrs )
2019-05-30 23:13:47 +03:00
if ac . state == connectivity . Shutdown ||
ac . state == connectivity . TransientFailure ||
ac . state == connectivity . Idle {
2017-08-31 20:59:09 +03:00
ac . addrs = addrs
return true
}
2019-05-30 23:13:47 +03:00
if ac . state == connectivity . Connecting {
2018-12-18 00:10:13 +03:00
return false
}
2019-05-30 23:13:47 +03:00
// ac.state is Ready, try to find the connected address.
2017-08-31 20:59:09 +03:00
var curAddrFound bool
for _ , a := range addrs {
if reflect . DeepEqual ( ac . curAddr , a ) {
curAddrFound = true
break
}
}
2020-02-14 21:11:26 +03:00
channelz . Infof ( ac . channelzID , "addrConn: tryUpdateAddrs curAddrFound: %v" , curAddrFound )
2017-08-31 20:59:09 +03:00
if curAddrFound {
ac . addrs = addrs
}
return curAddrFound
}
2017-05-20 02:02:02 +03:00
// GetMethodConfig gets the method config of the input method.
// If there's an exact match for input method (i.e. /service/method), we return
// the corresponding MethodConfig.
// If there isn't an exact match for the input method, we look for the default config
// under the service (i.e /service/). If there is a default MethodConfig for
2018-02-21 21:14:52 +03:00
// the service, we return it.
2017-05-20 02:02:02 +03:00
// Otherwise, we return an empty MethodConfig.
2017-05-15 23:51:11 +03:00
func ( cc * ClientConn ) GetMethodConfig ( method string ) MethodConfig {
2017-05-20 02:02:02 +03:00
// TODO: Avoid the locking here.
2016-12-20 03:31:00 +03:00
cc . mu . RLock ( )
defer cc . mu . RUnlock ( )
2019-04-03 20:50:28 +03:00
if cc . sc == nil {
return MethodConfig { }
}
2017-05-15 23:51:11 +03:00
m , ok := cc . sc . Methods [ method ]
2017-04-04 01:03:24 +03:00
if ! ok {
i := strings . LastIndex ( method , "/" )
2018-04-15 14:07:56 +03:00
m = cc . sc . Methods [ method [ : i + 1 ] ]
2017-04-04 01:03:24 +03:00
}
2017-05-15 23:51:11 +03:00
return m
2016-12-20 03:31:00 +03:00
}
2018-11-01 20:49:35 +03:00
func ( cc * ClientConn ) healthCheckConfig ( ) * healthCheckConfig {
cc . mu . RLock ( )
defer cc . mu . RUnlock ( )
2019-04-03 20:50:28 +03:00
if cc . sc == nil {
return nil
}
2018-11-01 20:49:35 +03:00
return cc . sc . healthCheckConfig
}
2018-07-11 20:18:09 +03:00
func ( cc * ClientConn ) getTransport ( ctx context . Context , failfast bool , method string ) ( transport . ClientTransport , func ( balancer . DoneInfo ) , error ) {
2019-11-21 21:27:29 +03:00
t , done , err := cc . blockingpicker . pick ( ctx , failfast , balancer . PickInfo {
Ctx : ctx ,
2018-07-11 20:18:09 +03:00
FullMethodName : method ,
} )
2016-05-07 01:47:09 +03:00
if err != nil {
2017-10-02 19:22:57 +03:00
return nil , nil , toRPCErr ( err )
2015-09-24 05:09:37 +03:00
}
2017-10-02 19:22:57 +03:00
return t , done , nil
2015-09-24 05:09:37 +03:00
}
2019-10-04 22:59:43 +03:00
func ( cc * ClientConn ) applyServiceConfigAndBalancer ( sc * ServiceConfig , addrs [ ] resolver . Address ) {
2019-04-03 20:50:28 +03:00
if sc == nil {
// should never reach here.
2019-10-04 22:59:43 +03:00
return
2019-04-03 20:50:28 +03:00
}
cc . sc = sc
if cc . sc . retryThrottling != nil {
newThrottler := & retryThrottler {
tokens : cc . sc . retryThrottling . MaxTokens ,
max : cc . sc . retryThrottling . MaxTokens ,
thresh : cc . sc . retryThrottling . MaxTokens / 2 ,
ratio : cc . sc . retryThrottling . TokenRatio ,
}
cc . retryThrottler . Store ( newThrottler )
} else {
cc . retryThrottler . Store ( ( * retryThrottler ) ( nil ) )
}
2019-10-04 22:59:43 +03:00
if cc . dopts . balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var newBalancerName string
if cc . sc != nil && cc . sc . lbConfig != nil {
newBalancerName = cc . sc . lbConfig . name
} else {
var isGRPCLB bool
for _ , a := range addrs {
if a . Type == resolver . GRPCLB {
isGRPCLB = true
break
}
}
if isGRPCLB {
newBalancerName = grpclbName
} else if cc . sc != nil && cc . sc . LB != nil {
newBalancerName = * cc . sc . LB
} else {
newBalancerName = PickFirstBalancerName
}
}
cc . switchBalancer ( newBalancerName )
} else if cc . balancerWrapper == nil {
// Balancer dial option was set, and this is the first time handling
// resolved addresses. Build a balancer with dopts.balancerBuilder.
cc . curBalancerName = cc . dopts . balancerBuilder . Name ( )
cc . balancerWrapper = newCCBalancerWrapper ( cc , cc . dopts . balancerBuilder , cc . balancerBuildOpts )
}
2019-04-03 20:50:28 +03:00
}
2019-11-13 02:23:46 +03:00
func ( cc * ClientConn ) resolveNow ( o resolver . ResolveNowOptions ) {
2018-06-12 22:50:43 +03:00
cc . mu . RLock ( )
2017-11-29 00:16:53 +03:00
r := cc . resolverWrapper
2018-06-12 22:50:43 +03:00
cc . mu . RUnlock ( )
2017-11-29 00:16:53 +03:00
if r == nil {
return
}
go r . resolveNow ( o )
}
2018-08-27 23:21:48 +03:00
// ResetConnectBackoff wakes up all subchannels in transient failure and causes
// them to attempt another connection immediately. It also resets the backoff
// times used for subsequent attempts regardless of the current state.
//
// In general, this function should not be used. Typical service or network
// outages result in a reasonable client reconnection strategy by default.
// However, if a previously unavailable network becomes available, this may be
// used to trigger an immediate reconnect.
//
// This API is EXPERIMENTAL.
func ( cc * ClientConn ) ResetConnectBackoff ( ) {
cc . mu . Lock ( )
2019-09-27 20:51:22 +03:00
conns := cc . conns
cc . mu . Unlock ( )
for ac := range conns {
2018-08-27 23:21:48 +03:00
ac . resetConnectBackoff ( )
}
}
2016-05-18 03:18:54 +03:00
// Close tears down the ClientConn and all underlying connections.
2016-05-07 01:47:09 +03:00
func ( cc * ClientConn ) Close ( ) error {
2018-03-03 00:39:55 +03:00
defer cc . cancel ( )
2016-07-29 20:16:56 +03:00
2015-08-01 00:16:02 +03:00
cc . mu . Lock ( )
2016-05-11 05:29:44 +03:00
if cc . conns == nil {
2016-05-07 01:47:09 +03:00
cc . mu . Unlock ( )
return ErrClientConnClosing
}
2016-05-11 05:29:44 +03:00
conns := cc . conns
cc . conns = nil
2017-08-09 20:31:12 +03:00
cc . csMgr . updateState ( connectivity . Shutdown )
2017-10-19 21:32:06 +03:00
rWrapper := cc . resolverWrapper
cc . resolverWrapper = nil
bWrapper := cc . balancerWrapper
cc . balancerWrapper = nil
2016-05-07 01:47:09 +03:00
cc . mu . Unlock ( )
2018-04-23 21:22:25 +03:00
2017-10-02 19:22:57 +03:00
cc . blockingpicker . close ( )
2018-04-23 21:22:25 +03:00
2017-10-19 21:32:06 +03:00
if rWrapper != nil {
rWrapper . close ( )
2017-10-02 19:22:57 +03:00
}
2017-10-19 21:32:06 +03:00
if bWrapper != nil {
bWrapper . close ( )
2016-07-19 01:58:41 +03:00
}
2018-04-23 21:22:25 +03:00
2017-08-31 20:59:09 +03:00
for ac := range conns {
2016-05-11 05:29:44 +03:00
ac . tearDown ( ErrClientConnClosing )
2016-05-07 01:47:09 +03:00
}
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
2018-09-12 21:15:32 +03:00
ted := & channelz . TraceEventDesc {
Desc : "Channel Deleted" ,
Severity : channelz . CtINFO ,
}
if cc . dopts . channelzParentID != 0 {
ted . Parent = & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Nested channel(id:%d) deleted" , cc . channelzID ) ,
Severity : channelz . CtINFO ,
}
}
2020-02-14 21:11:26 +03:00
channelz . AddTraceEvent ( cc . channelzID , 0 , ted )
2018-09-12 21:15:32 +03:00
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
2019-04-03 01:42:35 +03:00
// the entity being deleted, and thus prevent it from being deleted right away.
2018-04-09 21:13:06 +03:00
channelz . RemoveEntry ( cc . channelzID )
}
2016-05-07 01:47:09 +03:00
return nil
}
// addrConn is a network connection to a given address.
type addrConn struct {
2016-07-29 20:16:56 +03:00
ctx context . Context
cancel context . CancelFunc
2017-12-01 20:55:42 +03:00
cc * ClientConn
dopts dialOptions
acbw balancer . SubConn
2018-09-25 23:17:25 +03:00
scopts balancer . NewSubConnOptions
2016-05-07 01:47:09 +03:00
2018-11-01 20:49:35 +03:00
// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
// health checking may require server to report healthy to set ac to READY), and is reset
// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
// is received, transport is closed, ac has been torn down).
2018-09-21 01:45:40 +03:00
transport transport . ClientTransport // The current transport.
mu sync . Mutex
curAddr resolver . Address // The current address.
addrs [ ] resolver . Address // All addresses that the resolver resolved to.
2018-09-12 21:15:32 +03:00
// Use updateConnectivityState for updating addrConn's connectivity state.
state connectivity . State
2016-07-16 02:20:34 +03:00
2018-12-18 00:10:13 +03:00
backoffIdx int // Needs to be stateful for resetConnectBackoff.
2018-08-27 23:21:48 +03:00
resetBackoff chan struct { }
2019-01-17 21:14:45 +03:00
channelzID int64 // channelz unique identification number.
czData * channelzData
2016-05-07 01:47:09 +03:00
}
2018-09-21 01:45:40 +03:00
// Note: this requires a lock on ac.mu.
2019-11-21 21:27:29 +03:00
func ( ac * addrConn ) updateConnectivityState ( s connectivity . State , lastErr error ) {
2018-12-18 00:10:13 +03:00
if ac . state == s {
return
}
2018-09-12 21:15:32 +03:00
ac . state = s
2020-02-14 21:11:26 +03:00
channelz . Infof ( ac . channelzID , "Subchannel Connectivity change to %v" , s )
2019-11-21 21:27:29 +03:00
ac . cc . handleSubConnStateChange ( ac . acbw , s , lastErr )
2018-09-12 21:15:32 +03:00
}
2017-04-11 00:33:51 +03:00
// adjustParams updates parameters used to create transports upon
// receiving a GoAway.
func ( ac * addrConn ) adjustParams ( r transport . GoAwayReason ) {
switch r {
2017-11-07 00:45:11 +03:00
case transport . GoAwayTooManyPings :
2017-04-11 00:33:51 +03:00
v := 2 * ac . dopts . copts . KeepaliveParams . Time
ac . cc . mu . Lock ( )
if v > ac . cc . mkp . Time {
ac . cc . mkp . Time = v
}
ac . cc . mu . Unlock ( )
}
}
2018-12-18 00:10:13 +03:00
func ( ac * addrConn ) resetTransport ( ) {
for i := 0 ; ; i ++ {
if i > 0 {
2019-11-13 02:23:46 +03:00
ac . cc . resolveNow ( resolver . ResolveNowOptions { } )
2018-09-21 01:45:40 +03:00
}
2019-03-14 23:14:19 +03:00
ac . mu . Lock ( )
2019-03-19 20:28:26 +03:00
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
return
}
2018-12-18 00:10:13 +03:00
addrs := ac . addrs
backoffFor := ac . dopts . bs . Backoff ( ac . backoffIdx )
2019-02-12 03:12:42 +03:00
// This will be the duration that dial gets to finish.
2019-03-20 22:58:29 +03:00
dialDuration := minConnectTimeout
if ac . dopts . minConnectTimeout != nil {
dialDuration = ac . dopts . minConnectTimeout ( )
}
2019-02-12 03:12:42 +03:00
if dialDuration < backoffFor {
// Give dial more time as we keep failing to connect.
dialDuration = backoffFor
}
2019-03-14 23:14:19 +03:00
// We can potentially spend all the time trying the first address, and
// if the server accepts the connection and then hangs, the following
// addresses will never be tried.
//
// The spec doesn't mention what should be done for multiple addresses.
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
2019-02-12 03:12:42 +03:00
connectDeadline := time . Now ( ) . Add ( dialDuration )
2019-05-30 23:13:47 +03:00
2019-11-21 21:27:29 +03:00
ac . updateConnectivityState ( connectivity . Connecting , nil )
2019-05-30 23:13:47 +03:00
ac . transport = nil
2018-12-18 00:10:13 +03:00
ac . mu . Unlock ( )
2018-09-21 01:45:40 +03:00
2019-03-19 20:28:26 +03:00
newTr , addr , reconnect , err := ac . tryAllAddrs ( addrs , connectDeadline )
if err != nil {
// After exhausting all addresses, the addrConn enters
// TRANSIENT_FAILURE.
2018-12-18 00:10:13 +03:00
ac . mu . Lock ( )
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
return
}
2019-11-21 21:27:29 +03:00
ac . updateConnectivityState ( connectivity . TransientFailure , err )
2018-09-21 01:45:40 +03:00
2019-03-19 20:28:26 +03:00
// Backoff.
b := ac . resetBackoff
2018-09-21 01:45:40 +03:00
ac . mu . Unlock ( )
2019-03-19 20:28:26 +03:00
timer := time . NewTimer ( backoffFor )
select {
case <- timer . C :
ac . mu . Lock ( )
ac . backoffIdx ++
2019-03-14 23:14:19 +03:00
ac . mu . Unlock ( )
2019-03-19 20:28:26 +03:00
case <- b :
timer . Stop ( )
case <- ac . ctx . Done ( ) :
timer . Stop ( )
2019-03-14 23:14:19 +03:00
return
}
2019-03-19 20:28:26 +03:00
continue
}
2018-09-21 01:45:40 +03:00
2019-03-19 20:28:26 +03:00
ac . mu . Lock ( )
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
2019-07-23 02:07:55 +03:00
newTr . Close ( )
2019-03-19 20:28:26 +03:00
return
}
ac . curAddr = addr
ac . transport = newTr
ac . backoffIdx = 0
hctx , hcancel := context . WithCancel ( ac . ctx )
2019-06-26 21:15:17 +03:00
ac . startHealthCheck ( hctx )
2019-03-19 20:28:26 +03:00
ac . mu . Unlock ( )
// Block until the created transport is down. And when this happens,
// we restart from the top of the addr list.
<- reconnect . Done ( )
hcancel ( )
2019-07-23 02:07:55 +03:00
// restart connecting - the top of the loop will set state to
// CONNECTING. This is against the current connectivity semantics doc,
// however it allows for graceful behavior for RPCs not yet dispatched
// - unfortunate timing would otherwise lead to the RPC failing even
// though the TRANSIENT_FAILURE state (called for by the doc) would be
// instantaneous.
2019-03-19 20:28:26 +03:00
//
2019-07-23 02:07:55 +03:00
// Ideally we should transition to Idle here and block until there is
// RPC activity that leads to the balancer requesting a reconnect of
// the associated SubConn.
2019-03-19 20:28:26 +03:00
}
}
2017-12-01 20:55:42 +03:00
2019-03-19 20:28:26 +03:00
// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
// first successful one. It returns the transport, the address and a Event in
// the successful case. The Event fires when the returned transport disconnects.
func ( ac * addrConn ) tryAllAddrs ( addrs [ ] resolver . Address , connectDeadline time . Time ) ( transport . ClientTransport , resolver . Address , * grpcsync . Event , error ) {
2019-11-21 21:27:29 +03:00
var firstConnErr error
2019-03-19 20:28:26 +03:00
for _ , addr := range addrs {
ac . mu . Lock ( )
if ac . state == connectivity . Shutdown {
2018-12-18 00:10:13 +03:00
ac . mu . Unlock ( )
2019-03-19 20:28:26 +03:00
return nil , resolver . Address { } , nil , errConnClosing
}
ac . cc . mu . RLock ( )
ac . dopts . copts . KeepaliveParams = ac . cc . mkp
ac . cc . mu . RUnlock ( )
copts := ac . dopts . copts
if ac . scopts . CredsBundle != nil {
copts . CredsBundle = ac . scopts . CredsBundle
}
ac . mu . Unlock ( )
2020-02-14 21:11:26 +03:00
channelz . Infof ( ac . channelzID , "Subchannel picks a new address %q to connect" , addr . Addr )
2019-03-19 20:28:26 +03:00
newTr , reconnect , err := ac . createTransport ( addr , copts , connectDeadline )
if err == nil {
return newTr , addr , reconnect , nil
}
2019-11-21 21:27:29 +03:00
if firstConnErr == nil {
firstConnErr = err
}
2019-03-19 20:28:26 +03:00
ac . cc . blockingpicker . updateConnectionError ( err )
2018-09-21 01:45:40 +03:00
}
2019-03-19 20:28:26 +03:00
// Couldn't connect to any address.
2019-11-21 21:27:29 +03:00
return nil , resolver . Address { } , nil , firstConnErr
2018-09-21 01:45:40 +03:00
}
2019-03-19 20:28:26 +03:00
// createTransport creates a connection to addr. It returns the transport and a
// Event in the successful case. The Event fires when the returned transport
// disconnects.
func ( ac * addrConn ) createTransport ( addr resolver . Address , copts transport . ConnectOptions , connectDeadline time . Time ) ( transport . ClientTransport , * grpcsync . Event , error ) {
2019-03-14 23:14:19 +03:00
prefaceReceived := make ( chan struct { } )
2018-09-21 01:45:40 +03:00
onCloseCalled := make ( chan struct { } )
2019-03-19 20:28:26 +03:00
reconnect := grpcsync . NewEvent ( )
2018-09-21 01:45:40 +03:00
2019-10-08 23:59:02 +03:00
authority := ac . cc . authority
// addr.ServerName takes precedent over ClientConn authority, if present.
if addr . ServerName != "" {
authority = addr . ServerName
}
2018-12-18 00:10:13 +03:00
target := transport . TargetInfo {
Addr : addr . Addr ,
Metadata : addr . Metadata ,
2019-10-08 23:59:02 +03:00
Authority : authority ,
2018-12-18 00:10:13 +03:00
}
2018-10-19 00:31:34 +03:00
2019-07-23 02:07:55 +03:00
once := sync . Once { }
2018-09-21 01:45:40 +03:00
onGoAway := func ( r transport . GoAwayReason ) {
ac . mu . Lock ( )
ac . adjustParams ( r )
2019-07-23 02:07:55 +03:00
once . Do ( func ( ) {
if ac . state == connectivity . Ready {
// Prevent this SubConn from being used for new RPCs by setting its
// state to Connecting.
//
// TODO: this should be Idle when grpc-go properly supports it.
2019-11-21 21:27:29 +03:00
ac . updateConnectivityState ( connectivity . Connecting , nil )
2019-07-23 02:07:55 +03:00
}
} )
2018-09-21 01:45:40 +03:00
ac . mu . Unlock ( )
2018-12-18 00:10:13 +03:00
reconnect . Fire ( )
2018-09-21 01:45:40 +03:00
}
onClose := func ( ) {
2019-07-23 02:07:55 +03:00
ac . mu . Lock ( )
once . Do ( func ( ) {
if ac . state == connectivity . Ready {
// Prevent this SubConn from being used for new RPCs by setting its
// state to Connecting.
//
// TODO: this should be Idle when grpc-go properly supports it.
2019-11-21 21:27:29 +03:00
ac . updateConnectivityState ( connectivity . Connecting , nil )
2019-07-23 02:07:55 +03:00
}
} )
ac . mu . Unlock ( )
2018-09-21 01:45:40 +03:00
close ( onCloseCalled )
2018-12-18 00:10:13 +03:00
reconnect . Fire ( )
2018-09-21 01:45:40 +03:00
}
onPrefaceReceipt := func ( ) {
close ( prefaceReceived )
}
connectCtx , cancel := context . WithDeadline ( ac . ctx , connectDeadline )
2018-11-13 00:30:41 +03:00
defer cancel ( )
2018-09-21 01:45:40 +03:00
if channelz . IsOn ( ) {
copts . ChannelzParentID = ac . channelzID
}
newTr , err := transport . NewClientTransport ( connectCtx , ac . cc . ctx , target , copts , onPrefaceReceipt , onGoAway , onClose )
if err != nil {
// newTr is either nil, or closed.
2020-02-14 21:11:26 +03:00
channelz . Warningf ( ac . channelzID , "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting..." , addr , err )
2019-03-19 20:28:26 +03:00
return nil , nil , err
2017-12-01 20:55:42 +03:00
}
2018-09-21 01:45:40 +03:00
2019-07-12 23:37:27 +03:00
select {
2019-11-09 01:46:57 +03:00
case <- time . After ( time . Until ( connectDeadline ) ) :
2019-07-12 23:37:27 +03:00
// We didn't get the preface in time.
newTr . Close ( )
2020-02-14 21:11:26 +03:00
channelz . Warningf ( ac . channelzID , "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting..." , addr )
2019-07-12 23:37:27 +03:00
return nil , nil , errors . New ( "timed out waiting for server handshake" )
case <- prefaceReceived :
// We got the preface - huzzah! things are good.
case <- onCloseCalled :
// The transport has already closed - noop.
return nil , nil , errors . New ( "connection closed" )
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
2019-02-27 21:04:46 +03:00
}
2019-03-19 20:28:26 +03:00
return newTr , reconnect , nil
2018-09-21 01:45:40 +03:00
}
2019-06-26 21:15:17 +03:00
// startHealthCheck starts the health checking stream (RPC) to watch the health
// stats of this connection if health checking is requested and configured.
//
// LB channel health checking is enabled when all requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
// 3. a service config with non-empty healthCheckConfig field is provided
// 4. the load balancer requests it
//
// It sets addrConn to READY if the health checking stream is not started.
//
// Caller must hold ac.mu.
func ( ac * addrConn ) startHealthCheck ( ctx context . Context ) {
var healthcheckManagingState bool
defer func ( ) {
if ! healthcheckManagingState {
2019-11-21 21:27:29 +03:00
ac . updateConnectivityState ( connectivity . Ready , nil )
2019-06-26 21:15:17 +03:00
}
} ( )
if ac . cc . dopts . disableHealthCheck {
return
2018-11-01 20:49:35 +03:00
}
2019-06-26 21:15:17 +03:00
healthCheckConfig := ac . cc . healthCheckConfig ( )
if healthCheckConfig == nil {
return
}
if ! ac . scopts . HealthCheckEnabled {
return
}
healthCheckFunc := ac . cc . dopts . healthCheckFunc
if healthCheckFunc == nil {
// The health package is not imported to set health check function.
//
// TODO: add a link to the health check doc in the error message.
2020-02-14 21:11:26 +03:00
channelz . Error ( ac . channelzID , "Health check is requested but health check function is not set." )
2019-06-26 21:15:17 +03:00
return
}
healthcheckManagingState = true
// Set up the health check helper functions.
currentTr := ac . transport
newStream := func ( method string ) ( interface { } , error ) {
ac . mu . Lock ( )
if ac . transport != currentTr {
ac . mu . Unlock ( )
return nil , status . Error ( codes . Canceled , "the provided transport is no longer valid to use" )
}
ac . mu . Unlock ( )
return newNonRetryClientStream ( ctx , & StreamDesc { ServerStreams : true } , method , currentTr , ac )
}
2019-11-21 21:27:29 +03:00
setConnectivityState := func ( s connectivity . State , lastErr error ) {
2018-11-01 20:49:35 +03:00
ac . mu . Lock ( )
defer ac . mu . Unlock ( )
2019-06-26 21:15:17 +03:00
if ac . transport != currentTr {
2018-11-01 20:49:35 +03:00
return
}
2019-11-21 21:27:29 +03:00
ac . updateConnectivityState ( s , lastErr )
2018-11-01 20:49:35 +03:00
}
2019-06-26 21:15:17 +03:00
// Start the health checking stream.
go func ( ) {
err := ac . cc . dopts . healthCheckFunc ( ctx , newStream , setConnectivityState , healthCheckConfig . ServiceName )
if err != nil {
if status . Code ( err ) == codes . Unimplemented {
2020-02-14 21:11:26 +03:00
channelz . Error ( ac . channelzID , "Subchannel health check is unimplemented at server side, thus health check is disabled" )
2019-06-26 21:15:17 +03:00
} else {
2020-02-14 21:11:26 +03:00
channelz . Errorf ( ac . channelzID , "HealthCheckFunc exits with unexpected error %v" , err )
2018-11-01 20:49:35 +03:00
}
}
2019-06-26 21:15:17 +03:00
} ( )
2018-11-01 20:49:35 +03:00
}
2018-08-27 23:21:48 +03:00
func ( ac * addrConn ) resetConnectBackoff ( ) {
ac . mu . Lock ( )
close ( ac . resetBackoff )
2018-09-21 01:45:40 +03:00
ac . backoffIdx = 0
2018-08-27 23:21:48 +03:00
ac . resetBackoff = make ( chan struct { } )
ac . mu . Unlock ( )
}
2017-10-02 19:22:57 +03:00
// getReadyTransport returns the transport if ac's state is READY.
// Otherwise it returns nil, false.
// If ac's state is IDLE, it will trigger ac to connect.
func ( ac * addrConn ) getReadyTransport ( ) ( transport . ClientTransport , bool ) {
ac . mu . Lock ( )
2018-09-21 01:45:40 +03:00
if ac . state == connectivity . Ready && ac . transport != nil {
2017-10-02 19:22:57 +03:00
t := ac . transport
ac . mu . Unlock ( )
return t , true
}
var idle bool
if ac . state == connectivity . Idle {
idle = true
}
ac . mu . Unlock ( )
// Trigger idle ac to connect.
if idle {
2017-10-24 00:06:33 +03:00
ac . connect ( )
2017-10-02 19:22:57 +03:00
}
return nil , false
}
2016-05-17 01:31:00 +03:00
// tearDown starts to tear down the addrConn.
2015-02-06 04:14:05 +03:00
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
2016-05-07 01:47:09 +03:00
// some edge cases (e.g., the caller opens and closes many addrConn's in a
2015-02-06 04:14:05 +03:00
// tight loop.
2016-07-29 22:57:38 +03:00
// tearDown doesn't remove ac from ac.cc.conns.
2016-05-07 01:47:09 +03:00
func ( ac * addrConn ) tearDown ( err error ) {
ac . mu . Lock ( )
2017-12-15 23:03:41 +03:00
if ac . state == connectivity . Shutdown {
2018-09-21 01:45:40 +03:00
ac . mu . Unlock ( )
2017-12-15 23:03:41 +03:00
return
}
2018-11-01 20:49:35 +03:00
curTr := ac . transport
ac . transport = nil
2018-09-21 20:38:30 +03:00
// We have to set the state to Shutdown before anything else to prevent races
2019-10-01 01:30:07 +03:00
// between setting the state and logic that waits on context cancellation / etc.
2019-11-21 21:27:29 +03:00
ac . updateConnectivityState ( connectivity . Shutdown , nil )
2018-09-21 20:38:30 +03:00
ac . cancel ( )
2017-10-20 01:16:16 +03:00
ac . curAddr = resolver . Address { }
2018-11-01 20:49:35 +03:00
if err == errConnDrain && curTr != nil {
2016-07-26 02:35:32 +03:00
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
// ii) there are concurrent name resolver/Balancer triggered
// address removal and GoAway.
2018-09-21 01:45:40 +03:00
// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
ac . mu . Unlock ( )
2018-11-01 20:49:35 +03:00
curTr . GracefulClose ( )
2018-09-21 01:45:40 +03:00
ac . mu . Lock ( )
2016-07-26 02:35:32 +03:00
}
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
2020-02-14 21:11:26 +03:00
channelz . AddTraceEvent ( ac . channelzID , 0 , & channelz . TraceEventDesc {
2018-09-12 21:15:32 +03:00
Desc : "Subchannel Deleted" ,
Severity : channelz . CtINFO ,
Parent : & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Subchanel(id:%d) deleted" , ac . channelzID ) ,
Severity : channelz . CtINFO ,
} ,
} )
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
2019-10-01 01:30:07 +03:00
// the entity being deleted, and thus prevent it from being deleted right away.
2018-04-09 21:13:06 +03:00
channelz . RemoveEntry ( ac . channelzID )
}
2018-09-21 01:45:40 +03:00
ac . mu . Unlock ( )
2015-02-06 04:14:05 +03:00
}
2017-08-31 20:59:09 +03:00
func ( ac * addrConn ) getState ( ) connectivity . State {
ac . mu . Lock ( )
defer ac . mu . Unlock ( )
return ac . state
}
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
2018-04-09 21:13:06 +03:00
func ( ac * addrConn ) ChannelzMetric ( ) * channelz . ChannelInternalMetric {
2018-04-23 21:22:25 +03:00
ac . mu . Lock ( )
addr := ac . curAddr . Addr
ac . mu . Unlock ( )
return & channelz . ChannelInternalMetric {
2018-08-06 21:17:12 +03:00
State : ac . getState ( ) ,
2018-04-23 21:22:25 +03:00
Target : addr ,
2018-08-06 21:17:12 +03:00
CallsStarted : atomic . LoadInt64 ( & ac . czData . callsStarted ) ,
CallsSucceeded : atomic . LoadInt64 ( & ac . czData . callsSucceeded ) ,
CallsFailed : atomic . LoadInt64 ( & ac . czData . callsFailed ) ,
LastCallStartedTimestamp : time . Unix ( 0 , atomic . LoadInt64 ( & ac . czData . lastCallStartedTime ) ) ,
2018-04-23 21:22:25 +03:00
}
}
func ( ac * addrConn ) incrCallsStarted ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & ac . czData . callsStarted , 1 )
atomic . StoreInt64 ( & ac . czData . lastCallStartedTime , time . Now ( ) . UnixNano ( ) )
2018-04-23 21:22:25 +03:00
}
func ( ac * addrConn ) incrCallsSucceeded ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & ac . czData . callsSucceeded , 1 )
2018-04-23 21:22:25 +03:00
}
func ( ac * addrConn ) incrCallsFailed ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & ac . czData . callsFailed , 1 )
2018-04-09 21:13:06 +03:00
}
2018-06-28 02:18:41 +03:00
type retryThrottler struct {
max float64
thresh float64
ratio float64
mu sync . Mutex
tokens float64 // TODO(dfawley): replace with atomic and remove lock.
}
// throttle subtracts a retry token from the pool and returns whether a retry
// should be throttled (disallowed) based upon the retry throttling policy in
// the service config.
func ( rt * retryThrottler ) throttle ( ) bool {
if rt == nil {
return false
}
rt . mu . Lock ( )
defer rt . mu . Unlock ( )
rt . tokens --
if rt . tokens < 0 {
rt . tokens = 0
}
return rt . tokens <= rt . thresh
}
func ( rt * retryThrottler ) successfulRPC ( ) {
if rt == nil {
return
}
rt . mu . Lock ( )
defer rt . mu . Unlock ( )
rt . tokens += rt . ratio
if rt . tokens > rt . max {
rt . tokens = rt . max
}
}
2018-08-07 02:02:47 +03:00
type channelzChannel struct {
cc * ClientConn
}
func ( c * channelzChannel ) ChannelzMetric ( ) * channelz . ChannelInternalMetric {
return c . cc . channelzMetric ( )
}
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
//
// Deprecated: This error is never returned by grpc and should not be
// referenced by users.
var ErrClientConnTimeout = errors . New ( "grpc: timed out when dialing" )
2020-01-14 23:44:21 +03:00
func ( cc * ClientConn ) getResolver ( scheme string ) resolver . Builder {
for _ , rb := range cc . dopts . resolvers {
2020-03-12 23:41:38 +03:00
if scheme == rb . Scheme ( ) {
2020-01-14 23:44:21 +03:00
return rb
}
}
2020-03-12 23:41:38 +03:00
return resolver . Get ( scheme )
2020-01-14 23:44:21 +03:00
}