Add generic close loop
This commit is contained in:
Родитель
257710d39c
Коммит
b5e39adfb2
|
@ -176,6 +176,17 @@ func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, re
|
|||
}
|
||||
}
|
||||
|
||||
// DoGenericStreamingRoundTrip performs a round trip for a single streaming rpc, using custom codec.
|
||||
func DoGenericStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
|
||||
if err := stream.(grpc.ClientStream).SendMsg(make([]byte, reqSize)); err != nil {
|
||||
grpclog.Fatalf("StreamingCall(_).Send: %v", err)
|
||||
}
|
||||
m := make([]byte, respSize)
|
||||
if err := stream.(grpc.ClientStream).RecvMsg(m); err != nil {
|
||||
grpclog.Fatalf("StreamingCall(_).Recv: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// NewClientConn creates a gRPC client connection to addr.
|
||||
func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
|
||||
if len(opts) <= 0 {
|
||||
|
|
|
@ -66,6 +66,7 @@ func startBenchmarkClientWithSetup(setup *testpb.ClientConfig) (*benchmarkClient
|
|||
// TODO payload config
|
||||
grpclog.Printf(" - payload config: %v", setup.PayloadConfig)
|
||||
var payloadReqSize, payloadRespSize int
|
||||
var payloadType string
|
||||
if setup.PayloadConfig != nil {
|
||||
// TODO payload config
|
||||
grpclog.Printf("payload config: %v", setup.PayloadConfig)
|
||||
|
@ -74,10 +75,13 @@ func startBenchmarkClientWithSetup(setup *testpb.ClientConfig) (*benchmarkClient
|
|||
opts = append(opts, grpc.WithCodec(byteBufCodec{}))
|
||||
payloadReqSize = int(c.BytebufParams.ReqSize)
|
||||
payloadRespSize = int(c.BytebufParams.RespSize)
|
||||
payloadType = "bytebuf"
|
||||
case *testpb.PayloadConfig_SimpleParams:
|
||||
payloadReqSize = int(c.SimpleParams.ReqSize)
|
||||
payloadRespSize = int(c.SimpleParams.RespSize)
|
||||
payloadType = "protobuf"
|
||||
case *testpb.PayloadConfig_ComplexParams:
|
||||
return nil, grpc.Errorf(codes.InvalidArgument, "unsupported payload config: %v", setup.PayloadConfig)
|
||||
default:
|
||||
return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", setup.PayloadConfig)
|
||||
}
|
||||
|
@ -131,7 +135,7 @@ func startBenchmarkClientWithSetup(setup *testpb.ClientConfig) (*benchmarkClient
|
|||
case testpb.RpcType_UNARY:
|
||||
doCloseLoopUnaryBenchmark(bc.histogram, bc.conns, rpcCount, payloadReqSize, payloadRespSize, bc.stop)
|
||||
case testpb.RpcType_STREAMING:
|
||||
doCloseLoopStreamingBenchmark(bc.histogram, bc.conns, rpcCount, payloadReqSize, payloadRespSize, bc.stop)
|
||||
doCloseLoopStreamingBenchmark(bc.histogram, bc.conns, rpcCount, payloadReqSize, payloadRespSize, payloadType, bc.stop)
|
||||
default:
|
||||
return nil, grpc.Errorf(codes.InvalidArgument, "unknown rpc type: %v", setup.RpcType)
|
||||
}
|
||||
|
@ -143,6 +147,7 @@ func startBenchmarkClientWithSetup(setup *testpb.ClientConfig) (*benchmarkClient
|
|||
}
|
||||
|
||||
func doCloseLoopUnaryBenchmark(h *stats.Histogram, conns []*grpc.ClientConn, rpcCount int, reqSize int, respSize int, stop <-chan bool) {
|
||||
|
||||
clients := make([]testpb.BenchmarkServiceClient, len(conns))
|
||||
for ic, conn := range conns {
|
||||
clients[ic] = testpb.NewBenchmarkServiceClient(conn)
|
||||
|
@ -178,7 +183,13 @@ func doCloseLoopUnaryBenchmark(h *stats.Histogram, conns []*grpc.ClientConn, rpc
|
|||
grpclog.Printf("close loop done, count: %v", rpcCount)
|
||||
}
|
||||
|
||||
func doCloseLoopStreamingBenchmark(h *stats.Histogram, conns []*grpc.ClientConn, rpcCount int, reqSize int, respSize int, stop <-chan bool) {
|
||||
func doCloseLoopStreamingBenchmark(h *stats.Histogram, conns []*grpc.ClientConn, rpcCount int, reqSize int, respSize int, payloadType string, stop <-chan bool) {
|
||||
var doRPC func(testpb.BenchmarkService_StreamingCallClient, int, int)
|
||||
if payloadType == "bytebuf" {
|
||||
doRPC = benchmark.DoGenericStreamingRoundTrip
|
||||
} else {
|
||||
doRPC = benchmark.DoStreamingRoundTrip
|
||||
}
|
||||
streams := make([]testpb.BenchmarkService_StreamingCallClient, len(conns))
|
||||
for ic, conn := range conns {
|
||||
c := testpb.NewBenchmarkServiceClient(conn)
|
||||
|
@ -188,7 +199,7 @@ func doCloseLoopStreamingBenchmark(h *stats.Histogram, conns []*grpc.ClientConn,
|
|||
}
|
||||
streams[ic] = s
|
||||
for j := 0; j < 100/len(conns); j++ {
|
||||
benchmark.DoStreamingRoundTrip(streams[ic], reqSize, respSize)
|
||||
doRPC(streams[ic], reqSize, respSize)
|
||||
}
|
||||
}
|
||||
var mu sync.Mutex
|
||||
|
@ -199,7 +210,7 @@ func doCloseLoopStreamingBenchmark(h *stats.Histogram, conns []*grpc.ClientConn,
|
|||
done := make(chan bool)
|
||||
go func() {
|
||||
start := time.Now()
|
||||
benchmark.DoStreamingRoundTrip(streams[ic], reqSize, respSize)
|
||||
doRPC(streams[ic], reqSize, respSize)
|
||||
elapse := time.Since(start)
|
||||
mu.Lock()
|
||||
h.Add(int64(elapse / time.Nanosecond))
|
||||
|
|
Загрузка…
Ссылка в новой задаче