diff --git a/graph/pools_test.go b/graph/pools_test.go new file mode 100644 index 0000000000..785a4bd122 --- /dev/null +++ b/graph/pools_test.go @@ -0,0 +1,41 @@ +package graph + +import "testing" + +func TestPools(t *testing.T) { + s := &TagStore{ + pullingPool: make(map[string]chan struct{}), + pushingPool: make(map[string]chan struct{}), + } + + if _, err := s.poolAdd("pull", "test1"); err != nil { + t.Fatal(err) + } + if _, err := s.poolAdd("pull", "test2"); err != nil { + t.Fatal(err) + } + if _, err := s.poolAdd("push", "test1"); err == nil || err.Error() != "pull test1 is already in progress" { + t.Fatalf("Expected `pull test1 is already in progress`") + } + if _, err := s.poolAdd("pull", "test1"); err == nil || err.Error() != "pull test1 is already in progress" { + t.Fatalf("Expected `pull test1 is already in progress`") + } + if _, err := s.poolAdd("wait", "test3"); err == nil || err.Error() != "Unknown pool type" { + t.Fatalf("Expected `Unknown pool type`") + } + if err := s.poolRemove("pull", "test2"); err != nil { + t.Fatal(err) + } + if err := s.poolRemove("pull", "test2"); err != nil { + t.Fatal(err) + } + if err := s.poolRemove("pull", "test1"); err != nil { + t.Fatal(err) + } + if err := s.poolRemove("push", "test1"); err != nil { + t.Fatal(err) + } + if err := s.poolRemove("wait", "test3"); err == nil || err.Error() != "Unknown pool type" { + t.Fatalf("Expected `Unknown pool type`") + } +} diff --git a/graph/pull.go b/graph/pull.go new file mode 100644 index 0000000000..b2c6f31504 --- /dev/null +++ b/graph/pull.go @@ -0,0 +1,300 @@ +package graph + +import ( + "fmt" + "io" + "net" + "net/url" + "strings" + "time" + + "github.com/docker/docker/engine" + "github.com/docker/docker/image" + "github.com/docker/docker/registry" + "github.com/docker/docker/utils" +) + +func (s *TagStore) CmdPull(job *engine.Job) engine.Status { + if n := len(job.Args); n != 1 && n != 2 { + return job.Errorf("Usage: %s IMAGE [TAG]", job.Name) + } + var ( + localName = job.Args[0] + tag string + sf = utils.NewStreamFormatter(job.GetenvBool("json")) + authConfig = ®istry.AuthConfig{} + metaHeaders map[string][]string + ) + if len(job.Args) > 1 { + tag = job.Args[1] + } + + job.GetenvJson("authConfig", authConfig) + job.GetenvJson("metaHeaders", &metaHeaders) + + c, err := s.poolAdd("pull", localName+":"+tag) + if err != nil { + if c != nil { + // Another pull of the same repository is already taking place; just wait for it to finish + job.Stdout.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName)) + <-c + return engine.StatusOK + } + return job.Error(err) + } + defer s.poolRemove("pull", localName+":"+tag) + + // Resolve the Repository name from fqn to endpoint + name + hostname, remoteName, err := registry.ResolveRepositoryName(localName) + if err != nil { + return job.Error(err) + } + + endpoint, err := registry.ExpandAndVerifyRegistryUrl(hostname) + if err != nil { + return job.Error(err) + } + + r, err := registry.NewRegistry(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, true) + if err != nil { + return job.Error(err) + } + + if endpoint == registry.IndexServerAddress() { + // If pull "index.docker.io/foo/bar", it's stored locally under "foo/bar" + localName = remoteName + } + + if err = s.pullRepository(r, job.Stdout, localName, remoteName, tag, sf, job.GetenvBool("parallel")); err != nil { + return job.Error(err) + } + + return engine.StatusOK +} + +func (s *TagStore) pullRepository(r *registry.Registry, out io.Writer, localName, remoteName, askedTag string, sf *utils.StreamFormatter, parallel bool) error { + out.Write(sf.FormatStatus("", "Pulling repository %s", localName)) + + repoData, err := r.GetRepositoryData(remoteName) + if err != nil { + if strings.Contains(err.Error(), "HTTP code: 404") { + return fmt.Errorf("Error: image %s not found", remoteName) + } else { + // Unexpected HTTP error + return err + } + } + + utils.Debugf("Retrieving the tag list") + tagsList, err := r.GetRemoteTags(repoData.Endpoints, remoteName, repoData.Tokens) + if err != nil { + utils.Errorf("%v", err) + return err + } + + for tag, id := range tagsList { + repoData.ImgList[id] = ®istry.ImgData{ + ID: id, + Tag: tag, + Checksum: "", + } + } + + utils.Debugf("Registering tags") + // If no tag has been specified, pull them all + if askedTag == "" { + for tag, id := range tagsList { + repoData.ImgList[id].Tag = tag + } + } else { + // Otherwise, check that the tag exists and use only that one + id, exists := tagsList[askedTag] + if !exists { + return fmt.Errorf("Tag %s not found in repository %s", askedTag, localName) + } + repoData.ImgList[id].Tag = askedTag + } + + errors := make(chan error) + for _, image := range repoData.ImgList { + downloadImage := func(img *registry.ImgData) { + if askedTag != "" && img.Tag != askedTag { + utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID) + if parallel { + errors <- nil + } + return + } + + if img.Tag == "" { + utils.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID) + if parallel { + errors <- nil + } + return + } + + // ensure no two downloads of the same image happen at the same time + if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil { + if c != nil { + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil)) + <-c + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil)) + } else { + utils.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err) + } + if parallel { + errors <- nil + } + return + } + defer s.poolRemove("pull", "img:"+img.ID) + + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, localName), nil)) + success := false + var lastErr error + for _, ep := range repoData.Endpoints { + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, localName, ep), nil)) + if err := s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil { + // It's not ideal that only the last error is returned, it would be better to concatenate the errors. + // As the error is also given to the output stream the user will see the error. + lastErr = err + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err), nil)) + continue + } + success = true + break + } + if !success { + err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, localName, lastErr) + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), err.Error(), nil)) + if parallel { + errors <- err + return + } + } + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil)) + + if parallel { + errors <- nil + } + } + + if parallel { + go downloadImage(image) + } else { + downloadImage(image) + } + } + if parallel { + var lastError error + for i := 0; i < len(repoData.ImgList); i++ { + if err := <-errors; err != nil { + lastError = err + } + } + if lastError != nil { + return lastError + } + + } + for tag, id := range tagsList { + if askedTag != "" && tag != askedTag { + continue + } + if err := s.Set(localName, tag, id, true); err != nil { + return err + } + } + + return nil +} + +func (s *TagStore) pullImage(r *registry.Registry, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) error { + history, err := r.GetRemoteHistory(imgID, endpoint, token) + if err != nil { + return err + } + out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil)) + // FIXME: Try to stream the images? + // FIXME: Launch the getRemoteImage() in goroutines + + for i := len(history) - 1; i >= 0; i-- { + id := history[i] + + // ensure no two downloads of the same layer happen at the same time + if c, err := s.poolAdd("pull", "layer:"+id); err != nil { + utils.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err) + <-c + } + defer s.poolRemove("pull", "layer:"+id) + + if !s.graph.Exists(id) { + out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil)) + var ( + imgJSON []byte + imgSize int + err error + img *image.Image + ) + retries := 5 + for j := 1; j <= retries; j++ { + imgJSON, imgSize, err = r.GetRemoteImageJSON(id, endpoint, token) + if err != nil && j == retries { + out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil)) + return err + } else if err != nil { + time.Sleep(time.Duration(j) * 500 * time.Millisecond) + continue + } + img, err = image.NewImgJSON(imgJSON) + if err != nil && j == retries { + out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil)) + return fmt.Errorf("Failed to parse json: %s", err) + } else if err != nil { + time.Sleep(time.Duration(j) * 500 * time.Millisecond) + continue + } else { + break + } + } + + for j := 1; j <= retries; j++ { + // Get the layer + status := "Pulling fs layer" + if j > 1 { + status = fmt.Sprintf("Pulling fs layer [retries: %d]", j) + } + out.Write(sf.FormatProgress(utils.TruncateID(id), status, nil)) + layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token, int64(imgSize)) + if uerr, ok := err.(*url.Error); ok { + err = uerr.Err + } + if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries { + time.Sleep(time.Duration(j) * 500 * time.Millisecond) + continue + } else if err != nil { + out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil)) + return err + } + defer layer.Close() + + err = s.graph.Register(imgJSON, + utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"), + img) + if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries { + time.Sleep(time.Duration(j) * 500 * time.Millisecond) + continue + } else if err != nil { + out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil)) + return err + } else { + break + } + } + } + out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil)) + + } + return nil +} diff --git a/graph/push.go b/graph/push.go new file mode 100644 index 0000000000..8cfa6c2b40 --- /dev/null +++ b/graph/push.go @@ -0,0 +1,249 @@ +package graph + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path" + + "github.com/docker/docker/archive" + "github.com/docker/docker/engine" + "github.com/docker/docker/registry" + "github.com/docker/docker/utils" +) + +// Retrieve the all the images to be uploaded in the correct order +func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string) ([]string, map[string][]string, error) { + var ( + imageList []string + imagesSeen map[string]bool = make(map[string]bool) + tagsByImage map[string][]string = make(map[string][]string) + ) + + for tag, id := range localRepo { + if requestedTag != "" && requestedTag != tag { + continue + } + var imageListForThisTag []string + + tagsByImage[id] = append(tagsByImage[id], tag) + + for img, err := s.graph.Get(id); img != nil; img, err = img.GetParent() { + if err != nil { + return nil, nil, err + } + + if imagesSeen[img.ID] { + // This image is already on the list, we can ignore it and all its parents + break + } + + imagesSeen[img.ID] = true + imageListForThisTag = append(imageListForThisTag, img.ID) + } + + // reverse the image list for this tag (so the "most"-parent image is first) + for i, j := 0, len(imageListForThisTag)-1; i < j; i, j = i+1, j-1 { + imageListForThisTag[i], imageListForThisTag[j] = imageListForThisTag[j], imageListForThisTag[i] + } + + // append to main image list + imageList = append(imageList, imageListForThisTag...) + } + if len(imageList) == 0 { + return nil, nil, fmt.Errorf("No images found for the requested repository / tag") + } + utils.Debugf("Image list: %v", imageList) + utils.Debugf("Tags by image: %v", tagsByImage) + + return imageList, tagsByImage, nil +} + +func (s *TagStore) pushRepository(r *registry.Registry, out io.Writer, localName, remoteName string, localRepo map[string]string, tag string, sf *utils.StreamFormatter) error { + out = utils.NewWriteFlusher(out) + utils.Debugf("Local repo: %s", localRepo) + imgList, tagsByImage, err := s.getImageList(localRepo, tag) + if err != nil { + return err + } + + out.Write(sf.FormatStatus("", "Sending image list")) + + var ( + repoData *registry.RepositoryData + imageIndex []*registry.ImgData + ) + + for _, imgId := range imgList { + if tags, exists := tagsByImage[imgId]; exists { + // If an image has tags you must add an entry in the image index + // for each tag + for _, tag := range tags { + imageIndex = append(imageIndex, ®istry.ImgData{ + ID: imgId, + Tag: tag, + }) + } + } else { + // If the image does not have a tag it still needs to be sent to the + // registry with an empty tag so that it is accociated with the repository + imageIndex = append(imageIndex, ®istry.ImgData{ + ID: imgId, + Tag: "", + }) + + } + } + + utils.Debugf("Preparing to push %s with the following images and tags\n", localRepo) + for _, data := range imageIndex { + utils.Debugf("Pushing ID: %s with Tag: %s\n", data.ID, data.Tag) + } + + // Register all the images in a repository with the registry + // If an image is not in this list it will not be associated with the repository + repoData, err = r.PushImageJSONIndex(remoteName, imageIndex, false, nil) + if err != nil { + return err + } + + nTag := 1 + if tag == "" { + nTag = len(localRepo) + } + for _, ep := range repoData.Endpoints { + out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", localName, nTag)) + + for _, imgId := range imgList { + if r.LookupRemoteImage(imgId, ep, repoData.Tokens) { + out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(imgId))) + } else { + if _, err := s.pushImage(r, out, remoteName, imgId, ep, repoData.Tokens, sf); err != nil { + // FIXME: Continue on error? + return err + } + } + + for _, tag := range tagsByImage[imgId] { + out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(imgId), ep+"repositories/"+remoteName+"/tags/"+tag)) + + if err := r.PushRegistryTag(remoteName, imgId, tag, ep, repoData.Tokens); err != nil { + return err + } + } + } + } + + if _, err := r.PushImageJSONIndex(remoteName, imageIndex, true, repoData.Endpoints); err != nil { + return err + } + + return nil +} + +func (s *TagStore) pushImage(r *registry.Registry, out io.Writer, remote, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) { + out = utils.NewWriteFlusher(out) + jsonRaw, err := ioutil.ReadFile(path.Join(s.graph.Root, imgID, "json")) + if err != nil { + return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err) + } + out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pushing", nil)) + + imgData := ®istry.ImgData{ + ID: imgID, + } + + // Send the json + if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep, token); err != nil { + if err == registry.ErrAlreadyExists { + out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image already pushed, skipping", nil)) + return "", nil + } + return "", err + } + + layerData, err := s.graph.TempLayerArchive(imgID, archive.Uncompressed, sf, out) + if err != nil { + return "", fmt.Errorf("Failed to generate layer archive: %s", err) + } + defer os.RemoveAll(layerData.Name()) + + // Send the layer + utils.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size) + + checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, utils.TruncateID(imgData.ID), "Pushing"), ep, token, jsonRaw) + if err != nil { + return "", err + } + imgData.Checksum = checksum + imgData.ChecksumPayload = checksumPayload + // Send the checksum + if err := r.PushImageChecksumRegistry(imgData, ep, token); err != nil { + return "", err + } + + out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image successfully pushed", nil)) + return imgData.Checksum, nil +} + +// FIXME: Allow to interrupt current push when new push of same image is done. +func (s *TagStore) CmdPush(job *engine.Job) engine.Status { + if n := len(job.Args); n != 1 { + return job.Errorf("Usage: %s IMAGE", job.Name) + } + var ( + localName = job.Args[0] + sf = utils.NewStreamFormatter(job.GetenvBool("json")) + authConfig = ®istry.AuthConfig{} + metaHeaders map[string][]string + ) + + tag := job.Getenv("tag") + job.GetenvJson("authConfig", authConfig) + job.GetenvJson("metaHeaders", &metaHeaders) + if _, err := s.poolAdd("push", localName); err != nil { + return job.Error(err) + } + defer s.poolRemove("push", localName) + + // Resolve the Repository name from fqn to endpoint + name + hostname, remoteName, err := registry.ResolveRepositoryName(localName) + if err != nil { + return job.Error(err) + } + + endpoint, err := registry.ExpandAndVerifyRegistryUrl(hostname) + if err != nil { + return job.Error(err) + } + + img, err := s.graph.Get(localName) + r, err2 := registry.NewRegistry(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, false) + if err2 != nil { + return job.Error(err2) + } + + if err != nil { + reposLen := 1 + if tag == "" { + reposLen = len(s.Repositories[localName]) + } + job.Stdout.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", localName, reposLen)) + // If it fails, try to get the repository + if localRepo, exists := s.Repositories[localName]; exists { + if err := s.pushRepository(r, job.Stdout, localName, remoteName, localRepo, tag, sf); err != nil { + return job.Error(err) + } + return engine.StatusOK + } + return job.Error(err) + } + + var token []string + job.Stdout.Write(sf.FormatStatus("", "The push refers to an image: [%s]", localName)) + if _, err := s.pushImage(r, job.Stdout, remoteName, img.ID, endpoint, token, sf); err != nil { + return job.Error(err) + } + return engine.StatusOK +} diff --git a/graph/service.go b/graph/service.go index 74d6226d0a..ef48c0b064 100644 --- a/graph/service.go +++ b/graph/service.go @@ -23,6 +23,8 @@ func (s *TagStore) Install(eng *engine.Engine) error { "viz": s.CmdViz, "load": s.CmdLoad, "import": s.CmdImport, + "pull": s.CmdPull, + "push": s.CmdPush, } { if err := eng.Register(name, handler); err != nil { return fmt.Errorf("Could not register %q: %v", name, err) diff --git a/graph/tags.go b/graph/tags.go index f2a0646acb..30176ae071 100644 --- a/graph/tags.go +++ b/graph/tags.go @@ -22,6 +22,10 @@ type TagStore struct { graph *Graph Repositories map[string]Repository sync.Mutex + // FIXME: move push/pull-related fields + // to a helper type + pullingPool map[string]chan struct{} + pushingPool map[string]chan struct{} } type Repository map[string]string @@ -35,6 +39,8 @@ func NewTagStore(path string, graph *Graph) (*TagStore, error) { path: abspath, graph: graph, Repositories: make(map[string]Repository), + pullingPool: make(map[string]chan struct{}), + pushingPool: make(map[string]chan struct{}), } // Load the json file if it exists, otherwise create it. if err := store.reload(); os.IsNotExist(err) { @@ -263,3 +269,46 @@ func validateTagName(name string) error { } return nil } + +func (s *TagStore) poolAdd(kind, key string) (chan struct{}, error) { + s.Lock() + defer s.Unlock() + + if c, exists := s.pullingPool[key]; exists { + return c, fmt.Errorf("pull %s is already in progress", key) + } + if c, exists := s.pushingPool[key]; exists { + return c, fmt.Errorf("push %s is already in progress", key) + } + + c := make(chan struct{}) + switch kind { + case "pull": + s.pullingPool[key] = c + case "push": + s.pushingPool[key] = c + default: + return nil, fmt.Errorf("Unknown pool type") + } + return c, nil +} + +func (s *TagStore) poolRemove(kind, key string) error { + s.Lock() + defer s.Unlock() + switch kind { + case "pull": + if c, exists := s.pullingPool[key]; exists { + close(c) + delete(s.pullingPool, key) + } + case "push": + if c, exists := s.pushingPool[key]; exists { + close(c) + delete(s.pushingPool, key) + } + default: + return fmt.Errorf("Unknown pool type") + } + return nil +} diff --git a/server/image.go b/server/image.go index 5cde8dd63e..d508cb62f6 100644 --- a/server/image.go +++ b/server/image.go @@ -5,21 +5,15 @@ package server import ( - "fmt" "io" "io/ioutil" - "net" - "net/url" "os" "os/exec" - "path" "strings" - "time" "github.com/docker/docker/archive" "github.com/docker/docker/builder" "github.com/docker/docker/engine" - "github.com/docker/docker/image" "github.com/docker/docker/pkg/parsers" "github.com/docker/docker/registry" "github.com/docker/docker/utils" @@ -104,566 +98,3 @@ func (srv *Server) Build(job *engine.Job) engine.Status { } return engine.StatusOK } - -func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) error { - history, err := r.GetRemoteHistory(imgID, endpoint, token) - if err != nil { - return err - } - out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil)) - // FIXME: Try to stream the images? - // FIXME: Launch the getRemoteImage() in goroutines - - for i := len(history) - 1; i >= 0; i-- { - id := history[i] - - // ensure no two downloads of the same layer happen at the same time - if c, err := srv.poolAdd("pull", "layer:"+id); err != nil { - utils.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err) - <-c - } - defer srv.poolRemove("pull", "layer:"+id) - - if !srv.daemon.Graph().Exists(id) { - out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil)) - var ( - imgJSON []byte - imgSize int - err error - img *image.Image - ) - retries := 5 - for j := 1; j <= retries; j++ { - imgJSON, imgSize, err = r.GetRemoteImageJSON(id, endpoint, token) - if err != nil && j == retries { - out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil)) - return err - } else if err != nil { - time.Sleep(time.Duration(j) * 500 * time.Millisecond) - continue - } - img, err = image.NewImgJSON(imgJSON) - if err != nil && j == retries { - out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil)) - return fmt.Errorf("Failed to parse json: %s", err) - } else if err != nil { - time.Sleep(time.Duration(j) * 500 * time.Millisecond) - continue - } else { - break - } - } - - for j := 1; j <= retries; j++ { - // Get the layer - status := "Pulling fs layer" - if j > 1 { - status = fmt.Sprintf("Pulling fs layer [retries: %d]", j) - } - out.Write(sf.FormatProgress(utils.TruncateID(id), status, nil)) - layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token, int64(imgSize)) - if uerr, ok := err.(*url.Error); ok { - err = uerr.Err - } - if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries { - time.Sleep(time.Duration(j) * 500 * time.Millisecond) - continue - } else if err != nil { - out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil)) - return err - } - defer layer.Close() - - err = srv.daemon.Graph().Register(imgJSON, - utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"), - img) - if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries { - time.Sleep(time.Duration(j) * 500 * time.Millisecond) - continue - } else if err != nil { - out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil)) - return err - } else { - break - } - } - } - out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil)) - - } - return nil -} - -func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName, remoteName, askedTag string, sf *utils.StreamFormatter, parallel bool) error { - out.Write(sf.FormatStatus("", "Pulling repository %s", localName)) - - repoData, err := r.GetRepositoryData(remoteName) - if err != nil { - if strings.Contains(err.Error(), "HTTP code: 404") { - return fmt.Errorf("Error: image %s not found", remoteName) - } else { - // Unexpected HTTP error - return err - } - } - - utils.Debugf("Retrieving the tag list") - tagsList, err := r.GetRemoteTags(repoData.Endpoints, remoteName, repoData.Tokens) - if err != nil { - utils.Errorf("%v", err) - return err - } - - for tag, id := range tagsList { - repoData.ImgList[id] = ®istry.ImgData{ - ID: id, - Tag: tag, - Checksum: "", - } - } - - utils.Debugf("Registering tags") - // If no tag has been specified, pull them all - if askedTag == "" { - for tag, id := range tagsList { - repoData.ImgList[id].Tag = tag - } - } else { - // Otherwise, check that the tag exists and use only that one - id, exists := tagsList[askedTag] - if !exists { - return fmt.Errorf("Tag %s not found in repository %s", askedTag, localName) - } - repoData.ImgList[id].Tag = askedTag - } - - errors := make(chan error) - for _, image := range repoData.ImgList { - downloadImage := func(img *registry.ImgData) { - if askedTag != "" && img.Tag != askedTag { - utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID) - if parallel { - errors <- nil - } - return - } - - if img.Tag == "" { - utils.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID) - if parallel { - errors <- nil - } - return - } - - // ensure no two downloads of the same image happen at the same time - if c, err := srv.poolAdd("pull", "img:"+img.ID); err != nil { - if c != nil { - out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil)) - <-c - out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil)) - } else { - utils.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err) - } - if parallel { - errors <- nil - } - return - } - defer srv.poolRemove("pull", "img:"+img.ID) - - out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, localName), nil)) - success := false - var lastErr error - for _, ep := range repoData.Endpoints { - out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, localName, ep), nil)) - if err := srv.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil { - // It's not ideal that only the last error is returned, it would be better to concatenate the errors. - // As the error is also given to the output stream the user will see the error. - lastErr = err - out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err), nil)) - continue - } - success = true - break - } - if !success { - err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, localName, lastErr) - out.Write(sf.FormatProgress(utils.TruncateID(img.ID), err.Error(), nil)) - if parallel { - errors <- err - return - } - } - out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil)) - - if parallel { - errors <- nil - } - } - - if parallel { - go downloadImage(image) - } else { - downloadImage(image) - } - } - if parallel { - var lastError error - for i := 0; i < len(repoData.ImgList); i++ { - if err := <-errors; err != nil { - lastError = err - } - } - if lastError != nil { - return lastError - } - - } - for tag, id := range tagsList { - if askedTag != "" && tag != askedTag { - continue - } - if err := srv.daemon.Repositories().Set(localName, tag, id, true); err != nil { - return err - } - } - - return nil -} - -func (srv *Server) ImagePull(job *engine.Job) engine.Status { - if n := len(job.Args); n != 1 && n != 2 { - return job.Errorf("Usage: %s IMAGE [TAG]", job.Name) - } - var ( - localName = job.Args[0] - tag string - sf = utils.NewStreamFormatter(job.GetenvBool("json")) - authConfig = ®istry.AuthConfig{} - metaHeaders map[string][]string - ) - if len(job.Args) > 1 { - tag = job.Args[1] - } - - job.GetenvJson("authConfig", authConfig) - job.GetenvJson("metaHeaders", &metaHeaders) - - c, err := srv.poolAdd("pull", localName+":"+tag) - if err != nil { - if c != nil { - // Another pull of the same repository is already taking place; just wait for it to finish - job.Stdout.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName)) - <-c - return engine.StatusOK - } - return job.Error(err) - } - defer srv.poolRemove("pull", localName+":"+tag) - - // Resolve the Repository name from fqn to endpoint + name - hostname, remoteName, err := registry.ResolveRepositoryName(localName) - if err != nil { - return job.Error(err) - } - - endpoint, err := registry.ExpandAndVerifyRegistryUrl(hostname) - if err != nil { - return job.Error(err) - } - - r, err := registry.NewRegistry(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, true) - if err != nil { - return job.Error(err) - } - - if endpoint == registry.IndexServerAddress() { - // If pull "index.docker.io/foo/bar", it's stored locally under "foo/bar" - localName = remoteName - } - - if err = srv.pullRepository(r, job.Stdout, localName, remoteName, tag, sf, job.GetenvBool("parallel")); err != nil { - return job.Error(err) - } - - return engine.StatusOK -} - -// Retrieve the all the images to be uploaded in the correct order -func (srv *Server) getImageList(localRepo map[string]string, requestedTag string) ([]string, map[string][]string, error) { - var ( - imageList []string - imagesSeen map[string]bool = make(map[string]bool) - tagsByImage map[string][]string = make(map[string][]string) - ) - - for tag, id := range localRepo { - if requestedTag != "" && requestedTag != tag { - continue - } - var imageListForThisTag []string - - tagsByImage[id] = append(tagsByImage[id], tag) - - for img, err := srv.daemon.Graph().Get(id); img != nil; img, err = img.GetParent() { - if err != nil { - return nil, nil, err - } - - if imagesSeen[img.ID] { - // This image is already on the list, we can ignore it and all its parents - break - } - - imagesSeen[img.ID] = true - imageListForThisTag = append(imageListForThisTag, img.ID) - } - - // reverse the image list for this tag (so the "most"-parent image is first) - for i, j := 0, len(imageListForThisTag)-1; i < j; i, j = i+1, j-1 { - imageListForThisTag[i], imageListForThisTag[j] = imageListForThisTag[j], imageListForThisTag[i] - } - - // append to main image list - imageList = append(imageList, imageListForThisTag...) - } - if len(imageList) == 0 { - return nil, nil, fmt.Errorf("No images found for the requested repository / tag") - } - utils.Debugf("Image list: %v", imageList) - utils.Debugf("Tags by image: %v", tagsByImage) - - return imageList, tagsByImage, nil -} - -func (srv *Server) pushRepository(r *registry.Registry, out io.Writer, localName, remoteName string, localRepo map[string]string, tag string, sf *utils.StreamFormatter) error { - out = utils.NewWriteFlusher(out) - utils.Debugf("Local repo: %s", localRepo) - imgList, tagsByImage, err := srv.getImageList(localRepo, tag) - if err != nil { - return err - } - - out.Write(sf.FormatStatus("", "Sending image list")) - - var ( - repoData *registry.RepositoryData - imageIndex []*registry.ImgData - ) - - for _, imgId := range imgList { - if tags, exists := tagsByImage[imgId]; exists { - // If an image has tags you must add an entry in the image index - // for each tag - for _, tag := range tags { - imageIndex = append(imageIndex, ®istry.ImgData{ - ID: imgId, - Tag: tag, - }) - } - } else { - // If the image does not have a tag it still needs to be sent to the - // registry with an empty tag so that it is accociated with the repository - imageIndex = append(imageIndex, ®istry.ImgData{ - ID: imgId, - Tag: "", - }) - - } - } - - utils.Debugf("Preparing to push %s with the following images and tags\n", localRepo) - for _, data := range imageIndex { - utils.Debugf("Pushing ID: %s with Tag: %s\n", data.ID, data.Tag) - } - - // Register all the images in a repository with the registry - // If an image is not in this list it will not be associated with the repository - repoData, err = r.PushImageJSONIndex(remoteName, imageIndex, false, nil) - if err != nil { - return err - } - - nTag := 1 - if tag == "" { - nTag = len(localRepo) - } - for _, ep := range repoData.Endpoints { - out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", localName, nTag)) - - for _, imgId := range imgList { - if r.LookupRemoteImage(imgId, ep, repoData.Tokens) { - out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(imgId))) - } else { - if _, err := srv.pushImage(r, out, remoteName, imgId, ep, repoData.Tokens, sf); err != nil { - // FIXME: Continue on error? - return err - } - } - - for _, tag := range tagsByImage[imgId] { - out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(imgId), ep+"repositories/"+remoteName+"/tags/"+tag)) - - if err := r.PushRegistryTag(remoteName, imgId, tag, ep, repoData.Tokens); err != nil { - return err - } - } - } - } - - if _, err := r.PushImageJSONIndex(remoteName, imageIndex, true, repoData.Endpoints); err != nil { - return err - } - - return nil -} - -func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) { - out = utils.NewWriteFlusher(out) - jsonRaw, err := ioutil.ReadFile(path.Join(srv.daemon.Graph().Root, imgID, "json")) - if err != nil { - return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err) - } - out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pushing", nil)) - - imgData := ®istry.ImgData{ - ID: imgID, - } - - // Send the json - if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep, token); err != nil { - if err == registry.ErrAlreadyExists { - out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image already pushed, skipping", nil)) - return "", nil - } - return "", err - } - - layerData, err := srv.daemon.Graph().TempLayerArchive(imgID, archive.Uncompressed, sf, out) - if err != nil { - return "", fmt.Errorf("Failed to generate layer archive: %s", err) - } - defer os.RemoveAll(layerData.Name()) - - // Send the layer - utils.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size) - - checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, utils.TruncateID(imgData.ID), "Pushing"), ep, token, jsonRaw) - if err != nil { - return "", err - } - imgData.Checksum = checksum - imgData.ChecksumPayload = checksumPayload - // Send the checksum - if err := r.PushImageChecksumRegistry(imgData, ep, token); err != nil { - return "", err - } - - out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image successfully pushed", nil)) - return imgData.Checksum, nil -} - -// FIXME: Allow to interrupt current push when new push of same image is done. -func (srv *Server) ImagePush(job *engine.Job) engine.Status { - if n := len(job.Args); n != 1 { - return job.Errorf("Usage: %s IMAGE", job.Name) - } - var ( - localName = job.Args[0] - sf = utils.NewStreamFormatter(job.GetenvBool("json")) - authConfig = ®istry.AuthConfig{} - metaHeaders map[string][]string - ) - - tag := job.Getenv("tag") - job.GetenvJson("authConfig", authConfig) - job.GetenvJson("metaHeaders", &metaHeaders) - if _, err := srv.poolAdd("push", localName); err != nil { - return job.Error(err) - } - defer srv.poolRemove("push", localName) - - // Resolve the Repository name from fqn to endpoint + name - hostname, remoteName, err := registry.ResolveRepositoryName(localName) - if err != nil { - return job.Error(err) - } - - endpoint, err := registry.ExpandAndVerifyRegistryUrl(hostname) - if err != nil { - return job.Error(err) - } - - img, err := srv.daemon.Graph().Get(localName) - r, err2 := registry.NewRegistry(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, false) - if err2 != nil { - return job.Error(err2) - } - - if err != nil { - reposLen := 1 - if tag == "" { - reposLen = len(srv.daemon.Repositories().Repositories[localName]) - } - job.Stdout.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", localName, reposLen)) - // If it fails, try to get the repository - if localRepo, exists := srv.daemon.Repositories().Repositories[localName]; exists { - if err := srv.pushRepository(r, job.Stdout, localName, remoteName, localRepo, tag, sf); err != nil { - return job.Error(err) - } - return engine.StatusOK - } - return job.Error(err) - } - - var token []string - job.Stdout.Write(sf.FormatStatus("", "The push refers to an image: [%s]", localName)) - if _, err := srv.pushImage(r, job.Stdout, remoteName, img.ID, endpoint, token, sf); err != nil { - return job.Error(err) - } - return engine.StatusOK -} - -func (srv *Server) poolAdd(kind, key string) (chan struct{}, error) { - srv.Lock() - defer srv.Unlock() - - if c, exists := srv.pullingPool[key]; exists { - return c, fmt.Errorf("pull %s is already in progress", key) - } - if c, exists := srv.pushingPool[key]; exists { - return c, fmt.Errorf("push %s is already in progress", key) - } - - c := make(chan struct{}) - switch kind { - case "pull": - srv.pullingPool[key] = c - case "push": - srv.pushingPool[key] = c - default: - return nil, fmt.Errorf("Unknown pool type") - } - return c, nil -} - -func (srv *Server) poolRemove(kind, key string) error { - srv.Lock() - defer srv.Unlock() - switch kind { - case "pull": - if c, exists := srv.pullingPool[key]; exists { - close(c) - delete(srv.pullingPool, key) - } - case "push": - if c, exists := srv.pushingPool[key]; exists { - close(c) - delete(srv.pushingPool, key) - } - default: - return fmt.Errorf("Unknown pool type") - } - return nil -} diff --git a/server/init.go b/server/init.go index 3738d0c541..15eeb1cf15 100644 --- a/server/init.go +++ b/server/init.go @@ -33,8 +33,6 @@ func InitServer(job *engine.Job) engine.Status { for name, handler := range map[string]engine.Handler{ "build": srv.Build, - "pull": srv.ImagePull, - "push": srv.ImagePush, } { if err := job.Eng.Register(name, srv.handlerWrap(handler)); err != nil { return job.Error(err) @@ -59,10 +57,8 @@ func NewServer(eng *engine.Engine, config *daemonconfig.Config) (*Server, error) return nil, err } srv := &Server{ - Eng: eng, - daemon: daemon, - pullingPool: make(map[string]chan struct{}), - pushingPool: make(map[string]chan struct{}), + Eng: eng, + daemon: daemon, } return srv, nil } diff --git a/server/server.go b/server/server.go index 21b271d87c..1b77e5efe1 100644 --- a/server/server.go +++ b/server/server.go @@ -30,9 +30,7 @@ import ( type Server struct { sync.RWMutex - daemon *daemon.Daemon - pullingPool map[string]chan struct{} - pushingPool map[string]chan struct{} - Eng *engine.Engine - tasks sync.WaitGroup + daemon *daemon.Daemon + Eng *engine.Engine + tasks sync.WaitGroup } diff --git a/server/server_unit_test.go b/server/server_unit_test.go deleted file mode 100644 index 16f06c145e..0000000000 --- a/server/server_unit_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package server - -import "testing" - -func TestPools(t *testing.T) { - srv := &Server{ - pullingPool: make(map[string]chan struct{}), - pushingPool: make(map[string]chan struct{}), - } - - if _, err := srv.poolAdd("pull", "test1"); err != nil { - t.Fatal(err) - } - if _, err := srv.poolAdd("pull", "test2"); err != nil { - t.Fatal(err) - } - if _, err := srv.poolAdd("push", "test1"); err == nil || err.Error() != "pull test1 is already in progress" { - t.Fatalf("Expected `pull test1 is already in progress`") - } - if _, err := srv.poolAdd("pull", "test1"); err == nil || err.Error() != "pull test1 is already in progress" { - t.Fatalf("Expected `pull test1 is already in progress`") - } - if _, err := srv.poolAdd("wait", "test3"); err == nil || err.Error() != "Unknown pool type" { - t.Fatalf("Expected `Unknown pool type`") - } - if err := srv.poolRemove("pull", "test2"); err != nil { - t.Fatal(err) - } - if err := srv.poolRemove("pull", "test2"); err != nil { - t.Fatal(err) - } - if err := srv.poolRemove("pull", "test1"); err != nil { - t.Fatal(err) - } - if err := srv.poolRemove("push", "test1"); err != nil { - t.Fatal(err) - } - if err := srv.poolRemove("wait", "test3"); err == nil || err.Error() != "Unknown pool type" { - t.Fatalf("Expected `Unknown pool type`") - } -}