Fix panics on balancer and resolver updates (#1684)

- NewAddress with empty list (addrConn with an empty address list)
 - NewServiceConfig to switch balancer before the first balancer is built
This commit is contained in:
Menghan Li 2017-11-22 13:59:20 -08:00 коммит произвёл GitHub
Родитель 646f701c82
Коммит 10873b30bf
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 52 добавлений и 1 удалений

Просмотреть файл

@ -19,6 +19,7 @@
package grpc package grpc
import ( import (
"fmt"
"sync" "sync"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
@ -183,6 +184,9 @@ func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err
} }
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
if len(addrs) <= 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
ac, err := ccb.cc.newAddrConn(addrs) ac, err := ccb.cc.newAddrConn(addrs)
if err != nil { if err != nil {
return nil, err return nil, err
@ -223,6 +227,10 @@ type acBalancerWrapper struct {
func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
acbw.mu.Lock() acbw.mu.Lock()
defer acbw.mu.Unlock() defer acbw.mu.Unlock()
if len(addrs) <= 0 {
acbw.ac.tearDown(errConnDrain)
return
}
if !acbw.ac.tryUpdateAddrs(addrs) { if !acbw.ac.tryUpdateAddrs(addrs) {
cc := acbw.ac.cc cc := acbw.ac.cc
acbw.ac.mu.Lock() acbw.ac.mu.Lock()

Просмотреть файл

@ -642,6 +642,8 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
} }
// switchBalancer starts the switching from current balancer to the balancer with name. // switchBalancer starts the switching from current balancer to the balancer with name.
//
// Caller must hold cc.mu.
func (cc *ClientConn) switchBalancer(name string) { func (cc *ClientConn) switchBalancer(name string) {
if cc.conns == nil { if cc.conns == nil {
return return
@ -659,7 +661,9 @@ func (cc *ClientConn) switchBalancer(name string) {
// TODO(bar switching) change this to two steps: drain and close. // TODO(bar switching) change this to two steps: drain and close.
// Keep track of sc in wrapper. // Keep track of sc in wrapper.
if cc.balancerWrapper != nil {
cc.balancerWrapper.close() cc.balancerWrapper.close()
}
builder := balancer.Get(name) builder := balancer.Get(name)
if builder == nil { if builder == nil {
@ -684,6 +688,8 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi
} }
// newAddrConn creates an addrConn for addrs and adds it to cc.conns. // newAddrConn creates an addrConn for addrs and adds it to cc.conns.
//
// Caller needs to make sure len(addrs) > 0.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) { func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
ac := &addrConn{ ac := &addrConn{
cc: cc, cc: cc,

Просмотреть файл

@ -30,6 +30,8 @@ import (
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/naming" "google.golang.org/grpc/naming"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
_ "google.golang.org/grpc/resolver/passthrough" _ "google.golang.org/grpc/resolver/passthrough"
"google.golang.org/grpc/test/leakcheck" "google.golang.org/grpc/test/leakcheck"
"google.golang.org/grpc/testdata" "google.golang.org/grpc/testdata"
@ -329,6 +331,41 @@ func TestNonblockingDialWithEmptyBalancer(t *testing.T) {
} }
} }
func TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) {
defer leakcheck.Check(t)
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure())
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
// SwitchBalancer before NewAddress. There was no balancer created, this
// makes sure we don't call close on nil balancerWrapper.
r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) // This should not panic.
time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn.
}
func TestResolverEmptyUpdateNotPanic(t *testing.T) {
defer leakcheck.Check(t)
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure())
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
// This make sure we don't create addrConn with empty address list.
r.NewAddress([]resolver.Address{}) // This should not panic.
time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn.
}
func TestClientUpdatesParamsAfterGoAway(t *testing.T) { func TestClientUpdatesParamsAfterGoAway(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
lis, err := net.Listen("tcp", "localhost:0") lis, err := net.Listen("tcp", "localhost:0")