From 2d286d72e4b4f8c4bc32c833f4707034312a6922 Mon Sep 17 00:00:00 2001 From: Qi Zhao Date: Sun, 12 Apr 2015 11:39:22 -0700 Subject: [PATCH] Revert "remove restoreConn because it is not needed" --- transport/control.go | 20 ++++++++++++++++++++ transport/http2_client.go | 3 +++ transport/http2_server.go | 3 +++ transport/transport_test.go | 14 ++++++++++++-- 4 files changed, 38 insertions(+), 2 deletions(-) diff --git a/transport/control.go b/transport/control.go index 557c49bc..fc7806a8 100644 --- a/transport/control.go +++ b/transport/control.go @@ -209,3 +209,23 @@ func (f *inFlow) onRead(n uint32) uint32 { } return 0 } + +// restoreConn is invoked when a stream is terminated. It removes its stake in +// the connection-level flow and resets its own state. +func (f *inFlow) restoreConn() uint32 { + if f.conn == nil { + return 0 + } + f.mu.Lock() + defer f.mu.Unlock() + ret := f.pendingData + f.conn.mu.Lock() + f.conn.pendingData -= ret + if f.conn.pendingUpdate > f.conn.pendingData { + f.conn.pendingUpdate = f.conn.pendingData + } + f.conn.mu.Unlock() + f.pendingData = 0 + f.pendingUpdate = 0 + return ret +} diff --git a/transport/http2_client.go b/transport/http2_client.go index cf34f792..b9cef877 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -312,6 +312,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) { delete(t.activeStreams, s.id) t.mu.Unlock() s.mu.Lock() + if q := s.fc.restoreConn(); q > 0 { + t.controlBuf.put(&windowUpdate{0, q}) + } if s.state == streamDone { s.mu.Unlock() return diff --git a/transport/http2_server.go b/transport/http2_server.go index c5d7f48e..a6576584 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -648,6 +648,9 @@ func (t *http2Server) closeStream(s *Stream) { t.mu.Lock() delete(t.activeStreams, s.id) t.mu.Unlock() + if q := s.fc.restoreConn(); q > 0 { + t.controlBuf.put(&windowUpdate{0, q}) + } s.mu.Lock() if s.state == streamDone { s.mu.Unlock() diff --git a/transport/transport_test.go b/transport/transport_test.go index a3005343..70343396 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -500,6 +500,10 @@ func TestServerWithMisbehavedClient(t *testing.T) { if _, err := io.ReadFull(s, make([]byte, 1)); err != io.EOF || s.statusCode != code { t.Fatalf("%v got err %v with statusCode %d, want err with statusCode %d", s, err, s.statusCode, code) } + + if ss.fc.pendingData != 0 || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != 0 { + t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, 0", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate) + } ct.CloseStream(s, nil) // Test server behavior for violation of connection flow control window size restriction. // @@ -508,11 +512,14 @@ func TestServerWithMisbehavedClient(t *testing.T) { for { s, err := ct.NewStream(context.Background(), callHdr) if err != nil { + t.Fatalf("Failed to open stream: %v", err) + } + <-cc.writableChan + // Write will fail when connection flow control window runs out. + if err := cc.framer.writeData(true, s.id, true, make([]byte, http2MaxFrameLen)); err != nil { // The server tears down the connection. break } - <-cc.writableChan - cc.framer.writeData(true, s.id, true, make([]byte, http2MaxFrameLen)) cc.writableChan <- 0 } ct.Close() @@ -551,6 +558,9 @@ func TestClientWithMisbehavedServer(t *testing.T) { t.Fatalf("Got err %v and the status code %d, want and the code %d", err, s.statusCode, codes.Internal) } conn.CloseStream(s, err) + if s.fc.pendingData != 0 || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate != 0 { + t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, 0", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate) + } // Test the logic for the violation of the connection flow control window size restriction. // // Generate enough streams to drain the connection window.