Add dial option to set balancer (#1697)

WithBalancerName dial option specifies the name of the balancer to be used by the ClientConn. Service config updates can NOT override the balancer option.
This commit is contained in:
Menghan Li 2017-12-18 15:35:42 -08:00 коммит произвёл GitHub
Родитель 6610f9a340
Коммит e6549e636d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 110 добавлений и 45 удалений

Просмотреть файл

@ -31,9 +31,12 @@ import (
"google.golang.org/grpc/resolver"
)
// Name is the name of round_robin balancer.
const Name = "round_robin"
// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder("round_robin", &rrPickerBuilder{})
return base.NewBalancerBuilder(Name, &rrPickerBuilder{})
}
func init() {

Просмотреть файл

@ -27,7 +27,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/peer"
@ -38,8 +38,6 @@ import (
"google.golang.org/grpc/test/leakcheck"
)
var rr = balancer.Get("round_robin")
type testServer struct {
testpb.TestServiceServer
}
@ -103,7 +101,7 @@ func TestOneBackend(t *testing.T) {
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -135,7 +133,7 @@ func TestBackendsRoundRobin(t *testing.T) {
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -194,7 +192,7 @@ func TestAddressesRemoved(t *testing.T) {
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -236,7 +234,7 @@ func TestCloseWithPendingRPC(t *testing.T) {
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -270,7 +268,7 @@ func TestNewAddressWhileBlocking(t *testing.T) {
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -319,7 +317,7 @@ func TestOneServerDown(t *testing.T) {
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr), grpc.WithWaitForHandshake())
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -417,7 +415,7 @@ func TestAllServersDown(t *testing.T) {
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr), grpc.WithWaitForHandshake())
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
if err != nil {
t.Fatalf("failed to dial: %v", err)
}

Просмотреть файл

@ -25,6 +25,7 @@ import (
"time"
"golang.org/x/net/context"
"google.golang.org/grpc/balancer/roundrobin"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
@ -132,6 +133,34 @@ func TestSwitchBalancer(t *testing.T) {
}
}
// Test that balancer specified by dial option will not be overridden.
func TestBalancerDialOption(t *testing.T) {
defer leakcheck.Check(t)
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
numServers := 2
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
// The init balancer is roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
// Switch to pickfirst.
cc.handleServiceConfig(`{"loadBalancingPolicy": "pick_first"}`)
// Balancer is still roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
}
// First addr update contains grpclb.
func TestSwitchBalancerGRPCLBFirst(t *testing.T) {
defer leakcheck.Check(t)
@ -182,7 +211,7 @@ func TestSwitchBalancerGRPCLBFirst(t *testing.T) {
r.NewAddress([]resolver.Address{{Addr: "backend"}})
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == pickfirstName
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break
@ -210,7 +239,7 @@ func TestSwitchBalancerGRPCLBSecond(t *testing.T) {
var isPickFirst bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == pickfirstName
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break
@ -258,7 +287,7 @@ func TestSwitchBalancerGRPCLBSecond(t *testing.T) {
r.NewAddress([]resolver.Address{{Addr: "backend"}})
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == pickfirstName
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break
@ -352,7 +381,7 @@ func TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
var isPickFirst bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == pickfirstName
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break

Просмотреть файл

@ -95,7 +95,8 @@ type dialOptions struct {
scChan <-chan ServiceConfig
copts transport.ConnectOptions
callOptions []CallOption
// This is to support v1 balancer.
// This is used by v1 balancer dial option WithBalancer to support v1
// balancer, and also by WithBalancerName dial option.
balancerBuilder balancer.Builder
// This is to support grpclb.
resolverBuilder resolver.Builder
@ -200,7 +201,8 @@ func WithDecompressor(dc Decompressor) DialOption {
// WithBalancer returns a DialOption which sets a load balancer with the v1 API.
// Name resolver will be ignored if this DialOption is specified.
// Deprecated: use the new balancer APIs in balancer package instead.
//
// Deprecated: use the new balancer APIs in balancer package and WithBalancerName.
func WithBalancer(b Balancer) DialOption {
return func(o *dialOptions) {
o.balancerBuilder = &balancerWrapperBuilder{
@ -209,12 +211,21 @@ func WithBalancer(b Balancer) DialOption {
}
}
// WithBalancerBuilder is for testing only. Users using custom balancers should
// register their balancer and use service config to choose the balancer to use.
func WithBalancerBuilder(b balancer.Builder) DialOption {
// TODO(bar) remove this when switching balancer is done.
// WithBalancerName sets the balancer that the ClientConn will be initialized
// with. Balancer registered with balancerName will be used. This function
// panics if no balancer was registered by balancerName.
//
// The balancer cannot be overridden by balancer option specified by service
// config.
//
// This is an EXPERIMENTAL API.
func WithBalancerName(balancerName string) DialOption {
builder := balancer.Get(balancerName)
if builder == nil {
panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
}
return func(o *dialOptions) {
o.balancerBuilder = b
o.balancerBuilder = builder
}
}
@ -670,9 +681,9 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
cc.curAddresses = addrs
if cc.dopts.balancerBuilder != nil && cc.balancerWrapper == nil {
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
} else {
if cc.dopts.balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var isGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
@ -697,11 +708,16 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
// - the first time handling non-grpclb addresses
// (curBalancerName="grpclb", preBalancerName="")
if newBalancerName == "" {
newBalancerName = pickfirstName
newBalancerName = PickFirstBalancerName
}
}
cc.switchBalancer(newBalancerName)
} else if cc.balancerWrapper == nil {
// Balancer dial option was set, and this is the first time handling
// resolved addresses. Build a balancer with dopts.balancerBuilder.
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
}
cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
}
@ -724,7 +740,7 @@ func (cc *ClientConn) switchBalancer(name string) {
grpclog.Infof("ClientConn switching balancer to %q", name)
if cc.dopts.balancerBuilder != nil {
grpclog.Infoln("ignoring balancer switching: WithBalancer DialOption used instead")
grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
return
}
// TODO(bar switching) change this to two steps: drain and close.

Просмотреть файл

@ -83,6 +83,10 @@ func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
return m, nil
}
func init() {
balancer.Register(newLBBuilder())
}
// newLBBuilder creates a builder for grpclb.
func newLBBuilder() balancer.Builder {
return NewLBBuilderWithFallbackTimeout(defaultFallbackTimeout)

Просмотреть файл

@ -36,6 +36,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
lbmpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
@ -573,6 +574,24 @@ func TestBalancerDisconnects(t *testing.T) {
t.Fatalf("No RPC sent to second backend after 1 second")
}
type customGRPCLBBuilder struct {
balancer.Builder
name string
}
func (b *customGRPCLBBuilder) Name() string {
return b.name
}
const grpclbCustomFallbackName = "grpclb_with_custom_fallback_timeout"
func init() {
balancer.Register(&customGRPCLBBuilder{
Builder: grpc.NewLBBuilderWithFallbackTimeout(100 * time.Millisecond),
name: grpclbCustomFallbackName,
})
}
func TestFallback(t *testing.T) {
defer leakcheck.Check(t)
@ -611,7 +630,7 @@ func TestFallback(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithBalancerBuilder(grpc.NewLBBuilderWithFallbackTimeout(100*time.Millisecond)),
grpc.WithBalancerName(grpclbCustomFallbackName),
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)

Просмотреть файл

@ -241,7 +241,7 @@ func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
dopts = append(dopts, withContextDialer(lb.opt.Dialer))
}
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, WithBalancerBuilder(newPickfirstBuilder()))
dopts = append(dopts, WithBalancerName(PickFirstBalancerName))
dopts = append(dopts, withResolverBuilder(lb.manualResolver))
// Dial using manualResolver.Scheme, which is a random scheme generated
// when init grpclb. The target name is not important.

Просмотреть файл

@ -26,7 +26,8 @@ import (
"google.golang.org/grpc/resolver"
)
const pickfirstName = "pick_first"
// PickFirstBalancerName is the name of the pick_first balancer.
const PickFirstBalancerName = "pick_first"
func newPickfirstBuilder() balancer.Builder {
return &pickfirstBuilder{}
@ -39,7 +40,7 @@ func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions
}
func (*pickfirstBuilder) Name() string {
return pickfirstName
return PickFirstBalancerName
}
type pickfirstBalancer struct {

Просмотреть файл

@ -48,7 +48,7 @@ func TestOneBackendPickfirst(t *testing.T) {
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -82,7 +82,7 @@ func TestBackendsPickfirst(t *testing.T) {
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -116,7 +116,7 @@ func TestNewAddressWhileBlockingPickfirst(t *testing.T) {
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -153,7 +153,7 @@ func TestCloseWithPendingRPCPickfirst(t *testing.T) {
_, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -190,7 +190,7 @@ func TestOneServerDownPickfirst(t *testing.T) {
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}), WithWaitForHandshake())
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -232,7 +232,7 @@ func TestAllServersDownPickfirst(t *testing.T) {
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}), WithWaitForHandshake())
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
@ -276,7 +276,7 @@ func TestAddressesRemovedPickfirst(t *testing.T) {
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}

Просмотреть файл

@ -46,8 +46,7 @@ import (
"golang.org/x/net/http2"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
_ "google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
@ -694,11 +693,7 @@ func (te *test) clientConn() *grpc.ClientConn {
case "v1":
opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil)))
case "round_robin":
rr := balancer.Get("round_robin")
if rr == nil {
te.t.Fatalf("got nil when trying to get roundrobin balancer builder")
}
opts = append(opts, grpc.WithBalancerBuilder(rr))
opts = append(opts, grpc.WithBalancerName(roundrobin.Name))
}
if te.clientInitialWindowSize > 0 {
opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))