Move exponential backoff to DNS resolver from resolver.ClientConn (#4270)
This commit is contained in:
Родитель
41676e61b1
Коммит
1c598a11a4
|
@ -1,90 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/balancer/roundrobin"
|
||||
"google.golang.org/grpc/internal/balancer/stub"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/resolver/manual"
|
||||
)
|
||||
|
||||
// TestBalancerErrorResolverPolling injects balancer errors and verifies
|
||||
// ResolveNow is called on the resolver with the appropriate backoff strategy
|
||||
// being consulted between ResolveNow calls.
|
||||
func (s) TestBalancerErrorResolverPolling(t *testing.T) {
|
||||
// The test balancer will return ErrBadResolverState iff the
|
||||
// ClientConnState contains no addresses.
|
||||
bf := stub.BalancerFuncs{
|
||||
UpdateClientConnState: func(_ *stub.BalancerData, s balancer.ClientConnState) error {
|
||||
if len(s.ResolverState.Addresses) == 0 {
|
||||
return balancer.ErrBadResolverState
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
const balName = "BalancerErrorResolverPolling"
|
||||
stub.Register(balName, bf)
|
||||
|
||||
testResolverErrorPolling(t,
|
||||
func(r *manual.Resolver) {
|
||||
// No addresses so the balancer will fail.
|
||||
r.CC.UpdateState(resolver.State{})
|
||||
}, func(r *manual.Resolver) {
|
||||
// UpdateState will block if ResolveNow is being called (which blocks on
|
||||
// rn), so call it in a goroutine. Include some address so the balancer
|
||||
// will be happy.
|
||||
go r.CC.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "x"}}})
|
||||
},
|
||||
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, balName)))
|
||||
}
|
||||
|
||||
// TestRoundRobinZeroAddressesResolverPolling reports no addresses to the round
|
||||
// robin balancer and verifies ResolveNow is called on the resolver with the
|
||||
// appropriate backoff strategy being consulted between ResolveNow calls.
|
||||
func (s) TestRoundRobinZeroAddressesResolverPolling(t *testing.T) {
|
||||
// We need to start a real server or else the connecting loop will call
|
||||
// ResolveNow after every iteration, even after a valid resolver result is
|
||||
// returned.
|
||||
lis, err := net.Listen("tcp", "localhost:0")
|
||||
if err != nil {
|
||||
t.Fatalf("Error while listening. Err: %v", err)
|
||||
}
|
||||
defer lis.Close()
|
||||
s := NewServer()
|
||||
defer s.Stop()
|
||||
go s.Serve(lis)
|
||||
|
||||
testResolverErrorPolling(t,
|
||||
func(r *manual.Resolver) {
|
||||
// No addresses so the balancer will fail.
|
||||
r.CC.UpdateState(resolver.State{})
|
||||
}, func(r *manual.Resolver) {
|
||||
// UpdateState will block if ResolveNow is being called (which
|
||||
// blocks on rn), so call it in a goroutine. Include a valid
|
||||
// address so the balancer will be happy.
|
||||
go r.CC.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
|
||||
},
|
||||
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, roundrobin.Name)))
|
||||
}
|
|
@ -66,11 +66,7 @@ type dialOptions struct {
|
|||
minConnectTimeout func() time.Duration
|
||||
defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
|
||||
defaultServiceConfigRawJSON *string
|
||||
// This is used by ccResolverWrapper to backoff between successive calls to
|
||||
// resolver.ResolveNow(). The user will have no need to configure this, but
|
||||
// we need to be able to configure this in tests.
|
||||
resolveNowBackoff func(int) time.Duration
|
||||
resolvers []resolver.Builder
|
||||
resolvers []resolver.Builder
|
||||
}
|
||||
|
||||
// DialOption configures how we set up the connection.
|
||||
|
@ -596,7 +592,6 @@ func defaultDialOptions() dialOptions {
|
|||
ReadBufferSize: defaultReadBufSize,
|
||||
UseProxy: true,
|
||||
},
|
||||
resolveNowBackoff: internalbackoff.DefaultExponential.Backoff,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -611,16 +606,6 @@ func withMinConnectDeadline(f func() time.Duration) DialOption {
|
|||
})
|
||||
}
|
||||
|
||||
// withResolveNowBackoff specifies the function that clientconn uses to backoff
|
||||
// between successive calls to resolver.ResolveNow().
|
||||
//
|
||||
// For testing purpose only.
|
||||
func withResolveNowBackoff(f func(int) time.Duration) DialOption {
|
||||
return newFuncDialOption(func(o *dialOptions) {
|
||||
o.resolveNowBackoff = f
|
||||
})
|
||||
}
|
||||
|
||||
// WithResolvers allows a list of resolver implementations to be registered
|
||||
// locally with the ClientConn without needing to be globally registered via
|
||||
// resolver.Register. They will be matched against the scheme used for the
|
||||
|
|
|
@ -34,6 +34,7 @@ import (
|
|||
|
||||
grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal/backoff"
|
||||
"google.golang.org/grpc/internal/envconfig"
|
||||
"google.golang.org/grpc/internal/grpcrand"
|
||||
"google.golang.org/grpc/resolver"
|
||||
|
@ -46,6 +47,9 @@ var EnableSRVLookups = false
|
|||
|
||||
var logger = grpclog.Component("dns")
|
||||
|
||||
// A global to stub out in tests.
|
||||
var newTimer = time.NewTimer
|
||||
|
||||
func init() {
|
||||
resolver.Register(NewBuilder())
|
||||
}
|
||||
|
@ -143,7 +147,6 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
|
|||
|
||||
d.wg.Add(1)
|
||||
go d.watcher()
|
||||
d.ResolveNow(resolver.ResolveNowOptions{})
|
||||
return d, nil
|
||||
}
|
||||
|
||||
|
@ -201,28 +204,38 @@ func (d *dnsResolver) Close() {
|
|||
|
||||
func (d *dnsResolver) watcher() {
|
||||
defer d.wg.Done()
|
||||
backoffIndex := 1
|
||||
for {
|
||||
select {
|
||||
case <-d.ctx.Done():
|
||||
return
|
||||
case <-d.rn:
|
||||
}
|
||||
|
||||
state, err := d.lookup()
|
||||
if err != nil {
|
||||
// Report error to the underlying grpc.ClientConn.
|
||||
d.cc.ReportError(err)
|
||||
} else {
|
||||
d.cc.UpdateState(*state)
|
||||
err = d.cc.UpdateState(*state)
|
||||
}
|
||||
|
||||
// Sleep to prevent excessive re-resolutions. Incoming resolution requests
|
||||
// will be queued in d.rn.
|
||||
t := time.NewTimer(minDNSResRate)
|
||||
var timer *time.Timer
|
||||
if err == nil {
|
||||
// Success resolving, wait for the next ResolveNow. However, also wait 30 seconds at the very least
|
||||
// to prevent constantly re-resolving.
|
||||
backoffIndex = 1
|
||||
timer = time.NewTimer(minDNSResRate)
|
||||
select {
|
||||
case <-d.ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
case <-d.rn:
|
||||
}
|
||||
} else {
|
||||
// Poll on an error found in DNS Resolver or an error received from ClientConn.
|
||||
timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex))
|
||||
backoffIndex++
|
||||
}
|
||||
select {
|
||||
case <-t.C:
|
||||
case <-d.ctx.Done():
|
||||
t.Stop()
|
||||
timer.Stop()
|
||||
return
|
||||
case <-timer.C:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,9 +30,11 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/balancer"
|
||||
grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
|
||||
"google.golang.org/grpc/internal/envconfig"
|
||||
"google.golang.org/grpc/internal/leakcheck"
|
||||
"google.golang.org/grpc/internal/testutils"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
)
|
||||
|
@ -47,7 +49,8 @@ func TestMain(m *testing.M) {
|
|||
}
|
||||
|
||||
const (
|
||||
txtBytesLimit = 255
|
||||
txtBytesLimit = 255
|
||||
defaultTestTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
type testClientConn struct {
|
||||
|
@ -57,13 +60,17 @@ type testClientConn struct {
|
|||
state resolver.State
|
||||
updateStateCalls int
|
||||
errChan chan error
|
||||
updateStateErr error
|
||||
}
|
||||
|
||||
func (t *testClientConn) UpdateState(s resolver.State) {
|
||||
func (t *testClientConn) UpdateState(s resolver.State) error {
|
||||
t.m1.Lock()
|
||||
defer t.m1.Unlock()
|
||||
t.state = s
|
||||
t.updateStateCalls++
|
||||
// This error determines whether DNS Resolver actually decides to exponentially backoff or not.
|
||||
// This can be any error.
|
||||
return t.updateStateErr
|
||||
}
|
||||
|
||||
func (t *testClientConn) getState() (resolver.State, int) {
|
||||
|
@ -669,6 +676,13 @@ func TestResolve(t *testing.T) {
|
|||
|
||||
func testDNSResolver(t *testing.T) {
|
||||
defer leakcheck.Check(t)
|
||||
defer func(nt func(d time.Duration) *time.Timer) {
|
||||
newTimer = nt
|
||||
}(newTimer)
|
||||
newTimer = func(_ time.Duration) *time.Timer {
|
||||
// Will never fire on its own, will protect from triggering exponential backoff.
|
||||
return time.NewTimer(time.Hour)
|
||||
}
|
||||
tests := []struct {
|
||||
target string
|
||||
addrWant []resolver.Address
|
||||
|
@ -736,12 +750,151 @@ func testDNSResolver(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// DNS Resolver immediately starts polling on an error from grpc. This should continue until the ClientConn doesn't
|
||||
// send back an error from updating the DNS Resolver's state.
|
||||
func TestDNSResolverExponentialBackoff(t *testing.T) {
|
||||
defer leakcheck.Check(t)
|
||||
defer func(nt func(d time.Duration) *time.Timer) {
|
||||
newTimer = nt
|
||||
}(newTimer)
|
||||
timerChan := testutils.NewChannel()
|
||||
newTimer = func(d time.Duration) *time.Timer {
|
||||
// Will never fire on its own, allows this test to call timer immediately.
|
||||
t := time.NewTimer(time.Hour)
|
||||
timerChan.Send(t)
|
||||
return t
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
target string
|
||||
addrWant []resolver.Address
|
||||
scWant string
|
||||
}{
|
||||
{
|
||||
"happy case default port",
|
||||
"foo.bar.com",
|
||||
[]resolver.Address{{Addr: "1.2.3.4" + colonDefaultPort}, {Addr: "5.6.7.8" + colonDefaultPort}},
|
||||
generateSC("foo.bar.com"),
|
||||
},
|
||||
{
|
||||
"happy case specified port",
|
||||
"foo.bar.com:1234",
|
||||
[]resolver.Address{{Addr: "1.2.3.4:1234"}, {Addr: "5.6.7.8:1234"}},
|
||||
generateSC("foo.bar.com"),
|
||||
},
|
||||
{
|
||||
"happy case another default port",
|
||||
"srv.ipv4.single.fake",
|
||||
[]resolver.Address{{Addr: "2.4.6.8" + colonDefaultPort}},
|
||||
generateSC("srv.ipv4.single.fake"),
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
b := NewBuilder()
|
||||
cc := &testClientConn{target: test.target}
|
||||
// Cause ClientConn to return an error.
|
||||
cc.updateStateErr = balancer.ErrBadResolverState
|
||||
r, err := b.Build(resolver.Target{Endpoint: test.target}, cc, resolver.BuildOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Error building resolver for target %v: %v", test.target, err)
|
||||
}
|
||||
var state resolver.State
|
||||
var cnt int
|
||||
for i := 0; i < 2000; i++ {
|
||||
state, cnt = cc.getState()
|
||||
if cnt > 0 {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
if cnt == 0 {
|
||||
t.Fatalf("UpdateState not called after 2s; aborting")
|
||||
}
|
||||
if !reflect.DeepEqual(test.addrWant, state.Addresses) {
|
||||
t.Errorf("Resolved addresses of target: %q = %+v, want %+v", test.target, state.Addresses, test.addrWant)
|
||||
}
|
||||
sc := scFromState(state)
|
||||
if test.scWant != sc {
|
||||
t.Errorf("Resolved service config of target: %q = %+v, want %+v", test.target, sc, test.scWant)
|
||||
}
|
||||
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer ctxCancel()
|
||||
// Cause timer to go off 10 times, and see if it calls updateState() correctly.
|
||||
for i := 0; i < 10; i++ {
|
||||
timer, err := timerChan.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving timer from mock NewTimer call: %v", err)
|
||||
}
|
||||
timerPointer := timer.(*time.Timer)
|
||||
timerPointer.Reset(0)
|
||||
}
|
||||
// Poll to see if DNS Resolver updated state the correct number of times, which allows time for the DNS Resolver to call
|
||||
// ClientConn update state.
|
||||
deadline := time.Now().Add(defaultTestTimeout)
|
||||
for {
|
||||
cc.m1.Lock()
|
||||
got := cc.updateStateCalls
|
||||
cc.m1.Unlock()
|
||||
if got == 11 {
|
||||
break
|
||||
}
|
||||
|
||||
if time.Now().After(deadline) {
|
||||
t.Fatalf("Exponential backoff is not working as expected - should update state 11 times instead of %d", got)
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
|
||||
// Update resolver.ClientConn to not return an error anymore - this should stop it from backing off.
|
||||
cc.updateStateErr = nil
|
||||
timer, err := timerChan.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving timer from mock NewTimer call: %v", err)
|
||||
}
|
||||
timerPointer := timer.(*time.Timer)
|
||||
timerPointer.Reset(0)
|
||||
// Poll to see if DNS Resolver updated state the correct number of times, which allows time for the DNS Resolver to call
|
||||
// ClientConn update state the final time. The DNS Resolver should then stop polling.
|
||||
deadline = time.Now().Add(defaultTestTimeout)
|
||||
for {
|
||||
cc.m1.Lock()
|
||||
got := cc.updateStateCalls
|
||||
cc.m1.Unlock()
|
||||
if got == 12 {
|
||||
break
|
||||
}
|
||||
|
||||
if time.Now().After(deadline) {
|
||||
t.Fatalf("Exponential backoff is not working as expected - should stop backing off at 12 total UpdateState calls instead of %d", got)
|
||||
}
|
||||
|
||||
_, err := timerChan.ReceiveOrFail()
|
||||
if err {
|
||||
t.Fatalf("Should not poll again after Client Conn stops returning error.")
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
r.Close()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testDNSResolverWithSRV(t *testing.T) {
|
||||
EnableSRVLookups = true
|
||||
defer func() {
|
||||
EnableSRVLookups = false
|
||||
}()
|
||||
defer leakcheck.Check(t)
|
||||
defer func(nt func(d time.Duration) *time.Timer) {
|
||||
newTimer = nt
|
||||
}(newTimer)
|
||||
newTimer = func(_ time.Duration) *time.Timer {
|
||||
// Will never fire on its own, will protect from triggering exponential backoff.
|
||||
return time.NewTimer(time.Hour)
|
||||
}
|
||||
tests := []struct {
|
||||
target string
|
||||
addrWant []resolver.Address
|
||||
|
@ -855,6 +1008,13 @@ func mutateTbl(target string) func() {
|
|||
|
||||
func testDNSResolveNow(t *testing.T) {
|
||||
defer leakcheck.Check(t)
|
||||
defer func(nt func(d time.Duration) *time.Timer) {
|
||||
newTimer = nt
|
||||
}(newTimer)
|
||||
newTimer = func(_ time.Duration) *time.Timer {
|
||||
// Will never fire on its own, will protect from triggering exponential backoff.
|
||||
return time.NewTimer(time.Hour)
|
||||
}
|
||||
tests := []struct {
|
||||
target string
|
||||
addrWant []resolver.Address
|
||||
|
@ -926,6 +1086,13 @@ const colonDefaultPort = ":" + defaultPort
|
|||
|
||||
func testIPResolver(t *testing.T) {
|
||||
defer leakcheck.Check(t)
|
||||
defer func(nt func(d time.Duration) *time.Timer) {
|
||||
newTimer = nt
|
||||
}(newTimer)
|
||||
newTimer = func(_ time.Duration) *time.Timer {
|
||||
// Will never fire on its own, will protect from triggering exponential backoff.
|
||||
return time.NewTimer(time.Hour)
|
||||
}
|
||||
tests := []struct {
|
||||
target string
|
||||
want []resolver.Address
|
||||
|
@ -975,6 +1142,13 @@ func testIPResolver(t *testing.T) {
|
|||
|
||||
func TestResolveFunc(t *testing.T) {
|
||||
defer leakcheck.Check(t)
|
||||
defer func(nt func(d time.Duration) *time.Timer) {
|
||||
newTimer = nt
|
||||
}(newTimer)
|
||||
newTimer = func(d time.Duration) *time.Timer {
|
||||
// Will never fire on its own, will protect from triggering exponential backoff.
|
||||
return time.NewTimer(time.Hour)
|
||||
}
|
||||
tests := []struct {
|
||||
addr string
|
||||
want error
|
||||
|
@ -1013,6 +1187,13 @@ func TestResolveFunc(t *testing.T) {
|
|||
|
||||
func TestDisableServiceConfig(t *testing.T) {
|
||||
defer leakcheck.Check(t)
|
||||
defer func(nt func(d time.Duration) *time.Timer) {
|
||||
newTimer = nt
|
||||
}(newTimer)
|
||||
newTimer = func(d time.Duration) *time.Timer {
|
||||
// Will never fire on its own, will protect from triggering exponential backoff.
|
||||
return time.NewTimer(time.Hour)
|
||||
}
|
||||
tests := []struct {
|
||||
target string
|
||||
scWant string
|
||||
|
@ -1059,6 +1240,13 @@ func TestDisableServiceConfig(t *testing.T) {
|
|||
|
||||
func TestTXTError(t *testing.T) {
|
||||
defer leakcheck.Check(t)
|
||||
defer func(nt func(d time.Duration) *time.Timer) {
|
||||
newTimer = nt
|
||||
}(newTimer)
|
||||
newTimer = func(d time.Duration) *time.Timer {
|
||||
// Will never fire on its own, will protect from triggering exponential backoff.
|
||||
return time.NewTimer(time.Hour)
|
||||
}
|
||||
defer func(v bool) { envconfig.TXTErrIgnore = v }(envconfig.TXTErrIgnore)
|
||||
for _, ignore := range []bool{false, true} {
|
||||
envconfig.TXTErrIgnore = ignore
|
||||
|
@ -1090,6 +1278,13 @@ func TestTXTError(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestDNSResolverRetry(t *testing.T) {
|
||||
defer func(nt func(d time.Duration) *time.Timer) {
|
||||
newTimer = nt
|
||||
}(newTimer)
|
||||
newTimer = func(d time.Duration) *time.Timer {
|
||||
// Will never fire on its own, will protect from triggering exponential backoff.
|
||||
return time.NewTimer(time.Hour)
|
||||
}
|
||||
b := NewBuilder()
|
||||
target := "ipv4.single.fake"
|
||||
cc := &testClientConn{target: target}
|
||||
|
@ -1144,6 +1339,13 @@ func TestDNSResolverRetry(t *testing.T) {
|
|||
|
||||
func TestCustomAuthority(t *testing.T) {
|
||||
defer leakcheck.Check(t)
|
||||
defer func(nt func(d time.Duration) *time.Timer) {
|
||||
newTimer = nt
|
||||
}(newTimer)
|
||||
newTimer = func(d time.Duration) *time.Timer {
|
||||
// Will never fire on its own, will protect from triggering exponential backoff.
|
||||
return time.NewTimer(time.Hour)
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
authority string
|
||||
|
@ -1251,6 +1453,13 @@ func TestCustomAuthority(t *testing.T) {
|
|||
// requests are made.
|
||||
func TestRateLimitedResolve(t *testing.T) {
|
||||
defer leakcheck.Check(t)
|
||||
defer func(nt func(d time.Duration) *time.Timer) {
|
||||
newTimer = nt
|
||||
}(newTimer)
|
||||
newTimer = func(d time.Duration) *time.Timer {
|
||||
// Will never fire on its own, will protect from triggering exponential backoff.
|
||||
return time.NewTimer(time.Hour)
|
||||
}
|
||||
|
||||
const dnsResRate = 10 * time.Millisecond
|
||||
dc := replaceDNSResRate(dnsResRate)
|
||||
|
@ -1347,21 +1556,66 @@ func TestRateLimitedResolve(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// DNS Resolver immediately starts polling on an error. This will cause the re-resolution to return another error.
|
||||
// Thus, test that it constantly sends errors to the grpc.ClientConn.
|
||||
func TestReportError(t *testing.T) {
|
||||
const target = "notfoundaddress"
|
||||
defer func(nt func(d time.Duration) *time.Timer) {
|
||||
newTimer = nt
|
||||
}(newTimer)
|
||||
timerChan := testutils.NewChannel()
|
||||
newTimer = func(d time.Duration) *time.Timer {
|
||||
// Will never fire on its own, allows this test to call timer immediately.
|
||||
t := time.NewTimer(time.Hour)
|
||||
timerChan.Send(t)
|
||||
return t
|
||||
}
|
||||
cc := &testClientConn{target: target, errChan: make(chan error)}
|
||||
totalTimesCalledError := 0
|
||||
b := NewBuilder()
|
||||
r, err := b.Build(resolver.Target{Endpoint: target}, cc, resolver.BuildOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("%v\n", err)
|
||||
t.Fatalf("Error building resolver for target %v: %v", target, err)
|
||||
}
|
||||
// Should receive first error.
|
||||
err = <-cc.errChan
|
||||
if !strings.Contains(err.Error(), "hostLookup error") {
|
||||
t.Fatalf(`ReportError(err=%v) called; want err contains "hostLookupError"`, err)
|
||||
}
|
||||
totalTimesCalledError++
|
||||
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer ctxCancel()
|
||||
timer, err := timerChan.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving timer from mock NewTimer call: %v", err)
|
||||
}
|
||||
timerPointer := timer.(*time.Timer)
|
||||
timerPointer.Reset(0)
|
||||
defer r.Close()
|
||||
select {
|
||||
case err := <-cc.errChan:
|
||||
|
||||
// Cause timer to go off 10 times, and see if it matches DNS Resolver updating Error.
|
||||
for i := 0; i < 10; i++ {
|
||||
// Should call ReportError().
|
||||
err = <-cc.errChan
|
||||
if !strings.Contains(err.Error(), "hostLookup error") {
|
||||
t.Fatalf(`ReportError(err=%v) called; want err contains "hostLookupError"`, err)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("did not receive error after 1s")
|
||||
totalTimesCalledError++
|
||||
timer, err := timerChan.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving timer from mock NewTimer call: %v", err)
|
||||
}
|
||||
timerPointer := timer.(*time.Timer)
|
||||
timerPointer.Reset(0)
|
||||
}
|
||||
|
||||
if totalTimesCalledError != 11 {
|
||||
t.Errorf("ReportError() not called 11 times, instead called %d times.", totalTimesCalledError)
|
||||
}
|
||||
// Clean up final watcher iteration.
|
||||
<-cc.errChan
|
||||
_, err = timerChan.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving timer from mock NewTimer call: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -181,7 +181,7 @@ type State struct {
|
|||
// gRPC to add new methods to this interface.
|
||||
type ClientConn interface {
|
||||
// UpdateState updates the state of the ClientConn appropriately.
|
||||
UpdateState(State)
|
||||
UpdateState(State) error
|
||||
// ReportError notifies the ClientConn that the Resolver encountered an
|
||||
// error. The ClientConn will notify the load balancer and begin calling
|
||||
// ResolveNow on the Resolver with exponential backoff.
|
||||
|
|
|
@ -22,7 +22,6 @@ import (
|
|||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
@ -40,9 +39,6 @@ type ccResolverWrapper struct {
|
|||
resolver resolver.Resolver
|
||||
done *grpcsync.Event
|
||||
curState resolver.State
|
||||
|
||||
pollingMu sync.Mutex
|
||||
polling chan struct{}
|
||||
}
|
||||
|
||||
// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
|
||||
|
@ -93,59 +89,19 @@ func (ccr *ccResolverWrapper) close() {
|
|||
ccr.resolverMu.Unlock()
|
||||
}
|
||||
|
||||
// poll begins or ends asynchronous polling of the resolver based on whether
|
||||
// err is ErrBadResolverState.
|
||||
func (ccr *ccResolverWrapper) poll(err error) {
|
||||
ccr.pollingMu.Lock()
|
||||
defer ccr.pollingMu.Unlock()
|
||||
if err != balancer.ErrBadResolverState {
|
||||
// stop polling
|
||||
if ccr.polling != nil {
|
||||
close(ccr.polling)
|
||||
ccr.polling = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
if ccr.polling != nil {
|
||||
// already polling
|
||||
return
|
||||
}
|
||||
p := make(chan struct{})
|
||||
ccr.polling = p
|
||||
go func() {
|
||||
for i := 0; ; i++ {
|
||||
ccr.resolveNow(resolver.ResolveNowOptions{})
|
||||
t := time.NewTimer(ccr.cc.dopts.resolveNowBackoff(i))
|
||||
select {
|
||||
case <-p:
|
||||
t.Stop()
|
||||
return
|
||||
case <-ccr.done.Done():
|
||||
// Resolver has been closed.
|
||||
t.Stop()
|
||||
return
|
||||
case <-t.C:
|
||||
select {
|
||||
case <-p:
|
||||
return
|
||||
default:
|
||||
}
|
||||
// Timer expired; re-resolve.
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
|
||||
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
|
||||
if ccr.done.HasFired() {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)
|
||||
if channelz.IsOn() {
|
||||
ccr.addChannelzTraceEvent(s)
|
||||
}
|
||||
ccr.curState = s
|
||||
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
|
||||
if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
|
||||
return balancer.ErrBadResolverState
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ccr *ccResolverWrapper) ReportError(err error) {
|
||||
|
@ -153,7 +109,7 @@ func (ccr *ccResolverWrapper) ReportError(err error) {
|
|||
return
|
||||
}
|
||||
channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
|
||||
ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err))
|
||||
ccr.cc.updateResolverState(resolver.State{}, err)
|
||||
}
|
||||
|
||||
// NewAddress is called by the resolver implementation to send addresses to gRPC.
|
||||
|
@ -166,7 +122,7 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
|
|||
ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
|
||||
}
|
||||
ccr.curState.Addresses = addrs
|
||||
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
|
||||
ccr.cc.updateResolverState(ccr.curState, nil)
|
||||
}
|
||||
|
||||
// NewServiceConfig is called by the resolver implementation to send service
|
||||
|
@ -183,14 +139,13 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
|
|||
scpr := parseServiceConfig(sc)
|
||||
if scpr.Err != nil {
|
||||
channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
|
||||
ccr.poll(balancer.ErrBadResolverState)
|
||||
return
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
|
||||
}
|
||||
ccr.curState.ServiceConfig = scpr
|
||||
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
|
||||
ccr.cc.updateResolverState(ccr.curState, nil)
|
||||
}
|
||||
|
||||
func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
|
||||
|
|
|
@ -67,62 +67,6 @@ func (s) TestDialParseTargetUnknownScheme(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testResolverErrorPolling(t *testing.T, badUpdate func(*manual.Resolver), goodUpdate func(*manual.Resolver), dopts ...DialOption) {
|
||||
boIter := make(chan int)
|
||||
resolverBackoff := func(v int) time.Duration {
|
||||
boIter <- v
|
||||
return 0
|
||||
}
|
||||
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
rn := make(chan struct{})
|
||||
defer func() { close(rn) }()
|
||||
r.ResolveNowCallback = func(resolver.ResolveNowOptions) { rn <- struct{}{} }
|
||||
|
||||
defaultDialOptions := []DialOption{
|
||||
WithInsecure(),
|
||||
WithResolvers(r),
|
||||
withResolveNowBackoff(resolverBackoff),
|
||||
}
|
||||
cc, err := Dial(r.Scheme()+":///test.server", append(defaultDialOptions, dopts...)...)
|
||||
if err != nil {
|
||||
t.Fatalf("Dial(_, _) = _, %v; want _, nil", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
badUpdate(r)
|
||||
|
||||
panicAfter := time.AfterFunc(5*time.Second, func() { panic("timed out polling resolver") })
|
||||
defer panicAfter.Stop()
|
||||
|
||||
// Ensure ResolveNow is called, then Backoff with the right parameter, several times
|
||||
for i := 0; i < 7; i++ {
|
||||
<-rn
|
||||
if v := <-boIter; v != i {
|
||||
t.Errorf("Backoff call %v uses value %v", i, v)
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateState will block if ResolveNow is being called (which blocks on
|
||||
// rn), so call it in a goroutine.
|
||||
goodUpdate(r)
|
||||
|
||||
// Wait awhile to ensure ResolveNow and Backoff stop being called when the
|
||||
// state is OK (i.e. polling was cancelled).
|
||||
for {
|
||||
t := time.NewTimer(50 * time.Millisecond)
|
||||
select {
|
||||
case <-rn:
|
||||
// ClientConn is still calling ResolveNow
|
||||
<-boIter
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
continue
|
||||
case <-t.C:
|
||||
// ClientConn stopped calling ResolveNow; success
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
const happyBalancerName = "happy balancer"
|
||||
|
||||
func init() {
|
||||
|
@ -136,35 +80,6 @@ func init() {
|
|||
stub.Register(happyBalancerName, bf)
|
||||
}
|
||||
|
||||
// TestResolverErrorPolling injects resolver errors and verifies ResolveNow is
|
||||
// called with the appropriate backoff strategy being consulted between
|
||||
// ResolveNow calls.
|
||||
func (s) TestResolverErrorPolling(t *testing.T) {
|
||||
testResolverErrorPolling(t, func(r *manual.Resolver) {
|
||||
r.CC.ReportError(errors.New("res err"))
|
||||
}, func(r *manual.Resolver) {
|
||||
// UpdateState will block if ResolveNow is being called (which blocks on
|
||||
// rn), so call it in a goroutine.
|
||||
go r.CC.UpdateState(resolver.State{})
|
||||
},
|
||||
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, happyBalancerName)))
|
||||
}
|
||||
|
||||
// TestServiceConfigErrorPolling injects a service config error and verifies
|
||||
// ResolveNow is called with the appropriate backoff strategy being consulted
|
||||
// between ResolveNow calls.
|
||||
func (s) TestServiceConfigErrorPolling(t *testing.T) {
|
||||
testResolverErrorPolling(t, func(r *manual.Resolver) {
|
||||
badsc := r.CC.ParseServiceConfig("bad config")
|
||||
r.UpdateState(resolver.State{ServiceConfig: badsc})
|
||||
}, func(r *manual.Resolver) {
|
||||
// UpdateState will block if ResolveNow is being called (which blocks on
|
||||
// rn), so call it in a goroutine.
|
||||
go r.CC.UpdateState(resolver.State{})
|
||||
},
|
||||
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, happyBalancerName)))
|
||||
}
|
||||
|
||||
// TestResolverErrorInBuild makes the resolver.Builder call into the ClientConn
|
||||
// during the Build call. We use two separate mutexes in the code which make
|
||||
// sure there is no data race in this code path, and also that there is no
|
||||
|
|
|
@ -37,7 +37,6 @@ import (
|
|||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/internal/balancer/stub"
|
||||
"google.golang.org/grpc/internal/balancerload"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
imetadata "google.golang.org/grpc/internal/metadata"
|
||||
"google.golang.org/grpc/internal/stubserver"
|
||||
|
@ -698,10 +697,7 @@ func (s) TestEmptyAddrs(t *testing.T) {
|
|||
|
||||
// Initialize pickfirst client
|
||||
pfr := manual.NewBuilderWithScheme("whatever")
|
||||
pfrnCalled := grpcsync.NewEvent()
|
||||
pfr.ResolveNowCallback = func(resolver.ResolveNowOptions) {
|
||||
pfrnCalled.Fire()
|
||||
}
|
||||
|
||||
pfr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
|
||||
|
||||
pfcc, err := grpc.DialContext(ctx, pfr.Scheme()+":///", grpc.WithInsecure(), grpc.WithResolvers(pfr))
|
||||
|
@ -718,16 +714,10 @@ func (s) TestEmptyAddrs(t *testing.T) {
|
|||
|
||||
// Remove all addresses.
|
||||
pfr.UpdateState(resolver.State{})
|
||||
// Wait for a ResolveNow call on the pick first client's resolver.
|
||||
<-pfrnCalled.Done()
|
||||
|
||||
// Initialize roundrobin client
|
||||
rrr := manual.NewBuilderWithScheme("whatever")
|
||||
|
||||
rrrnCalled := grpcsync.NewEvent()
|
||||
rrr.ResolveNowCallback = func(resolver.ResolveNowOptions) {
|
||||
rrrnCalled.Fire()
|
||||
}
|
||||
rrr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
|
||||
|
||||
rrcc, err := grpc.DialContext(ctx, rrr.Scheme()+":///", grpc.WithInsecure(), grpc.WithResolvers(rrr),
|
||||
|
@ -745,8 +735,6 @@ func (s) TestEmptyAddrs(t *testing.T) {
|
|||
|
||||
// Remove all addresses.
|
||||
rrr.UpdateState(resolver.State{})
|
||||
// Wait for a ResolveNow call on the round robin client's resolver.
|
||||
<-rrrnCalled.Done()
|
||||
|
||||
// Confirm several new RPCs succeed on pick first.
|
||||
for i := 0; i < 10; i++ {
|
||||
|
|
|
@ -88,8 +88,9 @@ type testClientConn struct {
|
|||
errorCh *testutils.Channel
|
||||
}
|
||||
|
||||
func (t *testClientConn) UpdateState(s resolver.State) {
|
||||
func (t *testClientConn) UpdateState(s resolver.State) error {
|
||||
t.stateCh.Send(s)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *testClientConn) ReportError(err error) {
|
||||
|
|
Загрузка…
Ссылка в новой задаче