From 953211a3e326599f1b85cc61bd106a6b6f400ac4 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Fri, 18 Sep 2015 00:14:42 +0000 Subject: [PATCH] pargzip: fix blocked goroutine on write error Change-Id: Ifdcc24ef0e04b855882dc21b31182d6492bc54c6 Reviewed-on: https://go-review.googlesource.com/14730 Reviewed-by: Andrew Gerrand --- pargzip/pargzip.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/pargzip/pargzip.go b/pargzip/pargzip.go index ff533690..a71064d4 100644 --- a/pargzip/pargzip.go +++ b/pargzip/pargzip.go @@ -41,7 +41,8 @@ type Writer struct { w io.Writer bw *bufio.Writer - allWritten chan struct{} // when writing goroutine ends + allWritten chan struct{} // when writing goroutine ends + wasWriteErr chan struct{} // closed after 'err' set sem chan bool // semaphore bounding compressions in flight chunkc chan *writeChunk // closed on Close @@ -103,8 +104,9 @@ func (c *writeChunk) compress() (err error) { // Write. func NewWriter(w io.Writer) *Writer { return &Writer{ - w: w, - allWritten: make(chan struct{}), + w: w, + allWritten: make(chan struct{}), + wasWriteErr: make(chan struct{}), UseSystemGzip: true, ChunkSize: 1 << 20, @@ -122,6 +124,7 @@ func (w *Writer) init() { defer close(w.allWritten) for c := range w.chunkc { if err := w.writeCompressedChunk(c); err != nil { + close(w.wasWriteErr) return } } @@ -136,7 +139,12 @@ func (w *Writer) startChunk(p []byte) { donec: make(chan struct{}), } go c.compress() // receives from w.sem - w.chunkc <- c + select { + case w.chunkc <- c: + case <-w.wasWriteErr: + // Discard chunks that come after any chunk that failed + // to write. + } } func (w *Writer) writeCompressedChunk(c *writeChunk) (err error) {