From 9d4ac0cac5b7593fdff902fc8c544b09dbcf9bfa Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 23 Jul 2015 14:45:27 -0700 Subject: [PATCH] fix mutex problems and merge multiple servingstatus to one single test --- call.go | 4 ++-- health/health.go | 23 +++++++++--------- stream.go | 9 +------ test/end2end_test.go | 57 +++++++++++++------------------------------- 4 files changed, 32 insertions(+), 61 deletions(-) diff --git a/call.go b/call.go index 218fc0f6..3b974721 100644 --- a/call.go +++ b/call.go @@ -165,7 +165,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli return toRPCErr(err) } if EnableTracing { - c.traceInfo.tr.LazyLog(&fmtStringer{"sent: %v", []interface{}{payload{args}}}, true) + c.traceInfo.tr.LazyLog(payload{args}, true) } stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts) if err != nil { @@ -184,7 +184,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli continue } if EnableTracing { - c.traceInfo.tr.LazyLog(&fmtStringer{"received: %v", []interface{}{payload{reply}}}, true) + c.traceInfo.tr.LazyLog(payload{reply}, true) } t.CloseStream(stream, lastErr) if lastErr != nil { diff --git a/health/health.go b/health/health.go index de782dc5..819ace4c 100644 --- a/health/health.go +++ b/health/health.go @@ -12,8 +12,9 @@ import ( ) type HealthServer struct { - // StatusMap stores the serving status of a service + // statusMap stores the serving status of the services this HealthServer monitors statusMap map[string]int32 + mu sync.Mutex } func NewHealthServer() *HealthServer { @@ -25,22 +26,22 @@ func NewHealthServer() *HealthServer { func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (out *healthpb.HealthCheckResponse, err error) { service := in.Host + ":" + in.Service out = new(healthpb.HealthCheckResponse) - status, ok := s.statusMap[service] - out.Status = healthpb.HealthCheckResponse_ServingStatus(status) - if !ok { - err = grpc.Errorf(codes.NotFound, "unknown service") - } else { - err = nil + s.mu.Lock() + if status, ok := s.statusMap[service]; ok { + s.mu.Unlock() + return &healthpb.HealthCheckResponse{ + Status: healthpb.HealthCheckResponse_ServingStatus(status), + }, nil } - return out, err + s.mu.Unlock() + return nil, grpc.Errorf(codes.NotFound, "unknown service") } // SetServingStatus is called when need to reset the serving status of a service // or insert a new service entry into the statusMap func (s *HealthServer) SetServingStatus(host string, service string, status int32) { service = host + ":" + service - var mu sync.Mutex - mu.Lock() + s.mu.Lock() s.statusMap[service] = status - mu.Unlock() + s.mu.Unlock() } diff --git a/stream.go b/stream.go index 348fe9a5..c902034d 100644 --- a/stream.go +++ b/stream.go @@ -166,7 +166,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if cs.tracing { cs.mu.Lock() if cs.traceInfo.tr != nil { - cs.traceInfo.tr.LazyLog(&fmtStringer{"sent: %v", []interface{}{payload{m}}}, true) + cs.traceInfo.tr.LazyLog(payload{m}, true) } cs.mu.Unlock() } @@ -187,13 +187,6 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } func (cs *clientStream) RecvMsg(m interface{}) (err error) { - if cs.tracing { - cs.mu.Lock() - if cs.traceInfo.tr != nil { - cs.traceInfo.tr.LazyLog(&fmtStringer{"received: %v", []interface{}{payload{m}}}, true) - } - cs.mu.Unlock() - } err = recv(cs.p, cs.codec, m) defer func() { // err != nil indicates the termination of the stream. diff --git a/test/end2end_test.go b/test/end2end_test.go index f220ffca..ba674bf5 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -93,17 +93,6 @@ func newPayload(t testpb.PayloadType, size int32) *testpb.Payload { } } -func healthCheck(t time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) { - ctx, _ := context.WithTimeout(context.Background(), t) - hc := healthpb.NewHealthCheckClient(cc) - req := &healthpb.HealthCheckRequest{ - Host: "", - Service: serviceName, - } - out, err := hc.Check(ctx, req) - return out, err -} - func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { md, ok := metadata.FromContext(ctx) if ok { @@ -371,6 +360,17 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { cc.Close() } +func healthCheck(t time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) { + ctx, _ := context.WithTimeout(context.Background(), t) + hc := healthpb.NewHealthCheckClient(cc) + req := &healthpb.HealthCheckRequest{ + Host: "", + Service: serviceName, + } + out, err := hc.Check(ctx, req) + return out, err +} + func TestHealthCheckOnSuccess(t *testing.T) { for _, e := range listTestEnv() { testHealthCheckOnSuccess(t, e) @@ -417,32 +417,20 @@ func testHealthCheckOff(t *testing.T, e env) { } } -func TestHealthCheckNotFound(t *testing.T) { +func TestHealthCheckServingStatus(t *testing.T) { for _, e := range listTestEnv() { - testHealthCheckNotFound(t, e) + testHealthCheckServingStatus(t, e) } } -func testHealthCheckNotFound(t *testing.T, e env) { +func testHealthCheckServingStatus(t *testing.T, e env) { hs := health.NewHealthServer() s, cc := setUp(hs, math.MaxUint32, e) defer tearDown(s, cc) - if _, err := healthCheck(1*time.Second, cc, "unregister_service"); err != grpc.Errorf(codes.NotFound, "unknown service") { + if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.NotFound, "unknown service") { t.Fatalf("TestHealthCheck(_)=_, %v, want error code %d", err, codes.NotFound) } -} - -func TestHealthCheckServing(t *testing.T) { - for _, e := range listTestEnv() { - testHealthCheckServing(t, e) - } -} - -func testHealthCheckServing(t *testing.T, e env) { - hs := health.NewHealthServer() hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1) - s, cc := setUp(hs, math.MaxUint32, e) - defer tearDown(s, cc) out, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") if err != nil { t.Fatalf("TestHealthCheck(_)=_, %v, want _,", err) @@ -450,26 +438,15 @@ func testHealthCheckServing(t *testing.T, e env) { if out.Status != healthpb.HealthCheckResponse_SERVING { t.Fatalf("Got the serving status %v, want SERVING", out.Status) } -} - -func TestHealthCheckNotServing(t *testing.T) { - for _, e := range listTestEnv() { - testHealthCheckNotServing(t, e) - } -} - -func testHealthCheckNotServing(t *testing.T, e env) { - hs := health.NewHealthServer() hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 2) - s, cc := setUp(hs, math.MaxUint32, e) - defer tearDown(s, cc) - out, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") + out, err = healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck") if err != nil { t.Fatalf("TestHealthCheck(_)=_, %v, want _,", err) } if out.Status != healthpb.HealthCheckResponse_NOT_SERVING { t.Fatalf("Got the serving status %v, want NOT_SERVING ") } + } func TestEmptyUnary(t *testing.T) {