2015-02-06 04:14:05 +03:00
/ *
*
2017-06-08 15:42:19 +03:00
* Copyright 2014 gRPC authors .
2015-02-06 04:14:05 +03:00
*
2017-06-08 15:42:19 +03:00
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2015-02-06 04:14:05 +03:00
*
2017-06-08 15:42:19 +03:00
* http : //www.apache.org/licenses/LICENSE-2.0
2015-02-06 04:14:05 +03:00
*
2017-06-08 15:42:19 +03:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
2015-02-06 04:14:05 +03:00
*
* /
2015-02-09 03:35:38 +03:00
package grpc
2015-02-06 04:14:05 +03:00
import (
2015-02-19 14:57:41 +03:00
"errors"
2017-10-02 19:22:57 +03:00
"fmt"
2017-08-14 22:24:23 +03:00
"math"
2015-04-17 23:50:18 +03:00
"net"
2017-08-31 20:59:09 +03:00
"reflect"
2017-04-04 02:03:05 +03:00
"strings"
2015-02-06 04:14:05 +03:00
"sync"
2018-06-28 02:18:41 +03:00
"sync/atomic"
2015-02-06 04:14:05 +03:00
"time"
2015-02-19 09:15:13 +03:00
"golang.org/x/net/context"
2015-09-23 22:18:41 +03:00
"golang.org/x/net/trace"
2017-08-31 20:59:09 +03:00
"google.golang.org/grpc/balancer"
2017-10-19 21:32:06 +03:00
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
"google.golang.org/grpc/codes"
2017-08-09 20:31:12 +03:00
"google.golang.org/grpc/connectivity"
2015-02-09 03:39:06 +03:00
"google.golang.org/grpc/credentials"
2015-05-09 12:43:59 +03:00
"google.golang.org/grpc/grpclog"
2018-06-14 02:07:37 +03:00
"google.golang.org/grpc/internal/backoff"
2018-06-19 03:59:08 +03:00
"google.golang.org/grpc/internal/channelz"
2018-07-11 21:22:45 +03:00
"google.golang.org/grpc/internal/transport"
2017-02-28 22:49:51 +03:00
"google.golang.org/grpc/keepalive"
2017-08-31 20:59:09 +03:00
"google.golang.org/grpc/resolver"
2017-10-20 22:03:44 +03:00
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
"google.golang.org/grpc/status"
2015-02-06 04:14:05 +03:00
)
2018-03-07 21:36:17 +03:00
const (
// minimum time to give a connection to complete
minConnectTimeout = 20 * time . Second
2018-06-05 19:54:12 +03:00
// must match grpclbName in grpclb/grpclb.go
grpclbName = "grpclb"
2018-03-07 21:36:17 +03:00
)
2015-02-19 14:57:41 +03:00
var (
2016-05-25 21:28:45 +03:00
// ErrClientConnClosing indicates that the operation is illegal because
// the ClientConn is closing.
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
//
// Deprecated: this error should not be relied upon by users; use the status
// code of Canceled instead.
ErrClientConnClosing = status . Error ( codes . Canceled , "grpc: the client connection is closing" )
2017-11-07 00:45:11 +03:00
// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors . New ( "grpc: the connection is drained" )
// errConnClosing indicates that the connection is closing.
errConnClosing = errors . New ( "grpc: the connection is closing" )
// errBalancerClosed indicates that the balancer is closed.
errBalancerClosed = errors . New ( "grpc: balancer is closed" )
2018-03-07 21:36:17 +03:00
// We use an accessor so that minConnectTimeout can be
// atomically read and updated while testing.
getMinConnectTimeout = func ( ) time . Duration {
return minConnectTimeout
}
2017-11-07 00:45:11 +03:00
)
2016-05-25 21:28:45 +03:00
2017-11-07 00:45:11 +03:00
// The following errors are returned from Dial and DialContext
var (
2016-05-25 21:28:45 +03:00
// errNoTransportSecurity indicates that there is no transport security
2016-03-17 02:40:16 +03:00
// being set for ClientConn. Users should either set one or explicitly
2015-08-28 03:21:52 +03:00
// call WithInsecure DialOption to disable security.
2016-05-25 21:28:45 +03:00
errNoTransportSecurity = errors . New ( "grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)" )
2016-06-08 23:44:43 +03:00
// errTransportCredentialsMissing indicates that users want to transmit security
// information (e.g., oauth2 token) which requires secure connection on an insecure
2015-08-28 23:19:36 +03:00
// connection.
2016-06-08 23:44:43 +03:00
errTransportCredentialsMissing = errors . New ( "grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)" )
// errCredentialsConflict indicates that grpc.WithTransportCredentials()
// and grpc.WithInsecure() are both called for a connection.
errCredentialsConflict = errors . New ( "grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)" )
2015-02-19 14:57:41 +03:00
)
2017-04-07 00:08:04 +03:00
const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
2017-08-14 22:24:23 +03:00
defaultClientMaxSendMessageSize = math . MaxInt32
2018-06-27 21:16:33 +03:00
// http2IOBufSize specifies the buffer size for sending frames.
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
2017-04-07 00:08:04 +03:00
)
2017-03-10 03:58:23 +03:00
2016-08-15 20:17:09 +03:00
// Dial creates a client connection to the given target.
2015-02-06 04:14:05 +03:00
func Dial ( target string , opts ... DialOption ) ( * ClientConn , error ) {
2016-08-15 20:17:09 +03:00
return DialContext ( context . Background ( ) , target , opts ... )
}
2018-04-13 01:11:22 +03:00
// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use WithBlock() dial option.
//
// In the non-blocking case, the ctx does not act against the connection. It
// only controls the setup steps.
//
// In the blocking case, ctx can be used to cancel or expire the pending
// connection. Once this function returns, the cancellation and expiration of
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
2018-01-19 00:10:52 +03:00
//
// The target name syntax is defined in
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
2016-08-24 22:56:16 +03:00
func DialContext ( ctx context . Context , target string , opts ... DialOption ) ( conn * ClientConn , err error ) {
2015-09-29 20:24:03 +03:00
cc := & ClientConn {
2018-06-27 21:16:33 +03:00
target : target ,
csMgr : & connectivityStateManager { } ,
conns : make ( map [ * addrConn ] struct { } ) ,
dopts : defaultDialOptions ( ) ,
2017-10-02 19:22:57 +03:00
blockingpicker : newPickerWrapper ( ) ,
2018-08-06 21:17:12 +03:00
czData : new ( channelzData ) ,
2015-09-29 20:24:03 +03:00
}
2018-06-28 02:18:41 +03:00
cc . retryThrottler . Store ( ( * retryThrottler ) ( nil ) )
2016-08-24 04:18:18 +03:00
cc . ctx , cc . cancel = context . WithCancel ( context . Background ( ) )
2017-04-04 01:03:24 +03:00
2016-12-20 03:31:00 +03:00
for _ , opt := range opts {
2018-07-20 03:33:42 +03:00
opt . apply ( & cc . dopts )
2016-12-20 03:31:00 +03:00
}
2017-10-02 19:22:57 +03:00
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
if cc . dopts . channelzParentID != 0 {
2018-08-07 02:02:47 +03:00
cc . channelzID = channelz . RegisterChannel ( & channelzChannel { cc } , cc . dopts . channelzParentID , target )
2018-09-12 21:15:32 +03:00
channelz . AddTraceEvent ( cc . channelzID , & channelz . TraceEventDesc {
Desc : "Channel Created" ,
Severity : channelz . CtINFO ,
Parent : & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Nested Channel(id:%d) created" , cc . channelzID ) ,
Severity : channelz . CtINFO ,
} ,
} )
2018-04-09 21:13:06 +03:00
} else {
2018-08-07 02:02:47 +03:00
cc . channelzID = channelz . RegisterChannel ( & channelzChannel { cc } , 0 , target )
2018-09-12 21:15:32 +03:00
channelz . AddTraceEvent ( cc . channelzID , & channelz . TraceEventDesc {
Desc : "Channel Created" ,
Severity : channelz . CtINFO ,
} )
2018-04-09 21:13:06 +03:00
}
2018-09-12 21:15:32 +03:00
cc . csMgr . channelzID = cc . channelzID
2018-04-09 21:13:06 +03:00
}
2017-10-02 19:22:57 +03:00
if ! cc . dopts . insecure {
if cc . dopts . copts . TransportCredentials == nil {
return nil , errNoTransportSecurity
}
} else {
if cc . dopts . copts . TransportCredentials != nil {
return nil , errCredentialsConflict
}
for _ , cd := range cc . dopts . copts . PerRPCCredentials {
if cd . RequireTransportSecurity ( ) {
return nil , errTransportCredentialsMissing
}
}
}
2017-04-11 00:33:51 +03:00
cc . mkp = cc . dopts . copts . KeepaliveParams
2017-03-24 21:29:02 +03:00
2017-04-18 02:08:50 +03:00
if cc . dopts . copts . Dialer == nil {
cc . dopts . copts . Dialer = newProxyDialer (
func ( ctx context . Context , addr string ) ( net . Conn , error ) {
2018-04-09 21:12:34 +03:00
network , addr := parseDialTarget ( addr )
return dialContext ( ctx , network , addr )
2017-04-18 02:08:50 +03:00
} ,
)
}
2017-03-24 21:29:02 +03:00
if cc . dopts . copts . UserAgent != "" {
cc . dopts . copts . UserAgent += " " + grpcUA
} else {
cc . dopts . copts . UserAgent = grpcUA
}
2016-12-20 03:31:00 +03:00
if cc . dopts . timeout > 0 {
var cancel context . CancelFunc
ctx , cancel = context . WithTimeout ( ctx , cc . dopts . timeout )
defer cancel ( )
}
2016-08-24 22:56:16 +03:00
defer func ( ) {
2016-08-24 04:18:18 +03:00
select {
case <- ctx . Done ( ) :
2016-08-25 22:18:19 +03:00
conn , err = nil , ctx . Err ( )
2016-08-24 22:56:16 +03:00
default :
2016-08-24 04:18:18 +03:00
}
2016-08-25 22:18:19 +03:00
if err != nil {
cc . Close ( )
}
2016-08-24 04:18:18 +03:00
} ( )
2017-05-15 23:51:11 +03:00
scSet := false
2016-12-20 03:31:00 +03:00
if cc . dopts . scChan != nil {
2017-04-05 21:08:50 +03:00
// Try to get an initial service config.
2016-12-20 03:31:00 +03:00
select {
case sc , ok := <- cc . dopts . scChan :
if ok {
cc . sc = sc
2017-05-15 23:51:11 +03:00
scSet = true
2016-12-20 03:31:00 +03:00
}
2017-04-05 21:08:50 +03:00
default :
2016-12-20 03:31:00 +03:00
}
2015-02-06 04:14:05 +03:00
}
2016-03-23 21:49:05 +03:00
if cc . dopts . bs == nil {
2018-06-14 02:07:37 +03:00
cc . dopts . bs = backoff . Exponential {
MaxDelay : DefaultBackoffConfig . MaxDelay ,
}
2016-03-23 21:49:05 +03:00
}
2018-03-27 23:58:27 +03:00
if cc . dopts . resolverBuilder == nil {
// Only try to parse target when resolver builder is not already set.
cc . parsedTarget = parseTarget ( cc . target )
grpclog . Infof ( "parsed scheme: %q" , cc . parsedTarget . Scheme )
cc . dopts . resolverBuilder = resolver . Get ( cc . parsedTarget . Scheme )
if cc . dopts . resolverBuilder == nil {
// If resolver builder is still nil, the parse target's scheme is
// not registered. Fallback to default resolver and set Endpoint to
// the original unparsed target.
grpclog . Infof ( "scheme %q not registered, fallback to default scheme" , cc . parsedTarget . Scheme )
cc . parsedTarget = resolver . Target {
Scheme : resolver . GetDefaultScheme ( ) ,
Endpoint : target ,
}
cc . dopts . resolverBuilder = resolver . Get ( cc . parsedTarget . Scheme )
}
} else {
cc . parsedTarget = resolver . Target { Endpoint : target }
}
2016-09-20 01:11:57 +03:00
creds := cc . dopts . copts . TransportCredentials
if creds != nil && creds . Info ( ) . ServerName != "" {
cc . authority = creds . Info ( ) . ServerName
2018-07-13 19:56:47 +03:00
} else if cc . dopts . insecure && cc . dopts . authority != "" {
cc . authority = cc . dopts . authority
2016-09-20 01:11:57 +03:00
} else {
2017-10-23 21:40:43 +03:00
// Use endpoint from "scheme://authority/endpoint" as the default
// authority for ClientConn.
cc . authority = cc . parsedTarget . Endpoint
2016-09-20 01:11:57 +03:00
}
2017-08-31 20:59:09 +03:00
2017-05-15 23:51:11 +03:00
if cc . dopts . scChan != nil && ! scSet {
2017-05-16 00:36:20 +03:00
// Blocking wait for the initial service config.
2017-05-15 23:51:11 +03:00
select {
case sc , ok := <- cc . dopts . scChan :
if ok {
cc . sc = sc
}
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
}
}
2016-12-20 03:31:00 +03:00
if cc . dopts . scChan != nil {
go cc . scWatcher ( )
}
2017-03-21 21:35:53 +03:00
2017-10-19 21:32:06 +03:00
var credsClone credentials . TransportCredentials
if creds := cc . dopts . copts . TransportCredentials ; creds != nil {
credsClone = creds . Clone ( )
}
cc . balancerBuildOpts = balancer . BuildOptions {
2018-04-09 21:13:06 +03:00
DialCreds : credsClone ,
Dialer : cc . dopts . copts . Dialer ,
ChannelzParentID : cc . channelzID ,
2017-10-19 21:32:06 +03:00
}
2017-10-02 19:22:57 +03:00
// Build the resolver.
cc . resolverWrapper , err = newCCResolverWrapper ( cc )
if err != nil {
return nil , fmt . Errorf ( "failed to build resolver: %v" , err )
}
2017-11-29 00:16:53 +03:00
// Start the resolver wrapper goroutine after resolverWrapper is created.
//
// If the goroutine is started before resolverWrapper is ready, the
// following may happen: The goroutine sends updates to cc. cc forwards
// those to balancer. Balancer creates new addrConn. addrConn fails to
// connect, and calls resolveNow(). resolveNow() tries to use the non-ready
// resolverWrapper.
cc . resolverWrapper . start ( )
2017-10-02 19:22:57 +03:00
2017-08-31 20:59:09 +03:00
// A blocking dial blocks until the clientConn is ready.
if cc . dopts . block {
for {
s := cc . GetState ( )
if s == connectivity . Ready {
break
2018-08-27 20:28:41 +03:00
} else if cc . dopts . copts . FailOnNonTempDialError && s == connectivity . TransientFailure {
if err = cc . blockingpicker . connectionError ( ) ; err != nil {
terr , ok := err . ( interface { Temporary ( ) bool } )
if ok && ! terr . Temporary ( ) {
return nil , err
}
}
2017-08-31 20:59:09 +03:00
}
if ! cc . WaitForStateChange ( ctx , s ) {
// ctx got timeout or canceled.
return nil , ctx . Err ( )
}
}
}
2015-09-29 20:24:03 +03:00
return cc , nil
2015-09-23 22:18:41 +03:00
}
2017-08-09 20:31:12 +03:00
// connectivityStateManager keeps the connectivity.State of ClientConn.
2017-07-25 01:00:53 +03:00
// This struct will eventually be exported so the balancers can access it.
type connectivityStateManager struct {
mu sync . Mutex
2017-08-09 20:31:12 +03:00
state connectivity . State
2017-07-25 01:00:53 +03:00
notifyChan chan struct { }
2018-09-12 21:15:32 +03:00
channelzID int64
2017-07-25 01:00:53 +03:00
}
2017-08-09 20:31:12 +03:00
// updateState updates the connectivity.State of ClientConn.
2017-07-25 01:00:53 +03:00
// If there's a change it notifies goroutines waiting on state change to
// happen.
2017-08-09 20:31:12 +03:00
func ( csm * connectivityStateManager ) updateState ( state connectivity . State ) {
2017-07-25 01:00:53 +03:00
csm . mu . Lock ( )
defer csm . mu . Unlock ( )
2017-08-09 20:31:12 +03:00
if csm . state == connectivity . Shutdown {
2017-07-25 01:00:53 +03:00
return
}
if csm . state == state {
return
}
csm . state = state
2018-09-12 21:15:32 +03:00
if channelz . IsOn ( ) {
channelz . AddTraceEvent ( csm . channelzID , & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Channel Connectivity change to %v" , state ) ,
Severity : channelz . CtINFO ,
} )
}
2017-07-25 01:00:53 +03:00
if csm . notifyChan != nil {
// There are other goroutines waiting on this channel.
close ( csm . notifyChan )
csm . notifyChan = nil
}
}
2017-08-09 20:31:12 +03:00
func ( csm * connectivityStateManager ) getState ( ) connectivity . State {
2017-07-25 01:00:53 +03:00
csm . mu . Lock ( )
defer csm . mu . Unlock ( )
return csm . state
}
func ( csm * connectivityStateManager ) getNotifyChan ( ) <- chan struct { } {
csm . mu . Lock ( )
defer csm . mu . Unlock ( )
if csm . notifyChan == nil {
csm . notifyChan = make ( chan struct { } )
}
return csm . notifyChan
}
2016-05-18 21:18:10 +03:00
// ClientConn represents a client connection to an RPC server.
2015-02-06 04:14:05 +03:00
type ClientConn struct {
2016-07-29 20:16:56 +03:00
ctx context . Context
cancel context . CancelFunc
2017-10-23 21:40:43 +03:00
target string
parsedTarget resolver . Target
authority string
dopts dialOptions
csMgr * connectivityStateManager
2015-09-24 05:09:37 +03:00
2017-10-19 21:32:06 +03:00
balancerBuildOpts balancer . BuildOptions
resolverWrapper * ccResolverWrapper
blockingpicker * pickerWrapper
2017-08-31 20:59:09 +03:00
2016-05-07 01:47:09 +03:00
mu sync . RWMutex
2016-12-20 03:31:00 +03:00
sc ServiceConfig
2017-10-19 22:09:19 +03:00
scRaw string
2017-08-31 20:59:09 +03:00
conns map [ * addrConn ] struct { }
2017-06-16 19:59:37 +03:00
// Keepalive parameter can be updated if a GoAway is received.
2017-10-19 21:32:06 +03:00
mkp keepalive . ClientParameters
curBalancerName string
2017-12-12 23:45:05 +03:00
preBalancerName string // previous balancer name.
2017-10-19 21:32:06 +03:00
curAddresses [ ] resolver . Address
balancerWrapper * ccBalancerWrapper
2018-06-28 02:18:41 +03:00
retryThrottler atomic . Value
2018-04-09 21:13:06 +03:00
2018-08-06 21:17:12 +03:00
channelzID int64 // channelz unique identification number
czData * channelzData
2015-09-24 05:09:37 +03:00
}
2017-08-09 20:31:12 +03:00
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
2017-07-25 01:00:53 +03:00
// ctx expires. A true value is returned in former case and false in latter.
2017-08-09 20:31:12 +03:00
// This is an EXPERIMENTAL API.
func ( cc * ClientConn ) WaitForStateChange ( ctx context . Context , sourceState connectivity . State ) bool {
2017-07-25 01:00:53 +03:00
ch := cc . csMgr . getNotifyChan ( )
if cc . csMgr . getState ( ) != sourceState {
return true
}
select {
case <- ctx . Done ( ) :
return false
case <- ch :
return true
}
}
2017-08-09 20:31:12 +03:00
// GetState returns the connectivity.State of ClientConn.
// This is an EXPERIMENTAL API.
func ( cc * ClientConn ) GetState ( ) connectivity . State {
2017-07-25 01:00:53 +03:00
return cc . csMgr . getState ( )
}
2016-12-20 03:31:00 +03:00
func ( cc * ClientConn ) scWatcher ( ) {
for {
select {
case sc , ok := <- cc . dopts . scChan :
if ! ok {
return
}
cc . mu . Lock ( )
// TODO: load balance policy runtime change is ignored.
// We may revist this decision in the future.
cc . sc = sc
2017-10-19 22:09:19 +03:00
cc . scRaw = ""
2016-12-20 03:31:00 +03:00
cc . mu . Unlock ( )
case <- cc . ctx . Done ( ) :
return
}
}
}
2017-10-19 21:32:06 +03:00
func ( cc * ClientConn ) handleResolvedAddrs ( addrs [ ] resolver . Address , err error ) {
cc . mu . Lock ( )
defer cc . mu . Unlock ( )
if cc . conns == nil {
2017-11-29 00:16:53 +03:00
// cc was closed.
return
}
if reflect . DeepEqual ( cc . curAddresses , addrs ) {
2017-10-19 21:32:06 +03:00
return
}
2017-12-12 23:45:05 +03:00
cc . curAddresses = addrs
2017-12-19 02:35:42 +03:00
if cc . dopts . balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
2017-12-12 23:45:05 +03:00
var isGRPCLB bool
for _ , a := range addrs {
if a . Type == resolver . GRPCLB {
isGRPCLB = true
break
}
2017-11-29 00:16:53 +03:00
}
2017-12-12 23:45:05 +03:00
var newBalancerName string
if isGRPCLB {
newBalancerName = grpclbName
} else {
// Address list doesn't contain grpclb address. Try to pick a
// non-grpclb balancer.
newBalancerName = cc . curBalancerName
// If current balancer is grpclb, switch to the previous one.
if newBalancerName == grpclbName {
newBalancerName = cc . preBalancerName
}
// The following could be true in two cases:
// - the first time handling resolved addresses
// (curBalancerName="")
// - the first time handling non-grpclb addresses
// (curBalancerName="grpclb", preBalancerName="")
if newBalancerName == "" {
2017-12-19 02:35:42 +03:00
newBalancerName = PickFirstBalancerName
2017-12-12 23:45:05 +03:00
}
}
cc . switchBalancer ( newBalancerName )
2017-12-19 02:35:42 +03:00
} else if cc . balancerWrapper == nil {
// Balancer dial option was set, and this is the first time handling
// resolved addresses. Build a balancer with dopts.balancerBuilder.
cc . balancerWrapper = newCCBalancerWrapper ( cc , cc . dopts . balancerBuilder , cc . balancerBuildOpts )
2017-10-19 21:32:06 +03:00
}
2017-12-19 02:35:42 +03:00
2017-10-19 21:32:06 +03:00
cc . balancerWrapper . handleResolvedAddrs ( addrs , nil )
}
2017-12-12 23:45:05 +03:00
// switchBalancer starts the switching from current balancer to the balancer
// with the given name.
//
// It will NOT send the current address list to the new balancer. If needed,
// caller of this function should send address list to the new balancer after
// this function returns.
2017-11-23 00:59:20 +03:00
//
// Caller must hold cc.mu.
2017-10-19 21:32:06 +03:00
func ( cc * ClientConn ) switchBalancer ( name string ) {
if cc . conns == nil {
return
}
2017-12-12 23:45:05 +03:00
if strings . ToLower ( cc . curBalancerName ) == strings . ToLower ( name ) {
2017-10-19 21:32:06 +03:00
return
}
2017-12-12 23:45:05 +03:00
grpclog . Infof ( "ClientConn switching balancer to %q" , name )
if cc . dopts . balancerBuilder != nil {
2017-12-19 02:35:42 +03:00
grpclog . Infoln ( "ignoring balancer switching: Balancer DialOption used instead" )
2017-10-19 21:32:06 +03:00
return
}
// TODO(bar switching) change this to two steps: drain and close.
// Keep track of sc in wrapper.
2017-11-23 00:59:20 +03:00
if cc . balancerWrapper != nil {
cc . balancerWrapper . close ( )
}
2017-10-19 21:32:06 +03:00
builder := balancer . Get ( name )
2018-09-12 21:15:32 +03:00
// TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
// we reuse previous one?
if channelz . IsOn ( ) {
if builder == nil {
channelz . AddTraceEvent ( cc . channelzID , & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Channel switches to new LB policy %q due to fallback from invalid balancer name" , PickFirstBalancerName ) ,
Severity : channelz . CtWarning ,
} )
} else {
channelz . AddTraceEvent ( cc . channelzID , & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Channel switches to new LB policy %q" , name ) ,
Severity : channelz . CtINFO ,
} )
}
}
2017-10-19 21:32:06 +03:00
if builder == nil {
2017-12-12 23:45:05 +03:00
grpclog . Infof ( "failed to get balancer builder for: %v, using pick_first instead" , name )
2017-10-19 21:32:06 +03:00
builder = newPickfirstBuilder ( )
}
2018-09-12 21:15:32 +03:00
2017-12-12 23:45:05 +03:00
cc . preBalancerName = cc . curBalancerName
2017-10-19 21:32:06 +03:00
cc . curBalancerName = builder . Name ( )
cc . balancerWrapper = newCCBalancerWrapper ( cc , builder , cc . balancerBuildOpts )
}
func ( cc * ClientConn ) handleSubConnStateChange ( sc balancer . SubConn , s connectivity . State ) {
cc . mu . Lock ( )
if cc . conns == nil {
cc . mu . Unlock ( )
return
}
// TODO(bar switching) send updates to all balancer wrappers when balancer
// gracefully switching is supported.
cc . balancerWrapper . handleSubConnStateChange ( sc , s )
cc . mu . Unlock ( )
}
2017-08-31 20:59:09 +03:00
// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
2017-11-23 00:59:20 +03:00
//
// Caller needs to make sure len(addrs) > 0.
2017-08-31 20:59:09 +03:00
func ( cc * ClientConn ) newAddrConn ( addrs [ ] resolver . Address ) ( * addrConn , error ) {
ac := & addrConn {
2018-08-27 23:21:48 +03:00
cc : cc ,
addrs : addrs ,
dopts : cc . dopts ,
czData : new ( channelzData ) ,
resetBackoff : make ( chan struct { } ) ,
2017-08-21 22:27:04 +03:00
}
2017-08-31 20:59:09 +03:00
ac . ctx , ac . cancel = context . WithCancel ( cc . ctx )
// Track ac in cc. This needs to be done before any getTransport(...) is called.
cc . mu . Lock ( )
if cc . conns == nil {
cc . mu . Unlock ( )
return nil , ErrClientConnClosing
2017-08-21 22:27:04 +03:00
}
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
ac . channelzID = channelz . RegisterSubChannel ( ac , cc . channelzID , "" )
2018-09-12 21:15:32 +03:00
channelz . AddTraceEvent ( ac . channelzID , & channelz . TraceEventDesc {
Desc : "Subchannel Created" ,
Severity : channelz . CtINFO ,
Parent : & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Subchannel(id:%d) created" , ac . channelzID ) ,
Severity : channelz . CtINFO ,
} ,
} )
2018-04-09 21:13:06 +03:00
}
2017-08-31 20:59:09 +03:00
cc . conns [ ac ] = struct { } { }
cc . mu . Unlock ( )
return ac , nil
2017-08-21 22:27:04 +03:00
}
2017-08-31 20:59:09 +03:00
// removeAddrConn removes the addrConn in the subConn from clientConn.
// It also tears down the ac with the given error.
func ( cc * ClientConn ) removeAddrConn ( ac * addrConn , err error ) {
cc . mu . Lock ( )
if cc . conns == nil {
cc . mu . Unlock ( )
2017-08-21 22:27:04 +03:00
return
}
2017-08-31 20:59:09 +03:00
delete ( cc . conns , ac )
cc . mu . Unlock ( )
ac . tearDown ( err )
2017-08-21 22:27:04 +03:00
}
2018-08-07 02:02:47 +03:00
func ( cc * ClientConn ) channelzMetric ( ) * channelz . ChannelInternalMetric {
2018-04-23 21:22:25 +03:00
return & channelz . ChannelInternalMetric {
2018-08-06 21:17:12 +03:00
State : cc . GetState ( ) ,
2018-04-23 21:22:25 +03:00
Target : cc . target ,
2018-08-06 21:17:12 +03:00
CallsStarted : atomic . LoadInt64 ( & cc . czData . callsStarted ) ,
CallsSucceeded : atomic . LoadInt64 ( & cc . czData . callsSucceeded ) ,
CallsFailed : atomic . LoadInt64 ( & cc . czData . callsFailed ) ,
LastCallStartedTimestamp : time . Unix ( 0 , atomic . LoadInt64 ( & cc . czData . lastCallStartedTime ) ) ,
2018-04-23 21:22:25 +03:00
}
}
2018-07-24 02:19:11 +03:00
// Target returns the target string of the ClientConn.
// This is an EXPERIMENTAL API.
func ( cc * ClientConn ) Target ( ) string {
return cc . target
}
2018-04-23 21:22:25 +03:00
func ( cc * ClientConn ) incrCallsStarted ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & cc . czData . callsStarted , 1 )
atomic . StoreInt64 ( & cc . czData . lastCallStartedTime , time . Now ( ) . UnixNano ( ) )
2018-04-23 21:22:25 +03:00
}
func ( cc * ClientConn ) incrCallsSucceeded ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & cc . czData . callsSucceeded , 1 )
2018-04-23 21:22:25 +03:00
}
func ( cc * ClientConn ) incrCallsFailed ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & cc . czData . callsFailed , 1 )
2018-04-09 21:13:06 +03:00
}
2017-08-31 20:59:09 +03:00
// connect starts to creating transport and also starts the transport monitor
// goroutine for this ac.
2017-10-02 19:22:57 +03:00
// It does nothing if the ac is not IDLE.
2017-08-31 20:59:09 +03:00
// TODO(bar) Move this to the addrConn section.
// This was part of resetAddrConn, keep it here to make the diff look clean.
2017-10-24 00:06:33 +03:00
func ( ac * addrConn ) connect ( ) error {
2017-08-31 20:59:09 +03:00
ac . mu . Lock ( )
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
return errConnClosing
2017-08-21 22:27:04 +03:00
}
2017-10-02 19:22:57 +03:00
if ac . state != connectivity . Idle {
ac . mu . Unlock ( )
return nil
2015-09-24 05:09:37 +03:00
}
2018-09-12 21:15:32 +03:00
ac . updateConnectivityState ( connectivity . Connecting )
2017-10-19 21:32:06 +03:00
ac . cc . handleSubConnStateChange ( ac . acbw , ac . state )
2017-10-02 19:22:57 +03:00
ac . mu . Unlock ( )
2017-08-31 20:59:09 +03:00
2017-10-24 00:06:33 +03:00
// Start a goroutine connecting to the server asynchronously.
2018-08-02 01:40:56 +03:00
go func ( ) {
if err := ac . resetTransport ( ) ; err != nil {
grpclog . Warningf ( "Failed to dial %s: %v; please retry." , ac . addrs [ 0 ] . Addr , err )
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac . tearDown ( err )
}
return
}
ac . transportMonitor ( )
} ( )
2016-05-17 01:31:00 +03:00
return nil
2015-09-24 05:09:37 +03:00
}
2017-08-31 20:59:09 +03:00
// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
//
// It checks whether current connected address of ac is in the new addrs list.
// - If true, it updates ac.addrs and returns true. The ac will keep using
// the existing connection.
// - If false, it does nothing and returns false.
func ( ac * addrConn ) tryUpdateAddrs ( addrs [ ] resolver . Address ) bool {
ac . mu . Lock ( )
defer ac . mu . Unlock ( )
grpclog . Infof ( "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v" , ac . curAddr , addrs )
if ac . state == connectivity . Shutdown {
ac . addrs = addrs
return true
}
var curAddrFound bool
for _ , a := range addrs {
if reflect . DeepEqual ( ac . curAddr , a ) {
curAddrFound = true
break
}
}
grpclog . Infof ( "addrConn: tryUpdateAddrs curAddrFound: %v" , curAddrFound )
if curAddrFound {
ac . addrs = addrs
2018-08-02 01:40:56 +03:00
ac . reconnectIdx = 0 // Start reconnecting from beginning in the new list.
2017-08-31 20:59:09 +03:00
}
return curAddrFound
}
2017-05-20 02:02:02 +03:00
// GetMethodConfig gets the method config of the input method.
// If there's an exact match for input method (i.e. /service/method), we return
// the corresponding MethodConfig.
// If there isn't an exact match for the input method, we look for the default config
// under the service (i.e /service/). If there is a default MethodConfig for
2018-02-21 21:14:52 +03:00
// the service, we return it.
2017-05-20 02:02:02 +03:00
// Otherwise, we return an empty MethodConfig.
2017-05-15 23:51:11 +03:00
func ( cc * ClientConn ) GetMethodConfig ( method string ) MethodConfig {
2017-05-20 02:02:02 +03:00
// TODO: Avoid the locking here.
2016-12-20 03:31:00 +03:00
cc . mu . RLock ( )
defer cc . mu . RUnlock ( )
2017-05-15 23:51:11 +03:00
m , ok := cc . sc . Methods [ method ]
2017-04-04 01:03:24 +03:00
if ! ok {
i := strings . LastIndex ( method , "/" )
2018-04-15 14:07:56 +03:00
m = cc . sc . Methods [ method [ : i + 1 ] ]
2017-04-04 01:03:24 +03:00
}
2017-05-15 23:51:11 +03:00
return m
2016-12-20 03:31:00 +03:00
}
2018-07-11 20:18:09 +03:00
func ( cc * ClientConn ) getTransport ( ctx context . Context , failfast bool , method string ) ( transport . ClientTransport , func ( balancer . DoneInfo ) , error ) {
t , done , err := cc . blockingpicker . pick ( ctx , failfast , balancer . PickOptions {
FullMethodName : method ,
} )
2016-05-07 01:47:09 +03:00
if err != nil {
2017-10-02 19:22:57 +03:00
return nil , nil , toRPCErr ( err )
2015-09-24 05:09:37 +03:00
}
2017-10-02 19:22:57 +03:00
return t , done , nil
2015-09-24 05:09:37 +03:00
}
2017-10-19 22:09:19 +03:00
// handleServiceConfig parses the service config string in JSON format to Go native
// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
func ( cc * ClientConn ) handleServiceConfig ( js string ) error {
2018-05-08 20:28:26 +03:00
if cc . dopts . disableServiceConfig {
return nil
}
2018-09-12 21:15:32 +03:00
if cc . scRaw == js {
return nil
}
if channelz . IsOn ( ) {
channelz . AddTraceEvent ( cc . channelzID , & channelz . TraceEventDesc {
// The special formatting of \"%s\" instead of %q is to provide nice printing of service config
// for human consumption.
Desc : fmt . Sprintf ( "Channel has a new service config \"%s\"" , js ) ,
Severity : channelz . CtINFO ,
} )
}
2017-10-19 22:09:19 +03:00
sc , err := parseServiceConfig ( js )
if err != nil {
return err
}
cc . mu . Lock ( )
cc . scRaw = js
cc . sc = sc
2018-06-28 02:18:41 +03:00
if sc . retryThrottling != nil {
newThrottler := & retryThrottler {
tokens : sc . retryThrottling . MaxTokens ,
max : sc . retryThrottling . MaxTokens ,
thresh : sc . retryThrottling . MaxTokens / 2 ,
ratio : sc . retryThrottling . TokenRatio ,
}
cc . retryThrottler . Store ( newThrottler )
} else {
cc . retryThrottler . Store ( ( * retryThrottler ) ( nil ) )
}
2017-12-12 23:45:05 +03:00
if sc . LB != nil && * sc . LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
if cc . curBalancerName == grpclbName {
// If current balancer is grpclb, there's at least one grpclb
// balancer address in the resolved list. Don't switch the balancer,
// but change the previous balancer name, so if a new resolved
// address list doesn't contain grpclb address, balancer will be
// switched to *sc.LB.
cc . preBalancerName = * sc . LB
} else {
cc . switchBalancer ( * sc . LB )
cc . balancerWrapper . handleResolvedAddrs ( cc . curAddresses , nil )
}
2017-11-17 22:11:05 +03:00
}
2018-06-28 02:18:41 +03:00
2017-10-19 22:09:19 +03:00
cc . mu . Unlock ( )
return nil
}
2017-11-29 00:16:53 +03:00
func ( cc * ClientConn ) resolveNow ( o resolver . ResolveNowOption ) {
2018-06-12 22:50:43 +03:00
cc . mu . RLock ( )
2017-11-29 00:16:53 +03:00
r := cc . resolverWrapper
2018-06-12 22:50:43 +03:00
cc . mu . RUnlock ( )
2017-11-29 00:16:53 +03:00
if r == nil {
return
}
go r . resolveNow ( o )
}
2018-08-27 23:21:48 +03:00
// ResetConnectBackoff wakes up all subchannels in transient failure and causes
// them to attempt another connection immediately. It also resets the backoff
// times used for subsequent attempts regardless of the current state.
//
// In general, this function should not be used. Typical service or network
// outages result in a reasonable client reconnection strategy by default.
// However, if a previously unavailable network becomes available, this may be
// used to trigger an immediate reconnect.
//
// This API is EXPERIMENTAL.
func ( cc * ClientConn ) ResetConnectBackoff ( ) {
cc . mu . Lock ( )
defer cc . mu . Unlock ( )
for ac := range cc . conns {
ac . resetConnectBackoff ( )
}
}
2016-05-18 03:18:54 +03:00
// Close tears down the ClientConn and all underlying connections.
2016-05-07 01:47:09 +03:00
func ( cc * ClientConn ) Close ( ) error {
2018-03-03 00:39:55 +03:00
defer cc . cancel ( )
2016-07-29 20:16:56 +03:00
2015-08-01 00:16:02 +03:00
cc . mu . Lock ( )
2016-05-11 05:29:44 +03:00
if cc . conns == nil {
2016-05-07 01:47:09 +03:00
cc . mu . Unlock ( )
return ErrClientConnClosing
}
2016-05-11 05:29:44 +03:00
conns := cc . conns
cc . conns = nil
2017-08-09 20:31:12 +03:00
cc . csMgr . updateState ( connectivity . Shutdown )
2017-10-19 21:32:06 +03:00
rWrapper := cc . resolverWrapper
cc . resolverWrapper = nil
bWrapper := cc . balancerWrapper
cc . balancerWrapper = nil
2016-05-07 01:47:09 +03:00
cc . mu . Unlock ( )
2018-04-23 21:22:25 +03:00
2017-10-02 19:22:57 +03:00
cc . blockingpicker . close ( )
2018-04-23 21:22:25 +03:00
2017-10-19 21:32:06 +03:00
if rWrapper != nil {
rWrapper . close ( )
2017-10-02 19:22:57 +03:00
}
2017-10-19 21:32:06 +03:00
if bWrapper != nil {
bWrapper . close ( )
2016-07-19 01:58:41 +03:00
}
2018-04-23 21:22:25 +03:00
2017-08-31 20:59:09 +03:00
for ac := range conns {
2016-05-11 05:29:44 +03:00
ac . tearDown ( ErrClientConnClosing )
2016-05-07 01:47:09 +03:00
}
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
2018-09-12 21:15:32 +03:00
ted := & channelz . TraceEventDesc {
Desc : "Channel Deleted" ,
Severity : channelz . CtINFO ,
}
if cc . dopts . channelzParentID != 0 {
ted . Parent = & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Nested channel(id:%d) deleted" , cc . channelzID ) ,
Severity : channelz . CtINFO ,
}
}
channelz . AddTraceEvent ( cc . channelzID , ted )
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
// the entity beng deleted, and thus prevent it from being deleted right away.
2018-04-09 21:13:06 +03:00
channelz . RemoveEntry ( cc . channelzID )
}
2016-05-07 01:47:09 +03:00
return nil
}
// addrConn is a network connection to a given address.
type addrConn struct {
2016-07-29 20:16:56 +03:00
ctx context . Context
cancel context . CancelFunc
2017-12-01 20:55:42 +03:00
cc * ClientConn
2018-08-02 01:40:56 +03:00
addrs [ ] resolver . Address
2017-12-01 20:55:42 +03:00
dopts dialOptions
events trace . EventLog
acbw balancer . SubConn
2016-05-07 01:47:09 +03:00
2018-08-02 01:40:56 +03:00
mu sync . Mutex
curAddr resolver . Address
reconnectIdx int // The index in addrs list to start reconnecting from.
2018-09-12 21:15:32 +03:00
// Use updateConnectivityState for updating addrConn's connectivity state.
state connectivity . State
2016-05-07 01:47:09 +03:00
// ready is closed and becomes nil when a new transport is up or failed
// due to timeout.
2018-08-02 01:40:56 +03:00
ready chan struct { }
transport transport . ClientTransport
2016-07-16 02:20:34 +03:00
2018-08-02 01:40:56 +03:00
// The reason this addrConn is torn down.
tearDownErr error
2017-12-01 20:55:42 +03:00
2018-08-02 01:40:56 +03:00
connectRetryNum int
2017-12-01 20:55:42 +03:00
// backoffDeadline is the time until which resetTransport needs to
2018-08-02 01:40:56 +03:00
// wait before increasing connectRetryNum count.
2017-12-01 20:55:42 +03:00
backoffDeadline time . Time
// connectDeadline is the time by which all connection
// negotiations must complete.
connectDeadline time . Time
2018-04-09 21:13:06 +03:00
2018-08-27 23:21:48 +03:00
resetBackoff chan struct { }
2018-08-06 21:17:12 +03:00
channelzID int64 // channelz unique identification number
czData * channelzData
2016-05-07 01:47:09 +03:00
}
2018-09-12 21:15:32 +03:00
func ( ac * addrConn ) updateConnectivityState ( s connectivity . State ) {
ac . state = s
if channelz . IsOn ( ) {
channelz . AddTraceEvent ( ac . channelzID , & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Subchannel Connectivity change to %v" , s ) ,
Severity : channelz . CtINFO ,
} )
}
}
2017-04-11 00:33:51 +03:00
// adjustParams updates parameters used to create transports upon
// receiving a GoAway.
func ( ac * addrConn ) adjustParams ( r transport . GoAwayReason ) {
switch r {
2017-11-07 00:45:11 +03:00
case transport . GoAwayTooManyPings :
2017-04-11 00:33:51 +03:00
v := 2 * ac . dopts . copts . KeepaliveParams . Time
ac . cc . mu . Lock ( )
if v > ac . cc . mkp . Time {
ac . cc . mkp . Time = v
}
ac . cc . mu . Unlock ( )
}
}
2016-05-07 01:47:09 +03:00
// printf records an event in ac's event log, unless ac has been closed.
// REQUIRES ac.mu is held.
func ( ac * addrConn ) printf ( format string , a ... interface { } ) {
if ac . events != nil {
ac . events . Printf ( format , a ... )
}
}
2018-08-02 01:40:56 +03:00
// resetTransport recreates a transport to the address for ac. The old
// transport will close itself on error or when the clientconn is closed.
// The created transport must receive initial settings frame from the server.
// In case that doesn't happen, transportMonitor will kill the newly created
// transport after connectDeadline has expired.
// In case there was an error on the transport before the settings frame was
// received, resetTransport resumes connecting to backends after the one that
// was previously connected to. In case end of the list is reached, resetTransport
// backs off until the original deadline.
// If the DialOption WithWaitForHandshake was set, resetTrasport returns
// successfully only after server settings are received.
2018-08-01 00:10:13 +03:00
//
2018-08-02 01:40:56 +03:00
// TODO(bar) make sure all state transitions are valid.
func ( ac * addrConn ) resetTransport ( ) error {
ac . mu . Lock ( )
if ac . state == connectivity . Shutdown {
2018-08-01 00:10:13 +03:00
ac . mu . Unlock ( )
2018-08-02 01:40:56 +03:00
return errConnClosing
}
if ac . ready != nil {
close ( ac . ready )
ac . ready = nil
}
ac . transport = nil
ridx := ac . reconnectIdx
ac . mu . Unlock ( )
ac . cc . mu . RLock ( )
ac . dopts . copts . KeepaliveParams = ac . cc . mkp
ac . cc . mu . RUnlock ( )
var backoffDeadline , connectDeadline time . Time
2018-08-27 23:21:48 +03:00
var resetBackoff chan struct { }
2018-08-02 01:40:56 +03:00
for connectRetryNum := 0 ; ; connectRetryNum ++ {
2018-08-01 00:10:13 +03:00
ac . mu . Lock ( )
2018-08-02 01:40:56 +03:00
if ac . backoffDeadline . IsZero ( ) {
// This means either a successful HTTP2 connection was established
// or this is the first time this addrConn is trying to establish a
// connection.
backoffFor := ac . dopts . bs . Backoff ( connectRetryNum ) // time.Duration.
2018-08-27 23:21:48 +03:00
resetBackoff = ac . resetBackoff
2018-08-02 01:40:56 +03:00
// This will be the duration that dial gets to finish.
dialDuration := getMinConnectTimeout ( )
if backoffFor > dialDuration {
// Give dial more time as we keep failing to connect.
dialDuration = backoffFor
}
start := time . Now ( )
backoffDeadline = start . Add ( backoffFor )
connectDeadline = start . Add ( dialDuration )
ridx = 0 // Start connecting from the beginning.
} else {
// Continue trying to connect with the same deadlines.
connectRetryNum = ac . connectRetryNum
backoffDeadline = ac . backoffDeadline
connectDeadline = ac . connectDeadline
ac . backoffDeadline = time . Time { }
ac . connectDeadline = time . Time { }
ac . connectRetryNum = 0
}
2017-08-31 20:59:09 +03:00
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
2018-08-02 01:40:56 +03:00
return errConnClosing
2017-08-31 20:59:09 +03:00
}
ac . printf ( "connecting" )
2017-10-24 00:06:33 +03:00
if ac . state != connectivity . Connecting {
2018-09-12 21:15:32 +03:00
ac . updateConnectivityState ( connectivity . Connecting )
2017-10-24 00:06:33 +03:00
ac . cc . handleSubConnStateChange ( ac . acbw , ac . state )
}
2018-08-02 01:40:56 +03:00
// copy ac.addrs in case of race
addrsIter := make ( [ ] resolver . Address , len ( ac . addrs ) )
copy ( addrsIter , ac . addrs )
2017-10-02 19:22:57 +03:00
copts := ac . dopts . copts
2017-08-21 22:27:04 +03:00
ac . mu . Unlock ( )
2018-08-27 23:21:48 +03:00
connected , err := ac . createTransport ( connectRetryNum , ridx , backoffDeadline , connectDeadline , addrsIter , copts , resetBackoff )
2018-08-02 01:40:56 +03:00
if err != nil {
return err
}
if connected {
return nil
2017-12-01 20:55:42 +03:00
}
}
}
// createTransport creates a connection to one of the backends in addrs.
2018-08-02 01:40:56 +03:00
// It returns true if a connection was established.
2018-08-27 23:21:48 +03:00
func ( ac * addrConn ) createTransport ( connectRetryNum , ridx int , backoffDeadline , connectDeadline time . Time , addrs [ ] resolver . Address , copts transport . ConnectOptions , resetBackoff chan struct { } ) ( bool , error ) {
2018-08-02 01:40:56 +03:00
for i := ridx ; i < len ( addrs ) ; i ++ {
addr := addrs [ i ]
2018-09-12 21:15:32 +03:00
if channelz . IsOn ( ) {
channelz . AddTraceEvent ( ac . channelzID , & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Subchannel picks a new address %q to connect" , addr . Addr ) ,
Severity : channelz . CtINFO ,
} )
}
2018-08-02 01:40:56 +03:00
target := transport . TargetInfo {
Addr : addr . Addr ,
Metadata : addr . Metadata ,
Authority : ac . cc . authority ,
}
done := make ( chan struct { } )
onPrefaceReceipt := func ( ) {
2017-08-21 22:27:04 +03:00
ac . mu . Lock ( )
2018-08-02 01:40:56 +03:00
close ( done )
if ! ac . backoffDeadline . IsZero ( ) {
// If we haven't already started reconnecting to
// other backends.
// Note, this can happen when writer notices an error
// and triggers resetTransport while at the same time
// reader receives the preface and invokes this closure.
ac . backoffDeadline = time . Time { }
ac . connectDeadline = time . Time { }
ac . connectRetryNum = 0
}
2017-08-21 22:27:04 +03:00
ac . mu . Unlock ( )
2017-12-01 20:55:42 +03:00
}
2018-08-02 01:40:56 +03:00
// Do not cancel in the success path because of
// this issue in Go1.6: https://github.com/golang/go/issues/15078.
connectCtx , cancel := context . WithDeadline ( ac . ctx , connectDeadline )
if channelz . IsOn ( ) {
copts . ChannelzParentID = ac . channelzID
}
newTr , err := transport . NewClientTransport ( connectCtx , ac . cc . ctx , target , copts , onPrefaceReceipt )
if err != nil {
cancel ( )
ac . cc . blockingpicker . updateConnectionError ( err )
2016-05-07 01:47:09 +03:00
ac . mu . Lock ( )
2018-08-02 01:40:56 +03:00
if ac . state == connectivity . Shutdown {
// ac.tearDown(...) has been invoked.
2016-05-07 01:47:09 +03:00
ac . mu . Unlock ( )
2018-08-02 01:40:56 +03:00
return false , errConnClosing
2015-09-25 23:21:25 +03:00
}
2018-08-02 01:40:56 +03:00
ac . mu . Unlock ( )
grpclog . Warningf ( "grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting..." , addr , err )
continue
}
2017-12-01 20:55:42 +03:00
if ac . dopts . waitForHandshake {
select {
2018-08-02 01:40:56 +03:00
case <- done :
case <- connectCtx . Done ( ) :
// Didn't receive server preface, must kill this new transport now.
grpclog . Warningf ( "grpc: addrConn.createTransport failed to receive server preface before deadline." )
2017-12-01 20:55:42 +03:00
newTr . Close ( )
2018-08-02 01:40:56 +03:00
continue
case <- ac . ctx . Done ( ) :
2017-12-01 20:55:42 +03:00
}
2015-03-05 20:45:50 +03:00
}
2017-08-31 20:59:09 +03:00
ac . mu . Lock ( )
2017-12-01 20:55:42 +03:00
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
2018-08-02 01:40:56 +03:00
// ac.tearDonn(...) has been invoked.
newTr . Close ( )
return false , errConnClosing
2017-12-01 20:55:42 +03:00
}
2018-08-02 01:40:56 +03:00
ac . printf ( "ready" )
2018-09-12 21:15:32 +03:00
ac . updateConnectivityState ( connectivity . Ready )
2017-10-19 21:32:06 +03:00
ac . cc . handleSubConnStateChange ( ac . acbw , ac . state )
2018-08-02 01:40:56 +03:00
ac . transport = newTr
ac . curAddr = addr
if ac . ready != nil {
close ( ac . ready )
ac . ready = nil
}
select {
case <- done :
// If the server has responded back with preface already,
// don't set the reconnect parameters.
default :
ac . connectRetryNum = connectRetryNum
ac . backoffDeadline = backoffDeadline
ac . connectDeadline = connectDeadline
ac . reconnectIdx = i + 1 // Start reconnecting from the next backend in the list.
}
2017-08-31 20:59:09 +03:00
ac . mu . Unlock ( )
2018-08-02 01:40:56 +03:00
return true , nil
2017-12-01 20:55:42 +03:00
}
ac . mu . Lock ( )
2018-04-10 22:48:04 +03:00
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
2018-08-02 01:40:56 +03:00
return false , errConnClosing
2018-04-10 22:48:04 +03:00
}
2018-09-12 21:15:32 +03:00
ac . updateConnectivityState ( connectivity . TransientFailure )
2017-12-01 20:55:42 +03:00
ac . cc . handleSubConnStateChange ( ac . acbw , ac . state )
ac . cc . resolveNow ( resolver . ResolveNowOption { } )
if ac . ready != nil {
close ( ac . ready )
ac . ready = nil
}
ac . mu . Unlock ( )
timer := time . NewTimer ( backoffDeadline . Sub ( time . Now ( ) ) )
select {
case <- timer . C :
2018-08-27 23:21:48 +03:00
case <- resetBackoff :
timer . Stop ( )
2017-12-01 20:55:42 +03:00
case <- ac . ctx . Done ( ) :
2017-08-21 22:27:04 +03:00
timer . Stop ( )
2018-08-02 01:40:56 +03:00
return false , ac . ctx . Err ( )
}
return false , nil
}
2018-08-27 23:21:48 +03:00
func ( ac * addrConn ) resetConnectBackoff ( ) {
ac . mu . Lock ( )
close ( ac . resetBackoff )
ac . resetBackoff = make ( chan struct { } )
ac . connectRetryNum = 0
ac . mu . Unlock ( )
}
2018-08-02 01:40:56 +03:00
// Run in a goroutine to track the error in transport and create the
// new transport if an error happens. It returns when the channel is closing.
func ( ac * addrConn ) transportMonitor ( ) {
for {
var timer * time . Timer
var cdeadline <- chan time . Time
ac . mu . Lock ( )
t := ac . transport
if ! ac . connectDeadline . IsZero ( ) {
timer = time . NewTimer ( ac . connectDeadline . Sub ( time . Now ( ) ) )
cdeadline = timer . C
}
ac . mu . Unlock ( )
// Block until we receive a goaway or an error occurs.
select {
case <- t . GoAway ( ) :
done := t . Error ( )
cleanup := t . Close
// Since this transport will be orphaned (won't have a transportMonitor)
// we need to launch a goroutine to keep track of clientConn.Close()
// happening since it might not be noticed by any other goroutine for a while.
go func ( ) {
<- done
cleanup ( )
} ( )
case <- t . Error ( ) :
// In case this is triggered because clientConn.Close()
// was called, we want to immeditately close the transport
// since no other goroutine might notice it for a while.
t . Close ( )
case <- cdeadline :
ac . mu . Lock ( )
// This implies that client received server preface.
if ac . backoffDeadline . IsZero ( ) {
ac . mu . Unlock ( )
continue
}
ac . mu . Unlock ( )
timer = nil
// No server preface received until deadline.
// Kill the connection.
grpclog . Warningf ( "grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now." )
t . Close ( )
}
if timer != nil {
timer . Stop ( )
}
// If a GoAway happened, regardless of error, adjust our keepalive
// parameters as appropriate.
select {
case <- t . GoAway ( ) :
ac . adjustParams ( t . GetGoAwayReason ( ) )
default :
}
ac . mu . Lock ( )
if ac . state == connectivity . Shutdown {
ac . mu . Unlock ( )
return
}
// Set connectivity state to TransientFailure before calling
// resetTransport. Transition READY->CONNECTING is not valid.
2018-09-12 21:15:32 +03:00
ac . updateConnectivityState ( connectivity . TransientFailure )
2018-08-02 01:40:56 +03:00
ac . cc . handleSubConnStateChange ( ac . acbw , ac . state )
ac . cc . resolveNow ( resolver . ResolveNowOption { } )
ac . curAddr = resolver . Address { }
ac . mu . Unlock ( )
if err := ac . resetTransport ( ) ; err != nil {
ac . mu . Lock ( )
ac . printf ( "transport exiting: %v" , err )
ac . mu . Unlock ( )
grpclog . Warningf ( "grpc: addrConn.transportMonitor exits due to: %v" , err )
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac . tearDown ( err )
}
return
}
2015-02-06 04:14:05 +03:00
}
}
2017-10-02 19:22:57 +03:00
// getReadyTransport returns the transport if ac's state is READY.
// Otherwise it returns nil, false.
// If ac's state is IDLE, it will trigger ac to connect.
func ( ac * addrConn ) getReadyTransport ( ) ( transport . ClientTransport , bool ) {
ac . mu . Lock ( )
2018-08-02 01:40:56 +03:00
if ac . state == connectivity . Ready {
2017-10-02 19:22:57 +03:00
t := ac . transport
ac . mu . Unlock ( )
return t , true
}
var idle bool
if ac . state == connectivity . Idle {
idle = true
}
ac . mu . Unlock ( )
// Trigger idle ac to connect.
if idle {
2017-10-24 00:06:33 +03:00
ac . connect ( )
2017-10-02 19:22:57 +03:00
}
return nil , false
}
2016-05-17 01:31:00 +03:00
// tearDown starts to tear down the addrConn.
2015-02-06 04:14:05 +03:00
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
2016-05-07 01:47:09 +03:00
// some edge cases (e.g., the caller opens and closes many addrConn's in a
2015-02-06 04:14:05 +03:00
// tight loop.
2016-07-29 22:57:38 +03:00
// tearDown doesn't remove ac from ac.cc.conns.
2016-05-07 01:47:09 +03:00
func ( ac * addrConn ) tearDown ( err error ) {
2016-07-29 20:16:56 +03:00
ac . cancel ( )
2016-05-07 01:47:09 +03:00
ac . mu . Lock ( )
2018-08-02 01:40:56 +03:00
defer ac . mu . Unlock ( )
2017-12-15 23:03:41 +03:00
if ac . state == connectivity . Shutdown {
return
}
2017-10-20 01:16:16 +03:00
ac . curAddr = resolver . Address { }
2016-07-26 02:35:32 +03:00
if err == errConnDrain && ac . transport != nil {
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
// ii) there are concurrent name resolver/Balancer triggered
// address removal and GoAway.
ac . transport . GracefulClose ( )
}
2018-09-12 21:15:32 +03:00
ac . updateConnectivityState ( connectivity . Shutdown )
2018-08-02 01:40:56 +03:00
ac . tearDownErr = err
ac . cc . handleSubConnStateChange ( ac . acbw , ac . state )
2016-05-07 01:47:09 +03:00
if ac . events != nil {
ac . events . Finish ( )
ac . events = nil
2015-03-05 00:00:47 +03:00
}
2016-05-07 01:47:09 +03:00
if ac . ready != nil {
close ( ac . ready )
ac . ready = nil
2015-03-04 06:04:29 +03:00
}
2018-04-09 21:13:06 +03:00
if channelz . IsOn ( ) {
2018-09-12 21:15:32 +03:00
channelz . AddTraceEvent ( ac . channelzID , & channelz . TraceEventDesc {
Desc : "Subchannel Deleted" ,
Severity : channelz . CtINFO ,
Parent : & channelz . TraceEventDesc {
Desc : fmt . Sprintf ( "Subchanel(id:%d) deleted" , ac . channelzID ) ,
Severity : channelz . CtINFO ,
} ,
} )
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
// the entity beng deleted, and thus prevent it from being deleted right away.
2018-04-09 21:13:06 +03:00
channelz . RemoveEntry ( ac . channelzID )
}
2015-02-06 04:14:05 +03:00
}
2017-08-31 20:59:09 +03:00
func ( ac * addrConn ) getState ( ) connectivity . State {
ac . mu . Lock ( )
defer ac . mu . Unlock ( )
return ac . state
}
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
2018-04-09 21:13:06 +03:00
func ( ac * addrConn ) ChannelzMetric ( ) * channelz . ChannelInternalMetric {
2018-04-23 21:22:25 +03:00
ac . mu . Lock ( )
addr := ac . curAddr . Addr
ac . mu . Unlock ( )
return & channelz . ChannelInternalMetric {
2018-08-06 21:17:12 +03:00
State : ac . getState ( ) ,
2018-04-23 21:22:25 +03:00
Target : addr ,
2018-08-06 21:17:12 +03:00
CallsStarted : atomic . LoadInt64 ( & ac . czData . callsStarted ) ,
CallsSucceeded : atomic . LoadInt64 ( & ac . czData . callsSucceeded ) ,
CallsFailed : atomic . LoadInt64 ( & ac . czData . callsFailed ) ,
LastCallStartedTimestamp : time . Unix ( 0 , atomic . LoadInt64 ( & ac . czData . lastCallStartedTime ) ) ,
2018-04-23 21:22:25 +03:00
}
}
func ( ac * addrConn ) incrCallsStarted ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & ac . czData . callsStarted , 1 )
atomic . StoreInt64 ( & ac . czData . lastCallStartedTime , time . Now ( ) . UnixNano ( ) )
2018-04-23 21:22:25 +03:00
}
func ( ac * addrConn ) incrCallsSucceeded ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & ac . czData . callsSucceeded , 1 )
2018-04-23 21:22:25 +03:00
}
func ( ac * addrConn ) incrCallsFailed ( ) {
2018-08-06 21:17:12 +03:00
atomic . AddInt64 ( & ac . czData . callsFailed , 1 )
2018-04-09 21:13:06 +03:00
}
2018-06-28 02:18:41 +03:00
type retryThrottler struct {
max float64
thresh float64
ratio float64
mu sync . Mutex
tokens float64 // TODO(dfawley): replace with atomic and remove lock.
}
// throttle subtracts a retry token from the pool and returns whether a retry
// should be throttled (disallowed) based upon the retry throttling policy in
// the service config.
func ( rt * retryThrottler ) throttle ( ) bool {
if rt == nil {
return false
}
rt . mu . Lock ( )
defer rt . mu . Unlock ( )
rt . tokens --
if rt . tokens < 0 {
rt . tokens = 0
}
return rt . tokens <= rt . thresh
}
func ( rt * retryThrottler ) successfulRPC ( ) {
if rt == nil {
return
}
rt . mu . Lock ( )
defer rt . mu . Unlock ( )
rt . tokens += rt . ratio
if rt . tokens > rt . max {
rt . tokens = rt . max
}
}
2018-08-07 02:02:47 +03:00
type channelzChannel struct {
cc * ClientConn
}
func ( c * channelzChannel ) ChannelzMetric ( ) * channelz . ChannelInternalMetric {
return c . cc . channelzMetric ( )
}
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
//
// Deprecated: This error is never returned by grpc and should not be
// referenced by users.
var ErrClientConnTimeout = errors . New ( "grpc: timed out when dialing" )