diff --git a/stream.go b/stream.go index b23acaf3..da478938 100644 --- a/stream.go +++ b/stream.go @@ -542,7 +542,10 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) // We started another attempt already. continue } - if err == nil || err == io.EOF { + if err == io.EOF { + <-a.s.Done() + } + if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) { onSuccess() cs.mu.Unlock() return err @@ -616,7 +619,9 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { cs.finish(err) } }() - // TODO: Check cs.sentLast and error if we already ended the stream. + if cs.sentLast { + return status.Errorf(codes.Internal, "SendMsg called after CloseSend") + } if !cs.desc.ClientStreams { cs.sentLast = true } diff --git a/transport/transport.go b/transport/transport.go index d409356f..62d6e6bb 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -334,7 +334,7 @@ func (s *Stream) Method() string { // Status returns the status received from the server. // Status can be read safely only after the stream has ended, -// that is, read or write has returned io.EOF. +// that is, after Done() is closed. func (s *Stream) Status() *status.Status { return s.status }