From 458d514e70b3e5ba71287de5c338c8a6b8b92be3 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 27 May 2015 19:03:21 -0700 Subject: [PATCH 1/3] new streams block when the max concurrent stram limit is reached. --- test/end2end_test.go | 4 ++-- transport/http2_client.go | 13 +++++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index cf1e2915..69c131eb 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -806,12 +806,12 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) { var wg sync.WaitGroup wg.Add(1) go func() { + defer wg.Done() // The 2nd stream should block until its deadline exceeds. ctx, _ := context.WithTimeout(context.Background(), time.Second) if _, err := tc.StreamingInputCall(ctx); grpc.Code(err) != codes.DeadlineExceeded { - t.Fatalf("%v.StreamingInputCall(%v) = _, %v, want error code %d", tc, ctx, err, codes.DeadlineExceeded) + t.Errorf("%v.StreamingInputCall(%v) = _, %v, want error code %d", tc, ctx, err, codes.DeadlineExceeded) } - wg.Done() }() wg.Wait() } diff --git a/transport/http2_client.go b/transport/http2_client.go index 35a69fc3..07c93299 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -243,19 +243,20 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Unlock() return nil, ErrConnClosing } - if t.streamsQuota != nil { - q, err := wait(ctx, t.shutdownChan, t.streamsQuota.acquire()) + checkStreamsQuota := t.streamsQuota != nil + t.mu.Unlock() + if checkStreamsQuota { + sq, err := wait(ctx, t.shutdownChan, t.streamsQuota.acquire()) if err != nil { - t.mu.Unlock() return nil, err } // Returns the quota balance back. - if q > 1 { - t.streamsQuota.add(q - 1) + if sq > 1 { + t.streamsQuota.add(sq - 1) } } - t.mu.Unlock() if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil { + // t.streamsQuota will be updated when t.CloseStream is invoked. return nil, err } t.mu.Lock() From 8df7c3fd8449a0616db90397400e8ca0405da430 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 28 May 2015 13:43:27 -0700 Subject: [PATCH 2/3] i) update streamsQuota only if streamsQuota is there when the stream is created; ii) move ops of streamsQuota out of mutex of the transport (except acquire()) --- test/end2end_test.go | 2 +- transport/http2_client.go | 23 ++++++++++++++--------- transport/transport.go | 9 +++++++-- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 69c131eb..73e6cb65 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -795,7 +795,7 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) { s, cc := setUp(1, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) - // Perform an unary RPC to make sure the new settings were propagated to the client. + // Perform a unary RPC to make sure the new settings were propagated to the client. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", tc, err) } diff --git a/transport/http2_client.go b/transport/http2_client.go index 07c93299..a32d416c 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -190,7 +190,7 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e return t, nil } -func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { +func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr, sq bool) *Stream { fc := &inFlow{ limit: initialWindowSize, conn: t.fc, @@ -200,6 +200,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { id: t.nextID, method: callHdr.Method, buf: newRecvBuffer(), + updateStreams: sq, fc: fc, sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), headerChan: make(chan struct{}), @@ -260,7 +261,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea return nil, err } t.mu.Lock() - s := t.newStream(ctx, callHdr) + s := t.newStream(ctx, callHdr, checkStreamsQuota) t.activeStreams[s.id] = s t.mu.Unlock() // HPACK encodes various headers. Note that once WriteField(...) is @@ -329,10 +330,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea func (t *http2Client) CloseStream(s *Stream, err error) { t.mu.Lock() delete(t.activeStreams, s.id) - if t.streamsQuota != nil { + t.mu.Unlock() + if s.updateStreams { t.streamsQuota.add(1) } - t.mu.Unlock() s.mu.Lock() if q := s.fc.restoreConn(); q > 0 { t.controlBuf.put(&windowUpdate{0, q}) @@ -568,8 +569,6 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) { } f.ForeachSetting(func(s http2.Setting) error { if v, ok := f.Value(s.ID); ok { - t.mu.Lock() - defer t.mu.Unlock() switch s.ID { case http2.SettingMaxConcurrentStreams: // TODO(zhaoq): This is a hack to avoid significant refactoring of the @@ -578,18 +577,24 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) { if v > math.MaxInt32 { v = math.MaxInt32 } - if t.streamsQuota == nil { + t.mu.Lock() + reset := t.streamsQuota != nil + ms := t.maxStreams + t.maxStreams = int(v) + t.mu.Unlock() + if !reset { t.streamsQuota = newQuotaPool(int(v)) } else { - t.streamsQuota.reset(int(v) - t.maxStreams) + t.streamsQuota.reset(int(v) - ms) } - t.maxStreams = int(v) case http2.SettingInitialWindowSize: + t.mu.Lock() for _, s := range t.activeStreams { // Adjust the sending quota for each s. s.sendQuotaPool.reset(int(v - t.streamSendQuota)) } t.streamSendQuota = v + t.mu.Unlock() } } return nil diff --git a/transport/transport.go b/transport/transport.go index 5dfd89f0..498cee54 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -173,8 +173,13 @@ type Stream struct { buf *recvBuffer dec io.Reader - fc *inFlow - recvQuota uint32 + // updateStreams indicates whether the transport's streamsQuota needed + // to be updated when this stream is closed. It is false when the transport + // sticks to the initial infinite value of the number of concurrent streams. + // Ture otherwise. + updateStreams bool + fc *inFlow + recvQuota uint32 // The accumulated inbound quota pending for window update. updateQuota uint32 // The handler to control the window update procedure for both this From bf5381cf0e77bddbfece0f4f23cf8cd5f068c2ff Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 29 May 2015 11:16:24 -0700 Subject: [PATCH 3/3] fix transport_test --- transport/transport_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/transport_test.go b/transport/transport_test.go index 6c8c81fd..8529e2af 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -500,7 +500,7 @@ func TestClientWithMisbehavedServer(t *testing.T) { for i := 0; i < int(initialConnWindowSize/initialWindowSize+10); i++ { s, err := ct.NewStream(context.Background(), callHdr) if err != nil { - t.Fatalf("Failed to open stream: %v", err) + break } if err := ct.Write(s, expectedRequest, &Options{Last: true, Delay: false}); err != nil { break