Merge pull request #24 from iamqizhao/master

Support multiple listeners
This commit is contained in:
Qi Zhao 2015-02-03 16:50:56 -08:00
Родитель b3286a3319 afe57a6648
Коммит 5a9d8e8448
3 изменённых файлов: 38 добавлений и 32 удалений

Просмотреть файл

@ -195,16 +195,15 @@ func main() {
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
var server *rpc.Server
server := rpc.NewServer()
testpb.RegisterService(server, &testServer{})
if *useTLS {
creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
if err != nil {
log.Fatalf("Failed to generate credentials %v", err)
}
server = rpc.NewServer(lis, rpc.WithServerTLS(creds))
server.Serve(creds.NewListener(lis))
} else {
server = rpc.NewServer(lis)
server.Serve(lis)
}
testpb.RegisterService(server, &testServer{})
server.Run()
}

Просмотреть файл

@ -44,7 +44,6 @@ import (
"github.com/golang/protobuf/proto"
"github.com/google/grpc-go/rpc/codes"
"github.com/google/grpc-go/rpc/credentials"
"github.com/google/grpc-go/rpc/metadata"
"github.com/google/grpc-go/rpc/transport"
"golang.org/x/net/context"
@ -86,16 +85,15 @@ type service struct {
// Server is a gRPC server to serve RPC requests.
type Server struct {
lis net.Listener
opts options
mu sync.Mutex
lis map[net.Listener]bool
conns map[transport.ServerTransport]bool
m map[string]*service // service name -> service info
}
type options struct {
maxConcurrentStreams uint32
connCreds credentials.TransportAuthenticator
}
// ServerOption sets options.
@ -109,25 +107,15 @@ func MaxConcurrentStreams(n uint32) ServerOption {
}
}
// WithServerTLS returns an Option that consists of the input TLSCredentials.
func WithServerTLS(creds credentials.TransportAuthenticator) ServerOption {
return func(o *options) {
o.connCreds = creds
}
}
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(lis net.Listener, opt ...ServerOption) *Server {
func NewServer(opt ...ServerOption) *Server {
var opts options
for _, o := range opt {
o(&opts)
}
if opts.connCreds != nil {
lis = opts.connCreds.NewListener(lis)
}
return &Server{
lis: lis,
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[transport.ServerTransport]bool),
m: make(map[string]*service),
@ -135,7 +123,8 @@ func NewServer(lis net.Listener, opt ...ServerOption) *Server {
}
// RegisterService register a service and its implementation to the gRPC
// server. Called from the IDL generated code.
// server. Called from the IDL generated code. This must be called before
// invoking Serve.
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
@ -164,12 +153,26 @@ func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
s.m[sd.ServiceName] = srv
}
// Run makes the server start to accept connections on s.lis. Upon each received
// connection request, it creates a ServerTransport and starts receiving gRPC
// requests from it. Non-nil error returns if something goes wrong.
func (s *Server) Run() error {
// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC request and then call the registered handlers to reply to them.
// Service returns when lis.Accept fails.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
if s.lis == nil {
s.mu.Unlock()
return fmt.Errorf("the server has been stopped")
}
s.lis[lis] = true
s.mu.Unlock()
defer func() {
lis.Close()
s.mu.Lock()
delete(s.lis, lis)
s.mu.Unlock()
}()
for {
c, err := s.lis.Accept()
c, err := lis.Accept()
if err != nil {
return err
}
@ -312,11 +315,15 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
// Stop stops the gRPC server. Once it returns, the server stops accepting
// connection requests and closes all the connected connections.
func (s *Server) Stop() {
s.lis.Close()
s.mu.Lock()
listeners := s.lis
s.lis = nil
cs := s.conns
s.conns = nil
s.mu.Unlock()
for lis := range listeners {
lis.Close()
}
for c := range cs {
c.Close()
}

Просмотреть файл

@ -157,18 +157,18 @@ func setUp(useTLS bool, maxStream uint32) (s *rpc.Server, mc testpb.MathClient)
if err != nil {
log.Fatalf("Failed to parse listener address: %v", err)
}
s = rpc.NewServer(rpc.MaxConcurrentStreams(maxStream))
ms := &mathServer{}
testpb.RegisterService(s, ms)
if useTLS {
creds, err := credentials.NewServerTLSFromFile(tlsDir+"server1.pem", tlsDir+"server1.key")
if err != nil {
log.Fatalf("Failed to generate credentials %v", err)
}
s = rpc.NewServer(lis, rpc.MaxConcurrentStreams(maxStream), rpc.WithServerTLS(creds))
go s.Serve(creds.NewListener(lis))
} else {
s = rpc.NewServer(lis, rpc.MaxConcurrentStreams(maxStream))
go s.Serve(lis)
}
ms := &mathServer{}
testpb.RegisterService(s, ms)
go s.Run()
addr := "localhost:" + port
var conn *rpc.ClientConn
if useTLS {