Change log print
Ignore cpu core list and return the actual number of cores used
Add copyright
Change certfile and keyfile path to relative
RunClient() returns "not implemented" error
Close existing server when new server setup received
Main goroutine will wait for server to stop
Move benchmarkServer out of workerServer
This commit is contained in:
Menghan Li 2016-04-22 11:34:05 -07:00
Родитель bdd0e9ff61
Коммит c2e8421003
4 изменённых файлов: 195 добавлений и 43 удалений

Просмотреть файл

@ -121,7 +121,6 @@ func (s *genericTestServer) StreamingCall(stream testpb.BenchmarkService_Streami
m := make([]byte, s.reqSize)
err := stream.(grpc.ServerStream).RecvMsg(m)
if err == io.EOF {
// read done.
return nil
}
if err != nil {
@ -133,7 +132,7 @@ func (s *genericTestServer) StreamingCall(stream testpb.BenchmarkService_Streami
}
}
// StartGenericServer starts a gRPC a benchmark service server, which supports custom codec.
// StartGenericServer starts a benchmark service server that supports custom codec.
// It returns its listen port number and a function to stop the server.
func StartGenericServer(addr string, reqSize, respSize int32, opts ...grpc.ServerOption) (int, func()) {
lis, err := net.Listen("tcp", addr)

Просмотреть файл

@ -1,3 +1,36 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package main
import (
@ -15,13 +48,14 @@ import (
)
var (
// TODO change filepath
certFile = "/usr/local/google/home/menghanl/go/src/google.golang.org/grpc/benchmark/server/testdata/server1.pem"
keyFile = "/usr/local/google/home/menghanl/go/src/google.golang.org/grpc/benchmark/server/testdata/server1.key"
// File path related to google.golang.org/grpc.
certFile = "benchmark/server/testdata/server1.pem"
keyFile = "benchmark/server/testdata/server1.key"
)
type benchmarkServer struct {
port int
cores int
close func()
mu sync.RWMutex
lastResetTime time.Time
@ -30,46 +64,48 @@ type benchmarkServer struct {
func startBenchmarkServerWithSetup(setup *testpb.ServerConfig, serverPort int) (*benchmarkServer, error) {
var opts []grpc.ServerOption
grpclog.Printf(" - server type: %v", setup.ServerType)
// Some setup options are ignored:
// - server type:
// will always start sync server
// - async server threads
// - core list
grpclog.Printf(" * server type: %v (ignored, always starts sync server)", setup.ServerType)
switch setup.ServerType {
// Ignore server type.
case testpb.ServerType_SYNC_SERVER:
case testpb.ServerType_ASYNC_SERVER:
case testpb.ServerType_ASYNC_GENERIC_SERVER:
default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknow server type: %v", setup.ServerType)
}
grpclog.Printf(" * async server threads: %v (ignored)", setup.AsyncServerThreads)
grpclog.Printf(" * core list: %v (ignored)", setup.CoreList)
grpclog.Printf(" - security params: %v", setup.SecurityParams)
if setup.SecurityParams != nil {
creds, err := credentials.NewServerTLSFromFile(certFile, keyFile)
creds, err := credentials.NewServerTLSFromFile(Abs(certFile), Abs(keyFile))
if err != nil {
grpclog.Fatalf("failed to generate credentials %v", err)
}
opts = append(opts, grpc.Creds(creds))
}
// Ignore async server threads.
grpclog.Printf(" - core limit: %v", setup.CoreLimit)
// Use one cpu core by default.
numOfCores := 1
if setup.CoreLimit > 0 {
runtime.GOMAXPROCS(int(setup.CoreLimit))
} else {
runtime.GOMAXPROCS(1)
}
grpclog.Printf(" - core list: %v", setup.CoreList)
if len(setup.CoreList) > 0 {
return nil, grpc.Errorf(codes.InvalidArgument, "specifying core list is not supported")
numOfCores = int(setup.CoreLimit)
}
runtime.GOMAXPROCS(numOfCores)
grpclog.Printf(" - port: %v", setup.Port)
var port int
// Priority: setup.Port > serverPort > default (0).
if setup.Port != 0 {
port = int(setup.Port)
} else if serverPort != 0 {
port = serverPort
}
grpclog.Printf(" - payload config: %v", setup.PayloadConfig)
var p int
var close func()
@ -86,24 +122,24 @@ func startBenchmarkServerWithSetup(setup *testpb.ServerConfig, serverPort int) (
return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", setup.PayloadConfig)
}
} else {
// Start protobuf server is payload config is nil
// Start protobuf server is payload config is nil.
p, close = benchmark.StartServer(":"+strconv.Itoa(port), opts...)
}
grpclog.Printf("benchmark server listening at port %v", p)
bs := &benchmarkServer{port: p, close: close, lastResetTime: time.Now()}
bs := &benchmarkServer{port: p, cores: numOfCores, close: close, lastResetTime: time.Now()}
return bs, nil
}
func (bs *benchmarkServer) getStats() *testpb.ServerStats {
// TODO wall time, sys time, user time.
bs.mu.RLock()
defer bs.mu.RUnlock()
return &testpb.ServerStats{TimeElapsed: time.Since(bs.lastResetTime).Seconds(), TimeUser: 0, TimeSystem: 0}
}
func (bs *benchmarkServer) reset() {
// TODO wall time, sys time, user time
bs.mu.Lock()
defer bs.mu.Unlock()
bs.lastResetTime = time.Now()

Просмотреть файл

@ -1,3 +1,36 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package main
import (
@ -16,7 +49,7 @@ import (
var (
driverPort = flag.Int("driver_port", 10000, "port for communication with driver")
serverPort = flag.Int("server_port", 0, "port for operation as a server")
serverPort = flag.Int("server_port", 0, "default port for benchmark server")
)
type byteBufCodec struct {
@ -36,19 +69,20 @@ func (byteBufCodec) String() string {
}
type workerServer struct {
bs *benchmarkServer
stop chan<- bool
serverPort int
}
func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) error {
var bs *benchmarkServer
for {
in, err := stream.Recv()
if err == io.EOF {
// Close benchmark server when stream ends.
grpclog.Printf("closing benchmark server")
if s.bs != nil {
s.bs.close()
s.bs = nil
if bs != nil {
bs.close()
bs = nil
}
return nil
}
@ -56,30 +90,34 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er
return err
}
switch t := in.Argtype.(type) {
switch argtype := in.Argtype.(type) {
case *testpb.ServerArgs_Setup:
grpclog.Printf("server setup received:")
bs, err := startBenchmarkServerWithSetup(t.Setup, s.serverPort)
newbs, err := startBenchmarkServerWithSetup(argtype.Setup, s.serverPort)
if err != nil {
return err
}
s.bs = bs
if bs != nil {
grpclog.Printf("server setup received when server already exists, closing the existing server")
bs.close()
}
bs = newbs
case *testpb.ServerArgs_Mark:
grpclog.Printf("server mark received:")
grpclog.Printf(" - %v", t)
if s.bs == nil {
grpclog.Printf(" - %v", argtype)
if bs == nil {
return grpc.Errorf(codes.InvalidArgument, "server does not exist when mark received")
}
if t.Mark.Reset_ {
s.bs.reset()
if argtype.Mark.Reset_ {
bs.reset()
}
}
out := &testpb.ServerStatus{
Stats: s.bs.getStats(),
Port: int32(s.bs.port),
Cores: 1,
Stats: bs.getStats(),
Port: int32(bs.port),
Cores: int32(bs.cores),
}
if err := stream.Send(out); err != nil {
return err
@ -90,7 +128,7 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er
}
func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) error {
return nil
return grpc.Errorf(codes.Unimplemented, "RunClient not implemented")
}
func (s *workerServer) CoreCount(ctx context.Context, in *testpb.CoreRequest) (*testpb.CoreResponse, error) {
@ -100,10 +138,7 @@ func (s *workerServer) CoreCount(ctx context.Context, in *testpb.CoreRequest) (*
func (s *workerServer) QuitWorker(ctx context.Context, in *testpb.Void) (*testpb.Void, error) {
grpclog.Printf("quiting worker")
if s.bs != nil {
s.bs.close()
}
s.stop <- true
defer func() { s.stop <- true }()
return &testpb.Void{}, nil
}
@ -121,7 +156,14 @@ func main() {
stop: stop,
serverPort: *serverPort,
})
go s.Serve(lis)
stopped := make(chan bool)
go func() {
s.Serve(lis)
stopped <- true
}()
<-stop
s.Stop()
<-stopped
}

75
benchmark/worker/util.go Normal file
Просмотреть файл

@ -0,0 +1,75 @@
/*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package main
import (
"log"
"os"
"path/filepath"
)
// Abs returns the absolute path the given relative file or directory path,
// relative to the google.golang.org/grpc directory in the user's GOPATH.
// If rel is already absolute, it is returned unmodified.
func Abs(rel string) string {
if filepath.IsAbs(rel) {
return rel
}
v, err := goPackagePath("google.golang.org/grpc")
if err != nil {
log.Fatalf("Error finding google.golang.org/grpc/testdata directory: %v", err)
}
return filepath.Join(v, rel)
}
func goPackagePath(pkg string) (path string, err error) {
gp := os.Getenv("GOPATH")
if gp == "" {
return path, os.ErrNotExist
}
for _, p := range filepath.SplitList(gp) {
dir := filepath.Join(p, "src", filepath.FromSlash(pkg))
fi, err := os.Stat(dir)
if os.IsNotExist(err) {
continue
}
if err != nil {
return "", err
}
if !fi.IsDir() {
continue
}
return dir, nil
}
return path, os.ErrNotExist
}