resolver: add State fields to support error handling (#2951)

This commit is contained in:
Doug Fawley 2019-10-04 12:59:43 -07:00 коммит произвёл GitHub
Родитель aa4eae656c
Коммит ed563a02ea
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
24 изменённых файлов: 662 добавлений и 271 удалений

Просмотреть файл

@ -305,14 +305,23 @@ type ClientConnState struct {
BalancerConfig serviceconfig.LoadBalancingConfig
}
// ErrBadResolverState may be returned by UpdateClientConnState to indicate a
// problem with the provided name resolver data.
var ErrBadResolverState = errors.New("bad resolver state")
// V2Balancer is defined for documentation purposes. If a Balancer also
// implements V2Balancer, its UpdateClientConnState method will be called
// instead of HandleResolvedAddrs and its UpdateSubConnState will be called
// instead of HandleSubConnStateChange.
type V2Balancer interface {
// UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes.
UpdateClientConnState(ClientConnState)
// changes. If the error returned is ErrBadResolverState, the ClientConn
// will begin calling ResolveNow on the active name resolver with
// exponential backoff until a subsequent call to UpdateClientConnState
// returns a nil error. Any other errors are currently ignored.
UpdateClientConnState(ClientConnState) error
// ResolverError is called by gRPC when the name resolver reports an error.
ResolverError(error)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)

Просмотреть файл

@ -53,6 +53,8 @@ func (bb *baseBuilder) Name() string {
return bb.name
}
var _ balancer.V2Balancer = (*baseBalancer)(nil) // Assert that we implement V2Balancer
type baseBalancer struct {
cc balancer.ClientConn
pickerBuilder PickerBuilder
@ -70,7 +72,11 @@ func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
panic("not implemented")
}
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) {
func (b *baseBalancer) ResolverError(error) {
// Ignore
}
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// TODO: handle s.ResolverState.Err (log if not nil) once implemented.
// TODO: handle s.ResolverState.ServiceConfig?
if grpclog.V(2) {
@ -101,6 +107,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) {
// The entry will be deleted in HandleSubConnStateChange.
}
}
return nil
}
// regeneratePicker takes a snapshot of the balancer, and generates a picker

Просмотреть файл

@ -161,6 +161,8 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
return lb
}
var _ balancer.V2Balancer = (*lbBalancer)(nil) // Assert that we implement V2Balancer
type lbBalancer struct {
cc *lbCacheClientConn
target string
@ -410,7 +412,12 @@ func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
}
func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
func (lb *lbBalancer) ResolverError(error) {
// Ignore resolver errors. GRPCLB is not selected unless the resolver
// works at least once.
}
func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: UpdateClientConnState: %+v", ccs)
}
@ -418,8 +425,8 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
lb.handleServiceConfig(gc)
addrs := ccs.ResolverState.Addresses
if len(addrs) <= 0 {
return
if len(addrs) == 0 {
return balancer.ErrBadResolverState
}
var remoteBalancerAddrs, backendAddrs []resolver.Address
@ -433,9 +440,9 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
}
if lb.ccRemoteLB == nil {
if len(remoteBalancerAddrs) <= 0 {
if len(remoteBalancerAddrs) == 0 {
grpclog.Errorf("grpclb: no remote balancer address is available, should never happen")
return
return balancer.ErrBadResolverState
}
// First time receiving resolved addresses, create a cc to remote
// balancers.
@ -457,6 +464,7 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
}
lb.mu.Unlock()
return nil
}
func (lb *lbBalancer) Close() {

Просмотреть файл

@ -44,7 +44,6 @@ import (
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing"
)
@ -853,12 +852,6 @@ func TestFallback(t *testing.T) {
}
func TestGRPCLBPickFirst(t *testing.T) {
const pfc = `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`
svcCfg, err := serviceconfig.Parse(pfc)
if err != nil {
t.Fatalf("Error parsing config %q: %v", pfc, err)
}
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
@ -908,6 +901,11 @@ func TestGRPCLBPickFirst(t *testing.T) {
tss.ls.sls <- &lbpb.ServerList{Servers: beServers[0:3]}
// Start with sub policy pick_first.
const pfc = `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`
scpr := r.CC.ParseServiceConfig(pfc)
if scpr.Err != nil {
t.Fatalf("Error parsing config %q: %v", pfc, scpr.Err)
}
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{
@ -915,7 +913,7 @@ func TestGRPCLBPickFirst(t *testing.T) {
Type: resolver.GRPCLB,
ServerName: lbServerName,
}},
ServiceConfig: svcCfg,
ServiceConfig: scpr,
})
result = ""
@ -954,9 +952,9 @@ func TestGRPCLBPickFirst(t *testing.T) {
}
// Switch sub policy to roundrobin.
grpclbServiceConfigEmpty, err := serviceconfig.Parse(`{}`)
if err != nil {
t.Fatalf("Error parsing config %q: %v", grpclbServiceConfigEmpty, err)
grpclbServiceConfigEmpty := r.CC.ParseServiceConfig(`{}`)
if grpclbServiceConfigEmpty.Err != nil {
t.Fatalf("Error parsing config %q: %v", `{}`, grpclbServiceConfigEmpty.Err)
}
r.UpdateState(resolver.State{

Просмотреть файл

@ -25,6 +25,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
)
@ -86,10 +87,10 @@ func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
stateChangeQueue *scStateUpdateBuffer
ccUpdateCh chan *balancer.ClientConnState
done chan struct{}
done *grpcsync.Event
mu sync.Mutex
subConns map[*acBalancerWrapper]struct{}
@ -99,8 +100,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
ccb := &ccBalancerWrapper{
cc: cc,
stateChangeQueue: newSCStateUpdateBuffer(),
ccUpdateCh: make(chan *balancer.ClientConnState, 1),
done: make(chan struct{}),
done: grpcsync.NewEvent(),
subConns: make(map[*acBalancerWrapper]struct{}),
}
go ccb.watcher()
@ -115,34 +115,20 @@ func (ccb *ccBalancerWrapper) watcher() {
select {
case t := <-ccb.stateChangeQueue.get():
ccb.stateChangeQueue.load()
select {
case <-ccb.done:
ccb.balancer.Close()
return
default:
if ccb.done.HasFired() {
break
}
ccb.balancerMu.Lock()
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ub.UpdateSubConnState(t.sc, balancer.SubConnState{ConnectivityState: t.state})
} else {
ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
}
case s := <-ccb.ccUpdateCh:
select {
case <-ccb.done:
ccb.balancer.Close()
return
default:
}
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ub.UpdateClientConnState(*s)
} else {
ccb.balancer.HandleResolvedAddrs(s.ResolverState.Addresses, nil)
}
case <-ccb.done:
ccb.balancerMu.Unlock()
case <-ccb.done.Done():
}
select {
case <-ccb.done:
if ccb.done.HasFired() {
ccb.balancer.Close()
ccb.mu.Lock()
scs := ccb.subConns
@ -153,14 +139,12 @@ func (ccb *ccBalancerWrapper) watcher() {
}
ccb.UpdateBalancerState(connectivity.Connecting, nil)
return
default:
}
ccb.cc.firstResolveEvent.Fire()
}
}
func (ccb *ccBalancerWrapper) close() {
close(ccb.done)
ccb.done.Fire()
}
func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
@ -180,24 +164,22 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
})
}
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) {
if ccb.cc.curBalancerName != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
s := &ccs.ResolverState
for i := 0; i < len(s.Addresses); {
if s.Addresses[i].Type == resolver.GRPCLB {
copy(s.Addresses[i:], s.Addresses[i+1:])
s.Addresses = s.Addresses[:len(s.Addresses)-1]
continue
}
i++
}
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
return ub.UpdateClientConnState(*ccs)
}
select {
case <-ccb.ccUpdateCh:
default:
ccb.balancer.HandleResolvedAddrs(ccs.ResolverState.Addresses, nil)
return nil
}
func (ccb *ccBalancerWrapper) resolverError(err error) {
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ccb.balancerMu.Lock()
ub.ResolverError(err)
ccb.balancerMu.Unlock()
}
ccb.ccUpdateCh <- ccs
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {

Просмотреть файл

@ -0,0 +1,92 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"fmt"
"testing"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
)
var _ balancer.V2Balancer = &funcBalancer{}
type funcBalancer struct {
updateClientConnState func(s balancer.ClientConnState) error
}
func (*funcBalancer) HandleSubConnStateChange(balancer.SubConn, connectivity.State) {
panic("unimplemented") // v1 API
}
func (*funcBalancer) HandleResolvedAddrs([]resolver.Address, error) {
panic("unimplemented") // v1 API
}
func (b *funcBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
return b.updateClientConnState(s)
}
func (*funcBalancer) ResolverError(error) {
panic("unimplemented") // resolver never reports error
}
func (*funcBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
panic("unimplemented") // we never have sub-conns
}
func (*funcBalancer) Close() {}
type funcBalancerBuilder struct {
name string
instance *funcBalancer
}
func (b *funcBalancerBuilder) Build(balancer.ClientConn, balancer.BuildOptions) balancer.Balancer {
return b.instance
}
func (b *funcBalancerBuilder) Name() string { return b.name }
// TestBalancerErrorResolverPolling injects balancer errors and verifies
// ResolveNow is called on the resolver with the appropriate backoff strategy
// being consulted between ResolveNow calls.
func (s) TestBalancerErrorResolverPolling(t *testing.T) {
// The test balancer will return ErrBadResolverState iff the
// ClientConnState contains no addresses.
fb := &funcBalancer{
updateClientConnState: func(s balancer.ClientConnState) error {
if len(s.ResolverState.Addresses) == 0 {
return balancer.ErrBadResolverState
}
return nil
},
}
const balName = "BalancerErrorResolverPolling"
balancer.Register(&funcBalancerBuilder{name: balName, instance: fb})
testResolverErrorPolling(t,
func(r *manual.Resolver) {
// No addresses so the balancer will fail.
r.CC.UpdateState(resolver.State{})
}, func(r *manual.Resolver) {
// UpdateState will block if ResolveNow is being called (which blocks on
// rn), so call it in a goroutine. Include some address so the balancer
// will be happy.
go r.CC.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "x"}}})
},
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, balName)))
}

Просмотреть файл

@ -149,12 +149,12 @@ func (s) TestSwitchBalancer(t *testing.T) {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
// Switch to roundrobin.
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs})
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil)
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
// Switch to pickfirst.
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs})
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil)
if err := checkPickFirst(cc, servers); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
@ -181,7 +181,7 @@ func (s) TestBalancerDialOption(t *testing.T) {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
// Switch to pickfirst.
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs})
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil)
// Balancer is still roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
@ -337,7 +337,7 @@ func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
}
defer cc.Close()
sc := parseCfg(`{"loadBalancingPolicy": "round_robin"}`)
sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
var isRoundRobin bool
@ -433,7 +433,7 @@ func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
}
sc := parseCfg(`{"loadBalancingPolicy": "round_robin"}`)
sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc})
var isRoundRobin bool
for i := 0; i < 200; i++ {
@ -510,16 +510,16 @@ func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
// Switch to roundrobin, and check against server[1] and server[2].
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs})
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil)
if err := checkRoundRobin(cc, servers[1:]); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
}
func parseCfg(s string) serviceconfig.Config {
c, err := serviceconfig.Parse(s)
if err != nil {
panic(fmt.Sprintf("Error parsing config %q: %v", s, err))
func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
scpr := r.CC.ParseServiceConfig(s)
if scpr.Err != nil {
panic(fmt.Sprintf("Error parsing config %q: %v", s, scpr.Err))
}
return c
return scpr
}

Просмотреть файл

@ -31,6 +31,7 @@ import (
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
@ -186,11 +187,11 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
if cc.dopts.defaultServiceConfigRawJSON != nil {
sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
if err != nil {
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err)
scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
if scpr.Err != nil {
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
}
cc.dopts.defaultServiceConfig = sc
cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
}
cc.mkp = cc.dopts.copts.KeepaliveParams
@ -530,58 +531,104 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
}
}
func (cc *ClientConn) updateResolverState(s resolver.State) error {
var emptyServiceConfig *ServiceConfig
func init() {
cfg := parseServiceConfig("{}")
if cfg.Err != nil {
panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
}
emptyServiceConfig = cfg.Config.(*ServiceConfig)
}
func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
if cc.sc != nil {
cc.applyServiceConfigAndBalancer(cc.sc, addrs)
return
}
if cc.dopts.defaultServiceConfig != nil {
cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
} else {
cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
}
}
func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
defer cc.firstResolveEvent.Fire()
cc.mu.Lock()
defer cc.mu.Unlock()
// Check if the ClientConn is already closed. Some fields (e.g.
// balancerWrapper) are set to nil when closing the ClientConn, and could
// cause nil pointer panic if we don't have this check.
if cc.conns == nil {
cc.mu.Unlock()
return nil
}
if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
if err != nil {
// May need to apply the initial service config in case the resolver
// doesn't support service configs, or doesn't provide a service config
// with the new addresses.
cc.maybeApplyDefaultServiceConfig(nil)
if cc.balancerWrapper != nil {
cc.balancerWrapper.resolverError(err)
}
// No addresses are valid with err set; return early.
cc.mu.Unlock()
return balancer.ErrBadResolverState
}
var ret error
if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
cc.maybeApplyDefaultServiceConfig(s.Addresses)
// TODO: do we need to apply a failing LB policy if there is no
// default, per the error handling design?
} else {
if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
cc.applyServiceConfigAndBalancer(sc, s.Addresses)
} else {
ret = balancer.ErrBadResolverState
if cc.balancerWrapper == nil {
var err error
if s.ServiceConfig.Err != nil {
err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
} else {
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
}
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
cc.mu.Unlock()
return ret
}
}
} else if sc, ok := s.ServiceConfig.(*ServiceConfig); ok {
cc.applyServiceConfig(sc)
}
var balCfg serviceconfig.LoadBalancingConfig
if cc.dopts.balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var newBalancerName string
if cc.sc != nil && cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
balCfg = cc.sc.lbConfig.cfg
} else {
var isGRPCLB bool
for _, a := range s.Addresses {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc != nil && cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else {
newBalancerName = PickFirstBalancerName
}
}
cc.switchBalancer(newBalancerName)
} else if cc.balancerWrapper == nil {
// Balancer dial option was set, and this is the first time handling
// resolved addresses. Build a balancer with dopts.balancerBuilder.
cc.curBalancerName = cc.dopts.balancerBuilder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig.cfg
}
cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
return nil
cbn := cc.curBalancerName
bw := cc.balancerWrapper
cc.mu.Unlock()
if cbn != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
for i := 0; i < len(s.Addresses); {
if s.Addresses[i].Type == resolver.GRPCLB {
copy(s.Addresses[i:], s.Addresses[i+1:])
s.Addresses = s.Addresses[:len(s.Addresses)-1]
continue
}
i++
}
}
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
if ret == nil {
ret = uccsErr // prefer ErrBadResolver state since any other error is
// currently meaningless to the caller.
}
return ret
}
// switchBalancer starts the switching from current balancer to the balancer
@ -829,10 +876,10 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st
return t, done, nil
}
func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error {
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
if sc == nil {
// should never reach here.
return fmt.Errorf("got nil pointer for service config")
return
}
cc.sc = sc
@ -848,7 +895,35 @@ func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error {
cc.retryThrottler.Store((*retryThrottler)(nil))
}
return nil
if cc.dopts.balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var newBalancerName string
if cc.sc != nil && cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
} else {
var isGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc != nil && cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else {
newBalancerName = PickFirstBalancerName
}
}
cc.switchBalancer(newBalancerName)
} else if cc.balancerWrapper == nil {
// Balancer dial option was set, and this is the first time handling
// resolved addresses. Build a balancer with dopts.balancerBuilder.
cc.curBalancerName = cc.dopts.balancerBuilder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
}
}
func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {

Просмотреть файл

@ -780,7 +780,7 @@ func (s) TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) {
// SwitchBalancer before NewAddress. There was no balancer created, this
// makes sure we don't call close on nil balancerWrapper.
r.UpdateState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`)}) // This should not panic.
r.UpdateState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)}) // This should not panic.
time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn.
}
@ -796,7 +796,7 @@ func (s) TestResolverServiceConfigWhileClosingNotPanic(t *testing.T) {
}
// Send a new service config while closing the ClientConn.
go cc.Close()
go r.UpdateState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`)}) // This should not panic.
go r.UpdateState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)}) // This should not panic.
}
}
@ -889,7 +889,7 @@ func (s) TestDisableServiceConfigOption(t *testing.T) {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
defer cc.Close()
r.UpdateState(resolver.State{ServiceConfig: parseCfg(`{
r.UpdateState(resolver.State{ServiceConfig: parseCfg(r, `{
"methodConfig": [
{
"name": [
@ -1181,29 +1181,29 @@ func testInvalidDefaultServiceConfig(t *testing.T) {
}
}
func testDefaultServiceConfigWhenResolverServiceConfigDisabled(t *testing.T, r resolver.Resolver, addr string, js string) {
func testDefaultServiceConfigWhenResolverServiceConfigDisabled(t *testing.T, r *manual.Resolver, addr string, js string) {
cc, err := Dial(addr, WithInsecure(), WithDisableServiceConfig(), WithDefaultServiceConfig(js))
if err != nil {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
defer cc.Close()
// Resolver service config gets ignored since resolver service config is disabled.
r.(*manual.Resolver).UpdateState(resolver.State{
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}},
ServiceConfig: parseCfg("{}"),
ServiceConfig: parseCfg(r, "{}"),
})
if !verifyWaitForReadyEqualsTrue(cc) {
t.Fatal("default service config failed to be applied after 1s")
}
}
func testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig(t *testing.T, r resolver.Resolver, addr string, js string) {
func testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig(t *testing.T, r *manual.Resolver, addr string, js string) {
cc, err := Dial(addr, WithInsecure(), WithDefaultServiceConfig(js))
if err != nil {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
defer cc.Close()
r.(*manual.Resolver).UpdateState(resolver.State{
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}},
})
if !verifyWaitForReadyEqualsTrue(cc) {
@ -1211,15 +1211,14 @@ func testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig(t *testing.T
}
}
func testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t *testing.T, r resolver.Resolver, addr string, js string) {
func testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t *testing.T, r *manual.Resolver, addr string, js string) {
cc, err := Dial(addr, WithInsecure(), WithDefaultServiceConfig(js))
if err != nil {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
defer cc.Close()
r.(*manual.Resolver).UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}},
ServiceConfig: nil,
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}},
})
if !verifyWaitForReadyEqualsTrue(cc) {
t.Fatal("default service config failed to be applied after 1s")

Просмотреть файл

@ -39,9 +39,6 @@ var (
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
// ParseServiceConfig is a function to parse JSON service configs into
// opaque data structures.
ParseServiceConfig func(sc string) (interface{}, error)
// StatusRawProto is exported by status/status.go. This func returns a
// pointer to the wrapped Status proto for a given status.Status without a
// call to proto.Clone(). The returned Status proto should not be mutated by
@ -50,6 +47,9 @@ var (
// NewRequestInfoContext creates a new context based on the argument context attaching
// the passed in RequestInfo to the new context.
NewRequestInfoContext interface{} // func(context.Context, credentials.RequestInfo) context.Context
// ParseServiceConfigForTesting is for creating a fake
// ClientConn for resolver testing only
ParseServiceConfigForTesting interface{} // func(string) *serviceconfig.ParseResult
)
// HealthChecker defines the signature of the client-side LB channel health checking function.

Просмотреть файл

@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
func TestMain(m *testing.M) {
@ -89,6 +90,14 @@ func (t *testClientConn) getSc() (string, int) {
return t.sc, t.s
}
func (t *testClientConn) ParseServiceConfig(string) *serviceconfig.ParseResult {
panic("not implemented")
}
func (t *testClientConn) ReportError(error) {
panic("not implemented")
}
type testResolver struct {
// A write to this channel is made when this resolver receives a resolution
// request. Tests can rely on reading from this channel to be notified about

Просмотреть файл

@ -30,17 +30,22 @@ import (
// NewBuilderWithScheme creates a new test resolver builder with the given scheme.
func NewBuilderWithScheme(scheme string) *Resolver {
return &Resolver{
scheme: scheme,
ResolveNowCallback: func(resolver.ResolveNowOption) {},
scheme: scheme,
}
}
// Resolver is also a resolver builder.
// It's build() function always returns itself.
type Resolver struct {
scheme string
// ResolveNowCallback is called when the ResolveNow method is called on the
// resolver. Must not be nil. Must not be changed after the resolver may
// be built.
ResolveNowCallback func(resolver.ResolveNowOption)
scheme string
// Fields actually belong to the resolver.
cc resolver.ClientConn
CC resolver.ClientConn
bootstrapState *resolver.State
}
@ -52,7 +57,7 @@ func (r *Resolver) InitialState(s resolver.State) {
// Build returns itself for Resolver, because it's both a builder and a resolver.
func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
r.cc = cc
r.CC = cc
if r.bootstrapState != nil {
r.UpdateState(*r.bootstrapState)
}
@ -65,14 +70,16 @@ func (r *Resolver) Scheme() string {
}
// ResolveNow is a noop for Resolver.
func (*Resolver) ResolveNow(o resolver.ResolveNowOption) {}
func (r *Resolver) ResolveNow(o resolver.ResolveNowOption) {
r.ResolveNowCallback(o)
}
// Close is a noop for Resolver.
func (*Resolver) Close() {}
// UpdateState calls cc.UpdateState.
// UpdateState calls CC.UpdateState.
func (r *Resolver) UpdateState(s resolver.State) {
r.cc.UpdateState(s)
r.CC.UpdateState(s)
}
// GenerateAndRegisterManualResolver generates a random scheme and a Resolver

Просмотреть файл

@ -104,12 +104,13 @@ type BuildOption struct {
// State contains the current Resolver state relevant to the ClientConn.
type State struct {
Addresses []Address // Resolved addresses for the target
// ServiceConfig is the parsed service config; obtained from
// serviceconfig.Parse.
ServiceConfig serviceconfig.Config
// Addresses is the latest set of resolved addresses for the target.
Addresses []Address
// TODO: add Err error
// ServiceConfig contains the result from parsing the latest service
// config. If it is nil, it indicates no service config is present or the
// resolver does not provide service configs.
ServiceConfig *serviceconfig.ParseResult
}
// ClientConn contains the callbacks for resolver to notify any updates
@ -122,6 +123,10 @@ type State struct {
type ClientConn interface {
// UpdateState updates the state of the ClientConn appropriately.
UpdateState(State)
// ReportError notifies the ClientConn that the Resolver encountered an
// error. The ClientConn will notify the load balancer and begin calling
// ResolveNow on the Resolver with exponential backoff.
ReportError(error)
// NewAddress is called by resolver to notify ClientConn a new list
// of resolved addresses.
// The address list should be the complete list of resolved addresses.
@ -133,6 +138,9 @@ type ClientConn interface {
//
// Deprecated: Use UpdateState instead.
NewServiceConfig(serviceConfig string)
// ParseServiceConfig parses the provided service config and returns an
// object that provides the parsed config.
ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult
}
// Target represents a target for gRPC, as specified in:

Просмотреть файл

@ -21,11 +21,18 @@ package grpc
import (
"fmt"
"strings"
"sync/atomic"
"sync"
"time"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
internalbackoff "google.golang.org/grpc/internal/backoff"
)
// ccResolverWrapper is a wrapper on top of cc for resolvers.
@ -33,8 +40,11 @@ import (
type ccResolverWrapper struct {
cc *ClientConn
resolver resolver.Resolver
done uint32 // accessed atomically; set to 1 when closed.
done *grpcsync.Event
curState resolver.State
mu sync.Mutex // protects polling
polling chan struct{}
}
// split2 returns the values from strings.SplitN(s, sep, 2).
@ -77,7 +87,10 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme)
}
ccr := &ccResolverWrapper{cc: cc}
ccr := &ccResolverWrapper{
cc: cc,
done: grpcsync.NewEvent(),
}
var err error
ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig})
@ -88,33 +101,94 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
}
func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) {
ccr.resolver.ResolveNow(o)
ccr.mu.Lock()
if !ccr.done.HasFired() {
ccr.resolver.ResolveNow(o)
}
ccr.mu.Unlock()
}
func (ccr *ccResolverWrapper) close() {
ccr.mu.Lock()
ccr.resolver.Close()
atomic.StoreUint32(&ccr.done, 1)
ccr.done.Fire()
ccr.mu.Unlock()
}
func (ccr *ccResolverWrapper) isDone() bool {
return atomic.LoadUint32(&ccr.done) == 1
var resolverBackoff = internalbackoff.Exponential{Config: backoff.Config{MaxDelay: 2 * time.Minute}}.Backoff
// poll begins or ends asynchronous polling of the resolver based on whether
// err is ErrBadResolverState.
func (ccr *ccResolverWrapper) poll(err error) {
ccr.mu.Lock()
defer ccr.mu.Unlock()
if err != balancer.ErrBadResolverState {
// stop polling
if ccr.polling != nil {
close(ccr.polling)
ccr.polling = nil
}
return
}
if ccr.polling != nil {
// already polling
return
}
p := make(chan struct{})
ccr.polling = p
go func() {
for i := 0; ; i++ {
ccr.resolveNow(resolver.ResolveNowOption{})
t := time.NewTimer(resolverBackoff(i))
select {
case <-p:
t.Stop()
return
case <-ccr.done.Done():
// Resolver has been closed.
t.Stop()
return
case <-t.C:
select {
case <-p:
return
default:
}
// Timer expired; re-resolve.
}
}
}()
}
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
if ccr.isDone() {
if ccr.done.HasFired() {
return
}
grpclog.Infof("ccResolverWrapper: sending update to cc: %v", s)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(s)
}
ccr.cc.updateResolverState(s)
ccr.curState = s
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
}
func (ccr *ccResolverWrapper) ReportError(err error) {
if ccr.done.HasFired() {
return
}
grpclog.Warningf("ccResolverWrapper: reporting error to cc: %v", err)
if channelz.IsOn() {
channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Resolver reported error: %v", err),
Severity: channelz.CtWarning,
})
}
ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err))
}
// NewAddress is called by the resolver implementation to send addresses to gRPC.
func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
if ccr.isDone() {
if ccr.done.HasFired() {
return
}
grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
@ -122,31 +196,49 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
}
ccr.curState.Addresses = addrs
ccr.cc.updateResolverState(ccr.curState)
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
}
// NewServiceConfig is called by the resolver implementation to send service
// configs to gRPC.
func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
if ccr.isDone() {
if ccr.done.HasFired() {
return
}
grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
c, err := parseServiceConfig(sc)
if err != nil {
scpr := parseServiceConfig(sc)
if scpr.Err != nil {
grpclog.Warningf("ccResolverWrapper: error parsing service config: %v", scpr.Err)
if channelz.IsOn() {
channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Error parsing service config: %v", scpr.Err),
Severity: channelz.CtWarning,
})
}
ccr.poll(balancer.ErrBadResolverState)
return
}
if channelz.IsOn() {
ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: c})
ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
}
ccr.curState.ServiceConfig = c
ccr.cc.updateResolverState(ccr.curState)
ccr.curState.ServiceConfig = scpr
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
}
func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
return parseServiceConfig(scJSON)
}
func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
var updates []string
oldSC, oldOK := ccr.curState.ServiceConfig.(*ServiceConfig)
newSC, newOK := s.ServiceConfig.(*ServiceConfig)
var oldSC, newSC *ServiceConfig
var oldOK, newOK bool
if ccr.curState.ServiceConfig != nil {
oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
}
if s.ServiceConfig != nil {
newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
}
if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
updates = append(updates, "service config updated")
}

Просмотреть файл

@ -19,12 +19,18 @@
package grpc
import (
"context"
"errors"
"fmt"
"net"
"strings"
"testing"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"
)
func (s) TestParseTarget(t *testing.T) {
@ -114,3 +120,106 @@ func (s) TestDialParseTargetUnknownScheme(t *testing.T) {
}
}
}
func testResolverErrorPolling(t *testing.T, badUpdate func(*manual.Resolver), goodUpdate func(*manual.Resolver), dopts ...DialOption) {
defer func(o func(int) time.Duration) { resolverBackoff = o }(resolverBackoff)
boIter := make(chan int)
resolverBackoff = func(v int) time.Duration {
boIter <- v
return 0
}
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
rn := make(chan struct{})
defer func() { close(rn) }()
r.ResolveNowCallback = func(resolver.ResolveNowOption) { rn <- struct{}{} }
cc, err := Dial(r.Scheme()+":///test.server", append([]DialOption{WithInsecure()}, dopts...)...)
if err != nil {
t.Fatalf("Dial(_, _) = _, %v; want _, nil", err)
}
defer cc.Close()
badUpdate(r)
panicAfter := time.AfterFunc(5*time.Second, func() { panic("timed out polling resolver") })
defer panicAfter.Stop()
// Ensure ResolveNow is called, then Backoff with the right parameter, several times
for i := 0; i < 7; i++ {
<-rn
if v := <-boIter; v != i {
t.Errorf("Backoff call %v uses value %v", i, v)
}
}
// UpdateState will block if ResolveNow is being called (which blocks on
// rn), so call it in a goroutine.
goodUpdate(r)
// Wait awhile to ensure ResolveNow and Backoff stop being called when the
// state is OK (i.e. polling was cancelled).
for {
t := time.NewTimer(50 * time.Millisecond)
select {
case <-rn:
// ClientConn is still calling ResolveNow
<-boIter
time.Sleep(5 * time.Millisecond)
continue
case <-t.C:
// ClientConn stopped calling ResolveNow; success
}
break
}
}
// TestResolverErrorPolling injects resolver errors and verifies ResolveNow is
// called with the appropriate backoff strategy being consulted between
// ResolveNow calls.
func (s) TestResolverErrorPolling(t *testing.T) {
testResolverErrorPolling(t, func(r *manual.Resolver) {
r.CC.ReportError(errors.New("res err"))
}, func(r *manual.Resolver) {
// UpdateState will block if ResolveNow is being called (which blocks on
// rn), so call it in a goroutine.
go r.CC.UpdateState(resolver.State{})
})
}
// TestServiceConfigErrorPolling injects a service config error and verifies
// ResolveNow is called with the appropriate backoff strategy being consulted
// between ResolveNow calls.
func (s) TestServiceConfigErrorPolling(t *testing.T) {
testResolverErrorPolling(t, func(r *manual.Resolver) {
badsc := r.CC.ParseServiceConfig("bad config")
r.UpdateState(resolver.State{ServiceConfig: badsc})
}, func(r *manual.Resolver) {
// UpdateState will block if ResolveNow is being called (which blocks on
// rn), so call it in a goroutine.
go r.CC.UpdateState(resolver.State{})
})
}
func (s) TestServiceConfigErrorRPC(t *testing.T) {
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure())
if err != nil {
t.Fatalf("Dial(_, _) = _, %v; want _, nil", err)
}
defer cc.Close()
badsc := r.CC.ParseServiceConfig("bad config")
r.UpdateState(resolver.State{ServiceConfig: badsc})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var dummy int
const wantMsg = "error parsing service config"
const wantCode = codes.Unavailable
if err := cc.Invoke(ctx, "/foo/bar", &dummy, &dummy); status.Code(err) != wantCode || !strings.Contains(status.Convert(err).Message(), wantMsg) {
t.Fatalf("cc.Invoke(_, _, _, _) = %v; want status.Code()==%v, status.Message() contains %q", err, wantCode, wantMsg)
}
}

Просмотреть файл

@ -261,20 +261,17 @@ type jsonSC struct {
}
func init() {
internal.ParseServiceConfig = func(sc string) (interface{}, error) {
return parseServiceConfig(sc)
}
internal.ParseServiceConfigForTesting = parseServiceConfig
}
func parseServiceConfig(js string) (*ServiceConfig, error) {
func parseServiceConfig(js string) *serviceconfig.ParseResult {
if len(js) == 0 {
return nil, fmt.Errorf("no JSON service config provided")
return &serviceconfig.ParseResult{Err: fmt.Errorf("no JSON service config provided")}
}
var rsc jsonSC
err := json.Unmarshal([]byte(js), &rsc)
if err != nil {
grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
return nil, err
return &serviceconfig.ParseResult{Err: err}
}
sc := ServiceConfig{
LB: rsc.LoadBalancingPolicy,
@ -288,7 +285,7 @@ func parseServiceConfig(js string) (*ServiceConfig, error) {
if len(lbcfg) != 1 {
err := fmt.Errorf("invalid loadBalancingConfig: entry %v does not contain exactly 1 policy/config pair: %q", i, lbcfg)
grpclog.Warningf(err.Error())
return nil, err
return &serviceconfig.ParseResult{Err: err}
}
var name string
var jsonCfg json.RawMessage
@ -303,7 +300,7 @@ func parseServiceConfig(js string) (*ServiceConfig, error) {
var err error
sc.lbConfig.cfg, err = parser.ParseConfig(jsonCfg)
if err != nil {
return nil, fmt.Errorf("error parsing loadBalancingConfig for policy %q: %v", name, err)
return &serviceconfig.ParseResult{Err: fmt.Errorf("error parsing loadBalancingConfig for policy %q: %v", name, err)}
}
} else if string(jsonCfg) != "{}" {
grpclog.Warningf("non-empty balancer configuration %q, but balancer does not implement ParseConfig", string(jsonCfg))
@ -316,12 +313,12 @@ func parseServiceConfig(js string) (*ServiceConfig, error) {
// case.
err := fmt.Errorf("invalid loadBalancingConfig: no supported policies found")
grpclog.Warningf(err.Error())
return nil, err
return &serviceconfig.ParseResult{Err: err}
}
}
if rsc.MethodConfig == nil {
return &sc, nil
return &serviceconfig.ParseResult{Config: &sc}
}
for _, m := range *rsc.MethodConfig {
if m.Name == nil {
@ -330,7 +327,7 @@ func parseServiceConfig(js string) (*ServiceConfig, error) {
d, err := parseDuration(m.Timeout)
if err != nil {
grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
return nil, err
return &serviceconfig.ParseResult{Err: err}
}
mc := MethodConfig{
@ -339,7 +336,7 @@ func parseServiceConfig(js string) (*ServiceConfig, error) {
}
if mc.retryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil {
grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
return nil, err
return &serviceconfig.ParseResult{Err: err}
}
if m.MaxRequestMessageBytes != nil {
if *m.MaxRequestMessageBytes > int64(maxInt) {
@ -364,13 +361,13 @@ func parseServiceConfig(js string) (*ServiceConfig, error) {
if sc.retryThrottling != nil {
if mt := sc.retryThrottling.MaxTokens; mt <= 0 || mt > 1000 {
return nil, fmt.Errorf("invalid retry throttling config: maxTokens (%v) out of range (0, 1000]", mt)
return &serviceconfig.ParseResult{Err: fmt.Errorf("invalid retry throttling config: maxTokens (%v) out of range (0, 1000]", mt)}
}
if tr := sc.retryThrottling.TokenRatio; tr <= 0 {
return nil, fmt.Errorf("invalid retry throttling config: tokenRatio (%v) may not be negative", tr)
return &serviceconfig.ParseResult{Err: fmt.Errorf("invalid retry throttling config: tokenRatio (%v) may not be negative", tr)}
}
}
return &sc, nil
return &serviceconfig.ParseResult{Config: &sc}
}
func convertRetryPolicy(jrp *jsonRetryPolicy) (p *retryPolicy, err error) {

Просмотреть файл

@ -37,13 +37,16 @@ type parseTestCase struct {
}
func runParseTests(t *testing.T, testCases []parseTestCase) {
t.Helper()
for _, c := range testCases {
sc, err := parseServiceConfig(c.scjs)
scpr := parseServiceConfig(c.scjs)
var sc *ServiceConfig
sc, _ = scpr.Config.(*ServiceConfig)
if !c.wantErr {
c.wantSC.rawJSONString = c.scjs
}
if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) {
t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr)
if c.wantErr != (scpr.Err != nil) || !reflect.DeepEqual(sc, c.wantSC) {
t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, scpr.Err, c.wantSC, c.wantErr)
}
}
}

Просмотреть файл

@ -22,27 +22,20 @@
// This package is EXPERIMENTAL.
package serviceconfig
import (
"google.golang.org/grpc/internal"
)
// Config represents an opaque data structure holding a service config.
type Config interface {
isConfig()
isServiceConfig()
}
// LoadBalancingConfig represents an opaque data structure holding a load
// balancer config.
// balancing config.
type LoadBalancingConfig interface {
isLoadBalancingConfig()
}
// Parse parses the JSON service config provided into an internal form or
// returns an error if the config is invalid.
func Parse(ServiceConfigJSON string) (Config, error) {
c, err := internal.ParseServiceConfig(ServiceConfigJSON)
if err != nil {
return nil, err
}
return c.(Config), err
// ParseResult contains a service config or an error. Exactly one must be
// non-nil.
type ParseResult struct {
Config Config
Err error
}

Просмотреть файл

@ -230,7 +230,7 @@ func (s) TestCZNestedChannelRegistrationAndDeletion(t *testing.T) {
t.Fatal(err)
}
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`)})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
// wait for the shutdown of grpclb balancer
if err := verifyResultWithDelay(func() (bool, error) {
@ -1423,7 +1423,7 @@ func (s) TestCZChannelTraceCreationDeletion(t *testing.T) {
t.Fatal(err)
}
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`)})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
// wait for the shutdown of grpclb balancer
if err := verifyResultWithDelay(func() (bool, error) {
@ -1567,7 +1567,7 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
}); err != nil {
t.Fatal(err)
}
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`)})
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
if err := verifyResultWithDelay(func() (bool, error) {
cm := channelz.GetChannel(cid)
@ -1584,7 +1584,7 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
t.Fatal(err)
}
newSC := parseCfg(`{
newSC := parseCfg(r, `{
"methodConfig": [
{
"name": [
@ -1882,7 +1882,7 @@ func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) {
t.Fatal(err)
}
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`)})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
// wait for the shutdown of grpclb balancer
if err := verifyResultWithDelay(func() (bool, error) {

Просмотреть файл

@ -1461,7 +1461,7 @@ func (s) TestGetMethodConfig(t *testing.T) {
addrs := []resolver.Address{{Addr: te.srvAddr}}
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"methodConfig": [
{
"name": [
@ -1500,7 +1500,7 @@ func (s) TestGetMethodConfig(t *testing.T) {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parseCfg(`{
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parseCfg(r, `{
"methodConfig": [
{
"name": [
@ -1548,7 +1548,7 @@ func (s) TestServiceConfigWaitForReady(t *testing.T) {
addrs := []resolver.Address{{Addr: te.srvAddr}}
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"methodConfig": [
{
"name": [
@ -1590,7 +1590,7 @@ func (s) TestServiceConfigWaitForReady(t *testing.T) {
// Case2:Client API set failfast to be false, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds.
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"methodConfig": [
{
"name": [
@ -1637,7 +1637,7 @@ func (s) TestServiceConfigTimeout(t *testing.T) {
addrs := []resolver.Address{{Addr: te.srvAddr}}
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"methodConfig": [
{
"name": [
@ -1684,7 +1684,7 @@ func (s) TestServiceConfigTimeout(t *testing.T) {
// Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"methodConfig": [
{
"name": [
@ -1747,7 +1747,17 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
t.Fatal(err)
}
sc := parseCfg(`{
// Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
te1 := testServiceConfigSetup(t, e)
defer te1.tearDown()
te1.resolverScheme = r.Scheme()
te1.nonBlockingDial = true
te1.startServer(&testServer{security: e.security})
cc1 := te1.clientConn()
addrs := []resolver.Address{{Addr: te1.srvAddr}}
sc := parseCfg(r, `{
"methodConfig": [
{
"name": [
@ -1765,17 +1775,6 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
}
]
}`)
// Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
te1 := testServiceConfigSetup(t, e)
defer te1.tearDown()
te1.resolverScheme = r.Scheme()
te1.nonBlockingDial = true
te1.startServer(&testServer{security: e.security})
cc1 := te1.clientConn()
addrs := []resolver.Address{{Addr: te1.srvAddr}}
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc})
tc := testpb.NewTestServiceClient(cc1)
@ -2000,7 +1999,7 @@ func (s) TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) {
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: te.srvAddr}},
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"methodConfig": [
{
"name": [
@ -5312,7 +5311,7 @@ func (ss *stubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption)
}
func (ss *stubServer) newServiceConfig(sc string) {
ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.addr}}, ServiceConfig: parseCfg(sc)})
ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.addr}}, ServiceConfig: parseCfg(ss.r, sc)})
}
func (ss *stubServer) waitForReady(cc *grpc.ClientConn) error {
@ -7230,7 +7229,7 @@ func (s) TestRPCWaitsForResolver(t *testing.T) {
time.Sleep(time.Second)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: te.srvAddr}},
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"methodConfig": [
{
"name": [
@ -7477,12 +7476,12 @@ func doHTTPHeaderTest(t *testing.T, errCode codes.Code, headerFields ...[]string
}
}
func parseCfg(s string) serviceconfig.Config {
c, err := serviceconfig.Parse(s)
if err != nil {
panic(fmt.Sprintf("Error parsing config %q: %v", s, err))
func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
g := r.CC.ParseServiceConfig(s)
if g.Err != nil {
panic(fmt.Sprintf("Error parsing config %q: %v", s, g.Err))
}
return c
return g
}
type methodTestCreds struct{}

Просмотреть файл

@ -194,7 +194,7 @@ func (s) TestHealthCheckWatchStateChange(t *testing.T) {
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
@ -262,7 +262,7 @@ func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) {
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
@ -306,7 +306,7 @@ func (s) TestHealthCheckWithGoAway(t *testing.T) {
tc := testpb.NewTestServiceClient(cc)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
@ -398,7 +398,7 @@ func (s) TestHealthCheckWithConnClose(t *testing.T) {
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
@ -457,7 +457,7 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) {
defer deferFunc()
tc := testpb.NewTestServiceClient(cc)
sc := parseCfg(`{
sc := parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
@ -552,7 +552,7 @@ func (s) TestHealthCheckWithClientConnClose(t *testing.T) {
tc := testpb.NewTestServiceClient(cc)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
@ -630,7 +630,7 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *tes
// The serviceName "delay" is specially handled at server side, where response will not be sent
// back to client immediately upon receiving the request (client should receive no response until
// test ends).
sc := parseCfg(`{
sc := parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "delay"
}
@ -708,7 +708,7 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) {
// test ends).
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "delay"
}
@ -756,7 +756,7 @@ func testHealthCheckDisableWithDialOption(t *testing.T, addr string) {
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}},
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
@ -795,7 +795,7 @@ func testHealthCheckDisableWithBalancer(t *testing.T, addr string) {
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}},
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "foo"
}
@ -888,7 +888,7 @@ func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) {
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "channelzSuccess"
}
@ -944,7 +944,7 @@ func (s) TestHealthCheckChannelzCountingCallFailure(t *testing.T) {
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseCfg(`{
ServiceConfig: parseCfg(r, `{
"healthCheckConfig": {
"serviceName": "channelzFailure"
}

Просмотреть файл

@ -112,6 +112,8 @@ type edsBalancerInterface interface {
Close()
}
var _ balancer.V2Balancer = (*xdsBalancer)(nil) // Assert that we implement V2Balancer
// xdsBalancer manages xdsClient and the actual balancer that does load balancing (either edsBalancer,
// or fallback LB).
type xdsBalancer struct {
@ -400,11 +402,16 @@ func (x *xdsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Sub
}
}
func (x *xdsBalancer) UpdateClientConnState(s balancer.ClientConnState) {
func (x *xdsBalancer) ResolverError(error) {
// Ignore for now
}
func (x *xdsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
select {
case x.grpcUpdate <- &s:
case <-x.ctx.Done():
}
return nil
}
type cdsResp struct {

Просмотреть файл

@ -25,12 +25,8 @@ package resolver
import (
"fmt"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
const (
@ -54,11 +50,6 @@ const (
xdsScheme = "xds-experimental"
)
var (
parseOnce sync.Once
parsedSC serviceconfig.Config
)
// NewBuilder creates a new implementation of the resolver.Builder interface
// for the xDS resolver.
func NewBuilder() resolver.Builder {
@ -69,23 +60,17 @@ type xdsBuilder struct{}
// Build helps implement the resolver.Builder interface.
func (b *xdsBuilder) Build(t resolver.Target, cc resolver.ClientConn, o resolver.BuildOption) (resolver.Resolver, error) {
parseOnce.Do(func() {
// The xds balancer must have been registered at this point for the service
// config to be parsed properly.
psc, err := internal.ParseServiceConfig(jsonSC)
if err != nil {
panic(fmt.Sprintf("service config %s parsing failed: %v", jsonSC, err))
}
// The xds balancer must have been registered at this point for the service
// config to be parsed properly.
scpr := cc.ParseServiceConfig(jsonSC)
var ok bool
if parsedSC, ok = psc.(*grpc.ServiceConfig); !ok {
panic(fmt.Sprintf("service config type is [%T], want [grpc.ServiceConfig]", psc))
}
})
if scpr.Err != nil {
panic(fmt.Sprintf("error parsing service config %q: %v", jsonSC, scpr.Err))
}
// We return a resolver which bacically does nothing. The hard-coded service
// config returned here picks the xds balancer.
cc.UpdateState(resolver.State{ServiceConfig: parsedSC})
cc.UpdateState(resolver.State{ServiceConfig: scpr})
return &xdsResolver{}, nil
}

Просмотреть файл

@ -28,6 +28,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
xdsinternal "google.golang.org/grpc/xds/internal"
@ -49,7 +50,7 @@ func init() {
}
]
}`)},
errCh: make(chan error),
errCh: make(chan error, 1),
}
balancer.Register(fbb)
}
@ -67,8 +68,13 @@ func (t *testClientConn) UpdateState(s resolver.State) {
close(t.done)
}
func (*testClientConn) NewAddress(addresses []resolver.Address) { panic("unimplemented") }
func (*testClientConn) NewServiceConfig(serviceConfig string) { panic("unimplemented") }
func (*testClientConn) ParseServiceConfig(jsonSC string) *serviceconfig.ParseResult {
return internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(jsonSC)
}
func (*testClientConn) NewAddress([]resolver.Address) { panic("unimplemented") }
func (*testClientConn) NewServiceConfig(string) { panic("unimplemented") }
func (*testClientConn) ReportError(error) { panic("unimplemented") }
// TestXDSRsolverSchemeAndAddresses creates a new xds resolver, verifies that
// it returns an empty address list and the appropriate xds-experimental
@ -93,6 +99,8 @@ func TestXDSRsolverSchemeAndAddresses(t *testing.T) {
}
}
var _ balancer.V2Balancer = (*fakeBalancer)(nil)
// fakeBalancer is used to verify that the xds_resolver returns the expected
// serice config.
type fakeBalancer struct {
@ -106,31 +114,35 @@ func (*fakeBalancer) HandleSubConnStateChange(_ balancer.SubConn, _ connectivity
func (*fakeBalancer) HandleResolvedAddrs(_ []resolver.Address, _ error) {
panic("unimplemented")
}
func (*fakeBalancer) ResolverError(error) {
panic("unimplemented")
}
// UpdateClientConnState verifies that the received LBConfig matches the
// provided one, and if not, sends an error on the provided channel.
func (f *fakeBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
func (f *fakeBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
gotLBConfig, ok := ccs.BalancerConfig.(*wrappedLBConfig)
if !ok {
f.errCh <- fmt.Errorf("in fakeBalancer got lbConfig of type %T, want %T", ccs.BalancerConfig, &wrappedLBConfig{})
return
return balancer.ErrBadResolverState
}
var gotCfg, wantCfg xdsinternal.LBConfig
if err := wantCfg.UnmarshalJSON(f.wantLBConfig.lbCfg); err != nil {
f.errCh <- fmt.Errorf("unable to unmarshal balancer config %s into xds config", string(f.wantLBConfig.lbCfg))
return
return balancer.ErrBadResolverState
}
if err := gotCfg.UnmarshalJSON(gotLBConfig.lbCfg); err != nil {
f.errCh <- fmt.Errorf("unable to unmarshal balancer config %s into xds config", string(gotLBConfig.lbCfg))
return
return balancer.ErrBadResolverState
}
if !reflect.DeepEqual(gotCfg, wantCfg) {
f.errCh <- fmt.Errorf("in fakeBalancer got lbConfig %v, want %v", gotCfg, wantCfg)
return
return balancer.ErrBadResolverState
}
f.errCh <- nil
return nil
}
func (*fakeBalancer) UpdateSubConnState(_ balancer.SubConn, _ balancer.SubConnState) {