This commit is contained in:
yangzhouhan 2015-07-27 14:33:17 -07:00
Родитель d4a3879538
Коммит 845510e440
2 изменённых файлов: 57 добавлений и 9 удалений

Просмотреть файл

@ -43,6 +43,7 @@ import (
"sync"
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
@ -241,13 +242,21 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
//
// TODO(zhaoq): There exist other options also such as only closing the
// faulty stream locally and remotely (Other streams can keep going). Find
// the optimal option.
// the optimal option..M
grpclog.Fatalf("grpc: Server failed to encode response %v", err)
}
return t.Write(stream, p, opts)
}
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) {
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, srvn string) {
var traceInfo traceInfo
if EnableTracing {
traceInfo.tr = trace.New("Recv."+methodFamily(srvn), srvn)
// traceInfo.tr = trace.New("Recv."+methodFamily(md.MethodName), md.MethodName)
defer traceInfo.tr.Finish()
traceInfo.firstLine.client = false
traceInfo.tr.LazyLog(&traceInfo.firstLine, false)
}
p := &parser{s: stream}
for {
pf, req, err := p.recvMsg()
@ -268,6 +277,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
return
}
if EnableTracing {
traceInfo.tr.LazyLog(&payload{sent: false, msg: req}, true)
}
switch pf {
case compressionNone:
statusCode := codes.OK
@ -303,18 +315,27 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
}
t.WriteStatus(stream, statusCode, statusDesc)
if EnableTracing {
traceInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
}
default:
panic(fmt.Sprintf("payload format to be supported: %d", pf))
}
}
}
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc) {
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, srvn string) {
ss := &serverStream{
t: t,
s: stream,
p: &parser{s: stream},
codec: s.opts.codec,
t: t,
s: stream,
p: &parser{s: stream},
codec: s.opts.codec,
tracing: EnableTracing,
}
if ss.tracing {
ss.traceInfo.tr = trace.New("Recv."+methodFamily(srvn), srvn)
ss.traceInfo.firstLine.client = false
ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false)
}
if appErr := sd.Handler(srv.server, ss); appErr != nil {
if err, ok := appErr.(rpcError); ok {
@ -326,10 +347,17 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
}
}
t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
if ss.tracing {
ss.mu.Lock()
ss.traceInfo.tr.Finish()
ss.traceInfo.tr = nil
ss.mu.Unlock()
}
}
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {
sm := stream.Method()
srvn := sm
if sm != "" && sm[0] == '/' {
sm = sm[1:]
}
@ -351,11 +379,12 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
}
// Unary RPC or Streaming RPC?
if md, ok := srv.md[method]; ok {
s.processUnaryRPC(t, stream, srv, md)
s.processUnaryRPC(t, stream, srv, md, srvn)
return
}
if sd, ok := srv.sd[method]; ok {
s.processStreamingRPC(t, stream, srv, sd)
s.processStreamingRPC(t, stream, srv, sd, srvn)
return
}
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {

Просмотреть файл

@ -282,6 +282,9 @@ type serverStream struct {
codec Codec
statusCode codes.Code
statusDesc string
tracing bool
mu sync.Mutex
traceInfo traceInfo
}
func (ss *serverStream) Context() context.Context {
@ -301,6 +304,13 @@ func (ss *serverStream) SetTrailer(md metadata.MD) {
}
func (ss *serverStream) SendMsg(m interface{}) error {
if ss.tracing {
ss.mu.Lock()
if ss.traceInfo.tr != nil {
ss.traceInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
}
ss.mu.Unlock()
}
out, err := encode(ss.codec, m, compressionNone)
if err != nil {
err = transport.StreamErrorf(codes.Internal, "grpc: %v", err)
@ -310,5 +320,14 @@ func (ss *serverStream) SendMsg(m interface{}) error {
}
func (ss *serverStream) RecvMsg(m interface{}) error {
defer func() {
if ss.tracing {
ss.mu.Lock()
if ss.traceInfo.tr != nil {
ss.traceInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
}
ss.mu.Unlock()
}
}()
return recv(ss.p, ss.codec, m)
}