From 5da252b6a6c870fee942a3b18a1bc46c684f9adc Mon Sep 17 00:00:00 2001 From: lyuxuan Date: Thu, 13 Dec 2018 16:44:36 -0800 Subject: [PATCH] health check test: prevent double close of hcEnterChan (#2441) --- clientconn.go | 5 +- dialoptions.go | 18 +- internal/internal.go | 11 +- test/healthcheck_test.go | 548 +++++++++++++++++++-------------------- vet.sh | 1 + 5 files changed, 297 insertions(+), 286 deletions(-) diff --git a/clientconn.go b/clientconn.go index 84b6dbe3..6a35f31d 100644 --- a/clientconn.go +++ b/clientconn.go @@ -36,7 +36,6 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/envconfig" @@ -1220,7 +1219,7 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts // 3. a service config with non-empty healthCheckConfig field is provided, // 4. the current load balancer allows it. if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled { - if internal.HealthCheckFunc != nil { + if ac.cc.dopts.healthCheckFunc != nil { go ac.startHealthCheck(hcCtx, newTr, addr, healthCheckConfig.ServiceName) close(allowedToReset) return nil @@ -1281,7 +1280,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.Client } } - err := internal.HealthCheckFunc(ctx, newStream, reportHealth, serviceName) + err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName) if err != nil { if status.Code(err) == codes.Unimplemented { if channelz.IsOn() { diff --git a/dialoptions.go b/dialoptions.go index ef43bce7..a397f8c8 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -60,6 +60,7 @@ type dialOptions struct { disableServiceConfig bool disableRetry bool disableHealthCheck bool + healthCheckFunc internal.HealthChecker } // DialOption configures how we set up the connection. @@ -338,6 +339,7 @@ func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOp func init() { internal.WithContextDialer = withContextDialer internal.WithResolverBuilder = withResolverBuilder + internal.WithHealthCheckFunc = withHealthCheckFunc } // WithDialer returns a DialOption that specifies a function to use for dialing @@ -468,10 +470,22 @@ func WithDisableHealthCheck() DialOption { o.disableHealthCheck = true }) } + +// withHealthCheckFunc replaces the default health check function with the provided one. It makes +// tests easier to change the health check function. +// +// For testing purpose only. +func withHealthCheckFunc(f internal.HealthChecker) DialOption { + return newFuncDialOption(func(o *dialOptions) { + o.healthCheckFunc = f + }) +} + func defaultDialOptions() dialOptions { return dialOptions{ - disableRetry: !envconfig.Retry, - reqHandshake: envconfig.RequireHandshake, + disableRetry: !envconfig.Retry, + reqHandshake: envconfig.RequireHandshake, + healthCheckFunc: internal.HealthCheckFunc, copts: transport.ConnectOptions{ WriteBufferSize: defaultWriteBufSize, ReadBufferSize: defaultReadBufSize, diff --git a/internal/internal.go b/internal/internal.go index 466f5d24..eaa54d4f 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -23,16 +23,21 @@ package internal import "context" var ( - // WithContextDialer is exported by clientconn.go + // WithContextDialer is exported by dialoptions.go WithContextDialer interface{} // func(context.Context, string) (net.Conn, error) grpc.DialOption - // WithResolverBuilder is exported by clientconn.go + // WithResolverBuilder is exported by dialoptions.go WithResolverBuilder interface{} // func (resolver.Builder) grpc.DialOption + // WithHealthCheckFunc is not exported by dialoptions.go + WithHealthCheckFunc interface{} // func (HealthChecker) DialOption // HealthCheckFunc is used to provide client-side LB channel health checking - HealthCheckFunc func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error + HealthCheckFunc HealthChecker // BalancerUnregister is exported by package balancer to unregister a balancer. BalancerUnregister func(name string) ) +// HealthChecker defines the signature of the client-side LB channel health checking function. +type HealthChecker func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error + const ( // CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode. CredsBundleModeFallback = "fallback" diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index baf535d7..e41b9d12 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -44,14 +44,6 @@ import ( var testHealthCheckFunc = internal.HealthCheckFunc -func replaceHealthCheckFunc(f func(context.Context, func() (interface{}, error), func(bool), string) error) func() { - oldHcFunc := internal.HealthCheckFunc - internal.HealthCheckFunc = f - return func() { - internal.HealthCheckFunc = oldHcFunc - } -} - func newTestHealthServer() *testHealthServer { return newTestHealthServerWithWatchFunc(defaultWatchFunc) } @@ -120,17 +112,69 @@ func (s *testHealthServer) SetServingStatus(service string, status healthpb.Heal s.mu.Unlock() } +func setupHealthCheckWrapper() (hcEnterChan chan struct{}, hcExitChan chan struct{}, wrapper func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error) { + hcEnterChan = make(chan struct{}) + hcExitChan = make(chan struct{}) + wrapper = func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error { + close(hcEnterChan) + defer close(hcExitChan) + return testHealthCheckFunc(ctx, newStream, update, service) + } + return +} + +type svrConfig struct { + specialWatchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error +} + +func setupServer(sc *svrConfig) (s *grpc.Server, lis net.Listener, ts *testHealthServer, deferFunc func(), err error) { + s = grpc.NewServer() + lis, err = net.Listen("tcp", "localhost:0") + if err != nil { + return nil, nil, nil, func() {}, fmt.Errorf("failed to listen due to err %v", err) + } + if sc.specialWatchFunc != nil { + ts = newTestHealthServerWithWatchFunc(sc.specialWatchFunc) + } else { + ts = newTestHealthServer() + } + healthgrpc.RegisterHealthServer(s, ts) + testpb.RegisterTestServiceServer(s, &testServer{}) + go s.Serve(lis) + return s, lis, ts, s.Stop, nil +} + +type clientConfig struct { + balancerName string + testHealthCheckFuncWrapper func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error + extraDialOption []grpc.DialOption +} + +func setupClient(c *clientConfig) (cc *grpc.ClientConn, r *manual.Resolver, deferFunc func(), err error) { + r, rcleanup := manual.GenerateAndRegisterManualResolver() + var opts []grpc.DialOption + opts = append(opts, grpc.WithInsecure(), grpc.WithBalancerName(c.balancerName)) + if c.testHealthCheckFuncWrapper != nil { + opts = append(opts, internal.WithHealthCheckFunc.(func(internal.HealthChecker) grpc.DialOption)(c.testHealthCheckFuncWrapper)) + } + for _, dopt := range c.extraDialOption { + opts = append(opts, dopt) + } + cc, err = grpc.Dial(r.Scheme()+":///test.server", opts...) + if err != nil { + rcleanup() + return nil, nil, nil, fmt.Errorf("dial failed due to err: %v", err) + } + return cc, r, func() { cc.Close(); rcleanup() }, nil +} + func TestHealthCheckWatchStateChange(t *testing.T) { defer leakcheck.Check(t) - s := grpc.NewServer() - lis, err := net.Listen("tcp", "localhost:0") + _, lis, ts, deferFunc, err := setupServer(&svrConfig{}) + defer deferFunc() if err != nil { - t.Fatalf("failed to listen due to err: %v", err) + t.Fatal(err) } - ts := newTestHealthServer() - healthgrpc.RegisterHealthServer(s, ts) - go s.Serve(lis) - defer s.Stop() // The table below shows the expected series of addrConn connectivity transitions when server // updates its health status. As there's only one addrConn corresponds with the ClientConn in this @@ -145,13 +189,12 @@ func TestHealthCheckWatchStateChange(t *testing.T) { //| UNKNOWN | ->TRANSIENT FAILURE | //+------------------------------+-------------------------------------------+ ts.SetServingStatus("foo", healthpb.HealthCheckResponse_NOT_SERVING) - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) + + cc, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"}) if err != nil { - t.Fatalf("dial failed due to err: %v", err) + t.Fatal(err) } - defer cc.Close() + defer deferFunc() r.NewServiceConfig(`{ "healthCheckConfig": { @@ -215,13 +258,11 @@ func TestHealthCheckHealthServerNotRegistered(t *testing.T) { go s.Serve(lis) defer s.Stop() - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) + cc, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"}) if err != nil { - t.Fatalf("dial failed due to err: %v", err) + t.Fatal(err) } - defer cc.Close() + defer deferFunc() r.NewServiceConfig(`{ "healthCheckConfig": { @@ -247,33 +288,25 @@ func TestHealthCheckHealthServerNotRegistered(t *testing.T) { // function should exit. func TestHealthCheckWithGoAway(t *testing.T) { defer leakcheck.Check(t) - s := grpc.NewServer() - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("failed to listen due to err: %v", err) - } - ts := newTestHealthServer() - healthgrpc.RegisterHealthServer(s, ts) - testpb.RegisterTestServiceServer(s, &testServer{}) - go s.Serve(lis) - defer s.Stop() - ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) - hcExitChan := make(chan struct{}) - testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error { - err := testHealthCheckFunc(ctx, newStream, update, service) - close(hcExitChan) - return err - } - replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper) - defer replace() - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) + hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() + + s, lis, ts, deferFunc, err := setupServer(&svrConfig{}) + defer deferFunc() if err != nil { - t.Fatalf("dial failed due to err: %v", err) + t.Fatal(err) } - defer cc.Close() + + ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) + + cc, r, deferFunc, err := setupClient(&clientConfig{ + balancerName: "round_robin", + testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, + }) + if err != nil { + t.Fatal(err) + } + defer deferFunc() tc := testpb.NewTestServiceClient(cc) r.NewServiceConfig(`{ @@ -328,6 +361,11 @@ func TestHealthCheckWithGoAway(t *testing.T) { select { case <-hcExitChan: case <-time.After(5 * time.Second): + select { + case <-hcEnterChan: + default: + t.Fatal("Health check function has not entered after 5s.") + } t.Fatal("Health check function has not exited after 5s.") } @@ -342,33 +380,27 @@ func TestHealthCheckWithGoAway(t *testing.T) { func TestHealthCheckWithConnClose(t *testing.T) { defer leakcheck.Check(t) - s := grpc.NewServer() - lis, err := net.Listen("tcp", "localhost:0") + + hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() + + s, lis, ts, deferFunc, err := setupServer(&svrConfig{}) + defer deferFunc() if err != nil { - t.Fatalf("failed to listen due to err: %v", err) - } - ts := newTestHealthServer() - healthgrpc.RegisterHealthServer(s, ts) - testpb.RegisterTestServiceServer(s, &testServer{}) - go s.Serve(lis) - defer s.Stop() - ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) - hcExitChan := make(chan struct{}) - testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error { - err := testHealthCheckFunc(ctx, newStream, update, service) - close(hcExitChan) - return err + t.Fatal(err) } - replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper) - defer replace() - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) + ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) + + cc, r, deferFunc, err := setupClient(&clientConfig{ + balancerName: "round_robin", + testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, + extraDialOption: []grpc.DialOption{grpc.WithWaitForHandshake()}, + }) if err != nil { - t.Fatalf("dial failed due to err: %v", err) + t.Fatal(err) } - defer cc.Close() + defer deferFunc() + tc := testpb.NewTestServiceClient(cc) r.NewServiceConfig(`{ @@ -399,6 +431,11 @@ func TestHealthCheckWithConnClose(t *testing.T) { select { case <-hcExitChan: case <-time.After(5 * time.Second): + select { + case <-hcEnterChan: + default: + t.Fatal("Health check function has not entered after 5s.") + } t.Fatal("Health check function has not exited after 5s.") } } @@ -407,33 +444,25 @@ func TestHealthCheckWithConnClose(t *testing.T) { // address list returned by the resolver. func TestHealthCheckWithAddrConnDrain(t *testing.T) { defer leakcheck.Check(t) - s := grpc.NewServer() - lis, err := net.Listen("tcp", "localhost:0") + + hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() + + _, lis, ts, deferFunc, err := setupServer(&svrConfig{}) + defer deferFunc() if err != nil { - t.Fatalf("failed to listen due to err: %v", err) - } - ts := newTestHealthServer() - healthgrpc.RegisterHealthServer(s, ts) - testpb.RegisterTestServiceServer(s, &testServer{}) - go s.Serve(lis) - defer s.Stop() - ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) - hcExitChan := make(chan struct{}) - testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error { - err := testHealthCheckFunc(ctx, newStream, update, service) - close(hcExitChan) - return err + t.Fatal(err) } - replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper) - defer replace() - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) + ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) + + cc, r, deferFunc, err := setupClient(&clientConfig{ + balancerName: "round_robin", + testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, + }) if err != nil { - t.Fatalf("dial failed due to err: %v", err) + t.Fatal(err) } - defer cc.Close() + defer deferFunc() tc := testpb.NewTestServiceClient(cc) r.NewServiceConfig(`{ @@ -487,6 +516,11 @@ func TestHealthCheckWithAddrConnDrain(t *testing.T) { select { case <-hcExitChan: case <-time.After(5 * time.Second): + select { + case <-hcEnterChan: + default: + t.Fatal("Health check function has not entered after 5s.") + } t.Fatal("Health check function has not exited after 5s.") } @@ -502,33 +536,25 @@ func TestHealthCheckWithAddrConnDrain(t *testing.T) { // ClientConn close will lead to its addrConns being torn down. func TestHealthCheckWithClientConnClose(t *testing.T) { defer leakcheck.Check(t) - s := grpc.NewServer() - lis, err := net.Listen("tcp", "localhost:0") + + hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() + + _, lis, ts, deferFunc, err := setupServer(&svrConfig{}) + defer deferFunc() if err != nil { - t.Fatalf("failed to listen due to err: %v", err) - } - ts := newTestHealthServer() - healthgrpc.RegisterHealthServer(s, ts) - testpb.RegisterTestServiceServer(s, &testServer{}) - go s.Serve(lis) - defer s.Stop() - ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) - hcExitChan := make(chan struct{}) - testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error { - err := testHealthCheckFunc(ctx, newStream, update, service) - close(hcExitChan) - return err + t.Fatal(err) } - replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper) - defer replace() - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) + ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) + + cc, r, deferFunc, err := setupClient(&clientConfig{ + balancerName: "round_robin", + testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, + }) if err != nil { - t.Fatalf("dial failed due to err: %v", err) + t.Fatal(err) } - defer cc.Close() + defer deferFunc() tc := testpb.NewTestServiceClient(cc) r.NewServiceConfig(`{ @@ -560,6 +586,11 @@ func TestHealthCheckWithClientConnClose(t *testing.T) { select { case <-hcExitChan: case <-time.After(5 * time.Second): + select { + case <-hcEnterChan: + default: + t.Fatal("Health check function has not entered after 5s.") + } t.Fatal("Health check function has not exited after 5s.") } } @@ -569,49 +600,40 @@ func TestHealthCheckWithClientConnClose(t *testing.T) { // onGoAway/onClose goroutine. func TestHealthCheckWithoutReportHealthCalledAddrConnShutDown(t *testing.T) { defer leakcheck.Check(t) - s := grpc.NewServer() - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("failed to listen due to err %v", err) - } - ts := newTestHealthServerWithWatchFunc(func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { - if in.Service != "delay" { - return status.Error(codes.FailedPrecondition, - "this special Watch function only handles request with service name to be \"delay\"") - } - // Do nothing to mock a delay of health check response from server side. - // This case is to help with the test that covers the condition that reportHealth is not - // called inside HealthCheckFunc before the func returns. - select { - case <-stream.Context().Done(): - case <-time.After(5 * time.Second): - } - return nil + + hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() + + _, lis, ts, deferFunc, err := setupServer(&svrConfig{ + specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { + if in.Service != "delay" { + return status.Error(codes.FailedPrecondition, + "this special Watch function only handles request with service name to be \"delay\"") + } + // Do nothing to mock a delay of health check response from server side. + // This case is to help with the test that covers the condition that reportHealth is not + // called inside HealthCheckFunc before the func returns. + select { + case <-stream.Context().Done(): + case <-time.After(5 * time.Second): + } + return nil + }, }) - healthgrpc.RegisterHealthServer(s, ts) - testpb.RegisterTestServiceServer(s, &testServer{}) - go s.Serve(lis) - defer s.Stop() + defer deferFunc() + if err != nil { + t.Fatal(err) + } + ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING) - hcEnterChan := make(chan struct{}) - hcExitChan := make(chan struct{}) - testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error { - close(hcEnterChan) - err := testHealthCheckFunc(ctx, newStream, update, service) - close(hcExitChan) - return err - } - - replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper) - defer replace() - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) + _, r, deferFunc, err := setupClient(&clientConfig{ + balancerName: "round_robin", + testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, + }) if err != nil { - t.Fatalf("dial failed due to err: %v", err) + t.Fatal(err) } - defer cc.Close() + defer deferFunc() // The serviceName "delay" is specially handled at server side, where response will not be sent // back to client immediately upon receiving the request (client should receive no response until @@ -653,49 +675,41 @@ func TestHealthCheckWithoutReportHealthCalledAddrConnShutDown(t *testing.T) { // onGoAway/onClose goroutine. func TestHealthCheckWithoutReportHealthCalled(t *testing.T) { defer leakcheck.Check(t) - s := grpc.NewServer() - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("failed to listen due to err: %v", err) - } - ts := newTestHealthServerWithWatchFunc(func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { - if in.Service != "delay" { - return status.Error(codes.FailedPrecondition, - "this special Watch function only handles request with service name to be \"delay\"") - } - // Do nothing to mock a delay of health check response from server side. - // This case is to help with the test that covers the condition that reportHealth is not - // called inside HealthCheckFunc before the func returns. - select { - case <-stream.Context().Done(): - case <-time.After(5 * time.Second): - } - return nil + + hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() + + s, lis, ts, deferFunc, err := setupServer(&svrConfig{ + specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { + if in.Service != "delay" { + return status.Error(codes.FailedPrecondition, + "this special Watch function only handles request with service name to be \"delay\"") + } + // Do nothing to mock a delay of health check response from server side. + // This case is to help with the test that covers the condition that reportHealth is not + // called inside HealthCheckFunc before the func returns. + select { + case <-stream.Context().Done(): + case <-time.After(5 * time.Second): + } + return nil + }, }) - healthgrpc.RegisterHealthServer(s, ts) - testpb.RegisterTestServiceServer(s, &testServer{}) - go s.Serve(lis) - defer s.Stop() + defer deferFunc() + if err != nil { + t.Fatal(err) + } + ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING) - hcEnterChan := make(chan struct{}) - hcExitChan := make(chan struct{}) - testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error { - close(hcEnterChan) - err := testHealthCheckFunc(ctx, newStream, update, service) - close(hcExitChan) - return err - } - - replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper) - defer replace() - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) + _, r, deferFunc, err := setupClient(&clientConfig{ + balancerName: "round_robin", + testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, + extraDialOption: []grpc.DialOption{grpc.WithWaitForHandshake()}, + }) if err != nil { - t.Fatalf("dial failed due to err: %v", err) + t.Fatal(err) } - defer cc.Close() + defer deferFunc() // The serviceName "delay" is specially handled at server side, where response will not be sent // back to client immediately upon receiving the request (client should receive no response until @@ -733,22 +747,20 @@ func TestHealthCheckWithoutReportHealthCalled(t *testing.T) { } func testHealthCheckDisableWithDialOption(t *testing.T, addr string) { - hcEnterChan := make(chan struct{}) - testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error { - close(hcEnterChan) - return nil - } + hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper() - replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper) - defer replace() - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin"), grpc.WithDisableHealthCheck()) + cc, r, deferFunc, err := setupClient(&clientConfig{ + balancerName: "round_robin", + testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, + extraDialOption: []grpc.DialOption{grpc.WithDisableHealthCheck()}, + }) if err != nil { - t.Fatalf("dial failed due to err: %v", err) + t.Fatal(err) } + defer deferFunc() + tc := testpb.NewTestServiceClient(cc) - defer cc.Close() + r.NewServiceConfig(`{ "healthCheckConfig": { "serviceName": "foo" @@ -774,22 +786,19 @@ func testHealthCheckDisableWithDialOption(t *testing.T, addr string) { } func testHealthCheckDisableWithBalancer(t *testing.T, addr string) { - hcEnterChan := make(chan struct{}) - testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error { - close(hcEnterChan) - return nil - } + hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper() - replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper) - defer replace() - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("pick_first")) + cc, r, deferFunc, err := setupClient(&clientConfig{ + balancerName: "pick_first", + testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, + }) if err != nil { - t.Fatalf("dial failed due to err: %v", err) + t.Fatal(err) } + defer deferFunc() + tc := testpb.NewTestServiceClient(cc) - defer cc.Close() + r.NewServiceConfig(`{ "healthCheckConfig": { "serviceName": "foo" @@ -815,22 +824,18 @@ func testHealthCheckDisableWithBalancer(t *testing.T, addr string) { } func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) { - hcEnterChan := make(chan struct{}) - testHealthCheckFuncWrapper := func(ctx context.Context, newStream func() (interface{}, error), update func(bool), service string) error { - close(hcEnterChan) - return nil - } + hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper() - replace := replaceHealthCheckFunc(testHealthCheckFuncWrapper) - defer replace() - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) + cc, r, deferFunc, err := setupClient(&clientConfig{ + balancerName: "round_robin", + testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, + }) if err != nil { - t.Fatalf("dial failed due to err: %v", err) + t.Fatal(err) } + defer deferFunc() + tc := testpb.NewTestServiceClient(cc) - defer cc.Close() r.NewAddress([]resolver.Address{{Addr: addr}}) @@ -853,17 +858,12 @@ func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) { func TestHealthCheckDisable(t *testing.T) { defer leakcheck.Check(t) - // set up server side - s := grpc.NewServer() - lis, err := net.Listen("tcp", "localhost:0") + + _, lis, ts, deferFunc, err := setupServer(&svrConfig{}) + defer deferFunc() if err != nil { - t.Fatalf("failed to listen due to err: %v", err) + t.Fatal(err) } - ts := newTestHealthServer() - healthgrpc.RegisterHealthServer(s, ts) - testpb.RegisterTestServiceServer(s, &testServer{}) - go s.Serve(lis) - defer s.Stop() ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) // test client side disabling configuration. @@ -874,30 +874,26 @@ func TestHealthCheckDisable(t *testing.T) { func TestHealthCheckChannelzCountingCallSuccess(t *testing.T) { defer leakcheck.Check(t) - s := grpc.NewServer() - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("failed to listen due to err: %v", err) - } - ts := newTestHealthServerWithWatchFunc(func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { - if in.Service != "channelzSuccess" { - return status.Error(codes.FailedPrecondition, - "this special Watch function only handles request with service name to be \"channelzSuccess\"") - } - return status.Error(codes.OK, "fake success") - }) - healthgrpc.RegisterHealthServer(s, ts) - testpb.RegisterTestServiceServer(s, &testServer{}) - go s.Serve(lis) - defer s.Stop() - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) + _, lis, _, deferFunc, err := setupServer(&svrConfig{ + specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { + if in.Service != "channelzSuccess" { + return status.Error(codes.FailedPrecondition, + "this special Watch function only handles request with service name to be \"channelzSuccess\"") + } + return status.Error(codes.OK, "fake success") + }, + }) + defer deferFunc() if err != nil { - t.Fatalf("dial failed due to err: %v", err) + t.Fatal(err) } - defer cc.Close() + + _, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"}) + if err != nil { + t.Fatal(err) + } + defer deferFunc() r.NewServiceConfig(`{ "healthCheckConfig": { @@ -935,30 +931,26 @@ func TestHealthCheckChannelzCountingCallSuccess(t *testing.T) { func TestHealthCheckChannelzCountingCallFailure(t *testing.T) { defer leakcheck.Check(t) - s := grpc.NewServer() - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("failed to listen due to err: %v", err) - } - ts := newTestHealthServerWithWatchFunc(func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { - if in.Service != "channelzFailure" { - return status.Error(codes.FailedPrecondition, - "this special Watch function only handles request with service name to be \"channelzFailure\"") - } - return status.Error(codes.Internal, "fake failure") - }) - healthgrpc.RegisterHealthServer(s, ts) - testpb.RegisterTestServiceServer(s, &testServer{}) - go s.Serve(lis) - defer s.Stop() - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) + _, lis, _, deferFunc, err := setupServer(&svrConfig{ + specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { + if in.Service != "channelzFailure" { + return status.Error(codes.FailedPrecondition, + "this special Watch function only handles request with service name to be \"channelzFailure\"") + } + return status.Error(codes.Internal, "fake failure") + }, + }) if err != nil { - t.Fatalf("dial failed due to err: %v", err) + t.Fatal(err) } - defer cc.Close() + defer deferFunc() + + _, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"}) + if err != nil { + t.Fatal(err) + } + defer deferFunc() r.NewServiceConfig(`{ "healthCheckConfig": { diff --git a/vet.sh b/vet.sh index 94d3d54e..ebbc5bd6 100755 --- a/vet.sh +++ b/vet.sh @@ -132,5 +132,6 @@ internal/transport/transport_test.go:SA2002 stats/stats_test.go:SA1019 test/channelz_test.go:SA1019 test/end2end_test.go:SA1019 +test/healthcheck_test.go:SA1019 ' ./... misspell -error .