Merge pull request #161 from grpc/revert-160-restoreConn
Revert "remove restoreConn because it is not needed"
This commit is contained in:
Коммит
74f20d94f9
|
@ -209,3 +209,23 @@ func (f *inFlow) onRead(n uint32) uint32 {
|
|||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// restoreConn is invoked when a stream is terminated. It removes its stake in
|
||||
// the connection-level flow and resets its own state.
|
||||
func (f *inFlow) restoreConn() uint32 {
|
||||
if f.conn == nil {
|
||||
return 0
|
||||
}
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
ret := f.pendingData
|
||||
f.conn.mu.Lock()
|
||||
f.conn.pendingData -= ret
|
||||
if f.conn.pendingUpdate > f.conn.pendingData {
|
||||
f.conn.pendingUpdate = f.conn.pendingData
|
||||
}
|
||||
f.conn.mu.Unlock()
|
||||
f.pendingData = 0
|
||||
f.pendingUpdate = 0
|
||||
return ret
|
||||
}
|
||||
|
|
|
@ -312,6 +312,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
|
|||
delete(t.activeStreams, s.id)
|
||||
t.mu.Unlock()
|
||||
s.mu.Lock()
|
||||
if q := s.fc.restoreConn(); q > 0 {
|
||||
t.controlBuf.put(&windowUpdate{0, q})
|
||||
}
|
||||
if s.state == streamDone {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
|
|
|
@ -648,6 +648,9 @@ func (t *http2Server) closeStream(s *Stream) {
|
|||
t.mu.Lock()
|
||||
delete(t.activeStreams, s.id)
|
||||
t.mu.Unlock()
|
||||
if q := s.fc.restoreConn(); q > 0 {
|
||||
t.controlBuf.put(&windowUpdate{0, q})
|
||||
}
|
||||
s.mu.Lock()
|
||||
if s.state == streamDone {
|
||||
s.mu.Unlock()
|
||||
|
|
|
@ -500,6 +500,10 @@ func TestServerWithMisbehavedClient(t *testing.T) {
|
|||
if _, err := io.ReadFull(s, make([]byte, 1)); err != io.EOF || s.statusCode != code {
|
||||
t.Fatalf("%v got err %v with statusCode %d, want err <EOF> with statusCode %d", s, err, s.statusCode, code)
|
||||
}
|
||||
|
||||
if ss.fc.pendingData != 0 || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != 0 {
|
||||
t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, 0", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate)
|
||||
}
|
||||
ct.CloseStream(s, nil)
|
||||
// Test server behavior for violation of connection flow control window size restriction.
|
||||
//
|
||||
|
@ -508,11 +512,14 @@ func TestServerWithMisbehavedClient(t *testing.T) {
|
|||
for {
|
||||
s, err := ct.NewStream(context.Background(), callHdr)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to open stream: %v", err)
|
||||
}
|
||||
<-cc.writableChan
|
||||
// Write will fail when connection flow control window runs out.
|
||||
if err := cc.framer.writeData(true, s.id, true, make([]byte, http2MaxFrameLen)); err != nil {
|
||||
// The server tears down the connection.
|
||||
break
|
||||
}
|
||||
<-cc.writableChan
|
||||
cc.framer.writeData(true, s.id, true, make([]byte, http2MaxFrameLen))
|
||||
cc.writableChan <- 0
|
||||
}
|
||||
ct.Close()
|
||||
|
@ -551,6 +558,9 @@ func TestClientWithMisbehavedServer(t *testing.T) {
|
|||
t.Fatalf("Got err %v and the status code %d, want <EOF> and the code %d", err, s.statusCode, codes.Internal)
|
||||
}
|
||||
conn.CloseStream(s, err)
|
||||
if s.fc.pendingData != 0 || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate != 0 {
|
||||
t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, 0", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate)
|
||||
}
|
||||
// Test the logic for the violation of the connection flow control window size restriction.
|
||||
//
|
||||
// Generate enough streams to drain the connection window.
|
||||
|
|
Загрузка…
Ссылка в новой задаче