bar: add ability to update resolver state atomically and pass directly to the balancer (#2693)

This commit is contained in:
Doug Fawley 2019-03-22 10:48:55 -07:00 коммит произвёл GitHub
Родитель bcfa7b30ac
Коммит 3910b873d3
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
26 изменённых файлов: 420 добавлений и 394 удалений

Просмотреть файл

@ -248,18 +248,46 @@ type Balancer interface {
// that back to gRPC.
// Balancer should also generate and update Pickers when its internal state has
// been changed by the new state.
//
// Deprecated: if V2Balancer is implemented by the Balancer,
// UpdateSubConnState will be called instead.
HandleSubConnStateChange(sc SubConn, state connectivity.State)
// HandleResolvedAddrs is called by gRPC to send updated resolved addresses to
// balancers.
// Balancer can create new SubConn or remove SubConn with the addresses.
// An empty address slice and a non-nil error will be passed if the resolver returns
// non-nil error to gRPC.
//
// Deprecated: if V2Balancer is implemented by the Balancer,
// UpdateResolverState will be called instead.
HandleResolvedAddrs([]resolver.Address, error)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}
// SubConnState describes the state of a SubConn.
type SubConnState struct {
ConnectivityState connectivity.State
// TODO: add last connection error
}
// V2Balancer is defined for documentation purposes. If a Balancer also
// implements V2Balancer, its UpdateResolverState method will be called instead
// of HandleResolvedAddrs and its UpdateSubConnState will be called instead of
// HandleSubConnStateChange.
type V2Balancer interface {
// UpdateResolverState is called by gRPC when the state of the resolver
// changes.
UpdateResolverState(resolver.State)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}
// ConnectivityStateEvaluator takes the connectivity states of multiple SubConns
// and returns one aggregated connectivity state.
//

Просмотреть файл

@ -67,14 +67,16 @@ type baseBalancer struct {
}
func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
grpclog.Infof("base.baseBalancer: HandleResolvedAddrs called with error %v", err)
return
}
grpclog.Infoln("base.baseBalancer: got new resolved addresses: ", addrs)
panic("not implemented")
}
func (b *baseBalancer) UpdateResolverState(s resolver.State) {
// TODO: handle s.Err (log if not nil) once implemented.
// TODO: handle s.ServiceConfig?
grpclog.Infoln("base.baseBalancer: got new resolver state: ", s)
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{})
for _, a := range addrs {
for _, a := range s.Addresses {
addrsSet[a] = struct{}{}
if _, ok := b.subConns[a]; !ok {
// a is a new address (not existing in b.subConns).
@ -120,6 +122,11 @@ func (b *baseBalancer) regeneratePicker() {
}
func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
panic("not implemented")
}
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
oldS, ok := b.scStates[sc]
if !ok {

Просмотреть файл

@ -401,7 +401,7 @@ func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
// cc to remote balancers uses lb.manualResolver. Send the updated remote
// balancer addresses to it through manualResolver.
lb.manualResolver.NewAddress(remoteBalancerAddrs)
lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs})
lb.mu.Lock()
lb.resolvedBackendAddrs = backendAddrs

Просмотреть файл

@ -391,11 +391,11 @@ func TestGRPCLB(t *testing.T) {
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)
r.NewAddress([]resolver.Address{{
r.UpdateState(resolver.State{Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}})
}}})
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
@ -442,11 +442,11 @@ func TestGRPCLBWeighted(t *testing.T) {
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)
r.NewAddress([]resolver.Address{{
r.UpdateState(resolver.State{Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}})
}}})
sequences := []string{"00101", "00011"}
for _, seq := range sequences {
@ -512,11 +512,11 @@ func TestDropRequest(t *testing.T) {
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)
r.NewAddress([]resolver.Address{{
r.UpdateState(resolver.State{Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}})
}}})
// Wait for the 1st, non-fail-fast RPC to succeed. This ensures both server
// connections are made, because the first one has Drop set to true.
@ -630,7 +630,7 @@ func TestBalancerDisconnects(t *testing.T) {
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)
r.NewAddress([]resolver.Address{{
r.UpdateState(resolver.State{Addresses: []resolver.Address{{
Addr: tests[0].lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
@ -638,7 +638,7 @@ func TestBalancerDisconnects(t *testing.T) {
Addr: tests[1].lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}})
}}})
var p peer.Peer
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
@ -711,7 +711,7 @@ func TestFallback(t *testing.T) {
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)
r.NewAddress([]resolver.Address{{
r.UpdateState(resolver.State{Addresses: []resolver.Address{{
Addr: "",
Type: resolver.GRPCLB,
ServerName: lbServerName,
@ -719,7 +719,7 @@ func TestFallback(t *testing.T) {
Addr: beLis.Addr().String(),
Type: resolver.Backend,
ServerName: beServerName,
}})
}}})
var p peer.Peer
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
@ -729,7 +729,7 @@ func TestFallback(t *testing.T) {
t.Fatalf("got peer: %v, want peer: %v", p.Addr, beLis.Addr())
}
r.NewAddress([]resolver.Address{{
r.UpdateState(resolver.State{Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
@ -737,7 +737,7 @@ func TestFallback(t *testing.T) {
Addr: beLis.Addr().String(),
Type: resolver.Backend,
ServerName: beServerName,
}})
}}})
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
@ -798,11 +798,11 @@ func TestGRPCLBPickFirst(t *testing.T) {
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)
r.NewAddress([]resolver.Address{{
r.UpdateState(resolver.State{Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}})
}}})
var p peer.Peer
@ -909,11 +909,11 @@ func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rp
}
defer cc.Close()
r.NewAddress([]resolver.Address{{
r.UpdateState(resolver.State{Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}})
}}})
runRPCs(cc)
time.Sleep(1 * time.Second)

Просмотреть файл

@ -84,14 +84,9 @@ func (r *lbManualResolver) ResolveNow(o resolver.ResolveNowOption) {
// Close is a noop for Resolver.
func (*lbManualResolver) Close() {}
// NewAddress calls cc.NewAddress.
func (r *lbManualResolver) NewAddress(addrs []resolver.Address) {
r.ccr.NewAddress(addrs)
}
// NewServiceConfig calls cc.NewServiceConfig.
func (r *lbManualResolver) NewServiceConfig(sc string) {
r.ccr.NewServiceConfig(sc)
// UpdateState calls cc.UpdateState.
func (r *lbManualResolver) UpdateState(s resolver.State) {
r.ccr.UpdateState(s)
}
const subConnCacheTime = time.Second * 10

Просмотреть файл

@ -112,7 +112,7 @@ func TestOneBackend(t *testing.T) {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}})
// The second RPC should succeed.
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
@ -149,7 +149,7 @@ func TestBackendsRoundRobin(t *testing.T) {
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
}
r.NewAddress(resolvedAddrs)
r.UpdateState(resolver.State{Addresses: resolvedAddrs})
var p peer.Peer
// Make sure connections to all servers are up.
for si := 0; si < backendCount; si++ {
@ -203,13 +203,13 @@ func TestAddressesRemoved(t *testing.T) {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}})
// The second RPC should succeed.
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}
r.NewAddress([]resolver.Address{})
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
for i := 0; i < 1000; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
@ -279,7 +279,7 @@ func TestNewAddressWhileBlocking(t *testing.T) {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}})
// The second RPC should succeed.
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
@ -287,7 +287,7 @@ func TestNewAddressWhileBlocking(t *testing.T) {
t.Fatalf("EmptyCall() = _, %v, want _, nil", err)
}
r.NewAddress([]resolver.Address{})
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
@ -299,7 +299,7 @@ func TestNewAddressWhileBlocking(t *testing.T) {
}()
}
time.Sleep(50 * time.Millisecond)
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}})
wg.Wait()
}
@ -333,7 +333,7 @@ func TestOneServerDown(t *testing.T) {
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
}
r.NewAddress(resolvedAddrs)
r.UpdateState(resolver.State{Addresses: resolvedAddrs})
var p peer.Peer
// Make sure connections to all servers are up.
for si := 0; si < backendCount; si++ {
@ -431,7 +431,7 @@ func TestAllServersDown(t *testing.T) {
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
}
r.NewAddress(resolvedAddrs)
r.UpdateState(resolver.State{Addresses: resolvedAddrs})
var p peer.Peer
// Make sure connections to all servers are up.
for si := 0; si < backendCount; si++ {

Просмотреть файл

@ -172,7 +172,11 @@ func (bg *balancerGroup) handleSubConnStateChange(sc balancer.SubConn, state con
grpclog.Infof("balancer group: balancer not found for sc state change")
return
}
b.HandleSubConnStateChange(sc, state)
if ub, ok := b.(balancer.V2Balancer); ok {
ub.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: state})
} else {
b.HandleSubConnStateChange(sc, state)
}
}
// Address change: forward to balancer.
@ -184,7 +188,11 @@ func (bg *balancerGroup) handleResolvedAddrs(id string, addrs []resolver.Address
grpclog.Infof("balancer group: balancer with id %q not found", id)
return
}
b.HandleResolvedAddrs(addrs, nil)
if ub, ok := b.(balancer.V2Balancer); ok {
ub.UpdateResolverState(resolver.State{Addresses: addrs})
} else {
b.HandleResolvedAddrs(addrs, nil)
}
}
// TODO: handleServiceConfig()

Просмотреть файл

@ -82,20 +82,13 @@ func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
return b.c
}
// resolverUpdate contains the new resolved addresses or error if there's
// any.
type resolverUpdate struct {
addrs []resolver.Address
err error
}
// ccBalancerWrapper is a wrapper on top of cc for balancers.
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
cc *ClientConn
balancer balancer.Balancer
stateChangeQueue *scStateUpdateBuffer
resolverUpdateCh chan *resolverUpdate
resolverUpdateCh chan *resolver.State
done chan struct{}
mu sync.Mutex
@ -106,7 +99,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
ccb := &ccBalancerWrapper{
cc: cc,
stateChangeQueue: newSCStateUpdateBuffer(),
resolverUpdateCh: make(chan *resolverUpdate, 1),
resolverUpdateCh: make(chan *resolver.State, 1),
done: make(chan struct{}),
subConns: make(map[*acBalancerWrapper]struct{}),
}
@ -128,15 +121,23 @@ func (ccb *ccBalancerWrapper) watcher() {
return
default:
}
ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
case t := <-ccb.resolverUpdateCh:
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ub.UpdateSubConnState(t.sc, balancer.SubConnState{ConnectivityState: t.state})
} else {
ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
}
case s := <-ccb.resolverUpdateCh:
select {
case <-ccb.done:
ccb.balancer.Close()
return
default:
}
ccb.balancer.HandleResolvedAddrs(t.addrs, t.err)
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ub.UpdateResolverState(*s)
} else {
ccb.balancer.HandleResolvedAddrs(s.Addresses, nil)
}
case <-ccb.done:
}
@ -177,37 +178,23 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
})
}
func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err error) {
func (ccb *ccBalancerWrapper) updateResolverState(s resolver.State) {
if ccb.cc.curBalancerName != grpclbName {
var containsGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
containsGRPCLB = true
break
// Filter any grpclb addresses since we don't have the grpclb balancer.
for i := 0; i < len(s.Addresses); {
if s.Addresses[i].Type == resolver.GRPCLB {
copy(s.Addresses[i:], s.Addresses[i+1:])
s.Addresses = s.Addresses[:len(s.Addresses)-1]
continue
}
}
if containsGRPCLB {
// The current balancer is not grpclb, but addresses contain grpclb
// address. This means we failed to switch to grpclb, most likely
// because grpclb is not registered. Filter out all grpclb addresses
// from addrs before sending to balancer.
tempAddrs := make([]resolver.Address, 0, len(addrs))
for _, a := range addrs {
if a.Type != resolver.GRPCLB {
tempAddrs = append(tempAddrs, a)
}
}
addrs = tempAddrs
i++
}
}
select {
case <-ccb.resolverUpdateCh:
default:
}
ccb.resolverUpdateCh <- &resolverUpdate{
addrs: addrs,
err: err,
}
ccb.resolverUpdateCh <- &s
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {

Просмотреть файл

@ -141,18 +141,19 @@ func (s) TestSwitchBalancer(t *testing.T) {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
r.UpdateState(resolver.State{Addresses: addrs})
// The default balancer is pickfirst.
if err := checkPickFirst(cc, servers); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
// Switch to roundrobin.
cc.handleServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`, Addresses: addrs})
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
// Switch to pickfirst.
cc.handleServiceConfig(`{"loadBalancingPolicy": "pick_first"}`)
cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "pick_first"}`, Addresses: addrs})
if err := checkPickFirst(cc, servers); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
@ -172,13 +173,14 @@ func (s) TestBalancerDialOption(t *testing.T) {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
r.UpdateState(resolver.State{Addresses: addrs})
// The init balancer is roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
// Switch to pickfirst.
cc.handleServiceConfig(`{"loadBalancingPolicy": "pick_first"}`)
cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "pick_first"}`, Addresses: addrs})
// Balancer is still roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
@ -198,7 +200,7 @@ func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
// ClientConn will switch balancer to grpclb when receives an address of
// type GRPCLB.
r.NewAddress([]resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}})
var isGRPCLB bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
@ -215,7 +217,7 @@ func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
// New update containing new backend and new grpclb. Should not switch
// balancer.
r.NewAddress([]resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}})
for i := 0; i < 200; i++ {
cc.mu.Lock()
isGRPCLB = cc.curBalancerName == "grpclb"
@ -231,7 +233,7 @@ func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
var isPickFirst bool
// Switch balancer to pickfirst.
r.NewAddress([]resolver.Address{{Addr: "backend"}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == PickFirstBalancerName
@ -257,7 +259,7 @@ func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
}
defer cc.Close()
r.NewAddress([]resolver.Address{{Addr: "backend"}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
var isPickFirst bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
@ -274,7 +276,7 @@ func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
// ClientConn will switch balancer to grpclb when receives an address of
// type GRPCLB.
r.NewAddress([]resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}})
var isGRPCLB bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
@ -291,7 +293,7 @@ func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
// New update containing new backend and new grpclb. Should not switch
// balancer.
r.NewAddress([]resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}})
for i := 0; i < 200; i++ {
cc.mu.Lock()
isGRPCLB = cc.curBalancerName == "grpclb"
@ -306,7 +308,7 @@ func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
}
// Switch balancer back.
r.NewAddress([]resolver.Address{{Addr: "backend"}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == PickFirstBalancerName
@ -334,9 +336,9 @@ func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
}
defer cc.Close()
r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
sc := `{"loadBalancingPolicy": "round_robin"}`
r.NewAddress([]resolver.Address{{Addr: "backend"}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
var isRoundRobin bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
@ -353,7 +355,7 @@ func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
// ClientConn will switch balancer to grpclb when receives an address of
// type GRPCLB.
r.NewAddress([]resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}, ServiceConfig: sc})
var isGRPCLB bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
@ -369,7 +371,7 @@ func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
}
// Switch balancer back.
r.NewAddress([]resolver.Address{{Addr: "backend"}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isRoundRobin = cc.curBalancerName == "round_robin"
@ -397,7 +399,7 @@ func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
}
defer cc.Close()
r.NewAddress([]resolver.Address{{Addr: "backend"}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
var isPickFirst bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
@ -414,7 +416,8 @@ func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
// ClientConn will switch balancer to grpclb when receives an address of
// type GRPCLB.
r.NewAddress([]resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}})
addrs := []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}
r.UpdateState(resolver.State{Addresses: addrs})
var isGRPCLB bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
@ -429,7 +432,8 @@ func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
}
r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
sc := `{"loadBalancingPolicy": "round_robin"}`
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc})
var isRoundRobin bool
for i := 0; i < 200; i++ {
cc.mu.Lock()
@ -447,7 +451,7 @@ func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
}
// Switch balancer back.
r.NewAddress([]resolver.Address{{Addr: "backend"}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isRoundRobin = cc.curBalancerName == "round_robin"
@ -485,7 +489,7 @@ func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
r.NewAddress([]resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}}})
// The default balancer is pickfirst.
if err := checkPickFirst(cc, servers[1:]); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
@ -496,15 +500,16 @@ func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
//
// If the filtering failed, servers[0] will be used for RPCs and the RPCs
// will succeed. The following checks will catch this and fail.
r.NewAddress([]resolver.Address{
addrs := []resolver.Address{
{Addr: servers[0].addr, Type: resolver.GRPCLB},
{Addr: servers[1].addr}, {Addr: servers[2].addr}})
{Addr: servers[1].addr}, {Addr: servers[2].addr}}
r.UpdateState(resolver.State{Addresses: addrs})
// Still check for pickfirst, but only with server[1] and server[2].
if err := checkPickFirst(cc, servers[1:]); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
// Switch to roundrobin, anc check against server[1] and server[2].
cc.handleServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
// Switch to roundrobin, and check against server[1] and server[2].
cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`, Addresses: addrs})
if err := checkRoundRobin(cc, servers[1:]); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}

Просмотреть файл

@ -388,8 +388,6 @@ type ClientConn struct {
// Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters
curBalancerName string
preBalancerName string // previous balancer name.
curAddresses []resolver.Address
balancerWrapper *ccBalancerWrapper
retryThrottler atomic.Value
@ -459,50 +457,57 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
}
}
func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
func (cc *ClientConn) updateResolverState(s resolver.State) error {
cc.mu.Lock()
defer cc.mu.Unlock()
// Check if the ClientConn is already closed. Some fields (e.g.
// balancerWrapper) are set to nil when closing the ClientConn, and could
// cause nil pointer panic if we don't have this check.
if cc.conns == nil {
// cc was closed.
return
return nil
}
if reflect.DeepEqual(cc.curAddresses, addrs) {
return
}
if !cc.dopts.disableServiceConfig && cc.scRaw != s.ServiceConfig {
// New service config; apply it.
sc, err := parseServiceConfig(s.ServiceConfig)
if err != nil {
fmt.Println("error parsing config: ", err)
return err
}
cc.scRaw = s.ServiceConfig
cc.sc = sc
cc.curAddresses = addrs
cc.firstResolveEvent.Fire()
if cc.sc.retryThrottling != nil {
newThrottler := &retryThrottler{
tokens: cc.sc.retryThrottling.MaxTokens,
max: cc.sc.retryThrottling.MaxTokens,
thresh: cc.sc.retryThrottling.MaxTokens / 2,
ratio: cc.sc.retryThrottling.TokenRatio,
}
cc.retryThrottler.Store(newThrottler)
} else {
cc.retryThrottler.Store((*retryThrottler)(nil))
}
}
if cc.dopts.balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var isGRPCLB bool
for _, a := range addrs {
for _, a := range s.Addresses {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
var newBalancerName string
// TODO: use new loadBalancerConfig field with appropriate priority.
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else {
// Address list doesn't contain grpclb address. Try to pick a
// non-grpclb balancer.
newBalancerName = cc.curBalancerName
// If current balancer is grpclb, switch to the previous one.
if newBalancerName == grpclbName {
newBalancerName = cc.preBalancerName
}
// The following could be true in two cases:
// - the first time handling resolved addresses
// (curBalancerName="")
// - the first time handling non-grpclb addresses
// (curBalancerName="grpclb", preBalancerName="")
if newBalancerName == "" {
newBalancerName = PickFirstBalancerName
}
newBalancerName = PickFirstBalancerName
}
cc.switchBalancer(newBalancerName)
} else if cc.balancerWrapper == nil {
@ -511,7 +516,9 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
}
cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
cc.balancerWrapper.updateResolverState(s)
cc.firstResolveEvent.Fire()
return nil
}
// switchBalancer starts the switching from current balancer to the balancer
@ -523,10 +530,6 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
//
// Caller must hold cc.mu.
func (cc *ClientConn) switchBalancer(name string) {
if cc.conns == nil {
return
}
if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
return
}
@ -536,15 +539,11 @@ func (cc *ClientConn) switchBalancer(name string) {
grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
return
}
// TODO(bar switching) change this to two steps: drain and close.
// Keep track of sc in wrapper.
if cc.balancerWrapper != nil {
cc.balancerWrapper.close()
}
builder := balancer.Get(name)
// TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
// we reuse previous one?
if channelz.IsOn() {
if builder == nil {
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
@ -563,7 +562,6 @@ func (cc *ClientConn) switchBalancer(name string) {
builder = newPickfirstBuilder()
}
cc.preBalancerName = cc.curBalancerName
cc.curBalancerName = builder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
}
@ -750,68 +748,6 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st
return t, done, nil
}
// handleServiceConfig parses the service config string in JSON format to Go native
// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
func (cc *ClientConn) handleServiceConfig(js string) error {
if cc.dopts.disableServiceConfig {
return nil
}
if cc.scRaw == js {
return nil
}
if channelz.IsOn() {
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
// The special formatting of \"%s\" instead of %q is to provide nice printing of service config
// for human consumption.
Desc: fmt.Sprintf("Channel has a new service config \"%s\"", js),
Severity: channelz.CtINFO,
})
}
sc, err := parseServiceConfig(js)
if err != nil {
return err
}
cc.mu.Lock()
// Check if the ClientConn is already closed. Some fields (e.g.
// balancerWrapper) are set to nil when closing the ClientConn, and could
// cause nil pointer panic if we don't have this check.
if cc.conns == nil {
cc.mu.Unlock()
return nil
}
cc.scRaw = js
cc.sc = sc
if sc.retryThrottling != nil {
newThrottler := &retryThrottler{
tokens: sc.retryThrottling.MaxTokens,
max: sc.retryThrottling.MaxTokens,
thresh: sc.retryThrottling.MaxTokens / 2,
ratio: sc.retryThrottling.TokenRatio,
}
cc.retryThrottler.Store(newThrottler)
} else {
cc.retryThrottler.Store((*retryThrottler)(nil))
}
if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
if cc.curBalancerName == grpclbName {
// If current balancer is grpclb, there's at least one grpclb
// balancer address in the resolved list. Don't switch the balancer,
// but change the previous balancer name, so if a new resolved
// address list doesn't contain grpclb address, balancer will be
// switched to *sc.LB.
cc.preBalancerName = *sc.LB
} else {
cc.switchBalancer(*sc.LB)
cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
}
}
cc.mu.Unlock()
return nil
}
func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
cc.mu.RLock()
r := cc.resolverWrapper

Просмотреть файл

@ -318,10 +318,10 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T)
}()
rb := manual.NewBuilderWithScheme("whatever")
rb.InitialAddrs([]resolver.Address{
rb.InitialState(resolver.State{Addresses: []resolver.Address{
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
})
}})
client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithWaitForHandshake(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
if err != nil {
t.Fatal(err)
@ -414,10 +414,10 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
}()
rb := manual.NewBuilderWithScheme("whatever")
rb.InitialAddrs([]resolver.Address{
rb.InitialState(resolver.State{Addresses: []resolver.Address{
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
})
}})
client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithWaitForHandshake(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
if err != nil {
t.Fatal(err)

Просмотреть файл

@ -89,7 +89,7 @@ func (s) TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r.InitialAddrs([]resolver.Address{lis1Addr, lis2Addr})
r.InitialState(resolver.State{Addresses: []resolver.Address{lis1Addr, lis2Addr}})
client, err := Dial(r.Scheme()+":///test.server", WithInsecure())
if err != nil {
t.Fatalf("Dial failed. Err: %v", err)
@ -657,10 +657,10 @@ func (s) TestDial_OneBackoffPerRetryGroup(t *testing.T) {
}()
rb := manual.NewBuilderWithScheme("whatever")
rb.InitialAddrs([]resolver.Address{
rb.InitialState(resolver.State{Addresses: []resolver.Address{
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
})
}})
client, err := DialContext(ctx, "this-gets-overwritten",
WithInsecure(),
WithBalancerName(stateRecordingBalancerName),
@ -873,7 +873,7 @@ func (s) TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) {
// SwitchBalancer before NewAddress. There was no balancer created, this
// makes sure we don't call close on nil balancerWrapper.
r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) // This should not panic.
r.UpdateState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`}) // This should not panic.
time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn.
}
@ -889,7 +889,7 @@ func (s) TestResolverServiceConfigWhileClosingNotPanic(t *testing.T) {
}
// Send a new service config while closing the ClientConn.
go cc.Close()
go r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) // This should not panic.
go r.UpdateState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`}) // This should not panic.
}
}
@ -904,7 +904,7 @@ func (s) TestResolverEmptyUpdateNotPanic(t *testing.T) {
defer cc.Close()
// This make sure we don't create addrConn with empty address list.
r.NewAddress([]resolver.Address{}) // This should not panic.
r.UpdateState(resolver.State{}) // This should not panic.
time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn.
}
@ -982,7 +982,7 @@ func (s) TestDisableServiceConfigOption(t *testing.T) {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
defer cc.Close()
r.NewServiceConfig(`{
r.UpdateState(resolver.State{ServiceConfig: `{
"methodConfig": [
{
"name": [
@ -994,7 +994,7 @@ func (s) TestDisableServiceConfigOption(t *testing.T) {
"waitForReady": true
}
]
}`)
}`})
time.Sleep(1 * time.Second)
m := cc.GetMethodConfig("/foo/Bar")
if m.WaitForReady != nil {
@ -1177,7 +1177,7 @@ func (s) TestUpdateAddresses_RetryFromFirstAddr(t *testing.T) {
{Addr: lis3.Addr().String()},
}
rb := manual.NewBuilderWithScheme("whatever")
rb.InitialAddrs(addrsList)
rb.InitialState(resolver.State{Addresses: addrsList})
client, err := Dial("this-gets-overwritten",
WithInsecure(),

Просмотреть файл

@ -59,7 +59,7 @@ func main() {
}
defer conn.Close()
// Manually provide resolved addresses for the target.
r.NewAddress([]resolver.Address{{Addr: ":10001"}, {Addr: ":10002"}, {Addr: ":10003"}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ":10001"}, {Addr: ":10002"}, {Addr: ":10003"}}})
c := pb.NewGreeterClient(conn)

Просмотреть файл

@ -115,7 +115,7 @@ func (r *exampleResolver) start() {
for i, s := range addrStrs {
addrs[i] = resolver.Address{Addr: s}
}
r.cc.NewAddress(addrs)
r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func (*exampleResolver) ResolveNow(o resolver.ResolveNowOption) {}
func (*exampleResolver) Close() {}

Просмотреть файл

@ -123,7 +123,7 @@ func (r *exampleResolver) start() {
for i, s := range addrStrs {
addrs[i] = resolver.Address{Addr: s}
}
r.cc.NewAddress(addrs)
r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func (*exampleResolver) ResolveNow(o resolver.ResolveNowOption) {}
func (*exampleResolver) Close() {}

Просмотреть файл

@ -60,7 +60,7 @@ func (s) TestOneBackendPickfirst(t *testing.T) {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}}})
// The second RPC should succeed.
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
@ -93,7 +93,7 @@ func (s) TestBackendsPickfirst(t *testing.T) {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}})
// The second RPC should succeed with the first server.
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
@ -136,7 +136,7 @@ func (s) TestNewAddressWhileBlockingPickfirst(t *testing.T) {
}()
}
time.Sleep(50 * time.Millisecond)
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}}})
wg.Wait()
}
@ -198,7 +198,7 @@ func (s) TestOneServerDownPickfirst(t *testing.T) {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}})
// The second RPC should succeed with the first server.
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
@ -239,7 +239,7 @@ func (s) TestAllServersDownPickfirst(t *testing.T) {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}})
// The second RPC should succeed with the first server.
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
@ -282,7 +282,7 @@ func (s) TestAddressesRemovedPickfirst(t *testing.T) {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}, {Addr: servers[2].addr}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}, {Addr: servers[2].addr}}})
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
break
@ -297,7 +297,7 @@ func (s) TestAddressesRemovedPickfirst(t *testing.T) {
}
// Remove server[0].
r.NewAddress([]resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}}})
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
break
@ -312,7 +312,7 @@ func (s) TestAddressesRemovedPickfirst(t *testing.T) {
}
// Append server[0], nothing should change.
r.NewAddress([]resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}, {Addr: servers[0].addr}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}, {Addr: servers[0].addr}}})
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
@ -321,7 +321,7 @@ func (s) TestAddressesRemovedPickfirst(t *testing.T) {
}
// Remove server[1].
r.NewAddress([]resolver.Address{{Addr: servers[2].addr}, {Addr: servers[0].addr}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[2].addr}, {Addr: servers[0].addr}}})
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
break
@ -336,7 +336,7 @@ func (s) TestAddressesRemovedPickfirst(t *testing.T) {
}
// Remove server[2].
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}}})
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
break

Просмотреть файл

@ -54,6 +54,10 @@ type testClientConn struct {
s int
}
func (t *testClientConn) UpdateState(s resolver.State) {
panic("unused")
}
func (t *testClientConn) NewAddress(addresses []resolver.Address) {
t.m1.Lock()
defer t.m1.Unlock()

Просмотреть файл

@ -41,20 +41,20 @@ type Resolver struct {
// Fields actually belong to the resolver.
cc resolver.ClientConn
bootstrapAddrs []resolver.Address
bootstrapState *resolver.State
}
// InitialAddrs adds resolved addresses to the resolver so that
// NewAddress doesn't need to be explicitly called after Dial.
func (r *Resolver) InitialAddrs(addrs []resolver.Address) {
r.bootstrapAddrs = addrs
// InitialState adds initial state to the resolver so that UpdateState doesn't
// need to be explicitly called after Dial.
func (r *Resolver) InitialState(s resolver.State) {
r.bootstrapState = &s
}
// Build returns itself for Resolver, because it's both a builder and a resolver.
func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
r.cc = cc
if r.bootstrapAddrs != nil {
r.NewAddress(r.bootstrapAddrs)
if r.bootstrapState != nil {
r.UpdateState(*r.bootstrapState)
}
return r, nil
}
@ -70,14 +70,9 @@ func (*Resolver) ResolveNow(o resolver.ResolveNowOption) {}
// Close is a noop for Resolver.
func (*Resolver) Close() {}
// NewAddress calls cc.NewAddress.
func (r *Resolver) NewAddress(addrs []resolver.Address) {
r.cc.NewAddress(addrs)
}
// NewServiceConfig calls cc.NewServiceConfig.
func (r *Resolver) NewServiceConfig(sc string) {
r.cc.NewServiceConfig(sc)
// UpdateState calls cc.UpdateState.
func (r *Resolver) UpdateState(s resolver.State) {
r.cc.UpdateState(s)
}
// GenerateAndRegisterManualResolver generates a random scheme and a Resolver

Просмотреть файл

@ -45,7 +45,7 @@ type passthroughResolver struct {
}
func (r *passthroughResolver) start() {
r.cc.NewAddress([]resolver.Address{{Addr: r.target.Endpoint}})
r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
}
func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOption) {}

Просмотреть файл

@ -98,6 +98,15 @@ type BuildOption struct {
DisableServiceConfig bool
}
// State contains the current Resolver state relevant to the ClientConn.
type State struct {
Addresses []Address // Resolved addresses for the target
ServiceConfig string // JSON representation of the service config
// TODO: add Err error
// TODO: add ParsedServiceConfig interface{}
}
// ClientConn contains the callbacks for resolver to notify any updates
// to the gRPC ClientConn.
//
@ -106,12 +115,18 @@ type BuildOption struct {
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {
// UpdateState updates the state of the ClientConn appropriately.
UpdateState(State)
// NewAddress is called by resolver to notify ClientConn a new list
// of resolved addresses.
// The address list should be the complete list of resolved addresses.
//
// Deprecated: Use UpdateState instead.
NewAddress(addresses []Address)
// NewServiceConfig is called by resolver to notify ClientConn a new
// service config. The service config should be provided as a json string.
//
// Deprecated: Use UpdateState instead.
NewServiceConfig(serviceConfig string)
}

Просмотреть файл

@ -21,6 +21,7 @@ package grpc
import (
"fmt"
"strings"
"sync/atomic"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/channelz"
@ -30,12 +31,12 @@ import (
// ccResolverWrapper is a wrapper on top of cc for resolvers.
// It implements resolver.ClientConnection interface.
type ccResolverWrapper struct {
cc *ClientConn
resolver resolver.Resolver
addrCh chan []resolver.Address
scCh chan string
done chan struct{}
lastAddressesCount int
cc *ClientConn
resolver resolver.Resolver
addrCh chan []resolver.Address
scCh chan string
done uint32 // accessed atomically; set to 1 when closed.
curState resolver.State
}
// split2 returns the values from strings.SplitN(s, sep, 2).
@ -82,7 +83,6 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
cc: cc,
addrCh: make(chan []resolver.Address, 1),
scCh: make(chan string, 1),
done: make(chan struct{}),
}
var err error
@ -99,57 +99,67 @@ func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) {
func (ccr *ccResolverWrapper) close() {
ccr.resolver.Close()
close(ccr.done)
atomic.StoreUint32(&ccr.done, 1)
}
func (ccr *ccResolverWrapper) isDone() bool {
return atomic.LoadUint32(&ccr.done) == 1
}
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
if ccr.isDone() {
return
}
grpclog.Infof("ccResolverWrapper: sending update to cc: %v", s)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(s)
}
ccr.cc.updateResolverState(s)
ccr.curState = s
}
// NewAddress is called by the resolver implemenetion to send addresses to gRPC.
func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
select {
case <-ccr.done:
if ccr.isDone() {
return
default:
}
grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(addrs)
ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
}
ccr.cc.handleResolvedAddrs(addrs, nil)
ccr.curState.Addresses = addrs
ccr.cc.updateResolverState(ccr.curState)
}
// NewServiceConfig is called by the resolver implemenetion to send service
// configs to gRPC.
func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
select {
case <-ccr.done:
if ccr.isDone() {
return
default:
}
grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
ccr.cc.handleServiceConfig(sc)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: sc})
}
ccr.curState.ServiceConfig = sc
ccr.cc.updateResolverState(ccr.curState)
}
func (ccr *ccResolverWrapper) addChannelzTraceEvent(addrs []resolver.Address) {
if len(addrs) == 0 && ccr.lastAddressesCount != 0 {
channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
Desc: "Resolver returns an empty address list",
Severity: channelz.CtWarning,
})
} else if len(addrs) != 0 && ccr.lastAddressesCount == 0 {
var s string
for i, a := range addrs {
if a.ServerName != "" {
s += a.Addr + "(" + a.ServerName + ")"
} else {
s += a.Addr
}
if i != len(addrs)-1 {
s += " "
}
}
channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Resolver returns a non-empty address list (previous one was empty) %q", s),
Severity: channelz.CtINFO,
})
func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
if s.ServiceConfig == ccr.curState.ServiceConfig && (len(ccr.curState.Addresses) == 0) == (len(s.Addresses) == 0) {
return
}
ccr.lastAddressesCount = len(addrs)
var updates []string
if s.ServiceConfig != ccr.curState.ServiceConfig {
updates = append(updates, "service config updated")
}
if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
updates = append(updates, "resolver returned an empty address list")
} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
updates = append(updates, "resolver returned new addresses")
}
channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Resolver state updated: %+v (%v)", s, strings.Join(updates, "; ")),
Severity: channelz.CtINFO,
})
}

Просмотреть файл

@ -24,6 +24,7 @@ import (
"fmt"
"net"
"reflect"
"strings"
"sync"
"testing"
"time"
@ -198,7 +199,7 @@ func (s) TestCZNestedChannelRegistrationAndDeletion(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
r.InitialAddrs(resolvedAddrs)
r.InitialState(resolver.State{Addresses: resolvedAddrs})
te.resolverScheme = r.Scheme()
te.clientConn()
defer te.tearDown()
@ -216,8 +217,7 @@ func (s) TestCZNestedChannelRegistrationAndDeletion(t *testing.T) {
t.Fatal(err)
}
r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
r.NewAddress([]resolver.Address{{Addr: "127.0.0.1:0"}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`})
// wait for the shutdown of grpclb balancer
if err := verifyResultWithDelay(func() (bool, error) {
@ -246,7 +246,7 @@ func (s) TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) {
for _, a := range te.srvAddrs {
svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
}
r.InitialAddrs(svrAddrs)
r.InitialState(resolver.State{Addresses: svrAddrs})
te.resolverScheme = r.Scheme()
te.clientConn()
defer te.tearDown()
@ -277,7 +277,7 @@ func (s) TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) {
t.Fatal(err)
}
r.NewAddress(svrAddrs[:len(svrAddrs)-1])
r.UpdateState(resolver.State{Addresses: svrAddrs[:len(svrAddrs)-1]})
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0, 0)
@ -494,7 +494,7 @@ func (s) TestCZChannelMetrics(t *testing.T) {
for _, a := range te.srvAddrs {
svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
}
r.InitialAddrs(svrAddrs)
r.InitialState(resolver.State{Addresses: svrAddrs})
te.resolverScheme = r.Scheme()
cc := te.clientConn()
defer te.tearDown()
@ -1352,7 +1352,7 @@ func (s) TestCZChannelTraceCreationDeletion(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
r.InitialAddrs(resolvedAddrs)
r.InitialState(resolver.State{Addresses: resolvedAddrs})
te.resolverScheme = r.Scheme()
te.clientConn()
defer te.tearDown()
@ -1388,8 +1388,7 @@ func (s) TestCZChannelTraceCreationDeletion(t *testing.T) {
t.Fatal(err)
}
r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
r.NewAddress([]resolver.Address{{Addr: "127.0.0.1:0"}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`})
// wait for the shutdown of grpclb balancer
if err := verifyResultWithDelay(func() (bool, error) {
@ -1426,7 +1425,7 @@ func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) {
te.startServer(&testServer{security: e.security})
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
te.resolverScheme = r.Scheme()
te.clientConn()
defer te.tearDown()
@ -1467,7 +1466,7 @@ func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) {
t.Fatal(err)
}
r.NewAddress([]resolver.Address{})
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0, 0)
@ -1505,7 +1504,8 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
te.startServer(&testServer{security: e.security})
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
addrs := []resolver.Address{{Addr: te.srvAddr}}
r.InitialState(resolver.State{Addresses: addrs})
te.resolverScheme = r.Scheme()
te.clientConn()
defer te.tearDown()
@ -1519,19 +1519,18 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
}
cid = tcs[0].ID
for i := len(tcs[0].Trace.Events) - 1; i >= 0; i-- {
if tcs[0].Trace.Events[i].Desc == fmt.Sprintf("Resolver returns a non-empty address list (previous one was empty) %q", te.srvAddr) {
if strings.Contains(tcs[0].Trace.Events[i].Desc, "resolver returned new addresses") {
break
}
if i == 0 {
return false, fmt.Errorf("events do not contain expected address resolution from empty address state")
return false, fmt.Errorf("events do not contain expected address resolution from empty address state. Got: %+v", tcs[0].Trace.Events)
}
}
return true, nil
}); err != nil {
t.Fatal(err)
}
r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`})
if err := verifyResultWithDelay(func() (bool, error) {
cm := channelz.GetChannel(cid)
@ -1548,14 +1547,14 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
t.Fatal(err)
}
newSc := `{
newSC := `{
"methodConfig": [
{
"name": [
{
"service": "grpc.testing.TestService",
"method": "EmptyCall"
},
}
],
"waitForReady": false,
"timeout": ".001s"
@ -1563,13 +1562,13 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
]
}`
r.NewServiceConfig(newSc)
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: newSC})
if err := verifyResultWithDelay(func() (bool, error) {
cm := channelz.GetChannel(cid)
for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
if cm.Trace.Events[i].Desc == fmt.Sprintf("Channel has a new service config \"%s\"", newSc) {
if strings.Contains(cm.Trace.Events[i].Desc, "service config updated") {
break
}
if i == 0 {
@ -1581,12 +1580,12 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
t.Fatal(err)
}
r.NewAddress([]resolver.Address{})
r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: newSC})
if err := verifyResultWithDelay(func() (bool, error) {
cm := channelz.GetChannel(cid)
for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
if cm.Trace.Events[i].Desc == "Resolver returns an empty address list" {
if strings.Contains(cm.Trace.Events[i].Desc, "resolver returned an empty address list") {
break
}
if i == 0 {
@ -1611,7 +1610,7 @@ func (s) TestCZSubChannelPickedNewAddress(t *testing.T) {
for _, a := range te.srvAddrs {
svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
}
r.InitialAddrs(svrAddrs)
r.InitialState(resolver.State{Addresses: svrAddrs})
te.resolverScheme = r.Scheme()
cc := te.clientConn()
defer te.tearDown()
@ -1666,7 +1665,7 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) {
te.startServer(&testServer{security: e.security})
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
te.resolverScheme = r.Scheme()
cc := te.clientConn()
defer te.tearDown()
@ -1682,7 +1681,7 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) {
if err := verifyResultWithDelay(func() (bool, error) {
// we need to obtain the SubChannel id before it gets deleted from Channel's children list (due
// to effect of r.NewAddress([]resolver.Address{}))
// to effect of r.UpdateState(resolver.State{Addresses:[]resolver.Address{}}))
if subConn == 0 {
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
@ -1713,12 +1712,12 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) {
}
}
// Make sure the SubChannel has already seen transient failure before shutting it down through
// r.NewAddress([]resolver.Address{}).
// r.UpdateState(resolver.State{Addresses:[]resolver.Address{}}).
if transient == 0 {
return false, fmt.Errorf("transient failure has not happened on SubChannel yet")
}
transient = 0
r.NewAddress([]resolver.Address{})
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
for _, e := range scm.Trace.Events {
if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) {
ready++
@ -1760,7 +1759,7 @@ func (s) TestCZChannelConnectivityState(t *testing.T) {
te.startServer(&testServer{security: e.security})
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
te.resolverScheme = r.Scheme()
cc := te.clientConn()
defer te.tearDown()
@ -1820,7 +1819,7 @@ func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
r.InitialAddrs(resolvedAddrs)
r.InitialState(resolver.State{Addresses: resolvedAddrs})
te.resolverScheme = r.Scheme()
te.clientConn()
defer te.tearDown()
@ -1841,8 +1840,7 @@ func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) {
t.Fatal(err)
}
r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
r.NewAddress([]resolver.Address{{Addr: "127.0.0.1:0"}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`})
// wait for the shutdown of grpclb balancer
if err := verifyResultWithDelay(func() (bool, error) {
@ -1879,7 +1877,7 @@ func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) {
te.startServer(&testServer{security: e.security})
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
te.resolverScheme = r.Scheme()
te.clientConn()
defer te.tearDown()
@ -1902,7 +1900,7 @@ func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) {
t.Fatal(err)
}
r.NewAddress([]resolver.Address{})
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0, 0)
@ -1936,7 +1934,7 @@ func (s) TestCZTraceTopChannelDeletionTraceClear(t *testing.T) {
te.startServer(&testServer{security: e.security})
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}})
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
te.resolverScheme = r.Scheme()
te.clientConn()
var subConn int64

Просмотреть файл

@ -1478,8 +1478,10 @@ func (s) TestGetMethodConfig(t *testing.T) {
te.resolverScheme = r.Scheme()
cc := te.clientConn()
r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
r.NewServiceConfig(`{
addrs := []resolver.Address{{Addr: te.srvAddr}}
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: `{
"methodConfig": [
{
"name": [
@ -1500,7 +1502,7 @@ func (s) TestGetMethodConfig(t *testing.T) {
"waitForReady": false
}
]
}`)
}`})
tc := testpb.NewTestServiceClient(cc)
@ -1518,7 +1520,7 @@ func (s) TestGetMethodConfig(t *testing.T) {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
r.NewServiceConfig(`{
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: `{
"methodConfig": [
{
"name": [
@ -1539,7 +1541,7 @@ func (s) TestGetMethodConfig(t *testing.T) {
"waitForReady": false
}
]
}`)
}`})
// Make sure service config has been processed by grpc.
for {
@ -1563,8 +1565,10 @@ func (s) TestServiceConfigWaitForReady(t *testing.T) {
// Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds.
te.resolverScheme = r.Scheme()
cc := te.clientConn()
r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
r.NewServiceConfig(`{
addrs := []resolver.Address{{Addr: te.srvAddr}}
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: `{
"methodConfig": [
{
"name": [
@ -1581,7 +1585,7 @@ func (s) TestServiceConfigWaitForReady(t *testing.T) {
"timeout": ".001s"
}
]
}`)
}`})
tc := testpb.NewTestServiceClient(cc)
@ -1604,7 +1608,9 @@ func (s) TestServiceConfigWaitForReady(t *testing.T) {
// Generate a service config update.
// Case2:Client API set failfast to be false, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds.
r.NewServiceConfig(`{
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: `{
"methodConfig": [
{
"name": [
@ -1621,7 +1627,7 @@ func (s) TestServiceConfigWaitForReady(t *testing.T) {
"timeout": ".001s"
}
]
}`)
}`})
// Wait for the new service config to take effect.
for {
@ -1648,8 +1654,10 @@ func (s) TestServiceConfigTimeout(t *testing.T) {
// Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
te.resolverScheme = r.Scheme()
cc := te.clientConn()
r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
r.NewServiceConfig(`{
addrs := []resolver.Address{{Addr: te.srvAddr}}
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: `{
"methodConfig": [
{
"name": [
@ -1666,7 +1674,7 @@ func (s) TestServiceConfigTimeout(t *testing.T) {
"timeout": "3600s"
}
]
}`)
}`})
tc := testpb.NewTestServiceClient(cc)
@ -1694,7 +1702,9 @@ func (s) TestServiceConfigTimeout(t *testing.T) {
// Generate a service config update.
// Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
r.NewServiceConfig(`{
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: `{
"methodConfig": [
{
"name": [
@ -1711,7 +1721,7 @@ func (s) TestServiceConfigTimeout(t *testing.T) {
"timeout": ".000000001s"
}
]
}`)
}`})
// Wait for the new service config to take effect.
for {
@ -1785,8 +1795,8 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
te1.startServer(&testServer{security: e.security})
cc1 := te1.clientConn()
r.NewAddress([]resolver.Address{{Addr: te1.srvAddr}})
r.NewServiceConfig(scjs)
addrs := []resolver.Address{{Addr: te1.srvAddr}}
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: scjs})
tc := testpb.NewTestServiceClient(cc1)
req := &testpb.SimpleRequest{
@ -1857,8 +1867,7 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
te2.startServer(&testServer{security: e.security})
defer te2.tearDown()
cc2 := te2.clientConn()
r.NewAddress([]resolver.Address{{Addr: te2.srvAddr}})
r.NewServiceConfig(scjs)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te2.srvAddr}}, ServiceConfig: scjs})
tc = testpb.NewTestServiceClient(cc2)
for {
@ -1919,8 +1928,7 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
defer te3.tearDown()
cc3 := te3.clientConn()
r.NewAddress([]resolver.Address{{Addr: te3.srvAddr}})
r.NewServiceConfig(scjs)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te3.srvAddr}}, ServiceConfig: scjs})
tc = testpb.NewTestServiceClient(cc3)
for {
@ -2010,8 +2018,9 @@ func (s) TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) {
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
r.NewServiceConfig(`{
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: te.srvAddr}},
ServiceConfig: `{
"methodConfig": [
{
"name": [
@ -2024,7 +2033,7 @@ func (s) TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) {
"timeout": "10s"
}
]
}`)
}`})
// Make sure service config has been processed by grpc.
for {
if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil {
@ -4850,7 +4859,7 @@ func (s) TestCredsHandshakeAuthority(t *testing.T) {
t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
}
defer cc.Close()
r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
@ -5155,6 +5164,8 @@ type stubServer struct {
client testpb.TestServiceClient
cc *grpc.ClientConn
addr string // address of listener
cleanups []func() // Lambdas executed in Stop(); populated by Start().
r *manual.Resolver
@ -5182,6 +5193,7 @@ func (ss *stubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption)
if err != nil {
return fmt.Errorf(`net.Listen("tcp", "localhost:0") = %v`, err)
}
ss.addr = lis.Addr().String()
ss.cleanups = append(ss.cleanups, func() { lis.Close() })
s := grpc.NewServer(sopts...)
@ -5189,7 +5201,7 @@ func (ss *stubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption)
go s.Serve(lis)
ss.cleanups = append(ss.cleanups, s.Stop)
target := ss.r.Scheme() + ":///" + lis.Addr().String()
target := ss.r.Scheme() + ":///" + ss.addr
opts := append([]grpc.DialOption{grpc.WithInsecure()}, dopts...)
cc, err := grpc.Dial(target, opts...)
@ -5197,7 +5209,7 @@ func (ss *stubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption)
return fmt.Errorf("grpc.Dial(%q) = %v", target, err)
}
ss.cc = cc
ss.r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.addr}}})
if err := ss.waitForReady(cc); err != nil {
return err
}
@ -5208,6 +5220,10 @@ func (ss *stubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption)
return nil
}
func (ss *stubServer) newServiceConfig(sc string) {
ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.addr}}, ServiceConfig: sc})
}
func (ss *stubServer) waitForReady(cc *grpc.ClientConn) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
@ -7017,10 +7033,10 @@ func (s) TestGoAwayThenClose(t *testing.T) {
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
r.InitialAddrs([]resolver.Address{
r.InitialState(resolver.State{Addresses: []resolver.Address{
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
})
}})
cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithInsecure())
if err != nil {
t.Fatalf("Error creating client: %v", err)
@ -7100,7 +7116,9 @@ func (s) TestRPCWaitsForResolver(t *testing.T) {
defer cancel()
go func() {
time.Sleep(time.Second)
r.NewServiceConfig(`{
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: te.srvAddr}},
ServiceConfig: `{
"methodConfig": [
{
"name": [
@ -7112,8 +7130,7 @@ func (s) TestRPCWaitsForResolver(t *testing.T) {
"maxRequestMessageBytes": 0
}
]
}`)
r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
}`})
}()
// We wait a second before providing a service config and resolving
// addresses. So this will wait for that and then honor the

Просмотреть файл

@ -192,12 +192,13 @@ func (s) TestHealthCheckWatchStateChange(t *testing.T) {
}
defer deferFunc()
r.NewServiceConfig(`{
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{
"healthCheckConfig": {
"serviceName": "foo"
}
}`)
r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
}`})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if ok := cc.WaitForStateChange(ctx, connectivity.Idle); !ok {
@ -259,12 +260,13 @@ func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) {
}
defer deferFunc()
r.NewServiceConfig(`{
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{
"healthCheckConfig": {
"serviceName": "foo"
}
}`)
r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
}`})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
@ -302,12 +304,13 @@ func (s) TestHealthCheckWithGoAway(t *testing.T) {
defer deferFunc()
tc := testpb.NewTestServiceClient(cc)
r.NewServiceConfig(`{
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{
"healthCheckConfig": {
"serviceName": "foo"
}
}`)
r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
}`})
// make some rpcs to make sure connection is working.
if err := verifyResultWithDelay(func() (bool, error) {
@ -393,12 +396,13 @@ func (s) TestHealthCheckWithConnClose(t *testing.T) {
tc := testpb.NewTestServiceClient(cc)
r.NewServiceConfig(`{
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{
"healthCheckConfig": {
"serviceName": "foo"
}
}`)
r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
}`})
// make some rpcs to make sure connection is working.
if err := verifyResultWithDelay(func() (bool, error) {
@ -453,12 +457,15 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) {
defer deferFunc()
tc := testpb.NewTestServiceClient(cc)
r.NewServiceConfig(`{
sc := `{
"healthCheckConfig": {
"serviceName": "foo"
}
}`)
r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
}`
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: sc,
})
// make some rpcs to make sure connection is working.
if err := verifyResultWithDelay(func() (bool, error) {
@ -499,7 +506,7 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) {
default:
}
// trigger teardown of the ac
r.NewAddress([]resolver.Address{})
r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: sc})
select {
case <-hcExitChan:
@ -543,12 +550,13 @@ func (s) TestHealthCheckWithClientConnClose(t *testing.T) {
defer deferFunc()
tc := testpb.NewTestServiceClient(cc)
r.NewServiceConfig(`{
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{
"healthCheckConfig": {
"serviceName": "foo"
}
}`)
r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
}`})
// make some rpcs to make sure connection is working.
if err := verifyResultWithDelay(func() (bool, error) {
@ -622,12 +630,15 @@ func (s) TestHealthCheckWithoutReportHealthCalledAddrConnShutDown(t *testing.T)
// The serviceName "delay" is specially handled at server side, where response will not be sent
// back to client immediately upon receiving the request (client should receive no response until
// test ends).
r.NewServiceConfig(`{
sc := `{
"healthCheckConfig": {
"serviceName": "delay"
}
}`)
r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
}`
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: sc,
})
select {
case <-hcExitChan:
@ -641,7 +652,7 @@ func (s) TestHealthCheckWithoutReportHealthCalledAddrConnShutDown(t *testing.T)
t.Fatal("Health check function has not been invoked after 5s.")
}
// trigger teardown of the ac, ac in SHUTDOWN state
r.NewAddress([]resolver.Address{})
r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: sc})
// The health check func should exit without calling the reportHealth func, as server hasn't sent
// any response.
@ -695,12 +706,13 @@ func (s) TestHealthCheckWithoutReportHealthCalled(t *testing.T) {
// The serviceName "delay" is specially handled at server side, where response will not be sent
// back to client immediately upon receiving the request (client should receive no response until
// test ends).
r.NewServiceConfig(`{
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{
"healthCheckConfig": {
"serviceName": "delay"
}
}`)
r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
}`})
select {
case <-hcExitChan:
@ -742,12 +754,13 @@ func testHealthCheckDisableWithDialOption(t *testing.T, addr string) {
tc := testpb.NewTestServiceClient(cc)
r.NewServiceConfig(`{
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}},
ServiceConfig: `{
"healthCheckConfig": {
"serviceName": "foo"
}
}`)
r.NewAddress([]resolver.Address{{Addr: addr}})
}`})
// send some rpcs to make sure transport has been created and is ready for use.
if err := verifyResultWithDelay(func() (bool, error) {
@ -780,12 +793,13 @@ func testHealthCheckDisableWithBalancer(t *testing.T, addr string) {
tc := testpb.NewTestServiceClient(cc)
r.NewServiceConfig(`{
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}},
ServiceConfig: `{
"healthCheckConfig": {
"serviceName": "foo"
}
}`)
r.NewAddress([]resolver.Address{{Addr: addr}})
}`})
// send some rpcs to make sure transport has been created and is ready for use.
if err := verifyResultWithDelay(func() (bool, error) {
@ -818,7 +832,7 @@ func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) {
tc := testpb.NewTestServiceClient(cc)
r.NewAddress([]resolver.Address{{Addr: addr}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: addr}}})
// send some rpcs to make sure transport has been created and is ready for use.
if err := verifyResultWithDelay(func() (bool, error) {
@ -872,12 +886,13 @@ func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) {
}
defer deferFunc()
r.NewServiceConfig(`{
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{
"healthCheckConfig": {
"serviceName": "channelzSuccess"
}
}`)
r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
}`})
if err := verifyResultWithDelay(func() (bool, error) {
cm, _ := channelz.GetTopChannels(0, 0)
@ -927,12 +942,13 @@ func (s) TestHealthCheckChannelzCountingCallFailure(t *testing.T) {
}
defer deferFunc()
r.NewServiceConfig(`{
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{
"healthCheckConfig": {
"serviceName": "channelzFailure"
}
}`)
r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
}`})
if err := verifyResultWithDelay(func() (bool, error) {
cm, _ := channelz.GetTopChannels(0, 0)

Просмотреть файл

@ -63,7 +63,7 @@ func (s) TestRetryUnary(t *testing.T) {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
ss.r.NewServiceConfig(`{
ss.newServiceConfig(`{
"methodConfig": [{
"name": [{"service": "grpc.testing.TestService"}],
"waitForReady": true,
@ -131,7 +131,7 @@ func (s) TestRetryDisabledByDefault(t *testing.T) {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
ss.r.NewServiceConfig(`{
ss.newServiceConfig(`{
"methodConfig": [{
"name": [{"service": "grpc.testing.TestService"}],
"waitForReady": true,
@ -191,7 +191,7 @@ func (s) TestRetryThrottling(t *testing.T) {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
ss.r.NewServiceConfig(`{
ss.newServiceConfig(`{
"methodConfig": [{
"name": [{"service": "grpc.testing.TestService"}],
"waitForReady": true,
@ -502,7 +502,7 @@ func (s) TestRetryStreaming(t *testing.T) {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
ss.r.NewServiceConfig(`{
ss.newServiceConfig(`{
"methodConfig": [{
"name": [{"service": "grpc.testing.TestService"}],
"waitForReady": true,

11
vet.sh
Просмотреть файл

@ -108,17 +108,22 @@ fi
# TODO(menghanl): fix errors in transport_test.
staticcheck -go 1.9 -checks 'inherit,-ST1015' -ignore '
google.golang.org/grpc/balancer.go:SA1019
google.golang.org/grpc/balancer_test.go:SA1019
google.golang.org/grpc/clientconn_test.go:SA1019
google.golang.org/grpc/balancer/roundrobin/roundrobin_test.go:SA1019
google.golang.org/grpc/balancer/xds/edsbalancer/balancergroup.go:SA1019
google.golang.org/grpc/balancer/xds/xds.go:SA1019
google.golang.org/grpc/balancer_conn_wrappers.go:SA1019
google.golang.org/grpc/balancer_test.go:SA1019
google.golang.org/grpc/benchmark/benchmain/main.go:SA1019
google.golang.org/grpc/benchmark/worker/benchmark_client.go:SA1019
google.golang.org/grpc/clientconn.go:S1024
google.golang.org/grpc/clientconn_state_transition_test.go:SA1019
google.golang.org/grpc/clientconn_test.go:SA1019
google.golang.org/grpc/internal/transport/handler_server.go:SA1019
google.golang.org/grpc/internal/transport/handler_server_test.go:SA1019
google.golang.org/grpc/resolver/dns/dns_resolver.go:SA1019
google.golang.org/grpc/stats/stats_test.go:SA1019
google.golang.org/grpc/test/channelz_test.go:SA1019
google.golang.org/grpc/test/end2end_test.go:SA1019
google.golang.org/grpc/test/healthcheck_test.go:SA1019
google.golang.org/grpc/clientconn.go:S1024
' ./...
misspell -error .