diff --git a/graph/pull_v2.go b/graph/pull_v2.go index 8666ebc21a..bc8440d215 100644 --- a/graph/pull_v2.go +++ b/graph/pull_v2.go @@ -1,6 +1,7 @@ package graph import ( + "errors" "fmt" "io" "io/ioutil" @@ -108,6 +109,7 @@ type downloadInfo struct { layer distribution.ReadSeekCloser size int64 err chan error + out io.Writer // Download progress is written here. } type errVerification struct{} @@ -117,7 +119,7 @@ func (errVerification) Error() string { return "verification failed" } func (p *v2Puller) download(di *downloadInfo) { logrus.Debugf("pulling blob %q to %s", di.digest, di.img.ID) - out := p.config.OutStream + out := di.out if c, err := p.poolAdd("pull", "img:"+di.img.ID); err != nil { if c != nil { @@ -191,7 +193,7 @@ func (p *v2Puller) download(di *downloadInfo) { di.err <- nil } -func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) { +func (p *v2Puller) pullV2Tag(tag, taggedName string) (verified bool, err error) { logrus.Debugf("Pulling tag from V2 registry: %q", tag) out := p.config.OutStream @@ -204,7 +206,7 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) { if err != nil { return false, err } - verified, err := p.validateManifest(manifest, tag) + verified, err = p.validateManifest(manifest, tag) if err != nil { return false, err } @@ -212,6 +214,27 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) { logrus.Printf("Image manifest for %s has been verified", taggedName) } + // By using a pipeWriter for each of the downloads to write their progress + // to, we can avoid an issue where this function returns an error but + // leaves behind running download goroutines. By splitting the writer + // with a pipe, we can close the pipe if there is any error, consequently + // causing each download to cancel due to an error writing to this pipe. + pipeReader, pipeWriter := io.Pipe() + go func() { + if _, err := io.Copy(out, pipeReader); err != nil { + logrus.Errorf("error copying from layer download progress reader: %s", err) + } + }() + defer func() { + if err != nil { + // All operations on the pipe are synchronous. This call will wait + // until all current readers/writers are done using the pipe then + // set the error. All successive reads/writes will return with this + // error. + pipeWriter.CloseWithError(errors.New("download canceled")) + } + }() + out.Write(p.sf.FormatStatus(tag, "Pulling from %s", p.repo.Name())) downloads := make([]downloadInfo, len(manifest.FSLayers)) @@ -242,6 +265,7 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) { out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pulling fs layer", nil)) downloads[i].err = make(chan error) + downloads[i].out = pipeWriter go p.download(&downloads[i]) } diff --git a/integration-cli/docker_cli_by_digest_test.go b/integration-cli/docker_cli_by_digest_test.go index ef04869667..9f2d3173ea 100644 --- a/integration-cli/docker_cli_by_digest_test.go +++ b/integration-cli/docker_cli_by_digest_test.go @@ -446,7 +446,7 @@ func (s *DockerRegistrySuite) TestPullFailsWithAlteredManifest(c *check.C) { imageReference := fmt.Sprintf("%s@%s", repoName, manifestDigest) out, exitStatus, _ := dockerCmdWithError("pull", imageReference) if exitStatus == 0 { - c.Fatalf("expected a zero exit status but got %d: %s", exitStatus, out) + c.Fatalf("expected a non-zero exit status but got %d: %s", exitStatus, out) } expectedErrorMsg := fmt.Sprintf("image verification failed for digest %s", manifestDigest)