Коммит
d017580d29
12
stream.go
12
stream.go
|
@ -257,7 +257,17 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
|||
if err != nil {
|
||||
cs.finish(err)
|
||||
}
|
||||
if err == nil || err == io.EOF {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
if err == io.EOF {
|
||||
// Specialize the process for server streaming. SendMesg is only called
|
||||
// once when creating the stream object. io.EOF needs to be skipped when
|
||||
// the rpc is early finished (before the stream object is created.).
|
||||
// TODO: It is probably better to move this into the generated code.
|
||||
if !cs.desc.ClientStreams && cs.desc.ServerStreams {
|
||||
err = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
if _, ok := err.(transport.ConnectionError); !ok {
|
||||
|
|
|
@ -433,14 +433,21 @@ func TestMaxStreams(t *testing.T) {
|
|||
}
|
||||
done := make(chan struct{})
|
||||
ch := make(chan int)
|
||||
ready := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-time.After(5 * time.Millisecond):
|
||||
ch <- 0
|
||||
select {
|
||||
case ch <- 0:
|
||||
case <-ready:
|
||||
return
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
close(done)
|
||||
return
|
||||
case <-ready:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -467,6 +474,7 @@ func TestMaxStreams(t *testing.T) {
|
|||
}
|
||||
cc.mu.Unlock()
|
||||
}
|
||||
close(ready)
|
||||
// Close the pending stream so that the streams quota becomes available for the next new stream.
|
||||
ct.CloseStream(s, nil)
|
||||
select {
|
||||
|
@ -690,7 +698,8 @@ func TestClientWithMisbehavedServer(t *testing.T) {
|
|||
Host: "localhost",
|
||||
Method: "foo.MaxFrame",
|
||||
}
|
||||
for i := 0; i < int(initialConnWindowSize/initialWindowSize+10); i++ {
|
||||
// Make the server flood the traffic to violate flow control window size of the connection.
|
||||
for {
|
||||
s, err := ct.NewStream(context.Background(), callHdr)
|
||||
if err != nil {
|
||||
break
|
||||
|
|
Загрузка…
Ссылка в новой задаче