fix mutex problems and merge multiple servingstatus to one single test
This commit is contained in:
Родитель
ee67f58505
Коммит
9d4ac0cac5
4
call.go
4
call.go
|
@ -165,7 +165,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
|||
return toRPCErr(err)
|
||||
}
|
||||
if EnableTracing {
|
||||
c.traceInfo.tr.LazyLog(&fmtStringer{"sent: %v", []interface{}{payload{args}}}, true)
|
||||
c.traceInfo.tr.LazyLog(payload{args}, true)
|
||||
}
|
||||
stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts)
|
||||
if err != nil {
|
||||
|
@ -184,7 +184,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
|||
continue
|
||||
}
|
||||
if EnableTracing {
|
||||
c.traceInfo.tr.LazyLog(&fmtStringer{"received: %v", []interface{}{payload{reply}}}, true)
|
||||
c.traceInfo.tr.LazyLog(payload{reply}, true)
|
||||
}
|
||||
t.CloseStream(stream, lastErr)
|
||||
if lastErr != nil {
|
||||
|
|
|
@ -12,8 +12,9 @@ import (
|
|||
)
|
||||
|
||||
type HealthServer struct {
|
||||
// StatusMap stores the serving status of a service
|
||||
// statusMap stores the serving status of the services this HealthServer monitors
|
||||
statusMap map[string]int32
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewHealthServer() *HealthServer {
|
||||
|
@ -25,22 +26,22 @@ func NewHealthServer() *HealthServer {
|
|||
func (s *HealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (out *healthpb.HealthCheckResponse, err error) {
|
||||
service := in.Host + ":" + in.Service
|
||||
out = new(healthpb.HealthCheckResponse)
|
||||
status, ok := s.statusMap[service]
|
||||
out.Status = healthpb.HealthCheckResponse_ServingStatus(status)
|
||||
if !ok {
|
||||
err = grpc.Errorf(codes.NotFound, "unknown service")
|
||||
} else {
|
||||
err = nil
|
||||
s.mu.Lock()
|
||||
if status, ok := s.statusMap[service]; ok {
|
||||
s.mu.Unlock()
|
||||
return &healthpb.HealthCheckResponse{
|
||||
Status: healthpb.HealthCheckResponse_ServingStatus(status),
|
||||
}, nil
|
||||
}
|
||||
return out, err
|
||||
s.mu.Unlock()
|
||||
return nil, grpc.Errorf(codes.NotFound, "unknown service")
|
||||
}
|
||||
|
||||
// SetServingStatus is called when need to reset the serving status of a service
|
||||
// or insert a new service entry into the statusMap
|
||||
func (s *HealthServer) SetServingStatus(host string, service string, status int32) {
|
||||
service = host + ":" + service
|
||||
var mu sync.Mutex
|
||||
mu.Lock()
|
||||
s.mu.Lock()
|
||||
s.statusMap[service] = status
|
||||
mu.Unlock()
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
|
|
@ -166,7 +166,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
|||
if cs.tracing {
|
||||
cs.mu.Lock()
|
||||
if cs.traceInfo.tr != nil {
|
||||
cs.traceInfo.tr.LazyLog(&fmtStringer{"sent: %v", []interface{}{payload{m}}}, true)
|
||||
cs.traceInfo.tr.LazyLog(payload{m}, true)
|
||||
}
|
||||
cs.mu.Unlock()
|
||||
}
|
||||
|
@ -187,13 +187,6 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
|||
}
|
||||
|
||||
func (cs *clientStream) RecvMsg(m interface{}) (err error) {
|
||||
if cs.tracing {
|
||||
cs.mu.Lock()
|
||||
if cs.traceInfo.tr != nil {
|
||||
cs.traceInfo.tr.LazyLog(&fmtStringer{"received: %v", []interface{}{payload{m}}}, true)
|
||||
}
|
||||
cs.mu.Unlock()
|
||||
}
|
||||
err = recv(cs.p, cs.codec, m)
|
||||
defer func() {
|
||||
// err != nil indicates the termination of the stream.
|
||||
|
|
|
@ -93,17 +93,6 @@ func newPayload(t testpb.PayloadType, size int32) *testpb.Payload {
|
|||
}
|
||||
}
|
||||
|
||||
func healthCheck(t time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) {
|
||||
ctx, _ := context.WithTimeout(context.Background(), t)
|
||||
hc := healthpb.NewHealthCheckClient(cc)
|
||||
req := &healthpb.HealthCheckRequest{
|
||||
Host: "",
|
||||
Service: serviceName,
|
||||
}
|
||||
out, err := hc.Check(ctx, req)
|
||||
return out, err
|
||||
}
|
||||
|
||||
func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
||||
md, ok := metadata.FromContext(ctx)
|
||||
if ok {
|
||||
|
@ -371,6 +360,17 @@ func testTimeoutOnDeadServer(t *testing.T, e env) {
|
|||
cc.Close()
|
||||
}
|
||||
|
||||
func healthCheck(t time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) {
|
||||
ctx, _ := context.WithTimeout(context.Background(), t)
|
||||
hc := healthpb.NewHealthCheckClient(cc)
|
||||
req := &healthpb.HealthCheckRequest{
|
||||
Host: "",
|
||||
Service: serviceName,
|
||||
}
|
||||
out, err := hc.Check(ctx, req)
|
||||
return out, err
|
||||
}
|
||||
|
||||
func TestHealthCheckOnSuccess(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
testHealthCheckOnSuccess(t, e)
|
||||
|
@ -417,32 +417,20 @@ func testHealthCheckOff(t *testing.T, e env) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHealthCheckNotFound(t *testing.T) {
|
||||
func TestHealthCheckServingStatus(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
testHealthCheckNotFound(t, e)
|
||||
testHealthCheckServingStatus(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testHealthCheckNotFound(t *testing.T, e env) {
|
||||
func testHealthCheckServingStatus(t *testing.T, e env) {
|
||||
hs := health.NewHealthServer()
|
||||
s, cc := setUp(hs, math.MaxUint32, e)
|
||||
defer tearDown(s, cc)
|
||||
if _, err := healthCheck(1*time.Second, cc, "unregister_service"); err != grpc.Errorf(codes.NotFound, "unknown service") {
|
||||
if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.NotFound, "unknown service") {
|
||||
t.Fatalf("TestHealthCheck(_)=_, %v, want error code %d", err, codes.NotFound)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealthCheckServing(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
testHealthCheckServing(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testHealthCheckServing(t *testing.T, e env) {
|
||||
hs := health.NewHealthServer()
|
||||
hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1)
|
||||
s, cc := setUp(hs, math.MaxUint32, e)
|
||||
defer tearDown(s, cc)
|
||||
out, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck")
|
||||
if err != nil {
|
||||
t.Fatalf("TestHealthCheck(_)=_, %v, want _,<nil>", err)
|
||||
|
@ -450,26 +438,15 @@ func testHealthCheckServing(t *testing.T, e env) {
|
|||
if out.Status != healthpb.HealthCheckResponse_SERVING {
|
||||
t.Fatalf("Got the serving status %v, want SERVING", out.Status)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealthCheckNotServing(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
testHealthCheckNotServing(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testHealthCheckNotServing(t *testing.T, e env) {
|
||||
hs := health.NewHealthServer()
|
||||
hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 2)
|
||||
s, cc := setUp(hs, math.MaxUint32, e)
|
||||
defer tearDown(s, cc)
|
||||
out, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck")
|
||||
out, err = healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck")
|
||||
if err != nil {
|
||||
t.Fatalf("TestHealthCheck(_)=_, %v, want _,<nil>", err)
|
||||
}
|
||||
if out.Status != healthpb.HealthCheckResponse_NOT_SERVING {
|
||||
t.Fatalf("Got the serving status %v, want NOT_SERVING ")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestEmptyUnary(t *testing.T) {
|
||||
|
|
Загрузка…
Ссылка в новой задаче