/* * * Copyright 2014, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. * * Neither the name of Google Inc. nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ /* Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks. */ package benchmark import ( "io" "log" "math" "net" "time" "github.com/golang/protobuf/proto" "golang.org/x/net/context" "google.golang.org/grpc" testpb "google.golang.org/grpc/interop/grpc_testing" ) func newPayload(t testpb.PayloadType, size int) *testpb.Payload { if size < 0 { log.Fatalf("Requested a response with invalid length %d", size) } body := make([]byte, size) switch t { case testpb.PayloadType_COMPRESSABLE: case testpb.PayloadType_UNCOMPRESSABLE: log.Fatalf("PayloadType UNCOMPRESSABLE is not supported") default: log.Fatalf("Unsupported payload type: %d", t) } return &testpb.Payload{ Type: t.Enum(), Body: body, } } type testServer struct { } func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return new(testpb.Empty), nil } func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{ Payload: newPayload(in.GetResponseType(), int(in.GetResponseSize())), }, nil } func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error { cs := args.GetResponseParameters() for _, c := range cs { if us := c.GetIntervalUs(); us > 0 { time.Sleep(time.Duration(us) * time.Microsecond) } if err := stream.Send(&testpb.StreamingOutputCallResponse{ Payload: newPayload(args.GetResponseType(), int(c.GetSize())), }); err != nil { return err } } return nil } func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error { var sum int for { in, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&testpb.StreamingInputCallResponse{ AggregatedPayloadSize: proto.Int32(int32(sum)), }) } if err != nil { return err } p := in.GetPayload().GetBody() sum += len(p) } } func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { for { in, err := stream.Recv() if err == io.EOF { // read done. return nil } if err != nil { return err } cs := in.GetResponseParameters() for _, c := range cs { if us := c.GetIntervalUs(); us > 0 { time.Sleep(time.Duration(us) * time.Microsecond) } if err := stream.Send(&testpb.StreamingOutputCallResponse{ Payload: newPayload(in.GetResponseType(), int(c.GetSize())), }); err != nil { return err } } } } func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error { msgBuf := make([]*testpb.StreamingOutputCallRequest, 0) for { in, err := stream.Recv() if err == io.EOF { // read done. break } if err != nil { return err } msgBuf = append(msgBuf, in) } for _, m := range msgBuf { cs := m.GetResponseParameters() for _, c := range cs { if us := c.GetIntervalUs(); us > 0 { time.Sleep(time.Duration(us) * time.Microsecond) } if err := stream.Send(&testpb.StreamingOutputCallResponse{ Payload: newPayload(m.GetResponseType(), int(c.GetSize())), }); err != nil { return err } } } return nil } // StartServer starts a gRPC server serving a benchmark service. It returns its // listen address and a function to stop the server. func StartServer() (string, func()) { lis, err := net.Listen("tcp", ":0") if err != nil { log.Fatalf("Failed to listen: %v", err) } s := grpc.NewServer(grpc.MaxConcurrentStreams(math.MaxUint32)) testpb.RegisterTestServiceServer(s, &testServer{}) go s.Serve(lis) return lis.Addr().String(), func() { s.Stop() } } // DoUnaryCall performs an unary RPC with given stub and request and response sizes. func DoUnaryCall(tc testpb.TestServiceClient, reqSize, respSize int) { pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize) req := &testpb.SimpleRequest{ ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), ResponseSize: proto.Int32(int32(respSize)), Payload: pl, } if _, err := tc.UnaryCall(context.Background(), req); err != nil { log.Fatal("/TestService/UnaryCall RPC failed: ", err) } } // NewClientConn creates a gRPC client connection to addr. func NewClientConn(addr string) *grpc.ClientConn { conn, err := grpc.Dial(addr) if err != nil { log.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err) } return conn }