eds: handle child balancer state asynchronously (#3418)
Before this change, in EDS balancer, child balancer's state update is handled synchronously, which includes priority handling. This would cause a deadlock if the child policy sends a state update inline when handling addresses (e.g. when roundrobin handles empty address list). This change moves the child balancer state handling into a goroutine, to avoid the problem.
This commit is contained in:
Родитель
6af3d372ce
Коммит
cfc5fec35a
|
@ -29,6 +29,7 @@ import (
|
|||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/balancer/roundrobin"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/internal/buffer"
|
||||
"google.golang.org/grpc/internal/grpclog"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
|
@ -42,8 +43,8 @@ const (
|
|||
)
|
||||
|
||||
var (
|
||||
newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
|
||||
return newEDSBalancerImpl(cc, loadStore, logger)
|
||||
newEDSBalancer = func(cc balancer.ClientConn, enqueueState func(priorityType, balancer.State), loadStore lrs.Store, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
|
||||
return newEDSBalancerImpl(cc, enqueueState, loadStore, logger)
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -57,16 +58,17 @@ type edsBalancerBuilder struct{}
|
|||
func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
x := &edsBalancer{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
cc: cc,
|
||||
buildOpts: opts,
|
||||
grpcUpdate: make(chan interface{}),
|
||||
xdsClientUpdate: make(chan interface{}),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
cc: cc,
|
||||
buildOpts: opts,
|
||||
grpcUpdate: make(chan interface{}),
|
||||
xdsClientUpdate: make(chan interface{}),
|
||||
childPolicyUpdate: buffer.NewUnbounded(),
|
||||
}
|
||||
loadStore := lrs.NewStore()
|
||||
x.logger = grpclog.NewPrefixLogger(loggingPrefix(x))
|
||||
x.edsImpl = newEDSBalancer(x.cc, loadStore, x.logger)
|
||||
x.edsImpl = newEDSBalancer(x.cc, x.enqueueChildBalancerState, loadStore, x.logger)
|
||||
x.client = newXDSClientWrapper(x.handleEDSUpdate, x.loseContact, x.buildOpts, loadStore, x.logger)
|
||||
x.logger.Infof("Created")
|
||||
go x.run()
|
||||
|
@ -89,6 +91,8 @@ func (b *edsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadB
|
|||
// implement to communicate with edsBalancer.
|
||||
//
|
||||
// It's implemented by the real eds balancer and a fake testing eds balancer.
|
||||
//
|
||||
// TODO: none of the methods in this interface needs to be exported.
|
||||
type edsBalancerImplInterface interface {
|
||||
// HandleEDSResponse passes the received EDS message from traffic director to eds balancer.
|
||||
HandleEDSResponse(edsResp *xdsclient.EDSUpdate)
|
||||
|
@ -96,6 +100,8 @@ type edsBalancerImplInterface interface {
|
|||
HandleChildPolicy(name string, config json.RawMessage)
|
||||
// HandleSubConnStateChange handles state change for SubConn.
|
||||
HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State)
|
||||
// updateState handle a balancer state update from the priority.
|
||||
updateState(priority priorityType, s balancer.State)
|
||||
// Close closes the eds balancer.
|
||||
Close()
|
||||
}
|
||||
|
@ -115,8 +121,9 @@ type edsBalancer struct {
|
|||
logger *grpclog.PrefixLogger
|
||||
|
||||
// edsBalancer continuously monitor the channels below, and will handle events from them in sync.
|
||||
grpcUpdate chan interface{}
|
||||
xdsClientUpdate chan interface{}
|
||||
grpcUpdate chan interface{}
|
||||
xdsClientUpdate chan interface{}
|
||||
childPolicyUpdate *buffer.Unbounded
|
||||
|
||||
client *xdsclientWrapper // may change when passed a different service config
|
||||
config *EDSConfig // may change when passed a different service config
|
||||
|
@ -133,6 +140,10 @@ func (x *edsBalancer) run() {
|
|||
x.handleGRPCUpdate(update)
|
||||
case update := <-x.xdsClientUpdate:
|
||||
x.handleXDSClientUpdate(update)
|
||||
case update := <-x.childPolicyUpdate.Get():
|
||||
x.childPolicyUpdate.Load()
|
||||
u := update.(*balancerStateWithPriority)
|
||||
x.edsImpl.updateState(u.priority, u.s)
|
||||
case <-x.ctx.Done():
|
||||
if x.client != nil {
|
||||
x.client.close()
|
||||
|
@ -257,6 +268,18 @@ func (x *edsBalancer) loseContact() {
|
|||
}
|
||||
}
|
||||
|
||||
type balancerStateWithPriority struct {
|
||||
priority priorityType
|
||||
s balancer.State
|
||||
}
|
||||
|
||||
func (x *edsBalancer) enqueueChildBalancerState(p priorityType, s balancer.State) {
|
||||
x.childPolicyUpdate.Put(&balancerStateWithPriority{
|
||||
priority: p,
|
||||
s: s,
|
||||
})
|
||||
}
|
||||
|
||||
func (x *edsBalancer) Close() {
|
||||
x.cancel()
|
||||
x.logger.Infof("Shutdown")
|
||||
|
|
|
@ -60,6 +60,8 @@ type edsBalancerImpl struct {
|
|||
cc balancer.ClientConn
|
||||
logger *grpclog.PrefixLogger
|
||||
|
||||
enqueueChildBalancerStateUpdate func(priorityType, balancer.State)
|
||||
|
||||
subBalancerBuilder balancer.Builder
|
||||
loadStore lrs.Store
|
||||
priorityToLocalities map[priorityType]*balancerGroupWithConfig
|
||||
|
@ -90,12 +92,14 @@ type edsBalancerImpl struct {
|
|||
}
|
||||
|
||||
// newEDSBalancerImpl create a new edsBalancerImpl.
|
||||
func newEDSBalancerImpl(cc balancer.ClientConn, loadStore lrs.Store, logger *grpclog.PrefixLogger) *edsBalancerImpl {
|
||||
func newEDSBalancerImpl(cc balancer.ClientConn, enqueueState func(priorityType, balancer.State), loadStore lrs.Store, logger *grpclog.PrefixLogger) *edsBalancerImpl {
|
||||
edsImpl := &edsBalancerImpl{
|
||||
cc: cc,
|
||||
logger: logger,
|
||||
subBalancerBuilder: balancer.Get(roundrobin.Name),
|
||||
|
||||
enqueueChildBalancerStateUpdate: enqueueState,
|
||||
|
||||
priorityToLocalities: make(map[priorityType]*balancerGroupWithConfig),
|
||||
priorityToState: make(map[priorityType]*balancer.State),
|
||||
subConnToPriority: make(map[balancer.SubConn]priorityType),
|
||||
|
@ -402,7 +406,7 @@ func (ebwcc *edsBalancerWrapperCC) UpdateBalancerState(state connectivity.State,
|
|||
}
|
||||
|
||||
func (ebwcc *edsBalancerWrapperCC) UpdateState(state balancer.State) {
|
||||
ebwcc.parent.updateState(ebwcc.priority, state)
|
||||
ebwcc.parent.enqueueChildBalancerStateUpdate(ebwcc.priority, state)
|
||||
}
|
||||
|
||||
func (edsImpl *edsBalancerImpl) newSubConn(priority priorityType, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||
|
|
|
@ -98,8 +98,10 @@ func (edsImpl *edsBalancerImpl) startPriority(priority priorityType) {
|
|||
p := edsImpl.priorityToLocalities[priority]
|
||||
// NOTE: this will eventually send addresses to sub-balancers. If the
|
||||
// sub-balancer tries to update picker, it will result in a deadlock on
|
||||
// priorityMu. But it's not an expected behavior for the balancer to
|
||||
// update picker when handling addresses.
|
||||
// priorityMu in the update is handled synchronously. The deadlock is
|
||||
// currently avoided by handling balancer update in a goroutine (the run
|
||||
// goroutine in the parent eds balancer). When priority balancer is split
|
||||
// into its own, this asynchronous state handling needs to be copied.
|
||||
p.bg.start()
|
||||
// startPriority can be called when
|
||||
// 1. first EDS resp, start p0
|
||||
|
|
|
@ -33,7 +33,8 @@ import (
|
|||
// Init 0 and 1; 0 is up, use 0; add 2, use 0; remove 2, use 0.
|
||||
func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||
|
||||
// Two localities, with priorities [0, 1], each with one backend.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
|
@ -99,7 +100,8 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
|
|||
// down, use 2; remove 2, use 1.
|
||||
func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||
|
||||
// Two localities, with priorities [0, 1], each with one backend.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
|
@ -206,7 +208,8 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
|
|||
// Init 0 and 1; 0 and 1 both down; add 2, use 2.
|
||||
func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||
|
||||
// Two localities, with different priorities, each with one backend.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
|
@ -270,7 +273,8 @@ func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) {
|
|||
defer time.Sleep(10 * time.Millisecond)
|
||||
|
||||
cc := newTestClientConn(t)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||
|
||||
// Two localities, with priorities [0,1,2], each with one backend.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
|
@ -351,7 +355,8 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {
|
|||
}()()
|
||||
|
||||
cc := newTestClientConn(t)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||
|
||||
// Two localities, with different priorities, each with one backend.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
|
@ -400,7 +405,8 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {
|
|||
// - add localities to existing p0 and p1
|
||||
func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||
|
||||
// Two localities, with different priorities, each with one backend.
|
||||
clab0 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
|
@ -512,7 +518,8 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
|
|||
}()()
|
||||
|
||||
cc := newTestClientConn(t)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||
|
||||
// Two localities, with different priorities, each with one backend.
|
||||
clab0 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
|
|
|
@ -51,7 +51,8 @@ func init() {
|
|||
// - change drop rate
|
||||
func (s) TestEDS_OneLocality(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||
|
||||
// One locality with one backend.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
|
@ -157,7 +158,8 @@ func (s) TestEDS_OneLocality(t *testing.T) {
|
|||
// - update locality weight
|
||||
func (s) TestEDS_TwoLocalities(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||
|
||||
// Two localities, each with one backend.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
|
@ -287,7 +289,8 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
|
|||
// healthy ones are used.
|
||||
func (s) TestEDS_EndpointsHealth(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||
|
||||
// Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
|
@ -358,7 +361,7 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) {
|
|||
}
|
||||
|
||||
func (s) TestClose(t *testing.T) {
|
||||
edsb := newEDSBalancerImpl(nil, nil, nil)
|
||||
edsb := newEDSBalancerImpl(nil, nil, nil, nil)
|
||||
// This is what could happen when switching between fallback and eds. This
|
||||
// make sure it doesn't panic.
|
||||
edsb.Close()
|
||||
|
@ -415,7 +418,8 @@ func (tcp *testConstPicker) Pick(info balancer.PickInfo) (balancer.PickResult, e
|
|||
// eds response.
|
||||
func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||
|
||||
t.Logf("update sub-balancer to test-const-balancer")
|
||||
edsb.HandleChildPolicy("test-const-balancer", nil)
|
||||
|
@ -506,6 +510,68 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
balancer.Register(&testInlineUpdateBalancerBuilder{})
|
||||
}
|
||||
|
||||
// A test balancer that updates balancer.State inline when handling ClientConn
|
||||
// state.
|
||||
type testInlineUpdateBalancerBuilder struct{}
|
||||
|
||||
func (*testInlineUpdateBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
|
||||
return &testInlineUpdateBalancer{cc: cc}
|
||||
}
|
||||
|
||||
func (*testInlineUpdateBalancerBuilder) Name() string {
|
||||
return "test-inline-update-balancer"
|
||||
}
|
||||
|
||||
type testInlineUpdateBalancer struct {
|
||||
cc balancer.ClientConn
|
||||
}
|
||||
|
||||
func (tb *testInlineUpdateBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
|
||||
}
|
||||
|
||||
var errTestInlineStateUpdate = fmt.Errorf("don't like addresses, empty or not")
|
||||
|
||||
func (tb *testInlineUpdateBalancer) HandleResolvedAddrs(a []resolver.Address, err error) {
|
||||
tb.cc.UpdateState(balancer.State{
|
||||
ConnectivityState: connectivity.Ready,
|
||||
Picker: &testConstPicker{err: errTestInlineStateUpdate},
|
||||
})
|
||||
}
|
||||
|
||||
func (*testInlineUpdateBalancer) Close() {
|
||||
}
|
||||
|
||||
// When the child policy update picker inline in a handleClientUpdate call
|
||||
// (e.g., roundrobin handling empty addresses). There could be deadlock caused
|
||||
// by acquiring a locked mutex.
|
||||
func (s) TestEDS_ChildPolicyUpdatePickerInline(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
||||
edsb.enqueueChildBalancerStateUpdate = func(p priorityType, state balancer.State) {
|
||||
// For this test, euqueue needs to happen asynchronously (like in the
|
||||
// real implementation).
|
||||
go edsb.updateState(p, state)
|
||||
}
|
||||
|
||||
edsb.HandleChildPolicy("test-inline-update-balancer", nil)
|
||||
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
|
||||
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
|
||||
|
||||
p0 := <-cc.newPickerCh
|
||||
for i := 0; i < 5; i++ {
|
||||
_, err := p0.Pick(balancer.PickInfo{})
|
||||
if err != errTestInlineStateUpdate {
|
||||
t.Fatalf("picker.Pick, got err %q, want err %q", err, errTestInlineStateUpdate)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s) TestDropPicker(t *testing.T) {
|
||||
const pickCount = 12
|
||||
var constPicker = &testConstPicker{
|
||||
|
@ -575,7 +641,8 @@ func (s) TestEDS_LoadReport(t *testing.T) {
|
|||
testLoadStore := newTestLoadStore()
|
||||
|
||||
cc := newTestClientConn(t)
|
||||
edsb := newEDSBalancerImpl(cc, testLoadStore, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil, testLoadStore, nil)
|
||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||
|
||||
backendToBalancerID := make(map[balancer.SubConn]internal.Locality)
|
||||
|
||||
|
|
|
@ -101,8 +101,9 @@ func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage)
|
|||
f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config})
|
||||
}
|
||||
|
||||
func (f *fakeEDSBalancer) Close() {}
|
||||
func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {}
|
||||
func (f *fakeEDSBalancer) Close() {}
|
||||
func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {}
|
||||
func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {}
|
||||
|
||||
func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error {
|
||||
val, err := f.childPolicy.Receive()
|
||||
|
@ -185,7 +186,7 @@ func waitForNewEDSLB(t *testing.T, ch *testutils.Channel) *fakeEDSBalancer {
|
|||
// cleanup.
|
||||
func setup(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() {
|
||||
origNewEDSBalancer := newEDSBalancer
|
||||
newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
|
||||
newEDSBalancer = func(cc balancer.ClientConn, enqueue func(priorityType, balancer.State), loadStore lrs.Store, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
|
||||
edsLB := newFakeEDSBalancer(cc, loadStore)
|
||||
defer func() { edsLBCh.Send(edsLB) }()
|
||||
return edsLB
|
||||
|
|
Загрузка…
Ссылка в новой задаче