pargzip: fix blocked goroutine on write error

Change-Id: Ifdcc24ef0e04b855882dc21b31182d6492bc54c6
Reviewed-on: https://go-review.googlesource.com/14730
Reviewed-by: Andrew Gerrand <adg@golang.org>
This commit is contained in:
Brad Fitzpatrick 2015-09-18 00:14:42 +00:00 коммит произвёл Brad Fitzpatrick
Родитель 8a7ceb75fa
Коммит 953211a3e3
1 изменённых файлов: 12 добавлений и 4 удалений

Просмотреть файл

@ -41,7 +41,8 @@ type Writer struct {
w io.Writer w io.Writer
bw *bufio.Writer bw *bufio.Writer
allWritten chan struct{} // when writing goroutine ends allWritten chan struct{} // when writing goroutine ends
wasWriteErr chan struct{} // closed after 'err' set
sem chan bool // semaphore bounding compressions in flight sem chan bool // semaphore bounding compressions in flight
chunkc chan *writeChunk // closed on Close chunkc chan *writeChunk // closed on Close
@ -103,8 +104,9 @@ func (c *writeChunk) compress() (err error) {
// Write. // Write.
func NewWriter(w io.Writer) *Writer { func NewWriter(w io.Writer) *Writer {
return &Writer{ return &Writer{
w: w, w: w,
allWritten: make(chan struct{}), allWritten: make(chan struct{}),
wasWriteErr: make(chan struct{}),
UseSystemGzip: true, UseSystemGzip: true,
ChunkSize: 1 << 20, ChunkSize: 1 << 20,
@ -122,6 +124,7 @@ func (w *Writer) init() {
defer close(w.allWritten) defer close(w.allWritten)
for c := range w.chunkc { for c := range w.chunkc {
if err := w.writeCompressedChunk(c); err != nil { if err := w.writeCompressedChunk(c); err != nil {
close(w.wasWriteErr)
return return
} }
} }
@ -136,7 +139,12 @@ func (w *Writer) startChunk(p []byte) {
donec: make(chan struct{}), donec: make(chan struct{}),
} }
go c.compress() // receives from w.sem go c.compress() // receives from w.sem
w.chunkc <- c select {
case w.chunkc <- c:
case <-w.wasWriteErr:
// Discard chunks that come after any chunk that failed
// to write.
}
} }
func (w *Writer) writeCompressedChunk(c *writeChunk) (err error) { func (w *Writer) writeCompressedChunk(c *writeChunk) (err error) {