This commit is contained in:
Daniel Wang 2015-03-01 17:12:39 -08:00
Родитель 46f39d3ebd 575a9b2af8
Коммит 84451c554b
22 изменённых файлов: 784 добавлений и 104 удалений

8
.travis.yml Normal file
Просмотреть файл

@ -0,0 +1,8 @@
language: go
install:
- go get -v -t -d google.golang.org/grpc/...
script:
- go test -v google.golang.org/grpc/...
- go test -v -race google.golang.org/grpc/...

27
CONTRIBUTING.md Normal file
Просмотреть файл

@ -0,0 +1,27 @@
# How to contribute
We definitely welcome patches and contribution to grpc! Here is some guideline
and information about how to do so.
## Getting started
### Legal requirements
In order to protect both you and ourselves, you will need to sign the
[Contributor License Agreement](https://cla.developers.google.com/clas).
### Filing Issues
When filing an issue, make sure to answer these five questions:
1. What version of Go are you using (`go version`)?
2. What operating system and processor architecture are you using?
3. What did you do?
4. What did you expect to see?
5. What did you see instead?
### Contributing code
Please read the Contribution Guidelines before sending patches.
We will not accept GitHub pull requests once Gerrit is setup (we will use Gerrit instead for code review).
Unless otherwise noted, the Go source files are distributed under the BSD-style license found in the LICENSE file.

22
PATENTS Normal file
Просмотреть файл

@ -0,0 +1,22 @@
Additional IP Rights Grant (Patents)
"This implementation" means the copyrightable works distributed by
Google as part of the GRPC project.
Google hereby grants to You a perpetual, worldwide, non-exclusive,
no-charge, royalty-free, irrevocable (except as stated in this section)
patent license to make, have made, use, offer to sell, sell, import,
transfer and otherwise run, modify and propagate the contents of this
implementation of GRPC, where such license applies only to those patent
claims, both currently owned or controlled by Google and acquired in
the future, licensable by Google that are necessarily infringed by this
implementation of GRPC. This grant does not include claims that would be
infringed only as a consequence of further modification of this
implementation. If you or your agent or exclusive licensee institute or
order or agree to the institution of patent litigation against any
entity (including a cross-claim or counterclaim in a lawsuit) alleging
that this implementation of GRPC or any code incorporated within this
implementation of GRPC constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any patent
rights granted to you under this License for this implementation of GRPC
shall terminate as of the date such litigation is filed.

Просмотреть файл

@ -1,5 +1,7 @@
#gRPC-Go
[![Build Status](https://travis-ci.org/grpc/grpc-go.svg)](https://travis-ci.org/grpc/grpc-go) [![GoDoc](https://godoc.org/google.golang.org/grpc?status.svg)](https://godoc.org/google.golang.org/grpc)
The Go implementation of [gRPC](https://github.com/grpc/grpc)
Installation

Просмотреть файл

@ -35,6 +35,7 @@ package grpc
import (
"errors"
"log"
"sync"
"time"
@ -46,9 +47,9 @@ import (
var (
// ErrUnspecTarget indicates that the target address is unspecified.
ErrUnspecTarget = errors.New("grpc: target is unspecified")
// ErrClosingChan indicates that the operation is illegal because the session
// is closing.
ErrClosingChan = errors.New("grpc: the channel is closing")
// ErrClientConnClosing indicates that the operation is illegal because
// the session is closing.
ErrClientConnClosing = errors.New("grpc: the client connection is closing")
)
type dialOptions struct {
@ -60,9 +61,9 @@ type dialOptions struct {
// credentials.
type DialOption func(*dialOptions)
// WithClientTLS returns a DialOption which configures a TLS credentials
// for connection.
func WithClientTLS(creds credentials.TransportAuthenticator) DialOption {
// WithTransportCredentials returns a DialOption which configures a
// connection level security credentials (e.g., TLS/SSL).
func WithTransportCredentials(creds credentials.TransportAuthenticator) DialOption {
return func(o *dialOptions) {
o.authOptions = append(o.authOptions, creds)
}
@ -89,8 +90,7 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
for _, opt := range opts {
opt(&cc.dopts)
}
err := cc.resetTransport(false)
if err != nil {
if err := cc.resetTransport(false); err != nil {
return nil, err
}
cc.shutdownChan = make(chan struct{})
@ -127,7 +127,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
cc.transportSeq = 0
if cc.closing {
cc.mu.Unlock()
return ErrClosingChan
return ErrClientConnClosing
}
cc.mu.Unlock()
if closeTransport {
@ -139,6 +139,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
closeTransport = false
time.Sleep(backoff(retries))
retries++
log.Printf("grpc: ClientConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, cc.target)
continue
}
cc.mu.Lock()
@ -163,8 +164,7 @@ func (cc *ClientConn) transportMonitor() {
case <-cc.shutdownChan:
return
case <-cc.transport.Error():
err := cc.resetTransport(true)
if err != nil {
if err := cc.resetTransport(true); err != nil {
// The channel is closing.
return
}
@ -182,7 +182,7 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo
switch {
case cc.closing:
cc.mu.Unlock()
return nil, 0, ErrClosingChan
return nil, 0, ErrClientConnClosing
case ts < cc.transportSeq:
// Worked on a dying transport. Try the new one immediately.
defer cc.mu.Unlock()

Просмотреть файл

@ -5,7 +5,7 @@
#
# It assumes the installation of i) Google proto buffer compiler at
# https://github.com/google/protobuf (after v2.6.1) and ii) the Go codegen
# plugin at https://github.com/golang/protobuf (after 2/19/2015). If you have
# plugin at https://github.com/golang/protobuf (after 2015-02-20). If you have
# not, please install them first.
#
# We recommend running this script at $GOPATH or $GOPATH/src.

Просмотреть файл

@ -73,7 +73,7 @@ type Credentials interface {
type TransportAuthenticator interface {
// Dial connects to the given network address and does the authentication
// handshake specified by the corresponding authentication protocol.
Dial(add string) (net.Conn, error)
Dial(addr string) (net.Conn, error)
// NewListener creates a listener which accepts connections with requested
// authentication handshake.
NewListener(lis net.Listener) net.Listener

Просмотреть файл

@ -2,6 +2,8 @@
The route guide server and client demonstrate how to use grpc go libraries to
perform unary, client streaming, server streaming and full duplex RPCs.
Please refer to [Getting Started Guide for Go] (https://github.com/grpc/grpc-common/blob/master/go/gotutorial.md) for more information.
See the definition of the route guide service in proto/route_guide.proto.
# Run the sample code
@ -20,7 +22,7 @@ $ go run client/client.go
# Optional command line flags
The server and client both take optional command line flags. For example, the
client and server run without TLS by default. To enable TSL:
client and server run without TLS by default. To enable TLS:
```sh
$ go run server/server.go -tls=true

Просмотреть файл

@ -175,7 +175,7 @@ func main() {
} else {
creds = credentials.NewClientTLSFromCert(nil, sn)
}
opts = append(opts, grpc.WithClientTLS(creds))
opts = append(opts, grpc.WithTransportCredentials(creds))
}
conn, err := grpc.Dial(*serverAddr, opts...)
if err != nil {

Просмотреть файл

@ -0,0 +1,425 @@
// Code generated by protoc-gen-go.
// source: route_guide.proto
// DO NOT EDIT!
/*
Package proto is a generated protocol buffer package.
It is generated from these files:
route_guide.proto
It has these top-level messages:
Point
Rectangle
Feature
RouteNote
RouteSummary
*/
package proto
import proto1 "github.com/golang/protobuf/proto"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto1.Marshal
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
type Point struct {
Latitude int32 `protobuf:"varint,1,opt,name=latitude" json:"latitude,omitempty"`
Longitude int32 `protobuf:"varint,2,opt,name=longitude" json:"longitude,omitempty"`
}
func (m *Point) Reset() { *m = Point{} }
func (m *Point) String() string { return proto1.CompactTextString(m) }
func (*Point) ProtoMessage() {}
// A latitude-longitude rectangle, represented as two diagonally opposite
// points "lo" and "hi".
type Rectangle struct {
// One corner of the rectangle.
Lo *Point `protobuf:"bytes,1,opt,name=lo" json:"lo,omitempty"`
// The other corner of the rectangle.
Hi *Point `protobuf:"bytes,2,opt,name=hi" json:"hi,omitempty"`
}
func (m *Rectangle) Reset() { *m = Rectangle{} }
func (m *Rectangle) String() string { return proto1.CompactTextString(m) }
func (*Rectangle) ProtoMessage() {}
func (m *Rectangle) GetLo() *Point {
if m != nil {
return m.Lo
}
return nil
}
func (m *Rectangle) GetHi() *Point {
if m != nil {
return m.Hi
}
return nil
}
// A feature names something at a given point.
//
// If a feature could not be named, the name is empty.
type Feature struct {
// The name of the feature.
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
// The point where the feature is detected.
Location *Point `protobuf:"bytes,2,opt,name=location" json:"location,omitempty"`
}
func (m *Feature) Reset() { *m = Feature{} }
func (m *Feature) String() string { return proto1.CompactTextString(m) }
func (*Feature) ProtoMessage() {}
func (m *Feature) GetLocation() *Point {
if m != nil {
return m.Location
}
return nil
}
// A RouteNote is a message sent while at a given point.
type RouteNote struct {
// The location from which the message is sent.
Location *Point `protobuf:"bytes,1,opt,name=location" json:"location,omitempty"`
// The message to be sent.
Message string `protobuf:"bytes,2,opt,name=message" json:"message,omitempty"`
}
func (m *RouteNote) Reset() { *m = RouteNote{} }
func (m *RouteNote) String() string { return proto1.CompactTextString(m) }
func (*RouteNote) ProtoMessage() {}
func (m *RouteNote) GetLocation() *Point {
if m != nil {
return m.Location
}
return nil
}
// A RouteSummary is received in response to a RecordRoute rpc.
//
// It contains the number of individual points received, the number of
// detected features, and the total distance covered as the cumulative sum of
// the distance between each point.
type RouteSummary struct {
// The number of points received.
PointCount int32 `protobuf:"varint,1,opt,name=point_count" json:"point_count,omitempty"`
// The number of known features passed while traversing the route.
FeatureCount int32 `protobuf:"varint,2,opt,name=feature_count" json:"feature_count,omitempty"`
// The distance covered in metres.
Distance int32 `protobuf:"varint,3,opt,name=distance" json:"distance,omitempty"`
// The duration of the traversal in seconds.
ElapsedTime int32 `protobuf:"varint,4,opt,name=elapsed_time" json:"elapsed_time,omitempty"`
}
func (m *RouteSummary) Reset() { *m = RouteSummary{} }
func (m *RouteSummary) String() string { return proto1.CompactTextString(m) }
func (*RouteSummary) ProtoMessage() {}
func init() {
}
// Client API for RouteGuide service
type RouteGuideClient interface {
// A simple RPC.
//
// Obtains the feature at a given position.
//
// If no feature is found for the given point, a feature with an empty name
// should be returned.
GetFeature(ctx context.Context, in *Point, opts ...grpc.CallOption) (*Feature, error)
// A server-to-client streaming RPC.
//
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
ListFeatures(ctx context.Context, in *Rectangle, opts ...grpc.CallOption) (RouteGuide_ListFeaturesClient, error)
// A client-to-server streaming RPC.
//
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
RecordRoute(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RecordRouteClient, error)
// A Bidirectional streaming RPC.
//
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error)
}
type routeGuideClient struct {
cc *grpc.ClientConn
}
func NewRouteGuideClient(cc *grpc.ClientConn) RouteGuideClient {
return &routeGuideClient{cc}
}
func (c *routeGuideClient) GetFeature(ctx context.Context, in *Point, opts ...grpc.CallOption) (*Feature, error) {
out := new(Feature)
err := grpc.Invoke(ctx, "/proto.RouteGuide/GetFeature", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *routeGuideClient) ListFeatures(ctx context.Context, in *Rectangle, opts ...grpc.CallOption) (RouteGuide_ListFeaturesClient, error) {
stream, err := grpc.NewClientStream(ctx, &_RouteGuide_serviceDesc.Streams[0], c.cc, "/proto.RouteGuide/ListFeatures", opts...)
if err != nil {
return nil, err
}
x := &routeGuideListFeaturesClient{stream}
if err := x.ClientStream.SendProto(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type RouteGuide_ListFeaturesClient interface {
Recv() (*Feature, error)
grpc.ClientStream
}
type routeGuideListFeaturesClient struct {
grpc.ClientStream
}
func (x *routeGuideListFeaturesClient) Recv() (*Feature, error) {
m := new(Feature)
if err := x.ClientStream.RecvProto(m); err != nil {
return nil, err
}
return m, nil
}
func (c *routeGuideClient) RecordRoute(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RecordRouteClient, error) {
stream, err := grpc.NewClientStream(ctx, &_RouteGuide_serviceDesc.Streams[1], c.cc, "/proto.RouteGuide/RecordRoute", opts...)
if err != nil {
return nil, err
}
x := &routeGuideRecordRouteClient{stream}
return x, nil
}
type RouteGuide_RecordRouteClient interface {
Send(*Point) error
CloseAndRecv() (*RouteSummary, error)
grpc.ClientStream
}
type routeGuideRecordRouteClient struct {
grpc.ClientStream
}
func (x *routeGuideRecordRouteClient) Send(m *Point) error {
return x.ClientStream.SendProto(m)
}
func (x *routeGuideRecordRouteClient) CloseAndRecv() (*RouteSummary, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(RouteSummary)
if err := x.ClientStream.RecvProto(m); err != nil {
return nil, err
}
return m, nil
}
func (c *routeGuideClient) RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error) {
stream, err := grpc.NewClientStream(ctx, &_RouteGuide_serviceDesc.Streams[2], c.cc, "/proto.RouteGuide/RouteChat", opts...)
if err != nil {
return nil, err
}
x := &routeGuideRouteChatClient{stream}
return x, nil
}
type RouteGuide_RouteChatClient interface {
Send(*RouteNote) error
Recv() (*RouteNote, error)
grpc.ClientStream
}
type routeGuideRouteChatClient struct {
grpc.ClientStream
}
func (x *routeGuideRouteChatClient) Send(m *RouteNote) error {
return x.ClientStream.SendProto(m)
}
func (x *routeGuideRouteChatClient) Recv() (*RouteNote, error) {
m := new(RouteNote)
if err := x.ClientStream.RecvProto(m); err != nil {
return nil, err
}
return m, nil
}
// Server API for RouteGuide service
type RouteGuideServer interface {
// A simple RPC.
//
// Obtains the feature at a given position.
//
// If no feature is found for the given point, a feature with an empty name
// should be returned.
GetFeature(context.Context, *Point) (*Feature, error)
// A server-to-client streaming RPC.
//
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
ListFeatures(*Rectangle, RouteGuide_ListFeaturesServer) error
// A client-to-server streaming RPC.
//
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
RecordRoute(RouteGuide_RecordRouteServer) error
// A Bidirectional streaming RPC.
//
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
RouteChat(RouteGuide_RouteChatServer) error
}
func RegisterRouteGuideServer(s *grpc.Server, srv RouteGuideServer) {
s.RegisterService(&_RouteGuide_serviceDesc, srv)
}
func _RouteGuide_GetFeature_Handler(srv interface{}, ctx context.Context, buf []byte) (proto1.Message, error) {
in := new(Point)
if err := proto1.Unmarshal(buf, in); err != nil {
return nil, err
}
out, err := srv.(RouteGuideServer).GetFeature(ctx, in)
if err != nil {
return nil, err
}
return out, nil
}
func _RouteGuide_ListFeatures_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(Rectangle)
if err := stream.RecvProto(m); err != nil {
return err
}
return srv.(RouteGuideServer).ListFeatures(m, &routeGuideListFeaturesServer{stream})
}
type RouteGuide_ListFeaturesServer interface {
Send(*Feature) error
grpc.ServerStream
}
type routeGuideListFeaturesServer struct {
grpc.ServerStream
}
func (x *routeGuideListFeaturesServer) Send(m *Feature) error {
return x.ServerStream.SendProto(m)
}
func _RouteGuide_RecordRoute_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(RouteGuideServer).RecordRoute(&routeGuideRecordRouteServer{stream})
}
type RouteGuide_RecordRouteServer interface {
SendAndClose(*RouteSummary) error
Recv() (*Point, error)
grpc.ServerStream
}
type routeGuideRecordRouteServer struct {
grpc.ServerStream
}
func (x *routeGuideRecordRouteServer) SendAndClose(m *RouteSummary) error {
return x.ServerStream.SendProto(m)
}
func (x *routeGuideRecordRouteServer) Recv() (*Point, error) {
m := new(Point)
if err := x.ServerStream.RecvProto(m); err != nil {
return nil, err
}
return m, nil
}
func _RouteGuide_RouteChat_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(RouteGuideServer).RouteChat(&routeGuideRouteChatServer{stream})
}
type RouteGuide_RouteChatServer interface {
Send(*RouteNote) error
Recv() (*RouteNote, error)
grpc.ServerStream
}
type routeGuideRouteChatServer struct {
grpc.ServerStream
}
func (x *routeGuideRouteChatServer) Send(m *RouteNote) error {
return x.ServerStream.SendProto(m)
}
func (x *routeGuideRouteChatServer) Recv() (*RouteNote, error) {
m := new(RouteNote)
if err := x.ServerStream.RecvProto(m); err != nil {
return nil, err
}
return m, nil
}
var _RouteGuide_serviceDesc = grpc.ServiceDesc{
ServiceName: "proto.RouteGuide",
HandlerType: (*RouteGuideServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "GetFeature",
Handler: _RouteGuide_GetFeature_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "ListFeatures",
Handler: _RouteGuide_ListFeatures_Handler,
ServerStreams: true,
},
{
StreamName: "RecordRoute",
Handler: _RouteGuide_RecordRoute_Handler,
ClientStreams: true,
},
{
StreamName: "RouteChat",
Handler: _RouteGuide_RouteChat_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
}

Просмотреть файл

@ -159,7 +159,7 @@ func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error
func (s *routeGuideServer) loadFeatures(filePath string) {
file, err := ioutil.ReadFile(filePath)
if err != nil {
log.Fatal("Failed to load default features: %v", err)
log.Fatalf("Failed to load default features: %v", err)
}
if err := json.Unmarshal(file, &s.savedFeatures); err != nil {
log.Fatal("Failed to load default features: %v", err)

38
grpc-auth-support.md Normal file
Просмотреть файл

@ -0,0 +1,38 @@
# Authentication
As outlined <a href="https://github.com/grpc/grpc-common/blob/master/grpc-auth-support.md">here</a> gRPC supports a number of different mechanisms for asserting identity between an client and server. We'll present some code-samples here demonstrating how to provide TLS support encryption and identity assertions as well as passing OAuth2 tokens to services that support it.
# Enabling TLS on a gRPC client
```Go
conn, err := grpc.Dial(serverAddr, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, ""))
```
# Enabling TLS on a gRPC server
```Go
creds, err := credentials.NewServerTLSFromFile(certFile, keyFile)
if err != nil {
log.Fatalf("Failed to generate credentials %v", err)
}
server.Serve(creds.NewListener(lis))
```
# Authenticating with Google
## Google Compute Engine (GCE)
```Go
conn, err := grpc.Dial(serverAddr, grpc.WithClientTLS(credentials.NewClientTLSFromCert(nil, ""), grpc.WithPerRPCCredentials(credentials.NewComputeEngine())))
```
## JWT
```Go
jwtCreds, err := credentials.NewServiceAccountFromFile(*serviceAccountKeyFile, *oauthScope)
if err != nil {
log.Fatalf("Failed to create JWT credentials: %v", err)
}
conn, err := grpc.Dial(serverAddr, grpc.WithClientTLS(credentials.NewClientTLSFromCert(nil, ""), grpc.WithPerRPCCredentials(jwtCreds)))
```

Просмотреть файл

@ -36,9 +36,11 @@ package main
import (
"flag"
"io"
"io/ioutil"
"log"
"net"
"strconv"
"strings"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
@ -48,18 +50,23 @@ import (
)
var (
useTLS = flag.Bool("use_tls", false, "Connection uses TLS if true, else plain TCP")
caFile = flag.String("tls_ca_file", "testdata/ca.pem", "The file containning the CA root cert file")
serverHost = flag.String("server_host", "127.0.0.1", "The server host name")
serverPort = flag.Int("server_port", 10000, "The server port number")
tlsServerName = flag.String("tls_server_name", "x.test.youtube.com", "The server name use to verify the hostname returned by TLS handshake")
testCase = flag.String("test_case", "large_unary",
useTLS = flag.Bool("use_tls", false, "Connection uses TLS if true, else plain TCP")
caFile = flag.String("tls_ca_file", "testdata/ca.pem", "The file containning the CA root cert file")
serviceAccountKeyFile = flag.String("service_account_key_file", "", "Path to service account json key file")
oauthScope = flag.String("oauth_scope", "", "The scope for OAuth2 tokens")
defaultServiceAccount = flag.String("default_service_account", "", "Email of GCE default service account")
serverHost = flag.String("server_host", "127.0.0.1", "The server host name")
serverPort = flag.Int("server_port", 10000, "The server port number")
tlsServerName = flag.String("server_host_override", "x.test.youtube.com", "The server name use to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.")
testCase = flag.String("test_case", "large_unary",
`Configure different test cases. Valid options are:
empty_unary : empty (zero bytes) request and response;
large_unary : single request and (large) response;
client_streaming : request streaming with single response;
server_streaming : single request with response streaming;
ping_pong : full-duplex streaming`)
ping_pong : full-duplex streaming;
compute_engine_creds: large_unary with compute engine auth;
service_account_creds: large_unary with service account auth.`)
)
var (
@ -95,6 +102,7 @@ func doEmptyUnaryCall(tc testpb.TestServiceClient) {
if !proto.Equal(&testpb.Empty{}, reply) {
log.Fatalf("/TestService/EmptyCall receives %v, want %v", reply, testpb.Empty{})
}
log.Println("EmptyUnaryCall done")
}
func doLargeUnaryCall(tc testpb.TestServiceClient) {
@ -113,6 +121,7 @@ func doLargeUnaryCall(tc testpb.TestServiceClient) {
if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize {
log.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize)
}
log.Println("LargeUnaryCall done")
}
func doClientStreaming(tc testpb.TestServiceClient) {
@ -140,6 +149,7 @@ func doClientStreaming(tc testpb.TestServiceClient) {
if reply.GetAggregatedPayloadSize() != int32(sum) {
log.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
}
log.Println("ClientStreaming done")
}
func doServerStreaming(tc testpb.TestServiceClient) {
@ -183,6 +193,7 @@ func doServerStreaming(tc testpb.TestServiceClient) {
if respCnt != len(respSizes) {
log.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
}
log.Println("ServerStreaming done")
}
func doPingPong(tc testpb.TestServiceClient) {
@ -226,6 +237,64 @@ func doPingPong(tc testpb.TestServiceClient) {
if _, err := stream.Recv(); err != io.EOF {
log.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
}
log.Println("Pingpong done")
}
func doComputeEngineCreds(tc testpb.TestServiceClient) {
pl := newPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(largeRespSize)),
Payload: pl,
FillUsername: proto.Bool(true),
FillOauthScope: proto.Bool(true),
}
reply, err := tc.UnaryCall(context.Background(), req)
if err != nil {
log.Fatal("/TestService/UnaryCall RPC failed: ", err)
}
user := reply.GetUsername()
scope := reply.GetOauthScope()
if user != *defaultServiceAccount {
log.Fatalf("Got user name %q, want %q.", user, *defaultServiceAccount)
}
if !strings.Contains(*oauthScope, scope) {
log.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, *oauthScope)
}
log.Println("ComputeEngineCreds done")
}
func getServiceAccountJsonKey() []byte {
jsonKey, err := ioutil.ReadFile(*serviceAccountKeyFile)
if err != nil {
log.Fatalf("Failed to read the service account key file: %v", err)
}
return jsonKey
}
func doServiceAccountCreds(tc testpb.TestServiceClient) {
pl := newPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(largeRespSize)),
Payload: pl,
FillUsername: proto.Bool(true),
FillOauthScope: proto.Bool(true),
}
reply, err := tc.UnaryCall(context.Background(), req)
if err != nil {
log.Fatal("/TestService/UnaryCall RPC failed: ", err)
}
jsonKey := getServiceAccountJsonKey()
user := reply.GetUsername()
scope := reply.GetOauthScope()
if !strings.Contains(string(jsonKey), user) {
log.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
}
if !strings.Contains(*oauthScope, scope) {
log.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, *oauthScope)
}
log.Println("ServiceAccountCreds done")
}
func main() {
@ -247,11 +316,20 @@ func main() {
} else {
creds = credentials.NewClientTLSFromCert(nil, sn)
}
opts = append(opts, grpc.WithClientTLS(creds))
opts = append(opts, grpc.WithTransportCredentials(creds))
if *testCase == "compute_engine_creds" {
opts = append(opts, grpc.WithPerRPCCredentials(credentials.NewComputeEngine()))
} else if *testCase == "service_account_creds" {
jwtCreds, err := credentials.NewServiceAccountFromFile(*serviceAccountKeyFile, *oauthScope)
if err != nil {
log.Fatalf("Failed to create JWT credentials: %v", err)
}
opts = append(opts, grpc.WithPerRPCCredentials(jwtCreds))
}
}
conn, err := grpc.Dial(serverAddr, opts...)
if err != nil {
log.Fatalf("fail to dial: %v", err)
log.Fatalf("Fail to dial: %v", err)
}
defer conn.Close()
tc := testpb.NewTestServiceClient(conn)
@ -266,6 +344,16 @@ func main() {
doServerStreaming(tc)
case "ping_pong":
doPingPong(tc)
case "compute_engine_creds":
if !*useTLS {
log.Fatalf("TLS is not enabled. TLS is required to execute compute_engine_creds test case.")
}
doComputeEngineCreds(tc)
case "service_account_creds":
if !*useTLS {
log.Fatalf("TLS is not enabled. TLS is required to execute service_account_creds test case.")
}
doServiceAccountCreds(tc)
default:
log.Fatal("Unsupported test case: ", *testCase)
}

Просмотреть файл

@ -101,7 +101,7 @@ func New(m map[string]string) MD {
// Pairs panics if len(kv) is odd.
func Pairs(kv ...string) MD {
if len(kv)%2 == 1 {
panic(fmt.Sprintf("Got the odd number of input pairs for metadata: %d", len(kv)))
panic(fmt.Sprintf("metadata: Pairs got the odd number of input pairs for metadata: %d", len(kv)))
}
md := MD{}
var k string

Просмотреть файл

@ -123,12 +123,12 @@ func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
defer s.mu.Unlock()
// Does some sanity checks.
if _, ok := s.m[sd.ServiceName]; ok {
log.Fatalf("rpc: Duplicate service registration for %q", sd.ServiceName)
log.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) {
log.Fatalf("rpc: The handler of type %v that does not satisfy %v", st, ht)
log.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
}
srv := &service{
server: ss,
@ -186,7 +186,7 @@ func (s *Server) Serve(lis net.Listener) error {
if err != nil {
s.mu.Unlock()
c.Close()
log.Println("failed to create ServerTransport: ", err)
log.Println("grpc: Server.Serve failed to create ServerTransport: ", err)
continue
}
s.conns[st] = true
@ -213,7 +213,7 @@ func (s *Server) sendProto(t transport.ServerTransport, stream *transport.Stream
// TODO(zhaoq): There exist other options also such as only closing the
// faulty stream locally and remotely (Other streams can keep going). Find
// the optimal option.
log.Fatalf("Server: failed to encode proto message %v", err)
log.Fatalf("grpc: Server failed to encode proto message %v", err)
}
return t.Write(stream, p, opts)
}
@ -223,7 +223,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
for {
pf, req, err := p.recvMsg()
if err == io.EOF {
// The entire stream is done (for unary rpc only).
// The entire stream is done (for unary RPC only).
return
}
if err != nil {
@ -231,9 +231,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
case transport.ConnectionError:
// Nothing to do here.
case transport.StreamError:
t.WriteStatus(stream, err.Code, err.Desc)
if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
log.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
}
default:
panic(fmt.Sprintf("BUG: Unexpected error (%T) from recvMsg: %v", err, err))
panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
}
return
}
@ -241,7 +243,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
case compressionNone:
reply, appErr := md.Handler(srv.server, stream.Context(), req)
if appErr != nil {
t.WriteStatus(stream, convertCode(appErr), appErr.Error())
if err := t.WriteStatus(stream, convertCode(appErr), appErr.Error()); err != nil {
log.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
}
return
}
opts := &transport.Options{
@ -262,7 +266,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
statusDesc = err.Error()
}
}
t.WriteStatus(stream, statusCode, statusDesc)
if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
log.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
}
default:
panic(fmt.Sprintf("payload format to be supported: %d", pf))
}
@ -279,7 +285,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.statusCode = convertCode(err)
ss.statusDesc = err.Error()
}
t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
if err := t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc); err != nil {
log.Printf("grpc: Server.processStreamingRPC failed to write status: %v", err)
}
}
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {
@ -289,14 +297,18 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
}
pos := strings.LastIndex(sm, "/")
if pos == -1 {
t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method()))
if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil {
log.Printf("grpc: Server.handleStream failed to write status: %v", err)
}
return
}
service := sm[:pos]
method := sm[pos+1:]
srv, ok := s.m[service]
if !ok {
t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service))
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil {
log.Printf("grpc: Server.handleStream failed to write status: %v", err)
}
return
}
// Unary RPC or Streaming RPC?
@ -308,7 +320,9 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
s.processStreamingRPC(t, stream, srv, sd)
return
}
t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method))
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {
log.Printf("grpc: Server.handleStream failed to write status: %v", err)
}
}
// Stop stops the gRPC server. Once Stop returns, the server stops accepting
@ -351,7 +365,7 @@ func SendHeader(ctx context.Context, md metadata.MD) error {
}
t := stream.ServerTransport()
if t == nil {
log.Fatalf("rpc.SendHeader: %v has no ServerTransport to send header metadata.", stream)
log.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
}
return t.WriteHeader(stream, md)
}

Просмотреть файл

@ -220,7 +220,7 @@ func setUp(useTLS bool, maxStream uint32) (s *grpc.Server, tc testpb.TestService
if err != nil {
log.Fatalf("Failed to create credentials %v", err)
}
conn, err = grpc.Dial(addr, grpc.WithClientTLS(creds))
conn, err = grpc.Dial(addr, grpc.WithTransportCredentials(creds))
} else {
conn, err = grpc.Dial(addr)
}

Просмотреть файл

@ -35,6 +35,7 @@ package transport
import (
"bytes"
"errors"
"io"
"log"
"math"
@ -117,7 +118,7 @@ func newHTTP2Client(addr string, authOpts []credentials.Credentials) (_ ClientTr
conn, connErr = net.Dial("tcp", addr)
}
if connErr != nil {
return nil, ConnectionErrorf("grpc/transport: %v", connErr)
return nil, ConnectionErrorf("transport: %v", connErr)
}
defer func() {
if err != nil {
@ -127,14 +128,14 @@ func newHTTP2Client(addr string, authOpts []credentials.Credentials) (_ ClientTr
// Send connection preface to server.
n, err := conn.Write(clientPreface)
if err != nil {
return nil, ConnectionErrorf("grpc/transport: %v", err)
return nil, ConnectionErrorf("transport: %v", err)
}
if n != len(clientPreface) {
return nil, ConnectionErrorf("grpc/transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
return nil, ConnectionErrorf("transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
}
framer := http2.NewFramer(conn, conn)
if err := framer.WriteSettings(); err != nil {
return nil, ConnectionErrorf("grpc/transport: %v", err)
return nil, ConnectionErrorf("transport: %v", err)
}
var buf bytes.Buffer
t := &http2Client{
@ -225,7 +226,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
default:
}
if err != nil {
return nil, StreamErrorf(codes.InvalidArgument, "grpc/transport: %v", err)
return nil, StreamErrorf(codes.InvalidArgument, "transport: %v", err)
}
for k, v := range m {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v})
@ -264,8 +265,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
err = t.framer.WriteContinuation(t.nextID, endHeaders, t.hBuf.Next(size))
}
if err != nil {
t.notifyError()
return nil, ConnectionErrorf("grpc/transport: %v", err)
t.notifyError(err)
return nil, ConnectionErrorf("transport: %v", err)
}
}
s := t.newStream(ctx, callHdr)
@ -276,7 +277,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
t.mu.Unlock()
return nil, StreamErrorf(codes.Unavailable, "grpc/transport: failed to create new stream because the limit has been reached.")
return nil, StreamErrorf(codes.Unavailable, "transport: failed to create new stream because the limit has been reached.")
}
t.activeStreams[s.id] = s
t.mu.Unlock()
@ -315,6 +316,10 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
// accessed any more.
func (t *http2Client) Close() (err error) {
t.mu.Lock()
if t.state == closing {
t.mu.Unlock()
return errors.New("transport: Close() was already called")
}
t.state = closing
t.mu.Unlock()
close(t.shutdownChan)
@ -390,8 +395,8 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
// by http2Client.Close(). No explicit CloseStream() needs to be
// invoked.
if err := t.framer.WriteData(s.id, endStream, p); err != nil {
t.notifyError()
return ConnectionErrorf("grpc/transport: %v", err)
t.notifyError(err)
return ConnectionErrorf("transport: %v", err)
}
t.writableChan <- 0
if r.Len() == 0 {
@ -472,7 +477,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
s.state = streamDone
s.statusCode, ok = http2RSTErrConvTab[http2.ErrCode(f.ErrCode)]
if !ok {
log.Println("No gRPC status found for http2 error ", f.ErrCode)
log.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode)
}
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
@ -487,11 +492,11 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
}
func (t *http2Client) handlePing(f *http2.PingFrame) {
log.Println("PingFrame handler to be implemented")
// TODO(zhaoq): PingFrame handler to be implemented"
}
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
log.Println("GoAwayFrame handler to be implemented")
// TODO(zhaoq): GoAwayFrame handler to be implemented"
}
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
@ -560,12 +565,12 @@ func (t *http2Client) reader() {
// Check the validity of server preface.
frame, err := t.framer.ReadFrame()
if err != nil {
t.notifyError()
t.notifyError(err)
return
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.notifyError()
t.notifyError(err)
return
}
t.handleSettings(sf)
@ -576,7 +581,7 @@ func (t *http2Client) reader() {
for {
frame, err := t.framer.ReadFrame()
if err != nil {
t.notifyError()
t.notifyError(err)
return
}
switch frame := frame.(type) {
@ -605,7 +610,7 @@ func (t *http2Client) reader() {
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
default:
log.Printf("http2Client: unhandled frame type %v.", frame)
log.Printf("transport: http2Client.reader got unhandled frame type %v.", frame)
}
}
}
@ -627,7 +632,7 @@ func (t *http2Client) controller() {
case *resetStream:
t.framer.WriteRSTStream(i.streamID, i.code)
default:
log.Printf("http2Client.controller got unexpected item type %v\n", i)
log.Printf("transport: http2Client.controller got unexpected item type %v\n", i)
}
t.writableChan <- 0
continue
@ -644,12 +649,13 @@ func (t *http2Client) Error() <-chan struct{} {
return t.errorChan
}
func (t *http2Client) notifyError() {
func (t *http2Client) notifyError(err error) {
t.mu.Lock()
defer t.mu.Unlock()
// make sure t.errorChan is closed only once.
if t.state == reachable {
t.state = unreachable
close(t.errorChan)
log.Printf("transport: http2Client.notifyError got notified that the client transport was broken %v.", err)
}
}

Просмотреть файл

@ -52,7 +52,7 @@ import (
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
// the stream's state.
var ErrIllegalHeaderWrite = errors.New("grpc/transport: the stream is done or WriteHeader was already called")
var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
@ -65,7 +65,6 @@ type http2Server struct {
// shutdownChan is closed when Close is called.
// Blocking operations should select on shutdownChan to avoid
// blocking forever after Close.
// TODO(zhaoq): Maybe have a channel context?
shutdownChan chan struct{}
framer *http2.Framer
hBuf *bytes.Buffer // the buffer for HPACK encoding
@ -79,7 +78,7 @@ type http2Server struct {
// sendQuotaPool provides flow control to outbound message.
sendQuotaPool *quotaPool
mu sync.Mutex
mu sync.Mutex // guard the following
state transportState
activeStreams map[uint32]*Stream
// Inbound quota for flow control
@ -132,7 +131,7 @@ func (t *http2Server) operateHeaders(hDec *hpackDecoder, s *Stream, frame header
}()
endHeaders, err := hDec.decodeServerHTTP2Headers(s, frame)
if err != nil {
log.Print(err)
log.Printf("transport: http2Server.operateHeader found %v", err)
if se, ok := err.(StreamError); ok {
t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
}
@ -194,12 +193,12 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
// Check the validity of client preface.
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(t.conn, preface); err != nil {
log.Printf("failed to receive the preface from client: %v", err)
log.Printf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
t.Close()
return
}
if !bytes.Equal(preface, clientPreface) {
log.Printf("received bogus greeting from client: %q", preface)
log.Printf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
t.Close()
return
}
@ -211,7 +210,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
log.Printf("invalid preface type %T from client", frame)
log.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
t.Close()
return
}
@ -232,7 +231,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
id := frame.Header().StreamID
if id%2 != 1 || id <= t.maxStreamID {
// illegal gRPC stream id.
log.Println("http2Server: received an illegal stream id: ", id)
log.Println("transport: http2Server.HandleStreams received an illegal stream id: ", id)
t.Close()
break
}
@ -262,7 +261,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
default:
log.Printf("http2Server: unhandled frame type %v.", frame)
log.Printf("transport: http2Server.HanldeStreams found unhandled frame type %v.", frame)
}
}
}
@ -346,7 +345,7 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
}
func (t *http2Server) handlePing(f *http2.PingFrame) {
log.Println("PingFrame handler to be implemented")
// TODO(zhaoq): PingFrame handler to be implemented
}
func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
@ -387,7 +386,7 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e
}
if err != nil {
t.Close()
return ConnectionErrorf("grpc/transport: %v", err)
return ConnectionErrorf("transport: %v", err)
}
}
return nil
@ -430,7 +429,6 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
}
s.mu.RUnlock()
if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
// TODO(zhaoq): Print some errors using glog, e.g., glog.V(1).
return err
}
t.hBuf.Reset()
@ -478,7 +476,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
}
if err := t.framer.WriteHeaders(p); err != nil {
t.Close()
return ConnectionErrorf("grpc/transport: %v", err)
return ConnectionErrorf("transport: %v", err)
}
t.writableChan <- 0
}
@ -526,7 +524,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
}
if err := t.framer.WriteData(s.id, false, p); err != nil {
t.Close()
return ConnectionErrorf("grpc/transport: %v", err)
return ConnectionErrorf("transport: %v", err)
}
t.writableChan <- 0
}
@ -550,7 +548,7 @@ func (t *http2Server) controller() {
case *resetStream:
t.framer.WriteRSTStream(i.streamID, i.code)
default:
log.Printf("http2Server.controller got unexpected item type %v\n", i)
log.Printf("transport: http2Server.controller got unexpected item type %v\n", i)
}
t.writableChan <- 0
continue
@ -570,7 +568,7 @@ func (t *http2Server) Close() (err error) {
t.mu.Lock()
if t.state == closing {
t.mu.Unlock()
return
return errors.New("transport: Close() was already called")
}
t.state = closing
streams := t.activeStreams

Просмотреть файл

@ -137,7 +137,7 @@ func newHPACKDecoder() *hpackDecoder {
case "grpc-status":
code, err := strconv.Atoi(f.Value)
if err != nil {
d.err = StreamErrorf(codes.Internal, "grpc/transport: malformed grpc-status: %v", err)
d.err = StreamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err)
return
}
d.state.statusCode = codes.Code(code)
@ -148,7 +148,7 @@ func newHPACKDecoder() *hpackDecoder {
var err error
d.state.timeout, err = timeoutDecode(f.Value)
if err != nil {
d.err = StreamErrorf(codes.Internal, "grpc/transport: malformed time-out: %v", err)
d.err = StreamErrorf(codes.Internal, "transport: malformed time-out: %v", err)
return
}
case ":path":
@ -174,12 +174,12 @@ func (d *hpackDecoder) decodeClientHTTP2Headers(s *Stream, frame headerFrame) (e
d.err = nil
_, err = d.h.Write(frame.HeaderBlockFragment())
if err != nil {
err = StreamErrorf(codes.Internal, "grpc/transport: HPACK header decode error: %v", err)
err = StreamErrorf(codes.Internal, "transport: HPACK header decode error: %v", err)
}
if frame.HeadersEnded() {
if closeErr := d.h.Close(); closeErr != nil && err == nil {
err = StreamErrorf(codes.Internal, "grpc/transport: HPACK decoder close error: %v", closeErr)
err = StreamErrorf(codes.Internal, "transport: HPACK decoder close error: %v", closeErr)
}
endHeaders = true
}
@ -194,12 +194,12 @@ func (d *hpackDecoder) decodeServerHTTP2Headers(s *Stream, frame headerFrame) (e
d.err = nil
_, err = d.h.Write(frame.HeaderBlockFragment())
if err != nil {
err = StreamErrorf(codes.Internal, "grpc/transport: HPACK header decode error: %v", err)
err = StreamErrorf(codes.Internal, "transport: HPACK header decode error: %v", err)
}
if frame.HeadersEnded() {
if closeErr := d.h.Close(); closeErr != nil && err == nil {
err = StreamErrorf(codes.Internal, "grpc/transport: HPACK decoder close error: %v", closeErr)
err = StreamErrorf(codes.Internal, "transport: HPACK decoder close error: %v", closeErr)
}
endHeaders = true
}
@ -275,12 +275,12 @@ func timeoutEncode(t time.Duration) string {
func timeoutDecode(s string) (time.Duration, error) {
size := len(s)
if size < 2 {
return 0, fmt.Errorf("grpc/transport: timeout string is too short: %q", s)
return 0, fmt.Errorf("transport: timeout string is too short: %q", s)
}
unit := timeoutUnit(s[size-1])
d, ok := timeoutUnitToDuration(unit)
if !ok {
return 0, fmt.Errorf("grpc/transport: timeout unit is not recognized: %q", s)
return 0, fmt.Errorf("transport: timeout unit is not recognized: %q", s)
}
t, err := strconv.ParseInt(s[:size-1], 10, 64)
if err != nil {

Просмотреть файл

@ -75,9 +75,9 @@ func TestTimeoutDecode(t *testing.T) {
err error
}{
{"1234S", time.Second * 1234, nil},
{"1234x", 0, fmt.Errorf("grpc/transport: timeout unit is not recognized: %q", "1234x")},
{"1", 0, fmt.Errorf("grpc/transport: timeout string is too short: %q", "1")},
{"", 0, fmt.Errorf("grpc/transport: timeout string is too short: %q", "")},
{"1234x", 0, fmt.Errorf("transport: timeout unit is not recognized: %q", "1234x")},
{"1", 0, fmt.Errorf("transport: timeout string is too short: %q", "1")},
{"", 0, fmt.Errorf("transport: timeout string is too short: %q", "")},
} {
d, err := timeoutDecode(test.s)
if d != test.d || fmt.Sprint(err) != fmt.Sprint(test.err) {

Просмотреть файл

@ -186,7 +186,7 @@ type Stream struct {
// The key-value map of trailer metadata.
trailer metadata.MD
mu sync.RWMutex
mu sync.RWMutex // guard the following
// headerOK becomes true from the first header is about to send.
headerOk bool
state streamState
@ -247,7 +247,7 @@ func (s *Stream) StatusDesc() string {
// ErrIllegalTrailerSet indicates that the trailer has already been set or it
// is too late to do so.
var ErrIllegalTrailerSet = errors.New("grpc/transport: trailer has been set")
var ErrIllegalTrailerSet = errors.New("transport: trailer has been set")
// SetTrailer sets the trailer metadata which will be sent with the RPC status
// by the server. This can only be called at most once. Server side only.

Просмотреть файл

@ -158,12 +158,12 @@ func (s *server) Wait(t *testing.T, timeout time.Duration) {
}
func (s *server) Close() {
// Keep consistent with closeServer().
s.lis.Close()
s.mu.Lock()
for c := range s.conns {
c.Close()
}
s.conns = nil
s.mu.Unlock()
}
@ -227,8 +227,8 @@ func TestClientSendAndReceive(t *testing.T) {
if recvErr != io.EOF {
t.Fatalf("Error: %v; want <EOF>", recvErr)
}
ct.Close()
server.Close()
closeClient(ct, t)
closeServer(server, t)
}
func TestClientErrorNotify(t *testing.T) {
@ -245,10 +245,10 @@ func TestClientErrorNotify(t *testing.T) {
t.Fatalf("wrong stream id: %d", s.id)
}
// Tear down the server.
go server.Close()
go closeServer(server, t)
// ct.reader should detect the error and activate ct.Error().
<-ct.Error()
ct.Close()
closeClient(ct, t)
}
func performOneRPC(ct ClientTransport) {
@ -281,11 +281,11 @@ func TestClientMix(t *testing.T) {
s, ct := setUp(t, true, 0, math.MaxUint32, false)
go func(s *server) {
time.Sleep(5 * time.Second)
s.Close()
closeServer(s, t)
}(s)
go func(t ClientTransport) {
go func(ct ClientTransport) {
<-ct.Error()
ct.Close()
closeClient(ct, t)
}(ct)
for i := 0; i < 1000; i++ {
time.Sleep(10 * time.Millisecond)
@ -296,8 +296,8 @@ func TestClientMix(t *testing.T) {
func TestExceedMaxStreamsLimit(t *testing.T) {
server, ct := setUp(t, true, 0, 1, false)
defer func() {
ct.Close()
server.Close()
closeClient(ct, t)
closeServer(server, t)
}()
callHdr := &CallHdr{
Host: "localhost",
@ -371,8 +371,8 @@ func TestLargeMessage(t *testing.T) {
}()
}
wg.Wait()
ct.Close()
server.Close()
closeClient(ct, t)
closeServer(server, t)
}
func TestLargeMessageSuspension(t *testing.T) {
@ -393,8 +393,8 @@ func TestLargeMessageSuspension(t *testing.T) {
if err == nil || err != expectedErr {
t.Fatalf("Write got %v, want %v", err, expectedErr)
}
ct.Close()
server.Close()
closeClient(ct, t)
closeServer(server, t)
}
func TestStreamContext(t *testing.T) {
@ -405,3 +405,53 @@ func TestStreamContext(t *testing.T) {
t.Fatalf("GetStreamFromContext(%v) = %v, %t, want: %v, true", ctx, *s, ok, expectedStream)
}
}
// closeClient shuts down the ClientTransport and reports any errors to the
// test framework and terminates the current test case.
func closeClient(ct ClientTransport, t *testing.T) {
if err := ct.Close(); err != nil {
t.Fatalf("ct.Close() = %v, want <nil>", err)
}
}
// closeServerWithErr shuts down the testing server, closing the associated
// transports. It returns the first error it encounters, if any.
func closeServerWithErr(s *server) error {
// Keep consistent with s.Close().
s.lis.Close()
s.mu.Lock()
defer s.mu.Unlock()
for c := range s.conns {
if err := c.Close(); err != nil {
return err
}
}
return nil
}
// closeServer shuts down the and testing server, closing the associated
// transport. It reports any errors to the test framework and terminates the
// current test case.
func closeServer(s *server, t *testing.T) {
if err := closeServerWithErr(s); err != nil {
t.Fatalf("server.Close() = %v, want <nil>", err)
}
}
func TestClientServerDuplicatedClose(t *testing.T) {
server, ct := setUp(t, true, 0, math.MaxUint32, false)
if err := ct.Close(); err != nil {
t.Fatalf("ct.Close() = %v, want <nil>", err)
}
if err := ct.Close(); err == nil {
// Duplicated closes should gracefully issue an error.
t.Fatalf("ct.Close() = <nil>, want non-nil")
}
if err := closeServerWithErr(server); err != nil {
t.Fatalf("closeServerWithErr(server) = %v, want <nil>", err)
}
if err := closeServerWithErr(server); err == nil {
// Duplicated closes should gracefully issue an error.
t.Fatalf("closeServerWithErr(server) = <nil>, want non-nil")
}
}