2015-02-06 04:14:05 +03:00
|
|
|
/*
|
|
|
|
*
|
2017-06-08 15:42:19 +03:00
|
|
|
* Copyright 2014 gRPC authors.
|
2015-02-06 04:14:05 +03:00
|
|
|
*
|
2017-06-08 15:42:19 +03:00
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
2015-02-06 04:14:05 +03:00
|
|
|
*
|
2017-06-08 15:42:19 +03:00
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
2015-02-06 04:14:05 +03:00
|
|
|
*
|
2017-06-08 15:42:19 +03:00
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
2015-02-06 04:14:05 +03:00
|
|
|
*
|
|
|
|
*/
|
|
|
|
|
2015-02-09 03:35:38 +03:00
|
|
|
package grpc
|
2015-02-06 04:14:05 +03:00
|
|
|
|
|
|
|
import (
|
2015-02-20 03:48:59 +03:00
|
|
|
"errors"
|
2015-02-06 04:14:05 +03:00
|
|
|
"io"
|
2015-06-25 05:52:33 +03:00
|
|
|
"sync"
|
2015-06-18 05:30:57 +03:00
|
|
|
"time"
|
2015-02-06 04:14:05 +03:00
|
|
|
|
2015-02-19 01:00:26 +03:00
|
|
|
"golang.org/x/net/context"
|
2015-06-18 05:30:57 +03:00
|
|
|
"golang.org/x/net/trace"
|
2017-08-31 20:59:09 +03:00
|
|
|
"google.golang.org/grpc/balancer"
|
2018-04-23 21:22:25 +03:00
|
|
|
"google.golang.org/grpc/channelz"
|
2015-02-09 03:39:06 +03:00
|
|
|
"google.golang.org/grpc/codes"
|
2017-10-31 20:21:13 +03:00
|
|
|
"google.golang.org/grpc/encoding"
|
2015-02-09 03:39:06 +03:00
|
|
|
"google.golang.org/grpc/metadata"
|
2016-10-23 01:06:41 +03:00
|
|
|
"google.golang.org/grpc/stats"
|
2017-04-05 20:35:40 +03:00
|
|
|
"google.golang.org/grpc/status"
|
2015-02-09 03:39:06 +03:00
|
|
|
"google.golang.org/grpc/transport"
|
2015-02-06 04:14:05 +03:00
|
|
|
)
|
|
|
|
|
2016-04-19 02:18:34 +03:00
|
|
|
// StreamHandler defines the handler called by gRPC server to complete the
|
2018-03-09 00:46:26 +03:00
|
|
|
// execution of a streaming RPC. If a StreamHandler returns an error, it
|
|
|
|
// should be produced by the status package, or else gRPC will use
|
|
|
|
// codes.Unknown as the status code and err.Error() as the status message
|
|
|
|
// of the RPC.
|
2016-04-19 02:18:34 +03:00
|
|
|
type StreamHandler func(srv interface{}, stream ServerStream) error
|
2015-02-19 09:15:13 +03:00
|
|
|
|
|
|
|
// StreamDesc represents a streaming RPC service's method specification.
|
|
|
|
type StreamDesc struct {
|
|
|
|
StreamName string
|
2016-04-19 02:18:34 +03:00
|
|
|
Handler StreamHandler
|
2015-02-19 09:15:13 +03:00
|
|
|
|
|
|
|
// At least one of these is true.
|
|
|
|
ServerStreams bool
|
|
|
|
ClientStreams bool
|
|
|
|
}
|
|
|
|
|
2015-02-06 04:14:05 +03:00
|
|
|
// Stream defines the common interface a client or server stream has to satisfy.
|
2018-01-06 02:37:05 +03:00
|
|
|
//
|
|
|
|
// All errors returned from Stream are compatible with the status package.
|
2015-02-06 04:14:05 +03:00
|
|
|
type Stream interface {
|
|
|
|
// Context returns the context for this stream.
|
|
|
|
Context() context.Context
|
2015-03-26 01:18:07 +03:00
|
|
|
// SendMsg blocks until it sends m, the stream is done or the stream
|
2015-02-14 03:43:51 +03:00
|
|
|
// breaks.
|
|
|
|
// On error, it aborts the stream and returns an RPC status on client
|
|
|
|
// side. On server side, it simply returns the error to the caller.
|
2016-03-31 00:41:40 +03:00
|
|
|
// SendMsg is called by generated code. Also Users can call SendMsg
|
|
|
|
// directly when it is really needed in their use cases.
|
2017-06-16 01:24:17 +03:00
|
|
|
// It's safe to have a goroutine calling SendMsg and another goroutine calling
|
|
|
|
// recvMsg on the same stream at the same time.
|
|
|
|
// But it is not safe to call SendMsg on the same stream in different goroutines.
|
2015-03-26 01:18:07 +03:00
|
|
|
SendMsg(m interface{}) error
|
2015-04-02 00:02:26 +03:00
|
|
|
// RecvMsg blocks until it receives a message or the stream is
|
2015-02-14 03:43:51 +03:00
|
|
|
// done. On client side, it returns io.EOF when the stream is done. On
|
2016-02-12 07:19:47 +03:00
|
|
|
// any other error, it aborts the stream and returns an RPC status. On
|
2015-02-14 03:43:51 +03:00
|
|
|
// server side, it simply returns the error to the caller.
|
2017-06-16 01:24:17 +03:00
|
|
|
// It's safe to have a goroutine calling SendMsg and another goroutine calling
|
|
|
|
// recvMsg on the same stream at the same time.
|
|
|
|
// But it is not safe to call RecvMsg on the same stream in different goroutines.
|
2015-03-26 01:18:07 +03:00
|
|
|
RecvMsg(m interface{}) error
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
2016-06-07 14:01:07 +03:00
|
|
|
// ClientStream defines the interface a client stream has to satisfy.
|
2015-02-06 04:14:05 +03:00
|
|
|
type ClientStream interface {
|
2016-04-30 00:58:02 +03:00
|
|
|
// Header returns the header metadata received from the server if there
|
2015-02-06 04:14:05 +03:00
|
|
|
// is any. It blocks if the metadata is not ready to read.
|
|
|
|
Header() (metadata.MD, error)
|
2016-07-19 07:20:15 +03:00
|
|
|
// Trailer returns the trailer metadata from the server, if there is any.
|
|
|
|
// It must only be called after stream.CloseAndRecv has returned, or
|
|
|
|
// stream.Recv has returned a non-nil error (including io.EOF).
|
2015-02-06 04:14:05 +03:00
|
|
|
Trailer() metadata.MD
|
2015-02-14 03:43:51 +03:00
|
|
|
// CloseSend closes the send direction of the stream. It closes the stream
|
|
|
|
// when non-nil error is met.
|
2015-02-06 04:14:05 +03:00
|
|
|
CloseSend() error
|
2017-06-23 23:57:58 +03:00
|
|
|
// Stream.SendMsg() may return a non-nil error when something wrong happens sending
|
|
|
|
// the request. The returned error indicates the status of this sending, not the final
|
|
|
|
// status of the RPC.
|
2018-02-02 21:35:15 +03:00
|
|
|
//
|
|
|
|
// Always call Stream.RecvMsg() to drain the stream and get the final
|
|
|
|
// status, otherwise there could be leaked resources.
|
2015-02-06 04:14:05 +03:00
|
|
|
Stream
|
|
|
|
}
|
|
|
|
|
2017-10-21 00:21:31 +03:00
|
|
|
// NewStream creates a new Stream for the client side. This is typically
|
|
|
|
// called by generated code.
|
|
|
|
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
|
2018-03-17 01:57:34 +03:00
|
|
|
// allow interceptor to see all applicable call options, which means those
|
|
|
|
// configured as defaults from dial option as well as per-call options
|
2018-03-29 20:34:29 +03:00
|
|
|
opts = combine(cc.dopts.callOptions, opts)
|
2018-03-17 01:57:34 +03:00
|
|
|
|
2016-08-26 23:50:38 +03:00
|
|
|
if cc.dopts.streamInt != nil {
|
|
|
|
return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
|
|
|
|
}
|
|
|
|
return newClientStream(ctx, desc, cc, method, opts...)
|
|
|
|
}
|
|
|
|
|
2017-10-21 00:21:31 +03:00
|
|
|
// NewClientStream creates a new Stream for the client side. This is typically
|
|
|
|
// called by generated code.
|
|
|
|
//
|
|
|
|
// DEPRECATED: Use ClientConn.NewStream instead.
|
|
|
|
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
|
|
|
|
return cc.NewStream(ctx, desc, method, opts...)
|
|
|
|
}
|
|
|
|
|
2016-08-26 23:50:38 +03:00
|
|
|
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
|
2018-04-23 21:22:25 +03:00
|
|
|
if channelz.IsOn() {
|
|
|
|
cc.incrCallsStarted()
|
|
|
|
defer func() {
|
|
|
|
if err != nil {
|
|
|
|
cc.incrCallsFailed()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
2017-08-31 20:59:09 +03:00
|
|
|
c := defaultCallInfo()
|
2017-05-15 23:51:11 +03:00
|
|
|
mc := cc.GetMethodConfig(method)
|
2017-04-22 02:18:59 +03:00
|
|
|
if mc.WaitForReady != nil {
|
|
|
|
c.failFast = !*mc.WaitForReady
|
|
|
|
}
|
2017-04-04 01:03:24 +03:00
|
|
|
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
// Possible context leak:
|
|
|
|
// The cancel function for the child context we create will only be called
|
|
|
|
// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
|
|
|
|
// an error is generated by SendMsg.
|
|
|
|
// https://github.com/grpc/grpc-go/issues/1818.
|
|
|
|
var cancel context.CancelFunc
|
2017-11-21 00:49:49 +03:00
|
|
|
if mc.Timeout != nil && *mc.Timeout >= 0 {
|
2017-04-22 02:18:59 +03:00
|
|
|
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
} else {
|
|
|
|
ctx, cancel = context.WithCancel(ctx)
|
2016-12-20 03:31:00 +03:00
|
|
|
}
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
defer func() {
|
|
|
|
if err != nil {
|
|
|
|
cancel()
|
|
|
|
}
|
|
|
|
}()
|
2017-04-22 02:18:59 +03:00
|
|
|
|
2016-06-28 00:36:59 +03:00
|
|
|
for _, o := range opts {
|
2017-08-31 20:59:09 +03:00
|
|
|
if err := o.before(c); err != nil {
|
2016-06-28 00:36:59 +03:00
|
|
|
return nil, toRPCErr(err)
|
|
|
|
}
|
|
|
|
}
|
2017-04-27 01:50:58 +03:00
|
|
|
c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
|
|
|
|
c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
|
2018-01-23 22:39:40 +03:00
|
|
|
if err := setCallInfoCodec(c); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2017-04-27 01:50:58 +03:00
|
|
|
|
2015-02-06 04:14:05 +03:00
|
|
|
callHdr := &transport.CallHdr{
|
2016-11-08 00:17:59 +03:00
|
|
|
Host: cc.authority,
|
|
|
|
Method: method,
|
2017-07-06 02:51:14 +03:00
|
|
|
// If it's not client streaming, we should already have the request to be sent,
|
|
|
|
// so we don't flush the header.
|
|
|
|
// If it's client streaming, the user may never send a request or send it any
|
|
|
|
// time soon, so we ask the transport to flush the header.
|
2018-01-23 22:39:40 +03:00
|
|
|
Flush: desc.ClientStreams,
|
|
|
|
ContentSubtype: c.contentSubtype,
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
2017-11-17 20:24:54 +03:00
|
|
|
|
|
|
|
// Set our outgoing compression according to the UseCompressor CallOption, if
|
|
|
|
// set. In that case, also find the compressor from the encoding package.
|
|
|
|
// Otherwise, use the compressor configured by the WithCompressor DialOption,
|
|
|
|
// if set.
|
|
|
|
var cp Compressor
|
|
|
|
var comp encoding.Compressor
|
|
|
|
if ct := c.compressorType; ct != "" {
|
|
|
|
callHdr.SendCompress = ct
|
|
|
|
if ct != encoding.Identity {
|
|
|
|
comp = encoding.GetCompressor(ct)
|
|
|
|
if comp == nil {
|
2017-12-18 20:23:42 +03:00
|
|
|
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
|
2017-11-17 20:24:54 +03:00
|
|
|
}
|
|
|
|
}
|
2017-10-31 20:21:13 +03:00
|
|
|
} else if cc.dopts.cp != nil {
|
2016-01-30 01:38:20 +03:00
|
|
|
callHdr.SendCompress = cc.dopts.cp.Type()
|
2017-11-17 20:24:54 +03:00
|
|
|
cp = cc.dopts.cp
|
2016-01-23 05:21:41 +03:00
|
|
|
}
|
2017-05-11 21:07:38 +03:00
|
|
|
if c.creds != nil {
|
|
|
|
callHdr.Creds = c.creds
|
|
|
|
}
|
2016-08-09 23:22:02 +03:00
|
|
|
var trInfo traceInfo
|
|
|
|
if EnableTracing {
|
|
|
|
trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
|
|
|
|
trInfo.firstLine.client = true
|
2015-06-18 21:45:40 +03:00
|
|
|
if deadline, ok := ctx.Deadline(); ok {
|
2016-08-09 23:22:02 +03:00
|
|
|
trInfo.firstLine.deadline = deadline.Sub(time.Now())
|
2015-06-18 21:45:40 +03:00
|
|
|
}
|
2016-08-09 23:22:02 +03:00
|
|
|
trInfo.tr.LazyLog(&trInfo.firstLine, false)
|
|
|
|
ctx = trace.NewContext(ctx, trInfo.tr)
|
|
|
|
defer func() {
|
|
|
|
if err != nil {
|
|
|
|
// Need to call tr.finish() if error is returned.
|
|
|
|
// Because tr will not be returned to caller.
|
|
|
|
trInfo.tr.LazyPrintf("RPC: [%v]", err)
|
|
|
|
trInfo.tr.SetError()
|
|
|
|
trInfo.tr.Finish()
|
|
|
|
}
|
|
|
|
}()
|
2015-06-18 21:45:40 +03:00
|
|
|
}
|
2017-08-31 20:59:09 +03:00
|
|
|
ctx = newContextWithRPCInfo(ctx, c.failFast)
|
2017-01-10 04:11:32 +03:00
|
|
|
sh := cc.dopts.copts.StatsHandler
|
2018-03-12 19:16:36 +03:00
|
|
|
var beginTime time.Time
|
2017-01-10 04:11:32 +03:00
|
|
|
if sh != nil {
|
2017-05-11 03:54:49 +03:00
|
|
|
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
|
2018-03-12 19:16:36 +03:00
|
|
|
beginTime = time.Now()
|
2016-11-08 00:17:59 +03:00
|
|
|
begin := &stats.Begin{
|
|
|
|
Client: true,
|
2018-03-12 19:16:36 +03:00
|
|
|
BeginTime: beginTime,
|
2016-11-08 00:17:59 +03:00
|
|
|
FailFast: c.failFast,
|
|
|
|
}
|
2017-01-10 04:11:32 +03:00
|
|
|
sh.HandleRPC(ctx, begin)
|
2017-05-02 20:16:45 +03:00
|
|
|
defer func() {
|
|
|
|
if err != nil {
|
|
|
|
// Only handle end stats if err != nil.
|
|
|
|
end := &stats.End{
|
2018-03-12 19:16:36 +03:00
|
|
|
Client: true,
|
|
|
|
Error: err,
|
|
|
|
BeginTime: beginTime,
|
|
|
|
EndTime: time.Now(),
|
2017-05-02 20:16:45 +03:00
|
|
|
}
|
|
|
|
sh.HandleRPC(ctx, end)
|
2016-11-08 00:17:59 +03:00
|
|
|
}
|
2017-05-02 20:16:45 +03:00
|
|
|
}()
|
|
|
|
}
|
2017-11-07 00:45:11 +03:00
|
|
|
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
var (
|
|
|
|
t transport.ClientTransport
|
|
|
|
s *transport.Stream
|
|
|
|
done func(balancer.DoneInfo)
|
|
|
|
)
|
2016-06-29 02:08:19 +03:00
|
|
|
for {
|
2017-11-07 00:45:11 +03:00
|
|
|
// Check to make sure the context has expired. This will prevent us from
|
|
|
|
// looping forever if an error occurs for wait-for-ready RPCs where no data
|
|
|
|
// is sent on the wire.
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return nil, toRPCErr(ctx.Err())
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
2017-10-02 19:22:57 +03:00
|
|
|
t, done, err = cc.getTransport(ctx, c.failFast)
|
2016-06-29 02:08:19 +03:00
|
|
|
if err != nil {
|
2017-11-07 00:45:11 +03:00
|
|
|
return nil, err
|
2016-06-29 02:08:19 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
s, err = t.NewStream(ctx, callHdr)
|
|
|
|
if err != nil {
|
2017-10-02 19:22:57 +03:00
|
|
|
if done != nil {
|
2018-02-05 23:54:13 +03:00
|
|
|
done(balancer.DoneInfo{Err: err})
|
2017-10-02 19:22:57 +03:00
|
|
|
done = nil
|
2016-06-29 02:08:19 +03:00
|
|
|
}
|
2017-11-07 00:45:11 +03:00
|
|
|
// In the event of any error from NewStream, we never attempted to write
|
|
|
|
// anything to the wire, so we can retry indefinitely for non-fail-fast
|
|
|
|
// RPCs.
|
|
|
|
if !c.failFast {
|
2016-06-29 02:08:19 +03:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
return nil, toRPCErr(err)
|
|
|
|
}
|
|
|
|
break
|
2016-01-15 01:53:07 +03:00
|
|
|
}
|
2017-11-07 00:45:11 +03:00
|
|
|
|
2016-08-09 23:22:02 +03:00
|
|
|
cs := &clientStream{
|
2017-05-15 23:51:11 +03:00
|
|
|
opts: opts,
|
|
|
|
c: c,
|
2018-04-23 21:22:25 +03:00
|
|
|
cc: cc,
|
2017-05-15 23:51:11 +03:00
|
|
|
desc: desc,
|
2018-01-23 22:39:40 +03:00
|
|
|
codec: c.codec,
|
2017-11-17 20:24:54 +03:00
|
|
|
cp: cp,
|
|
|
|
comp: comp,
|
2017-05-15 23:51:11 +03:00
|
|
|
cancel: cancel,
|
2018-03-12 23:27:54 +03:00
|
|
|
attempt: &csAttempt{
|
|
|
|
t: t,
|
|
|
|
s: s,
|
2018-05-03 21:37:59 +03:00
|
|
|
p: &parser{r: s},
|
2018-03-12 23:27:54 +03:00
|
|
|
done: done,
|
|
|
|
dc: cc.dopts.dc,
|
|
|
|
ctx: ctx,
|
|
|
|
trInfo: trInfo,
|
|
|
|
statsHandler: sh,
|
|
|
|
beginTime: beginTime,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
cs.c.stream = cs
|
|
|
|
cs.attempt.cs = cs
|
2018-02-05 23:54:13 +03:00
|
|
|
if desc != unaryStreamDesc {
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
// Listen on cc and stream contexts to cleanup when the user closes the
|
|
|
|
// ClientConn or cancels the stream context. In all other cases, an error
|
|
|
|
// should already be injected into the recv buffer by the transport, which
|
|
|
|
// the client will eventually receive, and then we will cancel the stream's
|
|
|
|
// context in clientStream.finish.
|
2018-02-05 23:54:13 +03:00
|
|
|
go func() {
|
|
|
|
select {
|
|
|
|
case <-cc.ctx.Done():
|
|
|
|
cs.finish(ErrClientConnClosing)
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
case <-ctx.Done():
|
2018-03-12 23:27:54 +03:00
|
|
|
cs.finish(toRPCErr(ctx.Err()))
|
2018-02-05 23:54:13 +03:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
2015-06-18 21:45:40 +03:00
|
|
|
return cs, nil
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
// clientStream implements a client side Stream.
|
|
|
|
type clientStream struct {
|
2017-11-17 20:24:54 +03:00
|
|
|
opts []CallOption
|
|
|
|
c *callInfo
|
2018-04-23 21:22:25 +03:00
|
|
|
cc *ClientConn
|
2018-03-12 23:27:54 +03:00
|
|
|
desc *StreamDesc
|
|
|
|
|
|
|
|
codec baseCodec
|
|
|
|
cp Compressor
|
|
|
|
comp encoding.Compressor
|
|
|
|
|
|
|
|
cancel context.CancelFunc // cancels all attempts
|
|
|
|
|
|
|
|
sentLast bool // sent an end stream
|
|
|
|
|
|
|
|
mu sync.Mutex // guards finished
|
|
|
|
finished bool // TODO: replace with atomic cmpxchg or sync.Once?
|
|
|
|
|
|
|
|
attempt *csAttempt // the active client stream attempt
|
|
|
|
// TODO(hedging): hedging will have multiple attempts simultaneously.
|
|
|
|
}
|
|
|
|
|
|
|
|
// csAttempt implements a single transport stream attempt within a
|
|
|
|
// clientStream.
|
|
|
|
type csAttempt struct {
|
|
|
|
cs *clientStream
|
2017-11-17 20:24:54 +03:00
|
|
|
t transport.ClientTransport
|
|
|
|
s *transport.Stream
|
2018-05-03 21:37:59 +03:00
|
|
|
p *parser
|
2018-03-12 23:27:54 +03:00
|
|
|
done func(balancer.DoneInfo)
|
2017-11-17 20:24:54 +03:00
|
|
|
|
|
|
|
dc Decompressor
|
|
|
|
decomp encoding.Compressor
|
|
|
|
decompSet bool
|
|
|
|
|
2018-03-12 23:27:54 +03:00
|
|
|
ctx context.Context // the application's context, wrapped by stats/tracing
|
2015-06-25 05:52:33 +03:00
|
|
|
|
2018-03-12 23:27:54 +03:00
|
|
|
mu sync.Mutex // guards trInfo.tr
|
|
|
|
// trInfo.tr is set when created (if EnableTracing is true),
|
|
|
|
// and cleared when the finish method is called.
|
2015-10-06 03:49:53 +03:00
|
|
|
trInfo traceInfo
|
2016-11-05 02:00:06 +03:00
|
|
|
|
2017-01-10 04:11:32 +03:00
|
|
|
statsHandler stats.Handler
|
2018-03-12 19:16:36 +03:00
|
|
|
beginTime time.Time
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *clientStream) Context() context.Context {
|
2018-03-12 23:27:54 +03:00
|
|
|
// TODO(retry): commit the current attempt (the context has peer-aware data).
|
|
|
|
return cs.attempt.context()
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
2016-11-08 05:06:11 +03:00
|
|
|
func (cs *clientStream) Header() (metadata.MD, error) {
|
2018-03-12 23:27:54 +03:00
|
|
|
m, err := cs.attempt.header()
|
2015-02-19 01:00:26 +03:00
|
|
|
if err != nil {
|
2018-03-12 23:27:54 +03:00
|
|
|
// TODO(retry): maybe retry on error or commit attempt on success.
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
err = toRPCErr(err)
|
|
|
|
cs.finish(err)
|
2015-02-19 01:00:26 +03:00
|
|
|
}
|
|
|
|
return m, err
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *clientStream) Trailer() metadata.MD {
|
2018-03-12 23:27:54 +03:00
|
|
|
// TODO(retry): on error, maybe retry (trailers-only).
|
|
|
|
return cs.attempt.trailer()
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
2015-03-26 01:18:07 +03:00
|
|
|
func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
2018-03-12 23:27:54 +03:00
|
|
|
// TODO(retry): buffer message for replaying if not committed.
|
|
|
|
return cs.attempt.sendMsg(m)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *clientStream) RecvMsg(m interface{}) (err error) {
|
|
|
|
// TODO(retry): maybe retry on error or commit attempt on success.
|
|
|
|
return cs.attempt.recvMsg(m)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *clientStream) CloseSend() error {
|
|
|
|
cs.attempt.closeSend()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *clientStream) finish(err error) {
|
|
|
|
if err == io.EOF {
|
|
|
|
// Ending a stream with EOF indicates a success.
|
|
|
|
err = nil
|
|
|
|
}
|
|
|
|
cs.mu.Lock()
|
|
|
|
if cs.finished {
|
2015-06-25 05:52:33 +03:00
|
|
|
cs.mu.Unlock()
|
2018-03-12 23:27:54 +03:00
|
|
|
return
|
|
|
|
}
|
|
|
|
cs.finished = true
|
|
|
|
cs.mu.Unlock()
|
2018-04-23 21:22:25 +03:00
|
|
|
if channelz.IsOn() {
|
|
|
|
if err != nil {
|
|
|
|
cs.cc.incrCallsFailed()
|
|
|
|
} else {
|
|
|
|
cs.cc.incrCallsSucceeded()
|
|
|
|
}
|
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
// TODO(retry): commit current attempt if necessary.
|
|
|
|
cs.attempt.finish(err)
|
|
|
|
for _, o := range cs.opts {
|
|
|
|
o.after(cs.c)
|
2015-06-25 05:52:33 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
cs.cancel()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *csAttempt) context() context.Context {
|
|
|
|
return a.s.Context()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *csAttempt) header() (metadata.MD, error) {
|
|
|
|
return a.s.Header()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *csAttempt) trailer() metadata.MD {
|
|
|
|
return a.s.Trailer()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *csAttempt) sendMsg(m interface{}) (err error) {
|
2016-11-08 21:31:02 +03:00
|
|
|
// TODO Investigate how to signal the stats handling party.
|
|
|
|
// generate error stats if err != nil && err != io.EOF?
|
2018-03-12 23:27:54 +03:00
|
|
|
cs := a.cs
|
2015-02-06 04:14:05 +03:00
|
|
|
defer func() {
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
// For non-client-streaming RPCs, we return nil instead of EOF on success
|
|
|
|
// because the generated code requires it. finish is not called; RecvMsg()
|
|
|
|
// will call it with the stream's status independently.
|
|
|
|
if err == io.EOF && !cs.desc.ClientStreams {
|
|
|
|
err = nil
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
if err != nil && err != io.EOF {
|
2018-03-12 23:27:54 +03:00
|
|
|
// Call finish on the client stream for errors generated by this SendMsg
|
|
|
|
// call, as these indicate problems created by this client. (Transport
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
// errors are converted to an io.EOF error below; the real error will be
|
2018-03-12 23:27:54 +03:00
|
|
|
// returned from RecvMsg eventually in that case, or be retried.)
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
cs.finish(err)
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
}()
|
2018-03-12 23:27:54 +03:00
|
|
|
// TODO: Check cs.sentLast and error if we already ended the stream.
|
|
|
|
if EnableTracing {
|
|
|
|
a.mu.Lock()
|
|
|
|
if a.trInfo.tr != nil {
|
|
|
|
a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
|
|
|
|
}
|
|
|
|
a.mu.Unlock()
|
|
|
|
}
|
2016-11-05 02:00:06 +03:00
|
|
|
var outPayload *stats.OutPayload
|
2018-03-12 23:27:54 +03:00
|
|
|
if a.statsHandler != nil {
|
2016-11-05 02:00:06 +03:00
|
|
|
outPayload = &stats.OutPayload{
|
2016-11-03 01:45:22 +03:00
|
|
|
Client: true,
|
2016-11-02 02:57:54 +03:00
|
|
|
}
|
|
|
|
}
|
2018-05-03 21:37:59 +03:00
|
|
|
hdr, data, err := encode(cs.codec, m, cs.cp, outPayload, cs.comp)
|
2015-02-06 04:14:05 +03:00
|
|
|
if err != nil {
|
2017-06-02 22:32:37 +03:00
|
|
|
return err
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
2017-08-25 22:26:38 +03:00
|
|
|
if len(data) > *cs.c.maxSendMessageSize {
|
2017-12-18 20:23:42 +03:00
|
|
|
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), *cs.c.maxSendMessageSize)
|
2017-04-04 01:03:24 +03:00
|
|
|
}
|
2018-02-05 23:54:13 +03:00
|
|
|
if !cs.desc.ClientStreams {
|
|
|
|
cs.sentLast = true
|
|
|
|
}
|
2018-05-03 21:37:59 +03:00
|
|
|
err = a.t.Write(a.s, hdr, data, &transport.Options{Last: !cs.desc.ClientStreams})
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
if err == nil {
|
|
|
|
if outPayload != nil {
|
|
|
|
outPayload.SentTime = time.Now()
|
2018-03-12 23:27:54 +03:00
|
|
|
a.statsHandler.HandleRPC(a.ctx, outPayload)
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
}
|
2018-04-23 21:22:25 +03:00
|
|
|
if channelz.IsOn() {
|
|
|
|
a.t.IncrMsgSent()
|
|
|
|
}
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
return nil
|
2016-11-02 02:57:54 +03:00
|
|
|
}
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
return io.EOF
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
2018-03-12 23:27:54 +03:00
|
|
|
func (a *csAttempt) recvMsg(m interface{}) (err error) {
|
|
|
|
cs := a.cs
|
2018-02-05 23:54:13 +03:00
|
|
|
defer func() {
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
if err != nil || !cs.desc.ServerStreams {
|
|
|
|
// err != nil or non-server-streaming indicates end of stream.
|
2018-02-05 23:54:13 +03:00
|
|
|
cs.finish(err)
|
|
|
|
}
|
|
|
|
}()
|
2016-11-05 02:02:19 +03:00
|
|
|
var inPayload *stats.InPayload
|
2018-03-12 23:27:54 +03:00
|
|
|
if a.statsHandler != nil {
|
2016-11-05 02:02:19 +03:00
|
|
|
inPayload = &stats.InPayload{
|
2016-11-03 01:45:22 +03:00
|
|
|
Client: true,
|
2016-11-02 02:57:54 +03:00
|
|
|
}
|
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
if !a.decompSet {
|
2017-11-17 20:24:54 +03:00
|
|
|
// Block until we receive headers containing received message encoding.
|
2018-03-12 23:27:54 +03:00
|
|
|
if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
|
|
|
|
if a.dc == nil || a.dc.Type() != ct {
|
2017-11-17 20:24:54 +03:00
|
|
|
// No configured decompressor, or it does not match the incoming
|
|
|
|
// message encoding; attempt to find a registered compressor that does.
|
2018-03-12 23:27:54 +03:00
|
|
|
a.dc = nil
|
|
|
|
a.decomp = encoding.GetCompressor(ct)
|
2017-11-17 20:24:54 +03:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// No compression is used; disable our decompressor.
|
2018-03-12 23:27:54 +03:00
|
|
|
a.dc = nil
|
2017-11-17 20:24:54 +03:00
|
|
|
}
|
|
|
|
// Only initialize this state once per stream.
|
2018-03-12 23:27:54 +03:00
|
|
|
a.decompSet = true
|
2017-11-17 20:24:54 +03:00
|
|
|
}
|
2018-05-03 21:37:59 +03:00
|
|
|
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, inPayload, a.decomp)
|
2018-02-05 23:54:13 +03:00
|
|
|
if err != nil {
|
2015-02-20 03:48:59 +03:00
|
|
|
if err == io.EOF {
|
2018-03-12 23:27:54 +03:00
|
|
|
if statusErr := a.s.Status().Err(); statusErr != nil {
|
2018-02-05 23:54:13 +03:00
|
|
|
return statusErr
|
2018-02-02 21:35:15 +03:00
|
|
|
}
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
return io.EOF // indicates successful end of stream.
|
2015-02-19 09:15:13 +03:00
|
|
|
}
|
2015-02-20 03:48:59 +03:00
|
|
|
return toRPCErr(err)
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
if EnableTracing {
|
|
|
|
a.mu.Lock()
|
|
|
|
if a.trInfo.tr != nil {
|
|
|
|
a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
|
2018-02-05 23:54:13 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
a.mu.Unlock()
|
2018-02-05 23:54:13 +03:00
|
|
|
}
|
|
|
|
if inPayload != nil {
|
2018-03-12 23:27:54 +03:00
|
|
|
a.statsHandler.HandleRPC(a.ctx, inPayload)
|
2018-02-05 23:54:13 +03:00
|
|
|
}
|
2018-04-23 21:22:25 +03:00
|
|
|
if channelz.IsOn() {
|
|
|
|
a.t.IncrMsgRecv()
|
|
|
|
}
|
2018-02-05 23:54:13 +03:00
|
|
|
if cs.desc.ServerStreams {
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
// Subsequent messages should be received by subsequent RecvMsg calls.
|
2018-02-05 23:54:13 +03:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Special handling for non-server-stream rpcs.
|
|
|
|
// This recv expects EOF or errors, so we don't collect inPayload.
|
2018-05-03 21:37:59 +03:00
|
|
|
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, nil, a.decomp)
|
2018-02-05 23:54:13 +03:00
|
|
|
if err == nil {
|
|
|
|
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
|
2015-02-19 01:00:26 +03:00
|
|
|
}
|
2015-02-06 04:14:05 +03:00
|
|
|
if err == io.EOF {
|
2018-03-12 23:27:54 +03:00
|
|
|
return a.s.Status().Err() // non-server streaming Recv returns nil on success
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
return toRPCErr(err)
|
|
|
|
}
|
|
|
|
|
2018-03-12 23:27:54 +03:00
|
|
|
func (a *csAttempt) closeSend() {
|
|
|
|
cs := a.cs
|
2018-02-05 23:54:13 +03:00
|
|
|
if cs.sentLast {
|
2018-03-12 23:27:54 +03:00
|
|
|
return
|
2018-02-05 23:54:13 +03:00
|
|
|
}
|
|
|
|
cs.sentLast = true
|
2018-05-03 21:37:59 +03:00
|
|
|
cs.attempt.t.Write(cs.attempt.s, nil, nil, &transport.Options{Last: true})
|
2018-03-12 23:27:54 +03:00
|
|
|
// We ignore errors from Write. Any error it would return would also be
|
|
|
|
// returned by a subsequent RecvMsg call, and the user is supposed to always
|
|
|
|
// finish the stream by calling RecvMsg until it returns err != nil.
|
2015-10-22 23:07:13 +03:00
|
|
|
}
|
|
|
|
|
2018-03-12 23:27:54 +03:00
|
|
|
func (a *csAttempt) finish(err error) {
|
|
|
|
a.mu.Lock()
|
|
|
|
a.t.CloseStream(a.s, err)
|
|
|
|
|
|
|
|
if a.done != nil {
|
|
|
|
a.done(balancer.DoneInfo{
|
2017-11-27 22:16:26 +03:00
|
|
|
Err: err,
|
|
|
|
BytesSent: true,
|
2018-03-12 23:27:54 +03:00
|
|
|
BytesReceived: a.s.BytesReceived(),
|
2017-04-27 20:43:38 +03:00
|
|
|
})
|
2016-05-07 01:47:09 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
if a.statsHandler != nil {
|
2017-04-12 21:55:54 +03:00
|
|
|
end := &stats.End{
|
2018-03-12 19:16:36 +03:00
|
|
|
Client: true,
|
2018-03-12 23:27:54 +03:00
|
|
|
BeginTime: a.beginTime,
|
2018-03-12 19:16:36 +03:00
|
|
|
EndTime: time.Now(),
|
|
|
|
Error: err,
|
2017-04-12 21:55:54 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
a.statsHandler.HandleRPC(a.ctx, end)
|
2016-06-28 00:36:59 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
if a.trInfo.tr != nil {
|
streams: Stop cleaning up after orphaned streams (#1854)
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:
1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
expires. (Note that it if the context is no longer needed before the deadline
expires, it is still recommended to call cancel to prevent bloat.) It is always
recommended to cancel contexts when they are no longer needed, and to
never use the background context directly, so all users should always be
doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
returned.
4. The user receives any error from Header or SendMsg (or Send in generated
code) besides io.EOF. If none of the above happen, this will leak a goroutine
and a context, and grpc will not call the optionally-configured stats handler
with a stats.End message.
Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
2018-02-08 21:51:16 +03:00
|
|
|
if err == nil {
|
2018-03-12 23:27:54 +03:00
|
|
|
a.trInfo.tr.LazyPrintf("RPC: [OK]")
|
2015-06-25 05:52:33 +03:00
|
|
|
} else {
|
2018-03-12 23:27:54 +03:00
|
|
|
a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
|
|
|
|
a.trInfo.tr.SetError()
|
2015-06-25 05:52:33 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
a.trInfo.tr.Finish()
|
|
|
|
a.trInfo.tr = nil
|
2015-06-25 05:52:33 +03:00
|
|
|
}
|
2018-03-12 23:27:54 +03:00
|
|
|
a.mu.Unlock()
|
2015-06-25 05:52:33 +03:00
|
|
|
}
|
|
|
|
|
2015-02-06 04:14:05 +03:00
|
|
|
// ServerStream defines the interface a server stream has to satisfy.
|
|
|
|
type ServerStream interface {
|
2016-08-30 01:16:08 +03:00
|
|
|
// SetHeader sets the header metadata. It may be called multiple times.
|
|
|
|
// When call multiple times, all the provided metadata will be merged.
|
|
|
|
// All the metadata will be sent out when one of the following happens:
|
|
|
|
// - ServerStream.SendHeader() is called;
|
|
|
|
// - The first response is sent out;
|
|
|
|
// - An RPC status is sent out (error or success).
|
|
|
|
SetHeader(metadata.MD) error
|
|
|
|
// SendHeader sends the header metadata.
|
|
|
|
// The provided md and headers set by SetHeader() will be sent.
|
|
|
|
// It fails if called multiple times.
|
2015-02-06 04:14:05 +03:00
|
|
|
SendHeader(metadata.MD) error
|
2016-08-26 01:17:50 +03:00
|
|
|
// SetTrailer sets the trailer metadata which will be sent with the RPC status.
|
|
|
|
// When called more than once, all the provided metadata will be merged.
|
2015-02-06 04:14:05 +03:00
|
|
|
SetTrailer(metadata.MD)
|
|
|
|
Stream
|
|
|
|
}
|
|
|
|
|
|
|
|
// serverStream implements a server side Stream.
|
|
|
|
type serverStream struct {
|
2018-03-21 03:02:32 +03:00
|
|
|
ctx context.Context
|
2017-11-17 20:24:54 +03:00
|
|
|
t transport.ServerTransport
|
|
|
|
s *transport.Stream
|
2018-05-03 21:37:59 +03:00
|
|
|
p *parser
|
2018-01-23 22:39:40 +03:00
|
|
|
codec baseCodec
|
2017-11-17 20:24:54 +03:00
|
|
|
|
|
|
|
cp Compressor
|
|
|
|
dc Decompressor
|
|
|
|
comp encoding.Compressor
|
|
|
|
decomp encoding.Compressor
|
|
|
|
|
2017-04-04 01:03:24 +03:00
|
|
|
maxReceiveMessageSize int
|
|
|
|
maxSendMessageSize int
|
2017-04-06 01:08:25 +03:00
|
|
|
trInfo *traceInfo
|
2015-07-28 20:13:40 +03:00
|
|
|
|
2017-01-10 04:11:32 +03:00
|
|
|
statsHandler stats.Handler
|
|
|
|
|
2015-10-06 03:49:53 +03:00
|
|
|
mu sync.Mutex // protects trInfo.tr after the service handler runs.
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (ss *serverStream) Context() context.Context {
|
2018-03-21 03:02:32 +03:00
|
|
|
return ss.ctx
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
2016-08-30 01:16:08 +03:00
|
|
|
func (ss *serverStream) SetHeader(md metadata.MD) error {
|
|
|
|
if md.Len() == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return ss.s.SetHeader(md)
|
|
|
|
}
|
|
|
|
|
2015-02-06 04:14:05 +03:00
|
|
|
func (ss *serverStream) SendHeader(md metadata.MD) error {
|
|
|
|
return ss.t.WriteHeader(ss.s, md)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ss *serverStream) SetTrailer(md metadata.MD) {
|
|
|
|
if md.Len() == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
ss.s.SetTrailer(md)
|
|
|
|
}
|
|
|
|
|
2015-07-29 01:27:46 +03:00
|
|
|
func (ss *serverStream) SendMsg(m interface{}) (err error) {
|
|
|
|
defer func() {
|
2015-10-06 03:49:53 +03:00
|
|
|
if ss.trInfo != nil {
|
2015-07-29 01:27:46 +03:00
|
|
|
ss.mu.Lock()
|
2015-10-06 03:49:53 +03:00
|
|
|
if ss.trInfo.tr != nil {
|
|
|
|
if err == nil {
|
|
|
|
ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
|
|
|
|
} else {
|
|
|
|
ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
|
|
|
ss.trInfo.tr.SetError()
|
|
|
|
}
|
2015-07-29 01:27:46 +03:00
|
|
|
}
|
|
|
|
ss.mu.Unlock()
|
2015-07-28 00:33:17 +03:00
|
|
|
}
|
2017-08-14 22:24:23 +03:00
|
|
|
if err != nil && err != io.EOF {
|
|
|
|
st, _ := status.FromError(toRPCErr(err))
|
|
|
|
ss.t.WriteStatus(ss.s, st)
|
|
|
|
}
|
2018-04-23 21:22:25 +03:00
|
|
|
if channelz.IsOn() && err == nil {
|
|
|
|
ss.t.IncrMsgSent()
|
|
|
|
}
|
2015-07-29 01:27:46 +03:00
|
|
|
}()
|
2016-11-05 02:00:06 +03:00
|
|
|
var outPayload *stats.OutPayload
|
2017-01-10 04:11:32 +03:00
|
|
|
if ss.statsHandler != nil {
|
2016-11-05 02:00:06 +03:00
|
|
|
outPayload = &stats.OutPayload{}
|
2016-10-23 01:06:41 +03:00
|
|
|
}
|
2018-05-03 21:37:59 +03:00
|
|
|
hdr, data, err := encode(ss.codec, m, ss.cp, outPayload, ss.comp)
|
2015-02-06 04:14:05 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-08-25 22:26:38 +03:00
|
|
|
if len(data) > ss.maxSendMessageSize {
|
2017-12-18 20:23:42 +03:00
|
|
|
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), ss.maxSendMessageSize)
|
2017-04-04 01:03:24 +03:00
|
|
|
}
|
2018-05-03 21:37:59 +03:00
|
|
|
if err := ss.t.Write(ss.s, hdr, data, &transport.Options{Last: false}); err != nil {
|
2016-09-01 02:56:46 +03:00
|
|
|
return toRPCErr(err)
|
|
|
|
}
|
2016-11-05 02:00:06 +03:00
|
|
|
if outPayload != nil {
|
2016-11-08 00:17:59 +03:00
|
|
|
outPayload.SentTime = time.Now()
|
2017-01-10 04:11:32 +03:00
|
|
|
ss.statsHandler.HandleRPC(ss.s.Context(), outPayload)
|
2016-10-23 01:06:41 +03:00
|
|
|
}
|
2016-09-01 02:56:46 +03:00
|
|
|
return nil
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
|
|
|
|
2015-07-29 01:27:46 +03:00
|
|
|
func (ss *serverStream) RecvMsg(m interface{}) (err error) {
|
2015-07-28 00:33:17 +03:00
|
|
|
defer func() {
|
2015-10-06 03:49:53 +03:00
|
|
|
if ss.trInfo != nil {
|
2015-07-28 00:33:17 +03:00
|
|
|
ss.mu.Lock()
|
2015-10-06 03:49:53 +03:00
|
|
|
if ss.trInfo.tr != nil {
|
|
|
|
if err == nil {
|
|
|
|
ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
|
|
|
|
} else if err != io.EOF {
|
|
|
|
ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
|
|
|
ss.trInfo.tr.SetError()
|
|
|
|
}
|
2015-07-28 00:33:17 +03:00
|
|
|
}
|
|
|
|
ss.mu.Unlock()
|
|
|
|
}
|
2017-08-14 22:24:23 +03:00
|
|
|
if err != nil && err != io.EOF {
|
|
|
|
st, _ := status.FromError(toRPCErr(err))
|
|
|
|
ss.t.WriteStatus(ss.s, st)
|
|
|
|
}
|
2018-04-23 21:22:25 +03:00
|
|
|
if channelz.IsOn() && err == nil {
|
|
|
|
ss.t.IncrMsgRecv()
|
|
|
|
}
|
2015-07-28 00:33:17 +03:00
|
|
|
}()
|
2016-11-05 02:02:19 +03:00
|
|
|
var inPayload *stats.InPayload
|
2017-01-10 04:11:32 +03:00
|
|
|
if ss.statsHandler != nil {
|
2016-11-05 02:02:19 +03:00
|
|
|
inPayload = &stats.InPayload{}
|
2016-10-23 01:06:41 +03:00
|
|
|
}
|
2018-05-03 21:37:59 +03:00
|
|
|
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, inPayload, ss.decomp); err != nil {
|
2016-09-01 02:56:46 +03:00
|
|
|
if err == io.EOF {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err == io.ErrUnexpectedEOF {
|
2017-12-18 20:23:42 +03:00
|
|
|
err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
|
2016-09-01 02:56:46 +03:00
|
|
|
}
|
|
|
|
return toRPCErr(err)
|
|
|
|
}
|
2016-11-05 02:02:19 +03:00
|
|
|
if inPayload != nil {
|
2017-01-10 04:11:32 +03:00
|
|
|
ss.statsHandler.HandleRPC(ss.s.Context(), inPayload)
|
2016-10-23 01:06:41 +03:00
|
|
|
}
|
2016-09-01 02:56:46 +03:00
|
|
|
return nil
|
2015-02-06 04:14:05 +03:00
|
|
|
}
|
2017-10-27 02:03:44 +03:00
|
|
|
|
|
|
|
// MethodFromServerStream returns the method string for the input stream.
|
|
|
|
// The returned string is in the format of "/service/method".
|
|
|
|
func MethodFromServerStream(stream ServerStream) (string, bool) {
|
2018-04-02 23:08:04 +03:00
|
|
|
return Method(stream.Context())
|
2017-10-27 02:03:44 +03:00
|
|
|
}
|