From b531d0156b39ac28d3bc0181f91859b549ca895b Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 27 Sep 2016 17:07:14 -0700 Subject: [PATCH 1/2] fix the flakiness of TestStreamingRPCTimeoutServerError --- test/end2end_test.go | 2 +- transport/http2_client.go | 16 +++++----------- transport/http_util.go | 3 +++ 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 69c4e428..bf4b49db 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1747,7 +1747,7 @@ func testStreamingRPCTimeoutServerError(t *testing.T, e env) { req := &testpb.StreamingOutputCallRequest{} for duration := 50 * time.Millisecond; ; duration *= 2 { ctx, _ := context.WithTimeout(context.Background(), duration) - stream, err := tc.FullDuplexCall(ctx) + stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)) if grpc.Code(err) == codes.DeadlineExceeded { // Redo test with double timeout. continue diff --git a/transport/http2_client.go b/transport/http2_client.go index b7c736e0..d2833852 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -267,16 +267,6 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { // NewStream creates a stream and register it into the transport as "active" // streams. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { - // Record the timeout value on the context. - var timeout time.Duration - if dl, ok := ctx.Deadline(); ok { - timeout = dl.Sub(time.Now()) - } - select { - case <-ctx.Done(): - return nil, ContextErr(ctx.Err()) - default: - } pr := &peer.Peer{ Addr: t.conn.RemoteAddr(), } @@ -383,9 +373,13 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea if callHdr.SendCompress != "" { t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress}) } - if timeout > 0 { + dl, ok := ctx.Deadline() + if ok { + // Send out timeout regardless its value. The server can detect timeout context by itself. + timeout := dl.Sub(time.Now()) t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)}) } + for k, v := range authData { // Capital header names are illegal in HTTP/2. k = strings.ToLower(k) diff --git a/transport/http_util.go b/transport/http_util.go index b024594e..baec02da 100644 --- a/transport/http_util.go +++ b/transport/http_util.go @@ -253,6 +253,9 @@ func div(d, r time.Duration) int64 { // TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it. func encodeTimeout(t time.Duration) string { + if t <= 0 { + return "0n" + } if d := div(t, time.Nanosecond); d <= maxTimeoutValue { return strconv.FormatInt(d, 10) + "n" } From b9111902e257107315a49512f438d8228683830d Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 27 Sep 2016 17:09:57 -0700 Subject: [PATCH 2/2] minor polish --- transport/http2_client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/transport/http2_client.go b/transport/http2_client.go index d2833852..3c185541 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -373,8 +373,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea if callHdr.SendCompress != "" { t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress}) } - dl, ok := ctx.Deadline() - if ok { + if dl, ok := ctx.Deadline(); ok { // Send out timeout regardless its value. The server can detect timeout context by itself. timeout := dl.Sub(time.Now()) t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})