From d80c4244d322fe0a9cbdd996d23e37fb5b089544 Mon Sep 17 00:00:00 2001 From: Josh Hawn Date: Wed, 5 Aug 2015 17:47:37 -0700 Subject: [PATCH] [graph] Use a pipe for downloads to write progress The process of pulling an image spawns a new goroutine for each layer in the image manifest. If any of these downloads fail we would stop everything and return the error, even though other goroutines would still be running and writing output through a progress reader which is attached to an http response writer. Since the request handler had already returned from the first error, the http server panics when one of these download goroutines makes a write to the response writer buffer. This patch prevents this crash in the daemon http server by waiting for all of the download goroutines to complete, even if one of them fails. Only then does it return, terminating the request handler. Docker-DCO-1.1-Signed-off-by: Josh Hawn (github: jlhawn) --- graph/pull_v2.go | 30 ++++++++++++++++++-- integration-cli/docker_cli_by_digest_test.go | 2 +- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/graph/pull_v2.go b/graph/pull_v2.go index 8666ebc21a..bc8440d215 100644 --- a/graph/pull_v2.go +++ b/graph/pull_v2.go @@ -1,6 +1,7 @@ package graph import ( + "errors" "fmt" "io" "io/ioutil" @@ -108,6 +109,7 @@ type downloadInfo struct { layer distribution.ReadSeekCloser size int64 err chan error + out io.Writer // Download progress is written here. } type errVerification struct{} @@ -117,7 +119,7 @@ func (errVerification) Error() string { return "verification failed" } func (p *v2Puller) download(di *downloadInfo) { logrus.Debugf("pulling blob %q to %s", di.digest, di.img.ID) - out := p.config.OutStream + out := di.out if c, err := p.poolAdd("pull", "img:"+di.img.ID); err != nil { if c != nil { @@ -191,7 +193,7 @@ func (p *v2Puller) download(di *downloadInfo) { di.err <- nil } -func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) { +func (p *v2Puller) pullV2Tag(tag, taggedName string) (verified bool, err error) { logrus.Debugf("Pulling tag from V2 registry: %q", tag) out := p.config.OutStream @@ -204,7 +206,7 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) { if err != nil { return false, err } - verified, err := p.validateManifest(manifest, tag) + verified, err = p.validateManifest(manifest, tag) if err != nil { return false, err } @@ -212,6 +214,27 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) { logrus.Printf("Image manifest for %s has been verified", taggedName) } + // By using a pipeWriter for each of the downloads to write their progress + // to, we can avoid an issue where this function returns an error but + // leaves behind running download goroutines. By splitting the writer + // with a pipe, we can close the pipe if there is any error, consequently + // causing each download to cancel due to an error writing to this pipe. + pipeReader, pipeWriter := io.Pipe() + go func() { + if _, err := io.Copy(out, pipeReader); err != nil { + logrus.Errorf("error copying from layer download progress reader: %s", err) + } + }() + defer func() { + if err != nil { + // All operations on the pipe are synchronous. This call will wait + // until all current readers/writers are done using the pipe then + // set the error. All successive reads/writes will return with this + // error. + pipeWriter.CloseWithError(errors.New("download canceled")) + } + }() + out.Write(p.sf.FormatStatus(tag, "Pulling from %s", p.repo.Name())) downloads := make([]downloadInfo, len(manifest.FSLayers)) @@ -242,6 +265,7 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) { out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pulling fs layer", nil)) downloads[i].err = make(chan error) + downloads[i].out = pipeWriter go p.download(&downloads[i]) } diff --git a/integration-cli/docker_cli_by_digest_test.go b/integration-cli/docker_cli_by_digest_test.go index ef04869667..9f2d3173ea 100644 --- a/integration-cli/docker_cli_by_digest_test.go +++ b/integration-cli/docker_cli_by_digest_test.go @@ -446,7 +446,7 @@ func (s *DockerRegistrySuite) TestPullFailsWithAlteredManifest(c *check.C) { imageReference := fmt.Sprintf("%s@%s", repoName, manifestDigest) out, exitStatus, _ := dockerCmdWithError("pull", imageReference) if exitStatus == 0 { - c.Fatalf("expected a zero exit status but got %d: %s", exitStatus, out) + c.Fatalf("expected a non-zero exit status but got %d: %s", exitStatus, out) } expectedErrorMsg := fmt.Sprintf("image verification failed for digest %s", manifestDigest)