modified streaming call benchmark_test code and streaming call client test code

This commit is contained in:
yangzhouhan 2015-06-02 11:13:43 -07:00
Родитель 7b4cd4d7d3
Коммит 585c999c1f
4 изменённых файлов: 106 добавлений и 100 удалений

Просмотреть файл

@ -37,16 +37,13 @@ Package benchmark implements the building blocks to setup end-to-end gRPC benchm
package benchmark
import (
"io"
"math"
"net"
// "time"
// "github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/grpclog"
"io"
"math"
"net"
)
func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
@ -122,24 +119,19 @@ func DoUnaryCall(tc testpb.TestServiceClient, reqSize, respSize int) {
}
}
//DoStreamingcall performs a streaming RPC with given stub and request and response size.client side
func DoStreamingCall(tc testpb.TestServiceClient, reqSize, respSize int) {
// DoStreamingcall performs a streaming RPC with given stub and request and response size.client side
func DoStreamingCall(stream testpb.TestService_StreamingCallClient, tc testpb.TestServiceClient, reqSize, respSize int) {
pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
req := &testpb.SimpleRequest{
ResponseType: pl.Type,
ResponseSize: int32(respSize),
Payload: pl,
}
stream, err := tc.StreamingCall(context.Background())
if err != nil {
grpclog.Fatalf("TestService/Streaming call rpc failred: ", err)
}
if err := stream.Send(req); err != nil {
grpclog.Fatalf("/TestService/Streaming call send failed: ", err)
grpclog.Fatalf("%v.StreamingCall()= %v ", tc, err)
}
_, err = stream.Recv()
if err != nil {
grpclog.Fatal("/TestService/streamingCall receive failed: ", err)
if _, err := stream.Recv(); err != nil {
grpclog.Fatal("%v.StreamingCall()= %v", tc, err)
}
}

Просмотреть файл

@ -6,11 +6,13 @@ import (
"testing"
"time"
"golang.org/x/net/context"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/grpclog"
)
func run(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServiceClient)) {
func runUnary(b *testing.B, maxConcurrentCalls int) {
s := stats.AddStats(b, 38)
b.StopTimer()
target, stopper := StartServer()
@ -20,9 +22,8 @@ func run(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServiceCli
// Warm up connection.
for i := 0; i < 10; i++ {
caller(tc)
unaryCaller(tc)
}
ch := make(chan int, maxConcurrentCalls*4)
var (
mu sync.Mutex
@ -35,7 +36,7 @@ func run(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServiceCli
go func() {
for _ = range ch {
start := time.Now()
caller(tc)
unaryCaller(tc)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
@ -54,17 +55,20 @@ func run(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServiceCli
conn.Close()
}
func runStream(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServiceClient)) {
func runStream(b *testing.B, maxConcurrentCalls int) {
s := stats.AddStats(b, 38)
b.StopTimer()
target, stopper := StartServer()
defer stopper()
conn := NewClientConn(target)
tc := testpb.NewTestServiceClient(conn)
stream, err := tc.StreamingCall(context.Background())
if err != nil {
grpclog.Fatalf("%v.StreamingCall()=%v", tc, err)
}
// Warm up connection.
for i := 0; i < 10; i++ {
caller(tc)
streamCaller(stream, tc)
}
ch := make(chan int, maxConcurrentCalls*4)
@ -79,7 +83,7 @@ func runStream(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServ
go func() {
for _ = range ch {
start := time.Now()
caller(tc)
streamCaller(stream, tc)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
@ -97,47 +101,43 @@ func runStream(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServ
wg.Wait()
conn.Close()
}
func smallCaller(client testpb.TestServiceClient) {
func unaryCaller(client testpb.TestServiceClient) {
DoUnaryCall(client, 1, 1)
}
func streamCaller(client testpb.TestServiceClient) {
//func streamCaller(client testpb.TestServiceClient){
DoStreamingCall(client, 1, 1)
//DoStreamingCall(client,1,1)
func streamCaller(stream testpb.TestService_StreamingCallClient, client testpb.TestServiceClient) {
DoStreamingCall(stream, client, 1, 1)
}
func BenchmarkClientStreamc1(b *testing.B) {
runStream(b, 1, streamCaller)
runStream(b, 1)
}
func BenchmarkClientStreamc8(b *testing.B) {
runStream(b, 8, streamCaller)
runStream(b, 8)
}
func BenchmarkClientStreamc64(b *testing.B) {
runStream(b, 64, streamCaller)
runStream(b, 64)
}
func BenchmarkClientStreamc512(b *testing.B) {
runStream(b, 512, streamCaller)
runStream(b, 512)
}
func BenchmarkClientSmallc1(b *testing.B) {
run(b, 1, smallCaller)
func BenchmarkClientUnaryc1(b *testing.B) {
runUnary(b, 1)
}
func BenchmarkClientSmallc8(b *testing.B) {
run(b, 8, smallCaller)
func BenchmarkClientUnaryc8(b *testing.B) {
runUnary(b, 8)
}
func BenchmarkClientSmallc64(b *testing.B) {
run(b, 64, smallCaller)
func BenchmarkClientUnaryc64(b *testing.B) {
runUnary(b, 64)
}
func BenchmarkClientSmallc512(b *testing.B) {
run(b, 512, smallCaller)
func BenchmarkClientUnaryc512(b *testing.B) {
runUnary(b, 512)
}
func TestMain(m *testing.M) {

Просмотреть файл

@ -9,6 +9,8 @@ import (
"sync"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/benchmark"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/benchmark/stats"
@ -19,69 +21,28 @@ var (
server = flag.String("server", "", "The server address")
maxConcurrentRPCs = flag.Int("max_concurrent_rpcs", 1, "The max number of concurrent RPCs")
duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark client")
rpcType = flag.Int("rpc_type", 0, "client rpc type")
)
func unaryCaller(client testpb.TestServiceClient) {
benchmark.DoUnaryCall(client, 1, 1)
}
func streamCaller(client testpb.TestServiceClient) {
benchmark.DoStreamingCall(client, 1, 1)
func streamCaller(stream testpb.TestService_StreamingCallClient, client testpb.TestServiceClient) {
benchmark.DoStreamingCall(stream, client, 1, 1)
}
func closeLoopStream() {
s := stats.NewStats(256)
conn := benchmark.NewClientConn(*server)
tc := testpb.NewTestServiceClient(conn)
// Warm up connection.
for i := 0; i < 100; i++ {
streamCaller(tc)
}
ch := make(chan int, *maxConcurrentRPCs*4)
var (
mu sync.Mutex
wg sync.WaitGroup
)
wg.Add(*maxConcurrentRPCs)
// Distribute RPCs over maxConcurrentCalls workers.
for i := 0; i < *maxConcurrentRPCs; i++ {
go func() {
for _ = range ch {
start := time.Now()
streamCaller(tc)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
mu.Unlock()
}
wg.Done()
}()
}
// Stop the client when time is up.
done := make(chan struct{})
go func() {
<-time.After(time.Duration(*duration) * time.Second)
close(done)
}()
ok := true
for ok {
select {
case ch <- 0:
case <-done:
ok = false
}
}
close(ch)
wg.Wait()
conn.Close()
grpclog.Println(s.String())
func buildConnection() (s *stats.Stats, conn *grpc.ClientConn, tc testpb.TestServiceClient) {
s = stats.NewStats(256)
conn = benchmark.NewClientConn(*server)
tc = testpb.NewTestServiceClient(conn)
return s, conn, tc
}
func closeLoop() {
s := stats.NewStats(256)
conn := benchmark.NewClientConn(*server)
tc := testpb.NewTestServiceClient(conn)
// Warm up connection.
func closeLoopUnary() {
s, conn, tc := buildConnection()
for i := 0; i < 100; i++ {
unaryCaller(tc)
}
@ -91,7 +52,7 @@ func closeLoop() {
wg sync.WaitGroup
)
wg.Add(*maxConcurrentRPCs)
// Distribute RPCs over maxConcurrentCalls workers.
for i := 0; i < *maxConcurrentRPCs; i++ {
go func() {
for _ = range ch {
@ -123,10 +84,60 @@ func closeLoop() {
wg.Wait()
conn.Close()
grpclog.Println(s.String())
}
func closeLoopStream() {
s, conn, tc := buildConnection()
stream, err := tc.StreamingCall(context.Background())
if err != nil {
grpclog.Fatalf("%v.StreamingCall()=%v", tc, err)
}
for i := 0; i < 100; i++ {
streamCaller(stream, tc)
}
ch := make(chan int, *maxConcurrentRPCs*4)
var (
mu sync.Mutex
wg sync.WaitGroup
)
wg.Add(*maxConcurrentRPCs)
// Distribute RPCs over maxConcurrentCalls workers.
for i := 0; i < *maxConcurrentRPCs; i++ {
go func() {
for _ = range ch {
start := time.Now()
streamCaller(stream, tc)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
mu.Unlock()
}
wg.Done()
}()
}
// Stop the client when time is up.
done := make(chan struct{})
go func() {
<-time.After(time.Duration(*duration) * time.Second)
close(done)
}()
ok := true
for ok {
select {
case ch <- 0:
case <-done:
ok = false
}
}
close(ch)
wg.Wait()
conn.Close()
grpclog.Println(s.String())
}
func main() {
flag.Parse()
go func() {
lis, err := net.Listen("tcp", ":0")
if err != nil {
@ -137,5 +148,10 @@ func main() {
grpclog.Fatalf("Failed to serve: %v", err)
}
}()
closeLoop()
switch *rpcType {
case 0:
closeLoopUnary()
case 1:
closeLoopStream()
}
}

Просмотреть файл

@ -323,8 +323,6 @@ func (m *SimpleRequest) GetPayload() *Payload {
return nil
}
func caller(client testpb.TestServiceClient) {
type SimpleResponse struct {
Payload *Payload `protobuf:"bytes,1,opt,name=payload" json:"payload,omitempty"`
}