diff --git a/stream.go b/stream.go index a182e077..4c3136c9 100644 --- a/stream.go +++ b/stream.go @@ -257,7 +257,17 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if err != nil { cs.finish(err) } - if err == nil || err == io.EOF { + if err == nil { + return + } + if err == io.EOF { + // Specialize the process for server streaming. SendMesg is only called + // once when creating the stream object. io.EOF needs to be skipped when + // the rpc is early finished (before the stream object is created.). + // TODO: It is probably better to move this into the generated code. + if !cs.desc.ClientStreams && cs.desc.ServerStreams { + err = nil + } return } if _, ok := err.(transport.ConnectionError); !ok { diff --git a/transport/transport_test.go b/transport/transport_test.go index ce015da2..5a517e0b 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -433,14 +433,21 @@ func TestMaxStreams(t *testing.T) { } done := make(chan struct{}) ch := make(chan int) + ready := make(chan struct{}) go func() { for { select { case <-time.After(5 * time.Millisecond): - ch <- 0 + select { + case ch <- 0: + case <-ready: + return + } case <-time.After(5 * time.Second): close(done) return + case <-ready: + return } } }() @@ -467,6 +474,7 @@ func TestMaxStreams(t *testing.T) { } cc.mu.Unlock() } + close(ready) // Close the pending stream so that the streams quota becomes available for the next new stream. ct.CloseStream(s, nil) select { @@ -690,7 +698,8 @@ func TestClientWithMisbehavedServer(t *testing.T) { Host: "localhost", Method: "foo.MaxFrame", } - for i := 0; i < int(initialConnWindowSize/initialWindowSize+10); i++ { + // Make the server flood the traffic to violate flow control window size of the connection. + for { s, err := ct.NewStream(context.Background(), callHdr) if err != nil { break