2015-02-06 04:14:05 +03:00
|
|
|
/*
|
|
|
|
*
|
2017-06-08 15:42:19 +03:00
|
|
|
* Copyright 2014 gRPC authors.
|
2015-02-06 04:14:05 +03:00
|
|
|
*
|
2017-06-08 15:42:19 +03:00
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
2015-02-06 04:14:05 +03:00
|
|
|
*
|
2017-06-08 15:42:19 +03:00
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
2015-02-06 04:14:05 +03:00
|
|
|
*
|
2017-06-08 15:42:19 +03:00
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
2015-02-06 04:14:05 +03:00
|
|
|
*
|
|
|
|
*/
|
|
|
|
|
2015-02-09 03:35:38 +03:00
|
|
|
package grpc
|
2015-02-06 04:14:05 +03:00
|
|
|
|
|
|
|
import (
|
2018-11-13 00:30:41 +03:00
|
|
|
"context"
|
2015-02-20 03:48:59 +03:00
|
|
|
"errors"
|
2015-02-06 04:14:05 +03:00
|
|
|
"io"
|
2018-06-28 02:18:41 +03:00
|
|
|
"math"
|
|
|
|
"strconv"
|
2015-06-25 05:52:33 +03:00
|
|
|
"sync"
|
2015-06-18 05:30:57 +03:00
|
|
|
"time"
|
2015-02-06 04:14:05 +03:00
|
|
|
|
2015-06-18 05:30:57 +03:00
|
|
|
"golang.org/x/net/trace"
|
2017-08-31 20:59:09 +03:00
|
|
|
"google.golang.org/grpc/balancer"
|
2015-02-09 03:39:06 +03:00
|
|
|
"google.golang.org/grpc/codes"
|
2018-11-13 00:30:41 +03:00
|
|
|
"google.golang.org/grpc/connectivity"
|
2017-10-31 20:21:13 +03:00
|
|
|
"google.golang.org/grpc/encoding"
|
2018-06-28 02:18:41 +03:00
|
|
|
"google.golang.org/grpc/grpclog"
|
2018-10-31 20:21:20 +03:00
|
|
|
"google.golang.org/grpc/internal/binarylog"
|
2018-06-19 03:59:08 +03:00
|
|
|
"google.golang.org/grpc/internal/channelz"
|
2018-06-28 02:18:41 +03:00
|
|
|
"google.golang.org/grpc/internal/grpcrand"
|
2018-07-11 21:22:45 +03:00
|
|
|
"google.golang.org/grpc/internal/transport"
|
2015-02-09 03:39:06 +03:00
|
|
|
"google.golang.org/grpc/metadata"
|
2018-10-31 20:21:20 +03:00
|
|
|
"google.golang.org/grpc/peer"
|
2016-10-23 01:06:41 +03:00
|
|
|
"google.golang.org/grpc/stats"
|
2017-04-05 20:35:40 +03:00
|
|
|
"google.golang.org/grpc/status"
|
2015-02-06 04:14:05 +03:00
|
|
|
)
|
|
|
|
|
2016-04-19 02:18:34 +03:00
|
|
|
// StreamHandler defines the handler called by gRPC server to complete the
|
2018-03-09 00:46:26 +03:00
|
|
|
// execution of a streaming RPC. If a StreamHandler returns an error, it
|
|
|
|
// should be produced by the status package, or else gRPC will use
|
|
|
|
// codes.Unknown as the status code and err.Error() as the status message
|
|
|
|
// of the RPC.
|
2016-04-19 02:18:34 +03:00
|
|
|
type StreamHandler func(srv interface{}, stream ServerStream) error
|
2015-02-19 09:15:13 +03:00
|
|
|
|
|
|
|
// StreamDesc represents a streaming RPC service's method specification.
|
|
|
|
type StreamDesc struct {
|
|
|
|
StreamName string
|
2016-04-19 02:18:34 +03:00
|
|
|
Handler StreamHandler
|
2015-02-19 09:15:13 +03:00
|
|
|
|
|
|
|
// At least one of these is true.
|
|
|
|
ServerStreams bool
|
|
|
|
ClientStreams bool
|
|
|
|
}
|
|
|
|
|
2015-02-06 04:14:05 +03:00
|
|
|
// Stream defines the common interface a client or server stream has to satisfy.
|
2018-01-06 02:37:05 +03:00
|
|
|
//
|
2018-07-10 02:46:25 +03:00
|
|
|
// Deprecated: See ClientStream and ServerStream documentation instead.
|
2015-02-06 04:14:05 +03:00
|
|
|
type Stream interface {
|
2018-07-10 02:46:25 +03:00
|
|
|
// Deprecated: See ClientStream and ServerStream documentation instead.
|
2015-02-06 04:14:05 +03:00
|
|
|
Context() context.Context
|
2018-07-10 02:46:25 +03:00
|
|
|
// Deprecated: See ClientStream and ServerStream documentation instead.
|
2015-03-26 01:18:07 +03:00
|
|
|
SendMsg(m interface{}) error
|
2018-07-10 02:46:25 +03:00
|
|
|
// Deprecated: See ClientStream and ServerStream documentation instead.
|
2015-03-26 01:18:07 +03:00
|
|
|
RecvMsg(m interface{}) error
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
2018-07-10 02:46:25 +03:00
|
|
|
// ClientStream defines the client-side behavior of a streaming RPC.
|
|
|
|
//
|
|
|
|
// All errors returned from ClientStream methods are compatible with the
|
|
|
|
// status package.
|
2015-02-06 04:14:05 +03:00
|
|
|
type ClientStream interface {
|
2016-04-30 00:58:02 +03:00
|
|
|
// Header returns the header metadata received from the server if there
|
2015-02-06 04:14:05 +03:00
|
|
|
// is any. It blocks if the metadata is not ready to read.
|
|
|
|
Header() (metadata.MD, error)
|
2016-07-19 07:20:15 +03:00
|
|
|
// Trailer returns the trailer metadata from the server, if there is any.
|
|
|
|
// It must only be called after stream.CloseAndRecv has returned, or
|
|
|
|
// stream.Recv has returned a non-nil error (including io.EOF).
|
2015-02-06 04:14:05 +03:00
|
|
|
Trailer() metadata.MD
|
2015-02-14 03:43:51 +03:00
|
|
|
// CloseSend closes the send direction of the stream. It closes the stream
|
2018-11-01 21:29:53 +03:00
|
|
|
// when non-nil error is met. It is also not safe to call CloseSend
|
|
|
|
// concurrently with SendMsg.
|
2015-02-06 04:14:05 +03:00
|
|
|
CloseSend() error
|
2018-07-10 02:46:25 +03:00
|
|
|
// Context returns the context for this stream.
|
|
|
|
//
|
|
|
|
// It should not be called until after Header or RecvMsg has returned. Once
|
|
|
|
// called, subsequent client-side retries are disabled.
|
|
|
|
Context() context.Context
|
|
|
|
// SendMsg is generally called by generated code. On error, SendMsg aborts
|
|
|
|
// the stream. If the error was generated by the client, the status is
|
|
|
|
// returned directly; otherwise, io.EOF is returned and the status of
|
|
|
|
// the stream may be discovered using RecvMsg.
|
|
|
|
//
|
|
|
|
// SendMsg blocks until:
|
|
|
|
// - There is sufficient flow control to schedule m with the transport, or
|
|
|
|
// - The stream is done, or
|
|
|
|
// - The stream breaks.
|
|
|
|
//
|
|
|
|
// SendMsg does not wait until the message is received by the server. An
|
|
|
|
// untimely stream closure may result in lost messages. To ensure delivery,
|
|
|
|
// users should ensure the RPC completed successfully using RecvMsg.
|
2018-02-02 21:35:15 +03:00
|
|
|
//
|
2018-07-10 02:46:25 +03:00
|
|
|
// It is safe to have a goroutine calling SendMsg and another goroutine
|
|
|
|
// calling RecvMsg on the same stream at the same time, but it is not safe
|
2018-11-01 21:29:53 +03:00
|
|
|
// to call SendMsg on the same stream in different goroutines. It is also
|
|
|
|
// not safe to call CloseSend concurrently with SendMsg.
|
2018-07-10 02:46:25 +03:00
|
|
|
SendMsg(m interface{}) error
|
|
|
|
// RecvMsg blocks until it receives a message into m or the stream is
|
|
|
|
// done. It returns io.EOF when the stream completes successfully. On
|
|
|
|
// any other error, the stream is aborted and the error contains the RPC
|
|
|
|
// status.
|
|
|
|
//
|
|
|
|
// It is safe to have a goroutine calling SendMsg and another goroutine
|
|
|
|
// calling RecvMsg on the same stream at the same time, but it is not
|
|
|
|
// safe to call RecvMsg on the same stream in different goroutines.
|
|
|
|
RecvMsg(m interface{}) error
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
2017-10-21 00:21:31 +03:00
|
|
|
// NewStream creates a new Stream for the client side. This is typically
|
2018-05-11 18:43:47 +03:00
|
|
|
// called by generated code. ctx is used for the lifetime of the stream.
|
|
|
|
//
|
|
|
|
// To ensure resources are not leaked due to the stream returned, one of the following
|
|
|
|
// actions must be performed:
|
|
|
|
//
|
2018-05-29 21:35:11 +03:00
|
|
|
// 1. Call Close on the ClientConn.
|
|
|
|
// 2. Cancel the context provided.
|
|
|
|
// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
|
|
|
|
// client-streaming RPC, for instance, might use the helper function
|
|
|
|
// CloseAndRecv (note that CloseSend does not Recv, therefore is not
|
|
|
|
// guaranteed to release all resources).
|
|
|
|
// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
|
2018-05-11 18:43:47 +03:00
|
|
|
//
|
|
|
|
// If none of the above happen, a goroutine and a context will be leaked, and grpc
|
|
|
|
// will not call the optionally-configured stats handler with a stats.End message.
|
2017-10-21 00:21:31 +03:00
|
|
|
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
|
2018-03-17 01:57:34 +03:00
|
|
|
// allow interceptor to see all applicable call options, which means those
|
|
|
|
// configured as defaults from dial option as well as per-call options
|
2018-03-29 20:34:29 +03:00
|
|
|
opts = combine(cc.dopts.callOptions, opts)
|
2018-03-17 01:57:34 +03:00
|
|
|
|
2016-08-26 23:50:38 +03:00
|
|
|
if cc.dopts.streamInt != nil {
|
|
|
|
return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
|
|
|
|
}
|
|
|
|
return newClientStream(ctx, desc, cc, method, opts...)
|
|
|
|
}
|
|
|
|
|
2018-05-11 18:43:47 +03:00
|
|
|
// NewClientStream is a wrapper for ClientConn.NewStream.
|
2017-10-21 00:21:31 +03:00
|
|
|
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
|
|
|
|
return cc.NewStream(ctx, desc, method, opts...)
|
|
|
|
}
|
|
|
|
|
2016-08-26 23:50:38 +03:00
|
|
|
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
|
2018-04-23 21:22:25 +03:00
|
|
|
if channelz.IsOn() {
|
|
|
|
cc.incrCallsStarted()
|
|
|
|
defer func() {
|
|
|
|
if err != nil {
|
|
|
|
cc.incrCallsFailed()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
2017-08-31 20:59:09 +03:00
|
|
|
c := defaultCallInfo()
|
2018-11-10 00:53:47 +03:00
|
|
|
// Provide an opportunity for the first RPC to see the first service config
|
|
|
|
// provided by the resolver.
|
|
|
|
if err := cc.waitForResolvedAddrs(ctx); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2017-05-15 23:51:11 +03:00
|
|
|
mc := cc.GetMethodConfig(method)
|
2017-04-22 02:18:59 +03:00
|
|
|
if mc.WaitForReady != nil {
|
|
|
|
c.failFast = !*mc.WaitForReady
|
|
|
|
}
|
2017-04-04 01:03:24 +03:00
|
|
|
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
// Possible context leak:
|
|
|
|
// The cancel function for the child context we create will only be called
|
|
|
|
// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
|
|
|
|
// an error is generated by SendMsg.
|
|
|
|
// https://github.com/grpc/grpc-go/issues/1818.
|
|
|
|
var cancel context.CancelFunc
|
2017-11-21 00:49:49 +03:00
|
|
|
if mc.Timeout != nil && *mc.Timeout >= 0 {
|
2017-04-22 02:18:59 +03:00
|
|
|
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
} else {
|
|
|
|
ctx, cancel = context.WithCancel(ctx)
|
2016-12-20 03:31:00 +03:00
|
|
|
}
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
defer func() {
|
|
|
|
if err != nil {
|
|
|
|
cancel()
|
|
|
|
}
|
|
|
|
}()
|
2017-04-22 02:18:59 +03:00
|
|
|
|
2016-06-28 00:36:59 +03:00
|
|
|
for _, o := range opts {
|
2017-08-31 20:59:09 +03:00
|
|
|
if err := o.before(c); err != nil {
|
2016-06-28 00:36:59 +03:00
|
|
|
return nil, toRPCErr(err)
|
|
|
|
}
|
|
|
|
}
|
2017-04-27 01:50:58 +03:00
|
|
|
c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
|
|
|
|
c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
|
2018-01-23 22:39:40 +03:00
|
|
|
if err := setCallInfoCodec(c); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2017-04-27 01:50:58 +03:00
|
|
|
|
2015-02-06 04:14:05 +03:00
|
|
|
callHdr := &transport.CallHdr{
|
2018-07-13 19:56:47 +03:00
|
|
|
Host: cc.authority,
|
|
|
|
Method: method,
|
2018-01-23 22:39:40 +03:00
|
|
|
ContentSubtype: c.contentSubtype,
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
2017-11-17 20:24:54 +03:00
|
|
|
|
|
|
|
// Set our outgoing compression according to the UseCompressor CallOption, if
|
|
|
|
// set. In that case, also find the compressor from the encoding package.
|
|
|
|
// Otherwise, use the compressor configured by the WithCompressor DialOption,
|
|
|
|
// if set.
|
|
|
|
var cp Compressor
|
|
|
|
var comp encoding.Compressor
|
|
|
|
if ct := c.compressorType; ct != "" {
|
|
|
|
callHdr.SendCompress = ct
|
|
|
|
if ct != encoding.Identity {
|
|
|
|
comp = encoding.GetCompressor(ct)
|
|
|
|
if comp == nil {
|
2017-12-18 20:23:42 +03:00
|
|
|
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
|
2017-11-17 20:24:54 +03:00
|
|
|
}
|
|
|
|
}
|
2017-10-31 20:21:13 +03:00
|
|
|
} else if cc.dopts.cp != nil {
|
2016-01-30 01:38:20 +03:00
|
|
|
callHdr.SendCompress = cc.dopts.cp.Type()
|
2017-11-17 20:24:54 +03:00
|
|
|
cp = cc.dopts.cp
|
2016-01-23 05:21:41 +03:00
|
|
|
}
|
2017-05-11 21:07:38 +03:00
|
|
|
if c.creds != nil {
|
|
|
|
callHdr.Creds = c.creds
|
|
|
|
}
|
2016-08-09 23:22:02 +03:00
|
|
|
var trInfo traceInfo
|
|
|
|
if EnableTracing {
|
|
|
|
trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
|
|
|
|
trInfo.firstLine.client = true
|
2015-06-18 21:45:40 +03:00
|
|
|
if deadline, ok := ctx.Deadline(); ok {
|
2019-01-16 03:09:50 +03:00
|
|
|
trInfo.firstLine.deadline = time.Until(deadline)
|
2015-06-18 21:45:40 +03:00
|
|
|
}
|
2016-08-09 23:22:02 +03:00
|
|
|
trInfo.tr.LazyLog(&trInfo.firstLine, false)
|
|
|
|
ctx = trace.NewContext(ctx, trInfo.tr)
|
2015-06-18 21:45:40 +03:00
|
|
|
}
|
2017-08-31 20:59:09 +03:00
|
|
|
ctx = newContextWithRPCInfo(ctx, c.failFast)
|
2017-01-10 04:11:32 +03:00
|
|
|
sh := cc.dopts.copts.StatsHandler
|
2018-03-12 19:16:36 +03:00
|
|
|
var beginTime time.Time
|
2017-01-10 04:11:32 +03:00
|
|
|
if sh != nil {
|
2017-05-11 03:54:49 +03:00
|
|
|
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
|
2018-03-12 19:16:36 +03:00
|
|
|
beginTime = time.Now()
|
2016-11-08 00:17:59 +03:00
|
|
|
begin := &stats.Begin{
|
|
|
|
Client: true,
|
2018-03-12 19:16:36 +03:00
|
|
|
BeginTime: beginTime,
|
2016-11-08 00:17:59 +03:00
|
|
|
FailFast: c.failFast,
|
|
|
|
}
|
2017-01-10 04:11:32 +03:00
|
|
|
sh.HandleRPC(ctx, begin)
|
2017-05-02 20:16:45 +03:00
|
|
|
}
|
2017-11-07 00:45:11 +03:00
|
|
|
|
2018-06-28 02:18:41 +03:00
|
|
|
cs := &clientStream{
|
|
|
|
callHdr: callHdr,
|
|
|
|
ctx: ctx,
|
|
|
|
methodConfig: &mc,
|
|
|
|
opts: opts,
|
|
|
|
callInfo: c,
|
|
|
|
cc: cc,
|
|
|
|
desc: desc,
|
|
|
|
codec: c.codec,
|
|
|
|
cp: cp,
|
|
|
|
comp: comp,
|
|
|
|
cancel: cancel,
|
|
|
|
beginTime: beginTime,
|
|
|
|
firstAttempt: true,
|
|
|
|
}
|
|
|
|
if !cc.dopts.disableRetry {
|
|
|
|
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
|
|
|
|
}
|
2018-10-31 20:21:20 +03:00
|
|
|
cs.binlog = binarylog.GetMethodLogger(method)
|
2018-06-28 02:18:41 +03:00
|
|
|
|
|
|
|
cs.callInfo.stream = cs
|
|
|
|
// Only this initial attempt has stats/tracing.
|
|
|
|
// TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
|
|
|
|
if err := cs.newAttemptLocked(sh, trInfo); err != nil {
|
|
|
|
cs.finish(err)
|
|
|
|
return nil, err
|
|
|
|
}
|
2016-06-29 02:08:19 +03:00
|
|
|
|
2018-06-28 02:18:41 +03:00
|
|
|
op := func(a *csAttempt) error { return a.newStream() }
|
|
|
|
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
|
|
|
|
cs.finish(err)
|
|
|
|
return nil, err
|
2016-01-15 01:53:07 +03:00
|
|
|
}
|
2017-11-07 00:45:11 +03:00
|
|
|
|
2018-10-31 20:21:20 +03:00
|
|
|
if cs.binlog != nil {
|
|
|
|
md, _ := metadata.FromOutgoingContext(ctx)
|
|
|
|
logEntry := &binarylog.ClientHeader{
|
|
|
|
OnClientSide: true,
|
|
|
|
Header: md,
|
|
|
|
MethodName: method,
|
|
|
|
Authority: cs.cc.authority,
|
|
|
|
}
|
|
|
|
if deadline, ok := ctx.Deadline(); ok {
|
2019-01-16 03:09:50 +03:00
|
|
|
logEntry.Timeout = time.Until(deadline)
|
2018-10-31 20:21:20 +03:00
|
|
|
if logEntry.Timeout < 0 {
|
|
|
|
logEntry.Timeout = 0
|
|
|
|
}
|
|
|
|
}
|
|
|
|
cs.binlog.Log(logEntry)
|
|
|
|
}
|
|
|
|
|
2018-02-05 23:54:13 +03:00
|
|
|
if desc != unaryStreamDesc {
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
// Listen on cc and stream contexts to cleanup when the user closes the
|
|
|
|
// ClientConn or cancels the stream context. In all other cases, an error
|
|
|
|
// should already be injected into the recv buffer by the transport, which
|
|
|
|
// the client will eventually receive, and then we will cancel the stream's
|
|
|
|
// context in clientStream.finish.
|
2018-02-05 23:54:13 +03:00
|
|
|
go func() {
|
|
|
|
select {
|
|
|
|
case <-cc.ctx.Done():
|
|
|
|
cs.finish(ErrClientConnClosing)
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
case <-ctx.Done():
|
2018-03-12 23:27:54 +03:00
|
|
|
cs.finish(toRPCErr(ctx.Err()))
|
2018-02-05 23:54:13 +03:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
2015-06-18 21:45:40 +03:00
|
|
|
return cs, nil
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
2018-06-28 02:18:41 +03:00
|
|
|
func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo traceInfo) error {
|
|
|
|
cs.attempt = &csAttempt{
|
|
|
|
cs: cs,
|
|
|
|
dc: cs.cc.dopts.dc,
|
|
|
|
statsHandler: sh,
|
|
|
|
trInfo: trInfo,
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := cs.ctx.Err(); err != nil {
|
|
|
|
return toRPCErr(err)
|
|
|
|
}
|
2018-07-11 20:18:09 +03:00
|
|
|
t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
|
2018-06-28 02:18:41 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
cs.attempt.t = t
|
|
|
|
cs.attempt.done = done
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *csAttempt) newStream() error {
|
|
|
|
cs := a.cs
|
|
|
|
cs.callHdr.PreviousAttempts = cs.numRetries
|
|
|
|
s, err := a.t.NewStream(cs.ctx, cs.callHdr)
|
|
|
|
if err != nil {
|
|
|
|
return toRPCErr(err)
|
|
|
|
}
|
|
|
|
cs.attempt.s = s
|
|
|
|
cs.attempt.p = &parser{r: s}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-02-06 04:14:05 +03:00
|
|
|
// clientStream implements a client side Stream.
|
|
|
|
type clientStream struct {
|
2018-06-28 02:18:41 +03:00
|
|
|
callHdr *transport.CallHdr
|
|
|
|
opts []CallOption
|
|
|
|
callInfo *callInfo
|
|
|
|
cc *ClientConn
|
|
|
|
desc *StreamDesc
|
2018-03-12 23:27:54 +03:00
|
|
|
|
|
|
|
codec baseCodec
|
|
|
|
cp Compressor
|
|
|
|
comp encoding.Compressor
|
|
|
|
|
|
|
|
cancel context.CancelFunc // cancels all attempts
|
|
|
|
|
2018-06-28 02:18:41 +03:00
|
|
|
sentLast bool // sent an end stream
|
|
|
|
beginTime time.Time
|
|
|
|
|
|
|
|
methodConfig *MethodConfig
|
|
|
|
|
|
|
|
ctx context.Context // the application's context, wrapped by stats/tracing
|
2018-03-12 23:27:54 +03:00
|
|
|
|
2018-06-28 02:18:41 +03:00
|
|
|
retryThrottler *retryThrottler // The throttler active when the RPC began.
|
2018-03-12 23:27:54 +03:00
|
|
|
|
2018-10-31 20:21:20 +03:00
|
|
|
binlog *binarylog.MethodLogger // Binary logger, can be nil.
|
|
|
|
// serverHeaderBinlogged is a boolean for whether server header has been
|
|
|
|
// logged. Server header will be logged when the first time one of those
|
|
|
|
// happens: stream.Header(), stream.Recv().
|
|
|
|
//
|
|
|
|
// It's only read and used by Recv() and Header(), so it doesn't need to be
|
|
|
|
// synchronized.
|
|
|
|
serverHeaderBinlogged bool
|
|
|
|
|
2018-06-28 02:18:41 +03:00
|
|
|
mu sync.Mutex
|
|
|
|
firstAttempt bool // if true, transparent retry is valid
|
|
|
|
numRetries int // exclusive of transparent retry attempt(s)
|
|
|
|
numRetriesSincePushback int // retries since pushback; to reset backoff
|
|
|
|
finished bool // TODO: replace with atomic cmpxchg or sync.Once?
|
|
|
|
attempt *csAttempt // the active client stream attempt
|
2018-03-12 23:27:54 +03:00
|
|
|
// TODO(hedging): hedging will have multiple attempts simultaneously.
|
2018-06-28 02:18:41 +03:00
|
|
|
committed bool // active attempt committed for retry?
|
|
|
|
buffer []func(a *csAttempt) error // operations to replay on retry
|
|
|
|
bufferSize int // current size of buffer
|
2018-03-12 23:27:54 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
// csAttempt implements a single transport stream attempt within a
|
|
|
|
// clientStream.
|
|
|
|
type csAttempt struct {
|
|
|
|
cs *clientStream
|
2017-11-17 20:24:54 +03:00
|
|
|
t transport.ClientTransport
|
|
|
|
s *transport.Stream
|
2018-05-03 21:37:59 +03:00
|
|
|
p *parser
|
2018-03-12 23:27:54 +03:00
|
|
|
done func(balancer.DoneInfo)
|
2017-11-17 20:24:54 +03:00
|
|
|
|
2018-06-28 02:18:41 +03:00
|
|
|
finished bool
|
2017-11-17 20:24:54 +03:00
|
|
|
dc Decompressor
|
|
|
|
decomp encoding.Compressor
|
|
|
|
decompSet bool
|
|
|
|
|
2018-03-12 23:27:54 +03:00
|
|
|
mu sync.Mutex // guards trInfo.tr
|
|
|
|
// trInfo.tr is set when created (if EnableTracing is true),
|
|
|
|
// and cleared when the finish method is called.
|
2015-10-06 03:49:53 +03:00
|
|
|
trInfo traceInfo
|
2016-11-05 02:00:06 +03:00
|
|
|
|
2017-01-10 04:11:32 +03:00
|
|
|
statsHandler stats.Handler
|
2018-06-28 02:18:41 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *clientStream) commitAttemptLocked() {
|
|
|
|
cs.committed = true
|
|
|
|
cs.buffer = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *clientStream) commitAttempt() {
|
|
|
|
cs.mu.Lock()
|
|
|
|
cs.commitAttemptLocked()
|
|
|
|
cs.mu.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
|
|
|
|
// the error that should be returned by the operation.
|
|
|
|
func (cs *clientStream) shouldRetry(err error) error {
|
|
|
|
if cs.attempt.s == nil && !cs.callInfo.failFast {
|
|
|
|
// In the event of any error from NewStream (attempt.s == nil), we
|
|
|
|
// never attempted to write anything to the wire, so we can retry
|
|
|
|
// indefinitely for non-fail-fast RPCs.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
if cs.finished || cs.committed {
|
|
|
|
// RPC is finished or committed; cannot retry.
|
|
|
|
return err
|
|
|
|
}
|
2018-07-02 20:08:26 +03:00
|
|
|
// Wait for the trailers.
|
|
|
|
if cs.attempt.s != nil {
|
|
|
|
<-cs.attempt.s.Done()
|
|
|
|
}
|
2018-06-28 02:18:41 +03:00
|
|
|
if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
|
|
|
|
// First attempt, wait-for-ready, stream unprocessed: transparently retry.
|
|
|
|
cs.firstAttempt = false
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
cs.firstAttempt = false
|
|
|
|
if cs.cc.dopts.disableRetry {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
pushback := 0
|
|
|
|
hasPushback := false
|
|
|
|
if cs.attempt.s != nil {
|
2019-01-14 21:59:44 +03:00
|
|
|
if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil || !to {
|
2018-06-28 02:18:41 +03:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO(retry): Move down if the spec changes to not check server pushback
|
|
|
|
// before considering this a failure for throttling.
|
|
|
|
sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
|
|
|
|
if len(sps) == 1 {
|
|
|
|
var e error
|
|
|
|
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
|
|
|
|
grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0])
|
|
|
|
cs.retryThrottler.throttle() // This counts as a failure for throttling.
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
hasPushback = true
|
|
|
|
} else if len(sps) > 1 {
|
|
|
|
grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps)
|
|
|
|
cs.retryThrottler.throttle() // This counts as a failure for throttling.
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var code codes.Code
|
|
|
|
if cs.attempt.s != nil {
|
|
|
|
code = cs.attempt.s.Status().Code()
|
|
|
|
} else {
|
|
|
|
code = status.Convert(err).Code()
|
|
|
|
}
|
|
|
|
|
|
|
|
rp := cs.methodConfig.retryPolicy
|
|
|
|
if rp == nil || !rp.retryableStatusCodes[code] {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Note: the ordering here is important; we count this as a failure
|
|
|
|
// only if the code matched a retryable code.
|
|
|
|
if cs.retryThrottler.throttle() {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if cs.numRetries+1 >= rp.maxAttempts {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
var dur time.Duration
|
|
|
|
if hasPushback {
|
|
|
|
dur = time.Millisecond * time.Duration(pushback)
|
|
|
|
cs.numRetriesSincePushback = 0
|
|
|
|
} else {
|
|
|
|
fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
|
|
|
|
cur := float64(rp.initialBackoff) * fact
|
|
|
|
if max := float64(rp.maxBackoff); cur > max {
|
|
|
|
cur = max
|
|
|
|
}
|
|
|
|
dur = time.Duration(grpcrand.Int63n(int64(cur)))
|
|
|
|
cs.numRetriesSincePushback++
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO(dfawley): we could eagerly fail here if dur puts us past the
|
|
|
|
// deadline, but unsure if it is worth doing.
|
|
|
|
t := time.NewTimer(dur)
|
|
|
|
select {
|
|
|
|
case <-t.C:
|
|
|
|
cs.numRetries++
|
|
|
|
return nil
|
|
|
|
case <-cs.ctx.Done():
|
|
|
|
t.Stop()
|
|
|
|
return status.FromContextError(cs.ctx.Err()).Err()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Returns nil if a retry was performed and succeeded; error otherwise.
|
|
|
|
func (cs *clientStream) retryLocked(lastErr error) error {
|
|
|
|
for {
|
2018-07-02 20:08:26 +03:00
|
|
|
cs.attempt.finish(lastErr)
|
2018-06-28 02:18:41 +03:00
|
|
|
if err := cs.shouldRetry(lastErr); err != nil {
|
|
|
|
cs.commitAttemptLocked()
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if lastErr = cs.replayBufferLocked(); lastErr == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *clientStream) Context() context.Context {
|
2018-06-28 02:18:41 +03:00
|
|
|
cs.commitAttempt()
|
|
|
|
// No need to lock before using attempt, since we know it is committed and
|
|
|
|
// cannot change.
|
|
|
|
return cs.attempt.s.Context()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
|
|
|
|
cs.mu.Lock()
|
|
|
|
for {
|
|
|
|
if cs.committed {
|
|
|
|
cs.mu.Unlock()
|
|
|
|
return op(cs.attempt)
|
|
|
|
}
|
|
|
|
a := cs.attempt
|
|
|
|
cs.mu.Unlock()
|
|
|
|
err := op(a)
|
|
|
|
cs.mu.Lock()
|
|
|
|
if a != cs.attempt {
|
|
|
|
// We started another attempt already.
|
|
|
|
continue
|
|
|
|
}
|
2018-07-04 00:07:07 +03:00
|
|
|
if err == io.EOF {
|
|
|
|
<-a.s.Done()
|
|
|
|
}
|
|
|
|
if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
|
2018-06-28 02:18:41 +03:00
|
|
|
onSuccess()
|
|
|
|
cs.mu.Unlock()
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := cs.retryLocked(err); err != nil {
|
|
|
|
cs.mu.Unlock()
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
2016-11-08 05:06:11 +03:00
|
|
|
func (cs *clientStream) Header() (metadata.MD, error) {
|
2018-06-28 02:18:41 +03:00
|
|
|
var m metadata.MD
|
|
|
|
err := cs.withRetry(func(a *csAttempt) error {
|
|
|
|
var err error
|
|
|
|
m, err = a.s.Header()
|
|
|
|
return toRPCErr(err)
|
|
|
|
}, cs.commitAttemptLocked)
|
2015-02-19 01:00:26 +03:00
|
|
|
if err != nil {
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
cs.finish(err)
|
2018-10-31 20:21:20 +03:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if cs.binlog != nil && !cs.serverHeaderBinlogged {
|
|
|
|
// Only log if binary log is on and header has not been logged.
|
|
|
|
logEntry := &binarylog.ServerHeader{
|
|
|
|
OnClientSide: true,
|
|
|
|
Header: m,
|
|
|
|
PeerAddr: nil,
|
|
|
|
}
|
|
|
|
if peer, ok := peer.FromContext(cs.Context()); ok {
|
|
|
|
logEntry.PeerAddr = peer.Addr
|
|
|
|
}
|
|
|
|
cs.binlog.Log(logEntry)
|
|
|
|
cs.serverHeaderBinlogged = true
|
2015-02-19 01:00:26 +03:00
|
|
|
}
|
|
|
|
return m, err
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *clientStream) Trailer() metadata.MD {
|
2018-06-28 02:18:41 +03:00
|
|
|
// On RPC failure, we never need to retry, because usage requires that
|
|
|
|
// RecvMsg() returned a non-nil error before calling this function is valid.
|
|
|
|
// We would have retried earlier if necessary.
|
|
|
|
//
|
|
|
|
// Commit the attempt anyway, just in case users are not following those
|
|
|
|
// directions -- it will prevent races and should not meaningfully impact
|
|
|
|
// performance.
|
|
|
|
cs.commitAttempt()
|
|
|
|
if cs.attempt.s == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return cs.attempt.s.Trailer()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *clientStream) replayBufferLocked() error {
|
|
|
|
a := cs.attempt
|
|
|
|
for _, f := range cs.buffer {
|
|
|
|
if err := f(a); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
|
|
|
|
// Note: we still will buffer if retry is disabled (for transparent retries).
|
|
|
|
if cs.committed {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
cs.bufferSize += sz
|
|
|
|
if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
|
|
|
|
cs.commitAttemptLocked()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
cs.buffer = append(cs.buffer, op)
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
2015-03-26 01:18:07 +03:00
|
|
|
func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
2018-06-28 02:18:41 +03:00
|
|
|
defer func() {
|
|
|
|
if err != nil && err != io.EOF {
|
|
|
|
// Call finish on the client stream for errors generated by this SendMsg
|
|
|
|
// call, as these indicate problems created by this client. (Transport
|
|
|
|
// errors are converted to an io.EOF error in csAttempt.sendMsg; the real
|
|
|
|
// error will be returned from RecvMsg eventually in that case, or be
|
|
|
|
// retried.)
|
|
|
|
cs.finish(err)
|
|
|
|
}
|
|
|
|
}()
|
2018-07-04 00:07:07 +03:00
|
|
|
if cs.sentLast {
|
|
|
|
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
|
|
|
|
}
|
2018-06-28 02:18:41 +03:00
|
|
|
if !cs.desc.ClientStreams {
|
|
|
|
cs.sentLast = true
|
|
|
|
}
|
|
|
|
data, err := encode(cs.codec, m)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
compData, err := compress(data, cs.cp, cs.comp)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
hdr, payload := msgHeader(data, compData)
|
|
|
|
// TODO(dfawley): should we be checking len(data) instead?
|
|
|
|
if len(payload) > *cs.callInfo.maxSendMessageSize {
|
|
|
|
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
|
|
|
|
}
|
2018-10-31 20:21:20 +03:00
|
|
|
msgBytes := data // Store the pointer before setting to nil. For binary logging.
|
2018-06-28 02:18:41 +03:00
|
|
|
op := func(a *csAttempt) error {
|
|
|
|
err := a.sendMsg(m, hdr, payload, data)
|
|
|
|
// nil out the message and uncomp when replaying; they are only needed for
|
|
|
|
// stats which is disabled for subsequent attempts.
|
|
|
|
m, data = nil, nil
|
|
|
|
return err
|
|
|
|
}
|
2018-10-31 20:21:20 +03:00
|
|
|
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
|
|
|
|
if cs.binlog != nil && err == nil {
|
|
|
|
cs.binlog.Log(&binarylog.ClientMessage{
|
|
|
|
OnClientSide: true,
|
|
|
|
Message: msgBytes,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
return
|
2018-03-12 23:27:54 +03:00
|
|
|
}
|
|
|
|
|
2018-06-28 02:18:41 +03:00
|
|
|
func (cs *clientStream) RecvMsg(m interface{}) error {
|
2018-10-31 20:21:20 +03:00
|
|
|
if cs.binlog != nil && !cs.serverHeaderBinlogged {
|
|
|
|
// Call Header() to binary log header if it's not already logged.
|
|
|
|
cs.Header()
|
|
|
|
}
|
|
|
|
var recvInfo *payloadInfo
|
|
|
|
if cs.binlog != nil {
|
|
|
|
recvInfo = &payloadInfo{}
|
|
|
|
}
|
2018-06-28 02:18:41 +03:00
|
|
|
err := cs.withRetry(func(a *csAttempt) error {
|
2018-10-31 20:21:20 +03:00
|
|
|
return a.recvMsg(m, recvInfo)
|
2018-06-28 02:18:41 +03:00
|
|
|
}, cs.commitAttemptLocked)
|
2018-10-31 20:21:20 +03:00
|
|
|
if cs.binlog != nil && err == nil {
|
|
|
|
cs.binlog.Log(&binarylog.ServerMessage{
|
|
|
|
OnClientSide: true,
|
|
|
|
Message: recvInfo.uncompressedBytes,
|
|
|
|
})
|
|
|
|
}
|
2018-06-28 02:18:41 +03:00
|
|
|
if err != nil || !cs.desc.ServerStreams {
|
|
|
|
// err != nil or non-server-streaming indicates end of stream.
|
|
|
|
cs.finish(err)
|
2018-10-31 20:21:20 +03:00
|
|
|
|
|
|
|
if cs.binlog != nil {
|
|
|
|
// finish will not log Trailer. Log Trailer here.
|
|
|
|
logEntry := &binarylog.ServerTrailer{
|
|
|
|
OnClientSide: true,
|
|
|
|
Trailer: cs.Trailer(),
|
|
|
|
Err: err,
|
|
|
|
}
|
|
|
|
if logEntry.Err == io.EOF {
|
|
|
|
logEntry.Err = nil
|
|
|
|
}
|
|
|
|
if peer, ok := peer.FromContext(cs.Context()); ok {
|
|
|
|
logEntry.PeerAddr = peer.Addr
|
|
|
|
}
|
|
|
|
cs.binlog.Log(logEntry)
|
|
|
|
}
|
2018-06-28 02:18:41 +03:00
|
|
|
}
|
|
|
|
return err
|
2018-03-12 23:27:54 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *clientStream) CloseSend() error {
|
2018-06-28 02:18:41 +03:00
|
|
|
if cs.sentLast {
|
|
|
|
// TODO: return an error and finish the stream instead, due to API misuse?
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
cs.sentLast = true
|
2018-09-19 18:44:26 +03:00
|
|
|
op := func(a *csAttempt) error {
|
|
|
|
a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
|
|
|
|
// Always return nil; io.EOF is the only error that might make sense
|
|
|
|
// instead, but there is no need to signal the client to call RecvMsg
|
|
|
|
// as the only use left for the stream after CloseSend is to call
|
|
|
|
// RecvMsg. This also matches historical behavior.
|
|
|
|
return nil
|
|
|
|
}
|
2018-06-28 02:18:41 +03:00
|
|
|
cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
|
2018-10-31 20:21:20 +03:00
|
|
|
if cs.binlog != nil {
|
|
|
|
cs.binlog.Log(&binarylog.ClientHalfClose{
|
|
|
|
OnClientSide: true,
|
|
|
|
})
|
|
|
|
}
|
2018-06-28 02:18:41 +03:00
|
|
|
// We never returned an error here for reasons.
|
2018-03-12 23:27:54 +03:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *clientStream) finish(err error) {
|
|
|
|
if err == io.EOF {
|
|
|
|
// Ending a stream with EOF indicates a success.
|
|
|
|
err = nil
|
|
|
|
}
|
|
|
|
cs.mu.Lock()
|
|
|
|
if cs.finished {
|
2015-06-25 05:52:33 +03:00
|
|
|
cs.mu.Unlock()
|
2018-03-12 23:27:54 +03:00
|
|
|
return
|
|
|
|
}
|
|
|
|
cs.finished = true
|
2018-06-28 02:18:41 +03:00
|
|
|
cs.commitAttemptLocked()
|
2018-03-12 23:27:54 +03:00
|
|
|
cs.mu.Unlock()
|
2018-10-31 20:21:20 +03:00
|
|
|
// For binary logging. only log cancel in finish (could be caused by RPC ctx
|
|
|
|
// canceled or ClientConn closed). Trailer will be logged in RecvMsg.
|
|
|
|
//
|
|
|
|
// Only one of cancel or trailer needs to be logged. In the cases where
|
|
|
|
// users don't call RecvMsg, users must have already canceled the RPC.
|
|
|
|
if cs.binlog != nil && status.Code(err) == codes.Canceled {
|
|
|
|
cs.binlog.Log(&binarylog.Cancel{
|
|
|
|
OnClientSide: true,
|
|
|
|
})
|
|
|
|
}
|
2018-06-28 02:18:41 +03:00
|
|
|
if err == nil {
|
|
|
|
cs.retryThrottler.successfulRPC()
|
|
|
|
}
|
2018-04-23 21:22:25 +03:00
|
|
|
if channelz.IsOn() {
|
|
|
|
if err != nil {
|
|
|
|
cs.cc.incrCallsFailed()
|
|
|
|
} else {
|
|
|
|
cs.cc.incrCallsSucceeded()
|
|
|
|
}
|
|
|
|
}
|
2018-06-28 02:18:41 +03:00
|
|
|
if cs.attempt != nil {
|
|
|
|
cs.attempt.finish(err)
|
|
|
|
}
|
|
|
|
// after functions all rely upon having a stream.
|
|
|
|
if cs.attempt.s != nil {
|
|
|
|
for _, o := range cs.opts {
|
|
|
|
o.after(cs.callInfo)
|
|
|
|
}
|
2015-06-25 05:52:33 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
cs.cancel()
|
|
|
|
}
|
|
|
|
|
2018-06-28 02:18:41 +03:00
|
|
|
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
|
2018-03-12 23:27:54 +03:00
|
|
|
cs := a.cs
|
|
|
|
if EnableTracing {
|
|
|
|
a.mu.Lock()
|
|
|
|
if a.trInfo.tr != nil {
|
|
|
|
a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
|
|
|
|
}
|
|
|
|
a.mu.Unlock()
|
|
|
|
}
|
2018-06-28 02:18:41 +03:00
|
|
|
if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
|
|
|
|
if !cs.desc.ClientStreams {
|
|
|
|
// For non-client-streaming RPCs, we return nil instead of EOF on error
|
|
|
|
// because the generated code requires it. finish is not called; RecvMsg()
|
|
|
|
// will call it with the stream's status independently.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return io.EOF
|
2017-04-04 01:03:24 +03:00
|
|
|
}
|
2018-06-28 02:18:41 +03:00
|
|
|
if a.statsHandler != nil {
|
|
|
|
a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
|
2018-02-05 23:54:13 +03:00
|
|
|
}
|
2018-06-28 02:18:41 +03:00
|
|
|
if channelz.IsOn() {
|
|
|
|
a.t.IncrMsgSent()
|
2016-11-02 02:57:54 +03:00
|
|
|
}
|
2018-06-28 02:18:41 +03:00
|
|
|
return nil
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
2018-10-31 20:21:20 +03:00
|
|
|
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
|
2018-03-12 23:27:54 +03:00
|
|
|
cs := a.cs
|
2018-10-31 20:21:20 +03:00
|
|
|
if a.statsHandler != nil && payInfo == nil {
|
|
|
|
payInfo = &payloadInfo{}
|
2016-11-02 02:57:54 +03:00
|
|
|
}
|
2018-10-31 20:21:20 +03:00
|
|
|
|
2018-03-12 23:27:54 +03:00
|
|
|
if !a.decompSet {
|
2017-11-17 20:24:54 +03:00
|
|
|
// Block until we receive headers containing received message encoding.
|
2018-03-12 23:27:54 +03:00
|
|
|
if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
|
|
|
|
if a.dc == nil || a.dc.Type() != ct {
|
2017-11-17 20:24:54 +03:00
|
|
|
// No configured decompressor, or it does not match the incoming
|
|
|
|
// message encoding; attempt to find a registered compressor that does.
|
2018-03-12 23:27:54 +03:00
|
|
|
a.dc = nil
|
|
|
|
a.decomp = encoding.GetCompressor(ct)
|
2017-11-17 20:24:54 +03:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// No compression is used; disable our decompressor.
|
2018-03-12 23:27:54 +03:00
|
|
|
a.dc = nil
|
2017-11-17 20:24:54 +03:00
|
|
|
}
|
|
|
|
// Only initialize this state once per stream.
|
2018-03-12 23:27:54 +03:00
|
|
|
a.decompSet = true
|
2017-11-17 20:24:54 +03:00
|
|
|
}
|
2018-10-31 20:21:20 +03:00
|
|
|
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
|
2018-02-05 23:54:13 +03:00
|
|
|
if err != nil {
|
2015-02-20 03:48:59 +03:00
|
|
|
if err == io.EOF {
|
2018-03-12 23:27:54 +03:00
|
|
|
if statusErr := a.s.Status().Err(); statusErr != nil {
|
2018-02-05 23:54:13 +03:00
|
|
|
return statusErr
|
2018-02-02 21:35:15 +03:00
|
|
|
}
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
return io.EOF // indicates successful end of stream.
|
2015-02-19 09:15:13 +03:00
|
|
|
}
|
2015-02-20 03:48:59 +03:00
|
|
|
return toRPCErr(err)
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
if EnableTracing {
|
|
|
|
a.mu.Lock()
|
|
|
|
if a.trInfo.tr != nil {
|
|
|
|
a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
|
2018-02-05 23:54:13 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
a.mu.Unlock()
|
2018-02-05 23:54:13 +03:00
|
|
|
}
|
2018-10-31 20:21:20 +03:00
|
|
|
if a.statsHandler != nil {
|
|
|
|
a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
|
|
|
|
Client: true,
|
|
|
|
RecvTime: time.Now(),
|
|
|
|
Payload: m,
|
|
|
|
// TODO truncate large payload.
|
2019-03-26 01:42:16 +03:00
|
|
|
Data: payInfo.uncompressedBytes,
|
|
|
|
WireLength: payInfo.wireLength,
|
|
|
|
Length: len(payInfo.uncompressedBytes),
|
2018-10-31 20:21:20 +03:00
|
|
|
})
|
2018-02-05 23:54:13 +03:00
|
|
|
}
|
2018-04-23 21:22:25 +03:00
|
|
|
if channelz.IsOn() {
|
|
|
|
a.t.IncrMsgRecv()
|
|
|
|
}
|
2018-02-05 23:54:13 +03:00
|
|
|
if cs.desc.ServerStreams {
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
// Subsequent messages should be received by subsequent RecvMsg calls.
|
2018-02-05 23:54:13 +03:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
// Special handling for non-server-stream rpcs.
|
|
|
|
// This recv expects EOF or errors, so we don't collect inPayload.
|
2018-06-28 02:18:41 +03:00
|
|
|
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
|
2018-02-05 23:54:13 +03:00
|
|
|
if err == nil {
|
|
|
|
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
|
2015-02-19 01:00:26 +03:00
|
|
|
}
|
2015-02-06 04:14:05 +03:00
|
|
|
if err == io.EOF {
|
2018-03-12 23:27:54 +03:00
|
|
|
return a.s.Status().Err() // non-server streaming Recv returns nil on success
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
return toRPCErr(err)
|
|
|
|
}
|
|
|
|
|
2018-03-12 23:27:54 +03:00
|
|
|
func (a *csAttempt) finish(err error) {
|
|
|
|
a.mu.Lock()
|
2018-06-28 02:18:41 +03:00
|
|
|
if a.finished {
|
2018-07-02 20:08:26 +03:00
|
|
|
a.mu.Unlock()
|
2018-06-28 02:18:41 +03:00
|
|
|
return
|
|
|
|
}
|
|
|
|
a.finished = true
|
|
|
|
if err == io.EOF {
|
|
|
|
// Ending a stream with EOF indicates a success.
|
|
|
|
err = nil
|
|
|
|
}
|
2019-03-13 20:10:52 +03:00
|
|
|
var tr metadata.MD
|
2018-06-28 02:18:41 +03:00
|
|
|
if a.s != nil {
|
|
|
|
a.t.CloseStream(a.s, err)
|
2019-03-13 20:10:52 +03:00
|
|
|
tr = a.s.Trailer()
|
2018-06-28 02:18:41 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
|
|
|
|
if a.done != nil {
|
2018-06-28 02:18:41 +03:00
|
|
|
br := false
|
|
|
|
if a.s != nil {
|
|
|
|
br = a.s.BytesReceived()
|
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
a.done(balancer.DoneInfo{
|
2017-11-27 22:16:26 +03:00
|
|
|
Err: err,
|
2018-10-10 23:21:08 +03:00
|
|
|
Trailer: tr,
|
2018-06-28 02:18:41 +03:00
|
|
|
BytesSent: a.s != nil,
|
|
|
|
BytesReceived: br,
|
2017-04-27 20:43:38 +03:00
|
|
|
})
|
2016-05-07 01:47:09 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
if a.statsHandler != nil {
|
2017-04-12 21:55:54 +03:00
|
|
|
end := &stats.End{
|
2018-03-12 19:16:36 +03:00
|
|
|
Client: true,
|
2018-06-28 02:18:41 +03:00
|
|
|
BeginTime: a.cs.beginTime,
|
2018-03-12 19:16:36 +03:00
|
|
|
EndTime: time.Now(),
|
2019-03-13 20:10:52 +03:00
|
|
|
Trailer: tr,
|
2018-03-12 19:16:36 +03:00
|
|
|
Error: err,
|
2017-04-12 21:55:54 +03:00
|
|
|
}
|
2018-06-28 02:18:41 +03:00
|
|
|
a.statsHandler.HandleRPC(a.cs.ctx, end)
|
2016-06-28 00:36:59 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
if a.trInfo.tr != nil {
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
if err == nil {
|
2018-03-12 23:27:54 +03:00
|
|
|
a.trInfo.tr.LazyPrintf("RPC: [OK]")
|
2015-06-25 05:52:33 +03:00
|
|
|
} else {
|
2018-03-12 23:27:54 +03:00
|
|
|
a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
|
|
|
|
a.trInfo.tr.SetError()
|
2015-06-25 05:52:33 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
a.trInfo.tr.Finish()
|
|
|
|
a.trInfo.tr = nil
|
2015-06-25 05:52:33 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
a.mu.Unlock()
|
2015-06-25 05:52:33 +03:00
|
|
|
}
|
|
|
|
|
2018-11-01 20:49:35 +03:00
|
|
|
func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) {
|
|
|
|
ac.mu.Lock()
|
|
|
|
if ac.transport != t {
|
|
|
|
ac.mu.Unlock()
|
|
|
|
return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
|
|
|
|
}
|
|
|
|
// transition to CONNECTING state when an attempt starts
|
|
|
|
if ac.state != connectivity.Connecting {
|
|
|
|
ac.updateConnectivityState(connectivity.Connecting)
|
|
|
|
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
|
|
|
}
|
|
|
|
ac.mu.Unlock()
|
|
|
|
|
|
|
|
if t == nil {
|
|
|
|
// TODO: return RPC error here?
|
|
|
|
return nil, errors.New("transport provided is nil")
|
|
|
|
}
|
|
|
|
// defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
|
|
|
|
c := &callInfo{}
|
|
|
|
|
|
|
|
for _, o := range opts {
|
|
|
|
if err := o.before(c); err != nil {
|
|
|
|
return nil, toRPCErr(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
|
|
|
|
c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
|
|
|
|
|
|
|
|
// Possible context leak:
|
|
|
|
// The cancel function for the child context we create will only be called
|
|
|
|
// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
|
|
|
|
// an error is generated by SendMsg.
|
|
|
|
// https://github.com/grpc/grpc-go/issues/1818.
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer func() {
|
|
|
|
if err != nil {
|
|
|
|
cancel()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
if err := setCallInfoCodec(c); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
callHdr := &transport.CallHdr{
|
|
|
|
Host: ac.cc.authority,
|
|
|
|
Method: method,
|
|
|
|
ContentSubtype: c.contentSubtype,
|
|
|
|
}
|
|
|
|
|
|
|
|
// Set our outgoing compression according to the UseCompressor CallOption, if
|
|
|
|
// set. In that case, also find the compressor from the encoding package.
|
|
|
|
// Otherwise, use the compressor configured by the WithCompressor DialOption,
|
|
|
|
// if set.
|
|
|
|
var cp Compressor
|
|
|
|
var comp encoding.Compressor
|
|
|
|
if ct := c.compressorType; ct != "" {
|
|
|
|
callHdr.SendCompress = ct
|
|
|
|
if ct != encoding.Identity {
|
|
|
|
comp = encoding.GetCompressor(ct)
|
|
|
|
if comp == nil {
|
|
|
|
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else if ac.cc.dopts.cp != nil {
|
|
|
|
callHdr.SendCompress = ac.cc.dopts.cp.Type()
|
|
|
|
cp = ac.cc.dopts.cp
|
|
|
|
}
|
|
|
|
if c.creds != nil {
|
|
|
|
callHdr.Creds = c.creds
|
|
|
|
}
|
|
|
|
|
|
|
|
as := &addrConnStream{
|
|
|
|
callHdr: callHdr,
|
|
|
|
ac: ac,
|
|
|
|
ctx: ctx,
|
|
|
|
cancel: cancel,
|
|
|
|
opts: opts,
|
|
|
|
callInfo: c,
|
|
|
|
desc: desc,
|
|
|
|
codec: c.codec,
|
|
|
|
cp: cp,
|
|
|
|
comp: comp,
|
|
|
|
t: t,
|
|
|
|
}
|
|
|
|
|
|
|
|
as.callInfo.stream = as
|
|
|
|
s, err := as.t.NewStream(as.ctx, as.callHdr)
|
|
|
|
if err != nil {
|
|
|
|
err = toRPCErr(err)
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
as.s = s
|
|
|
|
as.p = &parser{r: s}
|
|
|
|
ac.incrCallsStarted()
|
|
|
|
if desc != unaryStreamDesc {
|
|
|
|
// Listen on cc and stream contexts to cleanup when the user closes the
|
|
|
|
// ClientConn or cancels the stream context. In all other cases, an error
|
|
|
|
// should already be injected into the recv buffer by the transport, which
|
|
|
|
// the client will eventually receive, and then we will cancel the stream's
|
|
|
|
// context in clientStream.finish.
|
|
|
|
go func() {
|
|
|
|
select {
|
|
|
|
case <-ac.ctx.Done():
|
|
|
|
as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
|
|
|
|
case <-ctx.Done():
|
|
|
|
as.finish(toRPCErr(ctx.Err()))
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
return as, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type addrConnStream struct {
|
|
|
|
s *transport.Stream
|
|
|
|
ac *addrConn
|
|
|
|
callHdr *transport.CallHdr
|
|
|
|
cancel context.CancelFunc
|
|
|
|
opts []CallOption
|
|
|
|
callInfo *callInfo
|
|
|
|
t transport.ClientTransport
|
|
|
|
ctx context.Context
|
|
|
|
sentLast bool
|
|
|
|
desc *StreamDesc
|
|
|
|
codec baseCodec
|
|
|
|
cp Compressor
|
|
|
|
comp encoding.Compressor
|
|
|
|
decompSet bool
|
|
|
|
dc Decompressor
|
|
|
|
decomp encoding.Compressor
|
|
|
|
p *parser
|
|
|
|
mu sync.Mutex
|
|
|
|
finished bool
|
|
|
|
}
|
|
|
|
|
|
|
|
func (as *addrConnStream) Header() (metadata.MD, error) {
|
|
|
|
m, err := as.s.Header()
|
|
|
|
if err != nil {
|
|
|
|
as.finish(toRPCErr(err))
|
|
|
|
}
|
|
|
|
return m, err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (as *addrConnStream) Trailer() metadata.MD {
|
|
|
|
return as.s.Trailer()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (as *addrConnStream) CloseSend() error {
|
|
|
|
if as.sentLast {
|
|
|
|
// TODO: return an error and finish the stream instead, due to API misuse?
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
as.sentLast = true
|
|
|
|
|
|
|
|
as.t.Write(as.s, nil, nil, &transport.Options{Last: true})
|
|
|
|
// Always return nil; io.EOF is the only error that might make sense
|
|
|
|
// instead, but there is no need to signal the client to call RecvMsg
|
|
|
|
// as the only use left for the stream after CloseSend is to call
|
|
|
|
// RecvMsg. This also matches historical behavior.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (as *addrConnStream) Context() context.Context {
|
|
|
|
return as.s.Context()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (as *addrConnStream) SendMsg(m interface{}) (err error) {
|
|
|
|
defer func() {
|
|
|
|
if err != nil && err != io.EOF {
|
|
|
|
// Call finish on the client stream for errors generated by this SendMsg
|
|
|
|
// call, as these indicate problems created by this client. (Transport
|
|
|
|
// errors are converted to an io.EOF error in csAttempt.sendMsg; the real
|
|
|
|
// error will be returned from RecvMsg eventually in that case, or be
|
|
|
|
// retried.)
|
|
|
|
as.finish(err)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
if as.sentLast {
|
|
|
|
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
|
|
|
|
}
|
|
|
|
if !as.desc.ClientStreams {
|
|
|
|
as.sentLast = true
|
|
|
|
}
|
|
|
|
data, err := encode(as.codec, m)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
compData, err := compress(data, as.cp, as.comp)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
hdr, payld := msgHeader(data, compData)
|
|
|
|
// TODO(dfawley): should we be checking len(data) instead?
|
|
|
|
if len(payld) > *as.callInfo.maxSendMessageSize {
|
|
|
|
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
|
|
|
|
if !as.desc.ClientStreams {
|
|
|
|
// For non-client-streaming RPCs, we return nil instead of EOF on error
|
|
|
|
// because the generated code requires it. finish is not called; RecvMsg()
|
|
|
|
// will call it with the stream's status independently.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return io.EOF
|
|
|
|
}
|
|
|
|
|
|
|
|
if channelz.IsOn() {
|
|
|
|
as.t.IncrMsgSent()
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
|
|
|
|
defer func() {
|
|
|
|
if err != nil || !as.desc.ServerStreams {
|
|
|
|
// err != nil or non-server-streaming indicates end of stream.
|
|
|
|
as.finish(err)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
if !as.decompSet {
|
|
|
|
// Block until we receive headers containing received message encoding.
|
|
|
|
if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
|
|
|
|
if as.dc == nil || as.dc.Type() != ct {
|
|
|
|
// No configured decompressor, or it does not match the incoming
|
|
|
|
// message encoding; attempt to find a registered compressor that does.
|
|
|
|
as.dc = nil
|
|
|
|
as.decomp = encoding.GetCompressor(ct)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// No compression is used; disable our decompressor.
|
|
|
|
as.dc = nil
|
|
|
|
}
|
|
|
|
// Only initialize this state once per stream.
|
|
|
|
as.decompSet = true
|
|
|
|
}
|
|
|
|
err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
|
|
|
|
if err != nil {
|
|
|
|
if err == io.EOF {
|
|
|
|
if statusErr := as.s.Status().Err(); statusErr != nil {
|
|
|
|
return statusErr
|
|
|
|
}
|
|
|
|
return io.EOF // indicates successful end of stream.
|
|
|
|
}
|
|
|
|
return toRPCErr(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if channelz.IsOn() {
|
|
|
|
as.t.IncrMsgRecv()
|
|
|
|
}
|
|
|
|
if as.desc.ServerStreams {
|
|
|
|
// Subsequent messages should be received by subsequent RecvMsg calls.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Special handling for non-server-stream rpcs.
|
|
|
|
// This recv expects EOF or errors, so we don't collect inPayload.
|
|
|
|
err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
|
|
|
|
if err == nil {
|
|
|
|
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
|
|
|
|
}
|
|
|
|
if err == io.EOF {
|
|
|
|
return as.s.Status().Err() // non-server streaming Recv returns nil on success
|
|
|
|
}
|
|
|
|
return toRPCErr(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (as *addrConnStream) finish(err error) {
|
|
|
|
as.mu.Lock()
|
|
|
|
if as.finished {
|
|
|
|
as.mu.Unlock()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
as.finished = true
|
|
|
|
if err == io.EOF {
|
|
|
|
// Ending a stream with EOF indicates a success.
|
|
|
|
err = nil
|
|
|
|
}
|
|
|
|
if as.s != nil {
|
|
|
|
as.t.CloseStream(as.s, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
as.ac.incrCallsFailed()
|
|
|
|
} else {
|
|
|
|
as.ac.incrCallsSucceeded()
|
|
|
|
}
|
|
|
|
as.cancel()
|
|
|
|
as.mu.Unlock()
|
|
|
|
}
|
|
|
|
|
2018-07-10 02:46:25 +03:00
|
|
|
// ServerStream defines the server-side behavior of a streaming RPC.
|
|
|
|
//
|
|
|
|
// All errors returned from ServerStream methods are compatible with the
|
|
|
|
// status package.
|
2015-02-06 04:14:05 +03:00
|
|
|
type ServerStream interface {
|
2016-08-30 01:16:08 +03:00
|
|
|
// SetHeader sets the header metadata. It may be called multiple times.
|
|
|
|
// When call multiple times, all the provided metadata will be merged.
|
|
|
|
// All the metadata will be sent out when one of the following happens:
|
|
|
|
// - ServerStream.SendHeader() is called;
|
|
|
|
// - The first response is sent out;
|
|
|
|
// - An RPC status is sent out (error or success).
|
|
|
|
SetHeader(metadata.MD) error
|
|
|
|
// SendHeader sends the header metadata.
|
|
|
|
// The provided md and headers set by SetHeader() will be sent.
|
|
|
|
// It fails if called multiple times.
|
2015-02-06 04:14:05 +03:00
|
|
|
SendHeader(metadata.MD) error
|
2016-08-26 01:17:50 +03:00
|
|
|
// SetTrailer sets the trailer metadata which will be sent with the RPC status.
|
|
|
|
// When called more than once, all the provided metadata will be merged.
|
2015-02-06 04:14:05 +03:00
|
|
|
SetTrailer(metadata.MD)
|
2018-07-10 02:46:25 +03:00
|
|
|
// Context returns the context for this stream.
|
|
|
|
Context() context.Context
|
|
|
|
// SendMsg sends a message. On error, SendMsg aborts the stream and the
|
|
|
|
// error is returned directly.
|
|
|
|
//
|
|
|
|
// SendMsg blocks until:
|
|
|
|
// - There is sufficient flow control to schedule m with the transport, or
|
|
|
|
// - The stream is done, or
|
|
|
|
// - The stream breaks.
|
|
|
|
//
|
|
|
|
// SendMsg does not wait until the message is received by the client. An
|
|
|
|
// untimely stream closure may result in lost messages.
|
|
|
|
//
|
|
|
|
// It is safe to have a goroutine calling SendMsg and another goroutine
|
|
|
|
// calling RecvMsg on the same stream at the same time, but it is not safe
|
|
|
|
// to call SendMsg on the same stream in different goroutines.
|
|
|
|
SendMsg(m interface{}) error
|
|
|
|
// RecvMsg blocks until it receives a message into m or the stream is
|
|
|
|
// done. It returns io.EOF when the client has performed a CloseSend. On
|
|
|
|
// any non-EOF error, the stream is aborted and the error contains the
|
|
|
|
// RPC status.
|
|
|
|
//
|
|
|
|
// It is safe to have a goroutine calling SendMsg and another goroutine
|
|
|
|
// calling RecvMsg on the same stream at the same time, but it is not
|
|
|
|
// safe to call RecvMsg on the same stream in different goroutines.
|
|
|
|
RecvMsg(m interface{}) error
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
// serverStream implements a server side Stream.
|
|
|
|
type serverStream struct {
|
2018-03-21 03:02:32 +03:00
|
|
|
ctx context.Context
|
2017-11-17 20:24:54 +03:00
|
|
|
t transport.ServerTransport
|
|
|
|
s *transport.Stream
|
2018-05-03 21:37:59 +03:00
|
|
|
p *parser
|
2018-01-23 22:39:40 +03:00
|
|
|
codec baseCodec
|
2017-11-17 20:24:54 +03:00
|
|
|
|
|
|
|
cp Compressor
|
|
|
|
dc Decompressor
|
|
|
|
comp encoding.Compressor
|
|
|
|
decomp encoding.Compressor
|
|
|
|
|
2017-04-04 01:03:24 +03:00
|
|
|
maxReceiveMessageSize int
|
|
|
|
maxSendMessageSize int
|
2017-04-06 01:08:25 +03:00
|
|
|
trInfo *traceInfo
|
2015-07-28 20:13:40 +03:00
|
|
|
|
2017-01-10 04:11:32 +03:00
|
|
|
statsHandler stats.Handler
|
|
|
|
|
2018-10-31 20:21:20 +03:00
|
|
|
binlog *binarylog.MethodLogger
|
|
|
|
// serverHeaderBinlogged indicates whether server header has been logged. It
|
|
|
|
// will happen when one of the following two happens: stream.SendHeader(),
|
|
|
|
// stream.Send().
|
|
|
|
//
|
|
|
|
// It's only checked in send and sendHeader, doesn't need to be
|
|
|
|
// synchronized.
|
|
|
|
serverHeaderBinlogged bool
|
|
|
|
|
2015-10-06 03:49:53 +03:00
|
|
|
mu sync.Mutex // protects trInfo.tr after the service handler runs.
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (ss *serverStream) Context() context.Context {
|
2018-03-21 03:02:32 +03:00
|
|
|
return ss.ctx
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
2016-08-30 01:16:08 +03:00
|
|
|
func (ss *serverStream) SetHeader(md metadata.MD) error {
|
|
|
|
if md.Len() == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return ss.s.SetHeader(md)
|
|
|
|
}
|
|
|
|
|
2015-02-06 04:14:05 +03:00
|
|
|
func (ss *serverStream) SendHeader(md metadata.MD) error {
|
2018-10-31 20:21:20 +03:00
|
|
|
err := ss.t.WriteHeader(ss.s, md)
|
|
|
|
if ss.binlog != nil && !ss.serverHeaderBinlogged {
|
|
|
|
h, _ := ss.s.Header()
|
|
|
|
ss.binlog.Log(&binarylog.ServerHeader{
|
|
|
|
Header: h,
|
|
|
|
})
|
|
|
|
ss.serverHeaderBinlogged = true
|
|
|
|
}
|
|
|
|
return err
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (ss *serverStream) SetTrailer(md metadata.MD) {
|
|
|
|
if md.Len() == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
ss.s.SetTrailer(md)
|
|
|
|
}
|
|
|
|
|
2015-07-29 01:27:46 +03:00
|
|
|
func (ss *serverStream) SendMsg(m interface{}) (err error) {
|
|
|
|
defer func() {
|
2015-10-06 03:49:53 +03:00
|
|
|
if ss.trInfo != nil {
|
2015-07-29 01:27:46 +03:00
|
|
|
ss.mu.Lock()
|
2015-10-06 03:49:53 +03:00
|
|
|
if ss.trInfo.tr != nil {
|
|
|
|
if err == nil {
|
|
|
|
ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
|
|
|
|
} else {
|
|
|
|
ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
|
|
|
ss.trInfo.tr.SetError()
|
|
|
|
}
|
2015-07-29 01:27:46 +03:00
|
|
|
}
|
|
|
|
ss.mu.Unlock()
|
2015-07-28 00:33:17 +03:00
|
|
|
}
|
2017-08-14 22:24:23 +03:00
|
|
|
if err != nil && err != io.EOF {
|
|
|
|
st, _ := status.FromError(toRPCErr(err))
|
|
|
|
ss.t.WriteStatus(ss.s, st)
|
2018-10-31 20:21:20 +03:00
|
|
|
// Non-user specified status was sent out. This should be an error
|
|
|
|
// case (as a server side Cancel maybe).
|
|
|
|
//
|
|
|
|
// This is not handled specifically now. User will return a final
|
|
|
|
// status from the service handler, we will log that error instead.
|
|
|
|
// This behavior is similar to an interceptor.
|
2017-08-14 22:24:23 +03:00
|
|
|
}
|
2018-04-23 21:22:25 +03:00
|
|
|
if channelz.IsOn() && err == nil {
|
|
|
|
ss.t.IncrMsgSent()
|
|
|
|
}
|
2015-07-29 01:27:46 +03:00
|
|
|
}()
|
2018-05-11 23:47:10 +03:00
|
|
|
data, err := encode(ss.codec, m)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2016-10-23 01:06:41 +03:00
|
|
|
}
|
2018-05-11 23:47:10 +03:00
|
|
|
compData, err := compress(data, ss.cp, ss.comp)
|
2015-02-06 04:14:05 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-05-11 23:47:10 +03:00
|
|
|
hdr, payload := msgHeader(data, compData)
|
|
|
|
// TODO(dfawley): should we be checking len(data) instead?
|
|
|
|
if len(payload) > ss.maxSendMessageSize {
|
|
|
|
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
|
2017-04-04 01:03:24 +03:00
|
|
|
}
|
2018-05-11 23:47:10 +03:00
|
|
|
if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
|
2016-09-01 02:56:46 +03:00
|
|
|
return toRPCErr(err)
|
|
|
|
}
|
2018-10-31 20:21:20 +03:00
|
|
|
if ss.binlog != nil {
|
|
|
|
if !ss.serverHeaderBinlogged {
|
|
|
|
h, _ := ss.s.Header()
|
|
|
|
ss.binlog.Log(&binarylog.ServerHeader{
|
|
|
|
Header: h,
|
|
|
|
})
|
|
|
|
ss.serverHeaderBinlogged = true
|
|
|
|
}
|
|
|
|
ss.binlog.Log(&binarylog.ServerMessage{
|
|
|
|
Message: data,
|
|
|
|
})
|
|
|
|
}
|
2018-05-11 23:47:10 +03:00
|
|
|
if ss.statsHandler != nil {
|
|
|
|
ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
|
2016-10-23 01:06:41 +03:00
|
|
|
}
|
2016-09-01 02:56:46 +03:00
|
|
|
return nil
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
2015-07-29 01:27:46 +03:00
|
|
|
func (ss *serverStream) RecvMsg(m interface{}) (err error) {
|
2015-07-28 00:33:17 +03:00
|
|
|
defer func() {
|
2015-10-06 03:49:53 +03:00
|
|
|
if ss.trInfo != nil {
|
2015-07-28 00:33:17 +03:00
|
|
|
ss.mu.Lock()
|
2015-10-06 03:49:53 +03:00
|
|
|
if ss.trInfo.tr != nil {
|
|
|
|
if err == nil {
|
|
|
|
ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
|
|
|
|
} else if err != io.EOF {
|
|
|
|
ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
|
|
|
ss.trInfo.tr.SetError()
|
|
|
|
}
|
2015-07-28 00:33:17 +03:00
|
|
|
}
|
|
|
|
ss.mu.Unlock()
|
|
|
|
}
|
2017-08-14 22:24:23 +03:00
|
|
|
if err != nil && err != io.EOF {
|
|
|
|
st, _ := status.FromError(toRPCErr(err))
|
|
|
|
ss.t.WriteStatus(ss.s, st)
|
2018-10-31 20:21:20 +03:00
|
|
|
// Non-user specified status was sent out. This should be an error
|
|
|
|
// case (as a server side Cancel maybe).
|
|
|
|
//
|
|
|
|
// This is not handled specifically now. User will return a final
|
|
|
|
// status from the service handler, we will log that error instead.
|
|
|
|
// This behavior is similar to an interceptor.
|
2017-08-14 22:24:23 +03:00
|
|
|
}
|
2018-04-23 21:22:25 +03:00
|
|
|
if channelz.IsOn() && err == nil {
|
|
|
|
ss.t.IncrMsgRecv()
|
|
|
|
}
|
2015-07-28 00:33:17 +03:00
|
|
|
}()
|
2018-10-31 20:21:20 +03:00
|
|
|
var payInfo *payloadInfo
|
|
|
|
if ss.statsHandler != nil || ss.binlog != nil {
|
|
|
|
payInfo = &payloadInfo{}
|
2016-10-23 01:06:41 +03:00
|
|
|
}
|
2018-10-31 20:21:20 +03:00
|
|
|
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
|
2016-09-01 02:56:46 +03:00
|
|
|
if err == io.EOF {
|
2018-10-31 20:21:20 +03:00
|
|
|
if ss.binlog != nil {
|
|
|
|
ss.binlog.Log(&binarylog.ClientHalfClose{})
|
|
|
|
}
|
2016-09-01 02:56:46 +03:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err == io.ErrUnexpectedEOF {
|
2017-12-18 20:23:42 +03:00
|
|
|
err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
|
2016-09-01 02:56:46 +03:00
|
|
|
}
|
|
|
|
return toRPCErr(err)
|
|
|
|
}
|
2018-10-31 20:21:20 +03:00
|
|
|
if ss.statsHandler != nil {
|
|
|
|
ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
|
|
|
|
RecvTime: time.Now(),
|
|
|
|
Payload: m,
|
|
|
|
// TODO truncate large payload.
|
2019-03-26 01:42:16 +03:00
|
|
|
Data: payInfo.uncompressedBytes,
|
|
|
|
WireLength: payInfo.wireLength,
|
|
|
|
Length: len(payInfo.uncompressedBytes),
|
2018-10-31 20:21:20 +03:00
|
|
|
})
|
|
|
|
}
|
|
|
|
if ss.binlog != nil {
|
|
|
|
ss.binlog.Log(&binarylog.ClientMessage{
|
|
|
|
Message: payInfo.uncompressedBytes,
|
|
|
|
})
|
2016-10-23 01:06:41 +03:00
|
|
|
}
|
2016-09-01 02:56:46 +03:00
|
|
|
return nil
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
2017-10-27 02:03:44 +03:00
|
|
|
|
|
|
|
// MethodFromServerStream returns the method string for the input stream.
|
|
|
|
// The returned string is in the format of "/service/method".
|
|
|
|
func MethodFromServerStream(stream ServerStream) (string, bool) {
|
2018-04-02 23:08:04 +03:00
|
|
|
return Method(stream.Context())
|
2017-10-27 02:03:44 +03:00
|
|
|
}
|