Merge pull request #912 from iamqizhao/master
Fix the flakiness of TestStreamingRPCTimeoutServerError
This commit is contained in:
Коммит
ac7efbd8be
|
@ -1747,7 +1747,7 @@ func testStreamingRPCTimeoutServerError(t *testing.T, e env) {
|
||||||
req := &testpb.StreamingOutputCallRequest{}
|
req := &testpb.StreamingOutputCallRequest{}
|
||||||
for duration := 50 * time.Millisecond; ; duration *= 2 {
|
for duration := 50 * time.Millisecond; ; duration *= 2 {
|
||||||
ctx, _ := context.WithTimeout(context.Background(), duration)
|
ctx, _ := context.WithTimeout(context.Background(), duration)
|
||||||
stream, err := tc.FullDuplexCall(ctx)
|
stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
|
||||||
if grpc.Code(err) == codes.DeadlineExceeded {
|
if grpc.Code(err) == codes.DeadlineExceeded {
|
||||||
// Redo test with double timeout.
|
// Redo test with double timeout.
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -267,16 +267,6 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
||||||
// NewStream creates a stream and register it into the transport as "active"
|
// NewStream creates a stream and register it into the transport as "active"
|
||||||
// streams.
|
// streams.
|
||||||
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
|
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
|
||||||
// Record the timeout value on the context.
|
|
||||||
var timeout time.Duration
|
|
||||||
if dl, ok := ctx.Deadline(); ok {
|
|
||||||
timeout = dl.Sub(time.Now())
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, ContextErr(ctx.Err())
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
pr := &peer.Peer{
|
pr := &peer.Peer{
|
||||||
Addr: t.conn.RemoteAddr(),
|
Addr: t.conn.RemoteAddr(),
|
||||||
}
|
}
|
||||||
|
@ -383,9 +373,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||||
if callHdr.SendCompress != "" {
|
if callHdr.SendCompress != "" {
|
||||||
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
|
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
|
||||||
}
|
}
|
||||||
if timeout > 0 {
|
if dl, ok := ctx.Deadline(); ok {
|
||||||
|
// Send out timeout regardless its value. The server can detect timeout context by itself.
|
||||||
|
timeout := dl.Sub(time.Now())
|
||||||
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
|
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
|
||||||
}
|
}
|
||||||
|
|
||||||
for k, v := range authData {
|
for k, v := range authData {
|
||||||
// Capital header names are illegal in HTTP/2.
|
// Capital header names are illegal in HTTP/2.
|
||||||
k = strings.ToLower(k)
|
k = strings.ToLower(k)
|
||||||
|
|
|
@ -253,6 +253,9 @@ func div(d, r time.Duration) int64 {
|
||||||
|
|
||||||
// TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it.
|
// TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it.
|
||||||
func encodeTimeout(t time.Duration) string {
|
func encodeTimeout(t time.Duration) string {
|
||||||
|
if t <= 0 {
|
||||||
|
return "0n"
|
||||||
|
}
|
||||||
if d := div(t, time.Nanosecond); d <= maxTimeoutValue {
|
if d := div(t, time.Nanosecond); d <= maxTimeoutValue {
|
||||||
return strconv.FormatInt(d, 10) + "n"
|
return strconv.FormatInt(d, 10) + "n"
|
||||||
}
|
}
|
||||||
|
|
Загрузка…
Ссылка в новой задаче