grpc-go/call.go

169 строки
4.9 KiB
Go
Исходник Обычный вид История

2015-02-06 04:14:05 +03:00
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package grpc
2015-02-06 04:14:05 +03:00
import (
2015-03-13 10:19:56 +03:00
"io"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/transport"
2015-02-06 04:14:05 +03:00
)
// recvResponse receives and parses an RPC response.
2015-02-06 04:14:05 +03:00
// On error, it returns the error and indicates whether the call should be retried.
//
// TODO(zhaoq): Check whether the received message sequence is valid.
func recvResponse(codec Codec, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) error {
// Try to acquire header metadata from the server if there is any.
var err error
c.headerMD, err = stream.Header()
if err != nil {
return err
}
2015-02-06 04:14:05 +03:00
p := &parser{s: stream}
for {
if err = recv(p, codec, reply); err != nil {
2015-02-06 04:14:05 +03:00
if err == io.EOF {
break
2015-02-06 04:14:05 +03:00
}
return err
}
}
c.trailerMD = stream.Trailer()
return nil
2015-02-06 04:14:05 +03:00
}
// sendRequest writes out various information of an RPC such as Context and Message.
func sendRequest(ctx context.Context, codec Codec, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {
2015-02-06 04:14:05 +03:00
stream, err := t.NewStream(ctx, callHdr)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if _, ok := err.(transport.ConnectionError); !ok {
t.CloseStream(stream, err)
}
}
}()
// TODO(zhaoq): Support compression.
outBuf, err := encode(codec, args, compressionNone)
2015-02-06 04:14:05 +03:00
if err != nil {
return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err)
2015-02-06 04:14:05 +03:00
}
err = t.Write(stream, outBuf, opts)
if err != nil {
return nil, err
}
// Sent successfully.
return stream, nil
}
// callInfo contains all related configuration and information about an RPC.
type callInfo struct {
failFast bool
headerMD metadata.MD
trailerMD metadata.MD
}
// Invoke is called by the generated code. It sends the RPC request on the
// wire and returns after response is received.
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error {
2015-02-06 04:14:05 +03:00
var c callInfo
for _, o := range opts {
if err := o.before(&c); err != nil {
return toRPCErr(err)
}
}
defer func() {
for _, o := range opts {
o.after(&c)
}
}()
callHdr := &transport.CallHdr{
2015-04-17 23:50:18 +03:00
Host: cc.authority,
2015-02-06 04:14:05 +03:00
Method: method,
}
topts := &transport.Options{
Last: true,
Delay: false,
}
2015-03-13 10:16:18 +03:00
var (
ts int // track the transport sequence number
lastErr error // record the error that happened
)
2015-02-06 04:14:05 +03:00
for {
var (
err error
t transport.ClientTransport
stream *transport.Stream
)
// TODO(zhaoq): Need a formal spec of retry strategy for non-failfast rpcs.
if lastErr != nil && c.failFast {
return toRPCErr(lastErr)
2015-02-06 04:14:05 +03:00
}
t, ts, err = cc.wait(ctx, ts)
if err != nil {
if lastErr != nil {
// This was a retry; return the error from the last attempt.
return toRPCErr(lastErr)
2015-02-06 04:14:05 +03:00
}
2015-03-23 21:47:27 +03:00
return toRPCErr(err)
2015-02-06 04:14:05 +03:00
}
stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts)
2015-02-06 04:14:05 +03:00
if err != nil {
if _, ok := err.(transport.ConnectionError); ok {
lastErr = err
continue
}
if lastErr != nil {
return toRPCErr(lastErr)
}
return toRPCErr(err)
}
// Receive the response
lastErr = recvResponse(cc.dopts.codec, t, &c, stream, reply)
2015-02-06 04:14:05 +03:00
if _, ok := lastErr.(transport.ConnectionError); ok {
continue
}
t.CloseStream(stream, lastErr)
if lastErr != nil {
return toRPCErr(lastErr)
}
return Errorf(stream.StatusCode(), stream.StatusDesc())
}
}