Update graph to use vendored distribution client for the v2 codepath

Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
Signed-off-by: Tibor Vass <tibor@docker.com>
This commit is contained in:
Derek McGowan 2015-02-12 10:23:22 -08:00 коммит произвёл Tibor Vass
Родитель 276c640be4
Коммит 19515a7ad8
25 изменённых файлов: 1822 добавлений и 2286 удалений

Просмотреть файл

@ -21,7 +21,7 @@ import (
//
// Usage: docker login SERVER
func (cli *DockerCli) CmdLogin(args ...string) error {
cmd := cli.Subcmd("login", []string{"[SERVER]"}, "Register or log in to a Docker registry server, if no server is\nspecified \""+registry.IndexServerAddress()+"\" is the default.", true)
cmd := cli.Subcmd("login", []string{"[SERVER]"}, "Register or log in to a Docker registry server, if no server is\nspecified \""+registry.INDEXSERVER+"\" is the default.", true)
cmd.Require(flag.Max, 1)
var username, password, email string
@ -32,7 +32,7 @@ func (cli *DockerCli) CmdLogin(args ...string) error {
cmd.ParseFlags(args, true)
serverAddress := registry.IndexServerAddress()
serverAddress := registry.INDEXSERVER
if len(cmd.Args()) > 0 {
serverAddress = cmd.Arg(0)
}

Просмотреть файл

@ -13,12 +13,12 @@ import (
//
// Usage: docker logout [SERVER]
func (cli *DockerCli) CmdLogout(args ...string) error {
cmd := cli.Subcmd("logout", []string{"[SERVER]"}, "Log out from a Docker registry, if no server is\nspecified \""+registry.IndexServerAddress()+"\" is the default.", true)
cmd := cli.Subcmd("logout", []string{"[SERVER]"}, "Log out from a Docker registry, if no server is\nspecified \""+registry.INDEXSERVER+"\" is the default.", true)
cmd.Require(flag.Max, 1)
cmd.ParseFlags(args, true)
serverAddress := registry.IndexServerAddress()
serverAddress := registry.INDEXSERVER
if len(cmd.Args()) > 0 {
serverAddress = cmd.Arg(0)
}

Просмотреть файл

@ -74,7 +74,7 @@ func (daemon *Daemon) SystemInfo() (*types.Info, error) {
NEventsListener: daemon.EventsService.SubscribersCount(),
KernelVersion: kernelVersion,
OperatingSystem: operatingSystem,
IndexServerAddress: registry.IndexServerAddress(),
IndexServerAddress: registry.INDEXSERVER,
RegistryConfig: daemon.RegistryService.Config,
InitSha1: dockerversion.INITSHA1,
InitPath: initPath,

Просмотреть файл

@ -1,178 +0,0 @@
package graph
import (
"encoding/json"
"fmt"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
"github.com/docker/docker/registry"
"github.com/docker/docker/trust"
"github.com/docker/libtrust"
)
// loadManifest loads a manifest from a byte array and verifies its content,
// returning the local digest, the manifest itself, whether or not it was
// verified. If ref is a digest, rather than a tag, this will be treated as
// the local digest. An error will be returned if the signature verification
// fails, local digest verification fails and, if provided, the remote digest
// verification fails. The boolean return will only be false without error on
// the failure of signatures trust check.
func (s *TagStore) loadManifest(manifestBytes []byte, ref string, remoteDigest digest.Digest) (digest.Digest, *registry.ManifestData, bool, error) {
payload, keys, err := unpackSignedManifest(manifestBytes)
if err != nil {
return "", nil, false, fmt.Errorf("error unpacking manifest: %v", err)
}
// TODO(stevvooe): It would be a lot better here to build up a stack of
// verifiers, then push the bytes one time for signatures and digests, but
// the manifests are typically small, so this optimization is not worth
// hacking this code without further refactoring.
var localDigest digest.Digest
// Verify the local digest, if present in ref. ParseDigest will validate
// that the ref is a digest and verify against that if present. Otherwize
// (on error), we simply compute the localDigest and proceed.
if dgst, err := digest.ParseDigest(ref); err == nil {
// verify the manifest against local ref
if err := verifyDigest(dgst, payload); err != nil {
return "", nil, false, fmt.Errorf("verifying local digest: %v", err)
}
localDigest = dgst
} else {
// We don't have a local digest, since we are working from a tag.
// Compute the digest of the payload and return that.
logrus.Debugf("provided manifest reference %q is not a digest: %v", ref, err)
localDigest, err = digest.FromBytes(payload)
if err != nil {
// near impossible
logrus.Errorf("error calculating local digest during tag pull: %v", err)
return "", nil, false, err
}
}
// verify against the remote digest, if available
if remoteDigest != "" {
if err := verifyDigest(remoteDigest, payload); err != nil {
return "", nil, false, fmt.Errorf("verifying remote digest: %v", err)
}
}
var manifest registry.ManifestData
if err := json.Unmarshal(payload, &manifest); err != nil {
return "", nil, false, fmt.Errorf("error unmarshalling manifest: %s", err)
}
// validate the contents of the manifest
if err := validateManifest(&manifest); err != nil {
return "", nil, false, err
}
var verified bool
verified, err = s.verifyTrustedKeys(manifest.Name, keys)
if err != nil {
return "", nil, false, fmt.Errorf("error verifying trusted keys: %v", err)
}
return localDigest, &manifest, verified, nil
}
// unpackSignedManifest takes the raw, signed manifest bytes, unpacks the jws
// and returns the payload and public keys used to signed the manifest.
// Signatures are verified for authenticity but not against the trust store.
func unpackSignedManifest(p []byte) ([]byte, []libtrust.PublicKey, error) {
sig, err := libtrust.ParsePrettySignature(p, "signatures")
if err != nil {
return nil, nil, fmt.Errorf("error parsing payload: %s", err)
}
keys, err := sig.Verify()
if err != nil {
return nil, nil, fmt.Errorf("error verifying payload: %s", err)
}
payload, err := sig.Payload()
if err != nil {
return nil, nil, fmt.Errorf("error retrieving payload: %s", err)
}
return payload, keys, nil
}
// verifyTrustedKeys checks the keys provided against the trust store,
// ensuring that the provided keys are trusted for the namespace. The keys
// provided from this method must come from the signatures provided as part of
// the manifest JWS package, obtained from unpackSignedManifest or libtrust.
func (s *TagStore) verifyTrustedKeys(namespace string, keys []libtrust.PublicKey) (verified bool, err error) {
if namespace[0] != '/' {
namespace = "/" + namespace
}
for _, key := range keys {
b, err := key.MarshalJSON()
if err != nil {
return false, fmt.Errorf("error marshalling public key: %s", err)
}
// Check key has read/write permission (0x03)
v, err := s.trustService.CheckKey(namespace, b, 0x03)
if err != nil {
vErr, ok := err.(trust.NotVerifiedError)
if !ok {
return false, fmt.Errorf("error running key check: %s", err)
}
logrus.Debugf("Key check result: %v", vErr)
}
verified = v
}
if verified {
logrus.Debug("Key check result: verified")
}
return
}
// verifyDigest checks the contents of p against the provided digest. Note
// that for manifests, this is the signed payload and not the raw bytes with
// signatures.
func verifyDigest(dgst digest.Digest, p []byte) error {
if err := dgst.Validate(); err != nil {
return fmt.Errorf("error validating digest %q: %v", dgst, err)
}
verifier, err := digest.NewDigestVerifier(dgst)
if err != nil {
// There are not many ways this can go wrong: if it does, its
// fatal. Likley, the cause would be poor validation of the
// incoming reference.
return fmt.Errorf("error creating verifier for digest %q: %v", dgst, err)
}
if _, err := verifier.Write(p); err != nil {
return fmt.Errorf("error writing payload to digest verifier (verifier target %q): %v", dgst, err)
}
if !verifier.Verified() {
return fmt.Errorf("verification against digest %q failed", dgst)
}
return nil
}
func validateManifest(manifest *registry.ManifestData) error {
if manifest.SchemaVersion != 1 {
return fmt.Errorf("unsupported schema version: %d", manifest.SchemaVersion)
}
if len(manifest.FSLayers) != len(manifest.History) {
return fmt.Errorf("length of history not equal to number of layers")
}
if len(manifest.FSLayers) == 0 {
return fmt.Errorf("no FSLayers in manifest")
}
return nil
}

Просмотреть файл

@ -1,293 +0,0 @@
package graph
import (
"encoding/json"
"fmt"
"os"
"testing"
"github.com/docker/distribution/digest"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/utils"
"github.com/docker/libtrust"
)
const (
testManifestImageName = "testapp"
testManifestImageID = "d821b739e8834ec89ac4469266c3d11515da88fdcbcbdddcbcddb636f54fdde9"
testManifestImageIDShort = "d821b739e883"
testManifestTag = "manifesttest"
)
func (s *TagStore) newManifest(localName, remoteName, tag string) ([]byte, error) {
manifest := &registry.ManifestData{
Name: remoteName,
Tag: tag,
SchemaVersion: 1,
}
localRepo, err := s.Get(localName)
if err != nil {
return nil, err
}
if localRepo == nil {
return nil, fmt.Errorf("Repo does not exist: %s", localName)
}
// Get the top-most layer id which the tag points to
layerId, exists := localRepo[tag]
if !exists {
return nil, fmt.Errorf("Tag does not exist for %s: %s", localName, tag)
}
layersSeen := make(map[string]bool)
layer, err := s.graph.Get(layerId)
if err != nil {
return nil, err
}
manifest.Architecture = layer.Architecture
manifest.FSLayers = make([]*registry.FSLayer, 0, 4)
manifest.History = make([]*registry.ManifestHistory, 0, 4)
var metadata runconfig.Config
if layer.Config != nil {
metadata = *layer.Config
}
for ; layer != nil; layer, err = s.graph.GetParent(layer) {
if err != nil {
return nil, err
}
if layersSeen[layer.ID] {
break
}
if layer.Config != nil && metadata.Image != layer.ID {
err = runconfig.Merge(&metadata, layer.Config)
if err != nil {
return nil, err
}
}
dgst, err := s.graph.GetDigest(layer.ID)
if err == ErrDigestNotSet {
archive, err := s.graph.TarLayer(layer)
if err != nil {
return nil, err
}
defer archive.Close()
dgst, err = digest.FromReader(archive)
if err != nil {
return nil, err
}
// Save checksum value
if err := s.graph.SetDigest(layer.ID, dgst); err != nil {
return nil, err
}
} else if err != nil {
return nil, fmt.Errorf("Error getting image checksum: %s", err)
}
jsonData, err := s.graph.RawJSON(layer.ID)
if err != nil {
return nil, fmt.Errorf("Cannot retrieve the path for {%s}: %s", layer.ID, err)
}
manifest.FSLayers = append(manifest.FSLayers, &registry.FSLayer{BlobSum: dgst.String()})
layersSeen[layer.ID] = true
manifest.History = append(manifest.History, &registry.ManifestHistory{V1Compatibility: string(jsonData)})
}
manifestBytes, err := json.MarshalIndent(manifest, "", " ")
if err != nil {
return nil, err
}
return manifestBytes, nil
}
func TestManifestTarsumCache(t *testing.T) {
tmp, err := utils.TestDirectory("")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)
store := mkTestTagStore(tmp, t)
defer store.graph.driver.Cleanup()
archive, err := fakeTar()
if err != nil {
t.Fatal(err)
}
img := &Image{ID: testManifestImageID}
if err := store.graph.Register(img, archive); err != nil {
t.Fatal(err)
}
if err := store.Tag(testManifestImageName, testManifestTag, testManifestImageID, false); err != nil {
t.Fatal(err)
}
if _, err := store.graph.GetDigest(testManifestImageID); err == nil {
t.Fatalf("Non-empty checksum file after register")
} else if err != ErrDigestNotSet {
t.Fatal(err)
}
// Generate manifest
payload, err := store.newManifest(testManifestImageName, testManifestImageName, testManifestTag)
if err != nil {
t.Fatal(err)
}
manifestChecksum, err := store.graph.GetDigest(testManifestImageID)
if err != nil {
t.Fatal(err)
}
var manifest registry.ManifestData
if err := json.Unmarshal(payload, &manifest); err != nil {
t.Fatalf("error unmarshalling manifest: %s", err)
}
if len(manifest.FSLayers) != 1 {
t.Fatalf("Unexpected number of layers, expecting 1: %d", len(manifest.FSLayers))
}
if manifest.FSLayers[0].BlobSum != manifestChecksum.String() {
t.Fatalf("Unexpected blob sum, expecting %q, got %q", manifestChecksum, manifest.FSLayers[0].BlobSum)
}
if len(manifest.History) != 1 {
t.Fatalf("Unexpected number of layer history, expecting 1: %d", len(manifest.History))
}
v1compat, err := store.graph.RawJSON(img.ID)
if err != nil {
t.Fatal(err)
}
if manifest.History[0].V1Compatibility != string(v1compat) {
t.Fatalf("Unexpected json value\nExpected:\n%s\nActual:\n%s", v1compat, manifest.History[0].V1Compatibility)
}
}
// TestManifestDigestCheck ensures that loadManifest properly verifies the
// remote and local digest.
func TestManifestDigestCheck(t *testing.T) {
tmp, err := utils.TestDirectory("")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)
store := mkTestTagStore(tmp, t)
defer store.graph.driver.Cleanup()
archive, err := fakeTar()
if err != nil {
t.Fatal(err)
}
img := &Image{ID: testManifestImageID}
if err := store.graph.Register(img, archive); err != nil {
t.Fatal(err)
}
if err := store.Tag(testManifestImageName, testManifestTag, testManifestImageID, false); err != nil {
t.Fatal(err)
}
if _, err := store.graph.GetDigest(testManifestImageID); err == nil {
t.Fatalf("Non-empty checksum file after register")
} else if err != ErrDigestNotSet {
t.Fatal(err)
}
// Generate manifest
payload, err := store.newManifest(testManifestImageName, testManifestImageName, testManifestTag)
if err != nil {
t.Fatalf("unexpected error generating test manifest: %v", err)
}
pk, err := libtrust.GenerateECP256PrivateKey()
if err != nil {
t.Fatalf("unexpected error generating private key: %v", err)
}
sig, err := libtrust.NewJSONSignature(payload)
if err != nil {
t.Fatalf("error creating signature: %v", err)
}
if err := sig.Sign(pk); err != nil {
t.Fatalf("error signing manifest bytes: %v", err)
}
signedBytes, err := sig.PrettySignature("signatures")
if err != nil {
t.Fatalf("error getting signed bytes: %v", err)
}
dgst, err := digest.FromBytes(payload)
if err != nil {
t.Fatalf("error getting digest of manifest: %v", err)
}
// use this as the "bad" digest
zeroDigest, err := digest.FromBytes([]byte{})
if err != nil {
t.Fatalf("error making zero digest: %v", err)
}
// Remote and local match, everything should look good
local, _, _, err := store.loadManifest(signedBytes, dgst.String(), dgst)
if err != nil {
t.Fatalf("unexpected error verifying local and remote digest: %v", err)
}
if local != dgst {
t.Fatalf("local digest not correctly calculated: %v", err)
}
// remote and no local, since pulling by tag
local, _, _, err = store.loadManifest(signedBytes, "tag", dgst)
if err != nil {
t.Fatalf("unexpected error verifying tag pull and remote digest: %v", err)
}
if local != dgst {
t.Fatalf("local digest not correctly calculated: %v", err)
}
// remote and differing local, this is the most important to fail
local, _, _, err = store.loadManifest(signedBytes, zeroDigest.String(), dgst)
if err == nil {
t.Fatalf("error expected when verifying with differing local digest")
}
// no remote, no local (by tag)
local, _, _, err = store.loadManifest(signedBytes, "tag", "")
if err != nil {
t.Fatalf("unexpected error verifying manifest without remote digest: %v", err)
}
if local != dgst {
t.Fatalf("local digest not correctly calculated: %v", err)
}
// no remote, with local
local, _, _, err = store.loadManifest(signedBytes, dgst.String(), "")
if err != nil {
t.Fatalf("unexpected error verifying manifest without remote digest: %v", err)
}
if local != dgst {
t.Fatalf("local digest not correctly calculated: %v", err)
}
// bad remote, we fail the check.
local, _, _, err = store.loadManifest(signedBytes, dgst.String(), zeroDigest)
if err == nil {
t.Fatalf("error expected when verifying with differing remote digest")
}
}

Просмотреть файл

@ -3,20 +3,10 @@ package graph
import (
"fmt"
"io"
"io/ioutil"
"net"
"net/url"
"os"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/docker/cliconfig"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
"github.com/docker/docker/utils"
)
@ -27,10 +17,38 @@ type ImagePullConfig struct {
OutStream io.Writer
}
type Puller interface {
// Pull tries to pull the image referenced by `tag`
// Pull returns an error if any, as well as a boolean that determines whether to retry Pull on the next configured endpoint.
//
// TODO(tiborvass): have Pull() take a reference to repository + tag, so that the puller itself is repository-agnostic.
Pull(tag string) (fallback bool, err error)
}
func NewPuller(s *TagStore, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePullConfig *ImagePullConfig, sf *streamformatter.StreamFormatter) (Puller, error) {
switch endpoint.Version {
case registry.APIVersion2:
return &v2Puller{
TagStore: s,
endpoint: endpoint,
config: imagePullConfig,
sf: sf,
repoInfo: repoInfo,
}, nil
case registry.APIVersion1:
return &v1Puller{
TagStore: s,
endpoint: endpoint,
config: imagePullConfig,
sf: sf,
repoInfo: repoInfo,
}, nil
}
return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
}
func (s *TagStore) Pull(image string, tag string, imagePullConfig *ImagePullConfig) error {
var (
sf = streamformatter.NewJSONStreamFormatter()
)
var sf = streamformatter.NewJSONStreamFormatter()
// Resolve the Repository name from fqn to RepositoryInfo
repoInfo, err := s.registryService.ResolveRepository(image)
@ -38,424 +56,74 @@ func (s *TagStore) Pull(image string, tag string, imagePullConfig *ImagePullConf
return err
}
// makes sure name is not empty or `scratch`
if err := validateRepoName(repoInfo.LocalName); err != nil {
return err
}
c, err := s.poolAdd("pull", utils.ImageReference(repoInfo.LocalName, tag))
endpoints, err := s.registryService.LookupEndpoints(repoInfo.CanonicalName)
if err != nil {
if c != nil {
// Another pull of the same repository is already taking place; just wait for it to finish
imagePullConfig.OutStream.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", repoInfo.LocalName))
<-c
return nil
}
return err
}
defer s.poolRemove("pull", utils.ImageReference(repoInfo.LocalName, tag))
logName := repoInfo.LocalName
if tag != "" {
logName = utils.ImageReference(logName, tag)
}
// Attempt pulling official content from a provided v2 mirror
if repoInfo.Index.Official {
v2mirrorEndpoint, v2mirrorRepoInfo, err := configureV2Mirror(repoInfo, s.registryService)
if err != nil {
logrus.Errorf("Error configuring mirrors: %s", err)
return err
}
var (
lastErr error
if v2mirrorEndpoint != nil {
logrus.Debugf("Attempting to pull from v2 mirror: %s", v2mirrorEndpoint.URL)
return s.pullFromV2Mirror(v2mirrorEndpoint, v2mirrorRepoInfo, imagePullConfig, tag, sf, logName)
}
}
logrus.Debugf("pulling image from host %q with remote name %q", repoInfo.Index.Name, repoInfo.RemoteName)
endpoint, err := repoInfo.GetEndpoint(imagePullConfig.MetaHeaders)
if err != nil {
return err
}
// TODO(tiborvass): reuse client from endpoint?
// Adds Docker-specific headers as well as user-specified headers (metaHeaders)
tr := transport.NewTransport(
registry.NewTransport(registry.ReceiveTimeout, endpoint.IsSecure),
registry.DockerHeaders(imagePullConfig.MetaHeaders)...,
// discardNoSupportErrors is used to track whether an endpoint encountered an error of type registry.ErrNoSupport
// By default it is false, which means that if a ErrNoSupport error is encountered, it will be saved in lastErr.
// As soon as another kind of error is encountered, discardNoSupportErrors is set to true, avoiding the saving of
// any subsequent ErrNoSupport errors in lastErr.
// It's needed for pull-by-digest on v1 endpoints: if there are only v1 endpoints configured, the error should be
// returned and displayed, but if there was a v2 endpoint which supports pull-by-digest, then the last relevant
// error is the ones from v2 endpoints not v1.
discardNoSupportErrors bool
)
client := registry.HTTPClient(tr)
r, err := registry.NewSession(client, imagePullConfig.AuthConfig, endpoint)
if err != nil {
return err
}
for _, endpoint := range endpoints {
logrus.Debugf("Trying to pull %s from %s %s", repoInfo.LocalName, endpoint.URL, endpoint.Version)
if len(repoInfo.Index.Mirrors) == 0 && (repoInfo.Index.Official || endpoint.Version == registry.APIVersion2) {
if repoInfo.Official {
s.trustService.UpdateBase()
if !endpoint.Mirror && (endpoint.Official || endpoint.Version == registry.APIVersion2) {
if repoInfo.Official {
s.trustService.UpdateBase()
}
}
logrus.Debugf("pulling v2 repository with local name %q", repoInfo.LocalName)
if err := s.pullV2Repository(r, imagePullConfig.OutStream, repoInfo, tag, sf); err == nil {
s.eventsService.Log("pull", logName, "")
return nil
} else if err != registry.ErrDoesNotExist && err != ErrV2RegistryUnavailable {
logrus.Errorf("Error from V2 registry: %s", err)
}
logrus.Debug("image does not exist on v2 registry, falling back to v1")
}
if utils.DigestReference(tag) {
return fmt.Errorf("pulling with digest reference failed from v2 registry")
}
logrus.Debugf("pulling v1 repository with local name %q", repoInfo.LocalName)
if err = s.pullRepository(r, imagePullConfig.OutStream, repoInfo, tag, sf); err != nil {
return err
}
s.eventsService.Log("pull", logName, "")
return nil
}
func makeMirrorRepoInfo(repoInfo *registry.RepositoryInfo, mirror string) *registry.RepositoryInfo {
mirrorRepo := &registry.RepositoryInfo{
RemoteName: repoInfo.RemoteName,
LocalName: repoInfo.LocalName,
CanonicalName: repoInfo.CanonicalName,
Official: false,
Index: &registry.IndexInfo{
Official: false,
Secure: repoInfo.Index.Secure,
Name: mirror,
Mirrors: []string{},
},
}
return mirrorRepo
}
func configureV2Mirror(repoInfo *registry.RepositoryInfo, s *registry.Service) (*registry.Endpoint, *registry.RepositoryInfo, error) {
mirrors := repoInfo.Index.Mirrors
if len(mirrors) == 0 {
// no mirrors configured
return nil, nil, nil
}
v1MirrorCount := 0
var v2MirrorEndpoint *registry.Endpoint
var v2MirrorRepoInfo *registry.RepositoryInfo
for _, mirror := range mirrors {
mirrorRepoInfo := makeMirrorRepoInfo(repoInfo, mirror)
endpoint, err := registry.NewEndpoint(mirrorRepoInfo.Index, nil)
puller, err := NewPuller(s, endpoint, repoInfo, imagePullConfig, sf)
if err != nil {
logrus.Errorf("Unable to create endpoint for %s: %s", mirror, err)
lastErr = err
continue
}
if endpoint.Version == 2 {
if v2MirrorEndpoint == nil {
v2MirrorEndpoint = endpoint
v2MirrorRepoInfo = mirrorRepoInfo
} else {
// > 1 v2 mirrors given
return nil, nil, fmt.Errorf("multiple v2 mirrors configured")
}
} else {
v1MirrorCount++
}
}
if v1MirrorCount == len(mirrors) {
// OK, but mirrors are v1
return nil, nil, nil
}
if v2MirrorEndpoint != nil && v1MirrorCount == 0 {
// OK, 1 v2 mirror specified
return v2MirrorEndpoint, v2MirrorRepoInfo, nil
}
if v2MirrorEndpoint != nil && v1MirrorCount > 0 {
return nil, nil, fmt.Errorf("v1 and v2 mirrors configured")
}
// No endpoint could be established with the given mirror configurations
// Fallback to pulling from the hub as per v1 behavior.
return nil, nil, nil
}
func (s *TagStore) pullFromV2Mirror(mirrorEndpoint *registry.Endpoint, repoInfo *registry.RepositoryInfo,
imagePullConfig *ImagePullConfig, tag string, sf *streamformatter.StreamFormatter, logName string) error {
tr := transport.NewTransport(
registry.NewTransport(registry.ReceiveTimeout, mirrorEndpoint.IsSecure),
registry.DockerHeaders(imagePullConfig.MetaHeaders)...,
)
client := registry.HTTPClient(tr)
mirrorSession, err := registry.NewSession(client, &cliconfig.AuthConfig{}, mirrorEndpoint)
if err != nil {
return err
}
logrus.Debugf("Pulling v2 repository with local name %q from %s", repoInfo.LocalName, mirrorEndpoint.URL)
if err := s.pullV2Repository(mirrorSession, imagePullConfig.OutStream, repoInfo, tag, sf); err != nil {
return err
}
s.eventsService.Log("pull", logName, "")
return nil
}
func (s *TagStore) pullRepository(r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, askedTag string, sf *streamformatter.StreamFormatter) error {
out.Write(sf.FormatStatus("", "Pulling repository %s", repoInfo.CanonicalName))
repoData, err := r.GetRepositoryData(repoInfo.RemoteName)
if err != nil {
if strings.Contains(err.Error(), "HTTP code: 404") {
return fmt.Errorf("Error: image %s not found", utils.ImageReference(repoInfo.RemoteName, askedTag))
}
// Unexpected HTTP error
return err
}
logrus.Debugf("Retrieving the tag list")
tagsList := make(map[string]string)
if askedTag == "" {
tagsList, err = r.GetRemoteTags(repoData.Endpoints, repoInfo.RemoteName)
} else {
var tagId string
tagId, err = r.GetRemoteTag(repoData.Endpoints, repoInfo.RemoteName, askedTag)
tagsList[askedTag] = tagId
}
if err != nil {
if err == registry.ErrRepoNotFound && askedTag != "" {
return fmt.Errorf("Tag %s not found in repository %s", askedTag, repoInfo.CanonicalName)
}
logrus.Errorf("unable to get remote tags: %s", err)
return err
}
for tag, id := range tagsList {
repoData.ImgList[id] = &registry.ImgData{
ID: id,
Tag: tag,
Checksum: "",
}
}
logrus.Debugf("Registering tags")
// If no tag has been specified, pull them all
if askedTag == "" {
for tag, id := range tagsList {
repoData.ImgList[id].Tag = tag
}
} else {
// Otherwise, check that the tag exists and use only that one
id, exists := tagsList[askedTag]
if !exists {
return fmt.Errorf("Tag %s not found in repository %s", askedTag, repoInfo.CanonicalName)
}
repoData.ImgList[id].Tag = askedTag
}
errors := make(chan error)
layersDownloaded := false
for _, image := range repoData.ImgList {
downloadImage := func(img *registry.ImgData) {
if askedTag != "" && img.Tag != askedTag {
errors <- nil
return
}
if img.Tag == "" {
logrus.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
errors <- nil
return
}
// ensure no two downloads of the same image happen at the same time
if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil {
if c != nil {
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
<-c
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil))
} else {
logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
if fallback, err := puller.Pull(tag); err != nil {
if fallback {
if _, ok := err.(registry.ErrNoSupport); !ok {
// Because we found an error that's not ErrNoSupport, discard all subsequent ErrNoSupport errors.
discardNoSupportErrors = true
// save the current error
lastErr = err
} else if !discardNoSupportErrors {
// Save the ErrNoSupport error, because it's either the first error or all encountered errors
// were also ErrNoSupport errors.
lastErr = err
}
errors <- nil
return
continue
}
defer s.poolRemove("pull", "img:"+img.ID)
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, repoInfo.CanonicalName), nil))
success := false
var lastErr, err error
var isDownloaded bool
for _, ep := range repoInfo.Index.Mirrors {
// Ensure endpoint is v1
ep = ep + "v1/"
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, repoInfo.CanonicalName, ep), nil))
if isDownloaded, err = s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
// Don't report errors when pulling from mirrors.
logrus.Debugf("Error pulling image (%s) from %s, mirror: %s, %s", img.Tag, repoInfo.CanonicalName, ep, err)
continue
}
layersDownloaded = layersDownloaded || isDownloaded
success = true
break
}
if !success {
for _, ep := range repoData.Endpoints {
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, repoInfo.CanonicalName, ep), nil))
if isDownloaded, err = s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
// It's not ideal that only the last error is returned, it would be better to concatenate the errors.
// As the error is also given to the output stream the user will see the error.
lastErr = err
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, repoInfo.CanonicalName, ep, err), nil))
continue
}
layersDownloaded = layersDownloaded || isDownloaded
success = true
break
}
}
if !success {
err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, repoInfo.CanonicalName, lastErr)
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), err.Error(), nil))
errors <- err
return
}
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil))
errors <- nil
}
go downloadImage(image)
}
var lastError error
for i := 0; i < len(repoData.ImgList); i++ {
if err := <-errors; err != nil {
lastError = err
}
}
if lastError != nil {
return lastError
}
for tag, id := range tagsList {
if askedTag != "" && tag != askedTag {
continue
}
if err := s.Tag(repoInfo.LocalName, tag, id, true); err != nil {
logrus.Debugf("Not continuing with error: %v", err)
return err
}
s.eventsService.Log("pull", logName, "")
return nil
}
requestedTag := repoInfo.LocalName
if len(askedTag) > 0 {
requestedTag = utils.ImageReference(repoInfo.LocalName, askedTag)
if lastErr == nil {
lastErr = fmt.Errorf("no endpoints found for %s", image)
}
WriteStatus(requestedTag, out, sf, layersDownloaded)
return nil
}
func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint string, token []string, sf *streamformatter.StreamFormatter) (bool, error) {
history, err := r.GetRemoteHistory(imgID, endpoint)
if err != nil {
return false, err
}
out.Write(sf.FormatProgress(stringid.TruncateID(imgID), "Pulling dependent layers", nil))
// FIXME: Try to stream the images?
// FIXME: Launch the getRemoteImage() in goroutines
layersDownloaded := false
for i := len(history) - 1; i >= 0; i-- {
id := history[i]
// ensure no two downloads of the same layer happen at the same time
if c, err := s.poolAdd("pull", "layer:"+id); err != nil {
logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err)
<-c
}
defer s.poolRemove("pull", "layer:"+id)
if !s.graph.Exists(id) {
out.Write(sf.FormatProgress(stringid.TruncateID(id), "Pulling metadata", nil))
var (
imgJSON []byte
imgSize int
err error
img *Image
)
retries := 5
for j := 1; j <= retries; j++ {
imgJSON, imgSize, err = r.GetRemoteImageJSON(id, endpoint)
if err != nil && j == retries {
out.Write(sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil))
return layersDownloaded, err
} else if err != nil {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
continue
}
img, err = NewImgJSON(imgJSON)
layersDownloaded = true
if err != nil && j == retries {
out.Write(sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil))
return layersDownloaded, fmt.Errorf("Failed to parse json: %s", err)
} else if err != nil {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
continue
} else {
break
}
}
for j := 1; j <= retries; j++ {
// Get the layer
status := "Pulling fs layer"
if j > 1 {
status = fmt.Sprintf("Pulling fs layer [retries: %d]", j)
}
out.Write(sf.FormatProgress(stringid.TruncateID(id), status, nil))
layer, err := r.GetRemoteImageLayer(img.ID, endpoint, int64(imgSize))
if uerr, ok := err.(*url.Error); ok {
err = uerr.Err
}
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
continue
} else if err != nil {
out.Write(sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil))
return layersDownloaded, err
}
layersDownloaded = true
defer layer.Close()
err = s.graph.Register(img,
progressreader.New(progressreader.Config{
In: layer,
Out: out,
Formatter: sf,
Size: imgSize,
NewLines: false,
ID: stringid.TruncateID(id),
Action: "Downloading",
}))
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
continue
} else if err != nil {
out.Write(sf.FormatProgress(stringid.TruncateID(id), "Error downloading dependent layers", nil))
return layersDownloaded, err
} else {
break
}
}
}
out.Write(sf.FormatProgress(stringid.TruncateID(id), "Download complete", nil))
}
return layersDownloaded, nil
return lastErr
}
func WriteStatus(requestedTag string, out io.Writer, sf *streamformatter.StreamFormatter, layersDownloaded bool) {
@ -465,273 +133,3 @@ func WriteStatus(requestedTag string, out io.Writer, sf *streamformatter.StreamF
out.Write(sf.FormatStatus("", "Status: Image is up to date for %s", requestedTag))
}
}
func (s *TagStore) pullV2Repository(r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, tag string, sf *streamformatter.StreamFormatter) error {
endpoint, err := r.V2RegistryEndpoint(repoInfo.Index)
if err != nil {
if repoInfo.Index.Official {
logrus.Debugf("Unable to pull from V2 registry, falling back to v1: %s", err)
return ErrV2RegistryUnavailable
}
return fmt.Errorf("error getting registry endpoint: %s", err)
}
auth, err := r.GetV2Authorization(endpoint, repoInfo.RemoteName, true)
if err != nil {
return fmt.Errorf("error getting authorization: %s", err)
}
if !auth.CanAuthorizeV2() {
return ErrV2RegistryUnavailable
}
var layersDownloaded bool
if tag == "" {
logrus.Debugf("Pulling tag list from V2 registry for %s", repoInfo.CanonicalName)
tags, err := r.GetV2RemoteTags(endpoint, repoInfo.RemoteName, auth)
if err != nil {
return err
}
if len(tags) == 0 {
return registry.ErrDoesNotExist
}
for _, t := range tags {
if downloaded, err := s.pullV2Tag(r, out, endpoint, repoInfo, t, sf, auth); err != nil {
return err
} else if downloaded {
layersDownloaded = true
}
}
} else {
if downloaded, err := s.pullV2Tag(r, out, endpoint, repoInfo, tag, sf, auth); err != nil {
return err
} else if downloaded {
layersDownloaded = true
}
}
requestedTag := repoInfo.LocalName
if len(tag) > 0 {
requestedTag = utils.ImageReference(repoInfo.LocalName, tag)
}
WriteStatus(requestedTag, out, sf, layersDownloaded)
return nil
}
func (s *TagStore) pullV2Tag(r *registry.Session, out io.Writer, endpoint *registry.Endpoint, repoInfo *registry.RepositoryInfo, tag string, sf *streamformatter.StreamFormatter, auth *registry.RequestAuthorization) (bool, error) {
logrus.Debugf("Pulling tag from V2 registry: %q", tag)
remoteDigest, manifestBytes, err := r.GetV2ImageManifest(endpoint, repoInfo.RemoteName, tag, auth)
if err != nil {
return false, err
}
// loadManifest ensures that the manifest payload has the expected digest
// if the tag is a digest reference.
localDigest, manifest, verified, err := s.loadManifest(manifestBytes, tag, remoteDigest)
if err != nil {
return false, fmt.Errorf("error verifying manifest: %s", err)
}
if verified {
logrus.Printf("Image manifest for %s has been verified", utils.ImageReference(repoInfo.CanonicalName, tag))
}
out.Write(sf.FormatStatus(tag, "Pulling from %s", repoInfo.CanonicalName))
// downloadInfo is used to pass information from download to extractor
type downloadInfo struct {
imgJSON []byte
img *Image
digest digest.Digest
tmpFile *os.File
length int64
downloaded bool
err chan error
}
downloads := make([]downloadInfo, len(manifest.FSLayers))
for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
var (
sumStr = manifest.FSLayers[i].BlobSum
imgJSON = []byte(manifest.History[i].V1Compatibility)
)
img, err := NewImgJSON(imgJSON)
if err != nil {
return false, fmt.Errorf("failed to parse json: %s", err)
}
downloads[i].img = img
// Check if exists
if s.graph.Exists(img.ID) {
logrus.Debugf("Image already exists: %s", img.ID)
continue
}
dgst, err := digest.ParseDigest(sumStr)
if err != nil {
return false, err
}
downloads[i].digest = dgst
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Pulling fs layer", nil))
downloadFunc := func(di *downloadInfo) error {
logrus.Debugf("pulling blob %q to V1 img %s", sumStr, img.ID)
if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil {
if c != nil {
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
<-c
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil))
} else {
logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
}
} else {
defer s.poolRemove("pull", "img:"+img.ID)
tmpFile, err := ioutil.TempFile("", "GetV2ImageBlob")
if err != nil {
return err
}
r, l, err := r.GetV2ImageBlobReader(endpoint, repoInfo.RemoteName, di.digest, auth)
if err != nil {
return err
}
defer r.Close()
verifier, err := digest.NewDigestVerifier(di.digest)
if err != nil {
return err
}
if _, err := io.Copy(tmpFile, progressreader.New(progressreader.Config{
In: ioutil.NopCloser(io.TeeReader(r, verifier)),
Out: out,
Formatter: sf,
Size: int(l),
NewLines: false,
ID: stringid.TruncateID(img.ID),
Action: "Downloading",
})); err != nil {
return fmt.Errorf("unable to copy v2 image blob data: %s", err)
}
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Verifying Checksum", nil))
if !verifier.Verified() {
return fmt.Errorf("image layer digest verification failed for %q", di.digest)
}
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil))
logrus.Debugf("Downloaded %s to tempfile %s", img.ID, tmpFile.Name())
di.tmpFile = tmpFile
di.length = l
di.downloaded = true
}
di.imgJSON = imgJSON
return nil
}
downloads[i].err = make(chan error)
go func(di *downloadInfo) {
di.err <- downloadFunc(di)
}(&downloads[i])
}
var tagUpdated bool
for i := len(downloads) - 1; i >= 0; i-- {
d := &downloads[i]
if d.err != nil {
if err := <-d.err; err != nil {
return false, err
}
}
if d.downloaded {
// if tmpFile is empty assume download and extracted elsewhere
defer os.Remove(d.tmpFile.Name())
defer d.tmpFile.Close()
d.tmpFile.Seek(0, 0)
if d.tmpFile != nil {
err = s.graph.Register(d.img,
progressreader.New(progressreader.Config{
In: d.tmpFile,
Out: out,
Formatter: sf,
Size: int(d.length),
ID: stringid.TruncateID(d.img.ID),
Action: "Extracting",
}))
if err != nil {
return false, err
}
if err := s.graph.SetDigest(d.img.ID, d.digest); err != nil {
return false, err
}
// FIXME: Pool release here for parallel tag pull (ensures any downloads block until fully extracted)
}
out.Write(sf.FormatProgress(stringid.TruncateID(d.img.ID), "Pull complete", nil))
tagUpdated = true
} else {
out.Write(sf.FormatProgress(stringid.TruncateID(d.img.ID), "Already exists", nil))
}
}
// Check for new tag if no layers downloaded
if !tagUpdated {
repo, err := s.Get(repoInfo.LocalName)
if err != nil {
return false, err
}
if repo != nil {
if _, exists := repo[tag]; !exists {
tagUpdated = true
}
} else {
tagUpdated = true
}
}
if verified && tagUpdated {
out.Write(sf.FormatStatus(utils.ImageReference(repoInfo.CanonicalName, tag), "The image you are pulling has been verified. Important: image verification is a tech preview feature and should not be relied on to provide security."))
}
if localDigest != remoteDigest { // this is not a verification check.
// NOTE(stevvooe): This is a very defensive branch and should never
// happen, since all manifest digest implementations use the same
// algorithm.
logrus.WithFields(
logrus.Fields{
"local": localDigest,
"remote": remoteDigest,
}).Debugf("local digest does not match remote")
out.Write(sf.FormatStatus("", "Remote Digest: %s", remoteDigest))
}
out.Write(sf.FormatStatus("", "Digest: %s", localDigest))
if tag == localDigest.String() {
// TODO(stevvooe): Ideally, we should always set the digest so we can
// use the digest whether we pull by it or not. Unfortunately, the tag
// store treats the digest as a separate tag, meaning there may be an
// untagged digest image that would seem to be dangling by a user.
if err = s.SetDigest(repoInfo.LocalName, localDigest.String(), downloads[0].img.ID); err != nil {
return false, err
}
}
if !utils.DigestReference(tag) {
// only set the repository/tag -> image ID mapping when pulling by tag (i.e. not by digest)
if err = s.Tag(repoInfo.LocalName, tag, downloads[0].img.ID, true); err != nil {
return false, err
}
}
return tagUpdated, nil
}

316
graph/pull_v1.go Normal file
Просмотреть файл

@ -0,0 +1,316 @@
package graph
import (
"errors"
"fmt"
"net"
"net/url"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
"github.com/docker/docker/utils"
)
type v1Puller struct {
*TagStore
endpoint registry.APIEndpoint
config *ImagePullConfig
sf *streamformatter.StreamFormatter
repoInfo *registry.RepositoryInfo
session *registry.Session
}
func (p *v1Puller) Pull(tag string) (fallback bool, err error) {
if utils.DigestReference(tag) {
// Allowing fallback, because HTTPS v1 is before HTTP v2
return true, registry.ErrNoSupport{errors.New("Cannot pull by digest with v1 registry")}
}
tlsConfig, err := p.registryService.TlsConfig(p.repoInfo.Index.Name)
if err != nil {
return false, err
}
// Adds Docker-specific headers as well as user-specified headers (metaHeaders)
tr := transport.NewTransport(
// TODO(tiborvass): was ReceiveTimeout
registry.NewTransport(tlsConfig),
registry.DockerHeaders(p.config.MetaHeaders)...,
)
client := registry.HTTPClient(tr)
v1Endpoint, err := p.endpoint.ToV1Endpoint(p.config.MetaHeaders)
if err != nil {
logrus.Debugf("Could not get v1 endpoint: %v", err)
return true, err
}
p.session, err = registry.NewSession(client, p.config.AuthConfig, v1Endpoint)
if err != nil {
// TODO(dmcgowan): Check if should fallback
logrus.Debugf("Fallback from error: %s", err)
return true, err
}
if err := p.pullRepository(tag); err != nil {
// TODO(dmcgowan): Check if should fallback
return false, err
}
return false, nil
}
func (p *v1Puller) pullRepository(askedTag string) error {
out := p.config.OutStream
out.Write(p.sf.FormatStatus("", "Pulling repository %s", p.repoInfo.CanonicalName))
repoData, err := p.session.GetRepositoryData(p.repoInfo.RemoteName)
if err != nil {
if strings.Contains(err.Error(), "HTTP code: 404") {
return fmt.Errorf("Error: image %s not found", utils.ImageReference(p.repoInfo.RemoteName, askedTag))
}
// Unexpected HTTP error
return err
}
logrus.Debugf("Retrieving the tag list")
tagsList := make(map[string]string)
if askedTag == "" {
tagsList, err = p.session.GetRemoteTags(repoData.Endpoints, p.repoInfo.RemoteName)
} else {
var tagId string
tagId, err = p.session.GetRemoteTag(repoData.Endpoints, p.repoInfo.RemoteName, askedTag)
tagsList[askedTag] = tagId
}
if err != nil {
if err == registry.ErrRepoNotFound && askedTag != "" {
return fmt.Errorf("Tag %s not found in repository %s", askedTag, p.repoInfo.CanonicalName)
}
logrus.Errorf("unable to get remote tags: %s", err)
return err
}
for tag, id := range tagsList {
repoData.ImgList[id] = &registry.ImgData{
ID: id,
Tag: tag,
Checksum: "",
}
}
logrus.Debugf("Registering tags")
// If no tag has been specified, pull them all
if askedTag == "" {
for tag, id := range tagsList {
repoData.ImgList[id].Tag = tag
}
} else {
// Otherwise, check that the tag exists and use only that one
id, exists := tagsList[askedTag]
if !exists {
return fmt.Errorf("Tag %s not found in repository %s", askedTag, p.repoInfo.CanonicalName)
}
repoData.ImgList[id].Tag = askedTag
}
errors := make(chan error)
layersDownloaded := false
for _, image := range repoData.ImgList {
downloadImage := func(img *registry.ImgData) {
if askedTag != "" && img.Tag != askedTag {
errors <- nil
return
}
if img.Tag == "" {
logrus.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
errors <- nil
return
}
// ensure no two downloads of the same image happen at the same time
if c, err := p.poolAdd("pull", "img:"+img.ID); err != nil {
if c != nil {
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
<-c
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil))
} else {
logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
}
errors <- nil
return
}
defer p.poolRemove("pull", "img:"+img.ID)
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName), nil))
success := false
var lastErr, err error
var isDownloaded bool
for _, ep := range p.repoInfo.Index.Mirrors {
ep += "v1/"
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil))
if isDownloaded, err = p.pullImage(img.ID, ep, repoData.Tokens); err != nil {
// Don't report errors when pulling from mirrors.
logrus.Debugf("Error pulling image (%s) from %s, mirror: %s, %s", img.Tag, p.repoInfo.CanonicalName, ep, err)
continue
}
layersDownloaded = layersDownloaded || isDownloaded
success = true
break
}
if !success {
for _, ep := range repoData.Endpoints {
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil))
if isDownloaded, err = p.pullImage(img.ID, ep, repoData.Tokens); err != nil {
// It's not ideal that only the last error is returned, it would be better to concatenate the errors.
// As the error is also given to the output stream the user will see the error.
lastErr = err
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, p.repoInfo.CanonicalName, ep, err), nil))
continue
}
layersDownloaded = layersDownloaded || isDownloaded
success = true
break
}
}
if !success {
err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, p.repoInfo.CanonicalName, lastErr)
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), err.Error(), nil))
errors <- err
return
}
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil))
errors <- nil
}
go downloadImage(image)
}
var lastError error
for i := 0; i < len(repoData.ImgList); i++ {
if err := <-errors; err != nil {
lastError = err
}
}
if lastError != nil {
return lastError
}
for tag, id := range tagsList {
if askedTag != "" && tag != askedTag {
continue
}
if err := p.Tag(p.repoInfo.LocalName, tag, id, true); err != nil {
return err
}
}
requestedTag := p.repoInfo.LocalName
if len(askedTag) > 0 {
requestedTag = utils.ImageReference(p.repoInfo.LocalName, askedTag)
}
WriteStatus(requestedTag, out, p.sf, layersDownloaded)
return nil
}
func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, error) {
history, err := p.session.GetRemoteHistory(imgID, endpoint)
if err != nil {
return false, err
}
out := p.config.OutStream
out.Write(p.sf.FormatProgress(stringid.TruncateID(imgID), "Pulling dependent layers", nil))
// FIXME: Try to stream the images?
// FIXME: Launch the getRemoteImage() in goroutines
layersDownloaded := false
for i := len(history) - 1; i >= 0; i-- {
id := history[i]
// ensure no two downloads of the same layer happen at the same time
if c, err := p.poolAdd("pull", "layer:"+id); err != nil {
logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err)
<-c
}
defer p.poolRemove("pull", "layer:"+id)
if !p.graph.Exists(id) {
out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Pulling metadata", nil))
var (
imgJSON []byte
imgSize int
err error
img *Image
)
retries := 5
for j := 1; j <= retries; j++ {
imgJSON, imgSize, err = p.session.GetRemoteImageJSON(id, endpoint)
if err != nil && j == retries {
out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil))
return layersDownloaded, err
} else if err != nil {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
continue
}
img, err = NewImgJSON(imgJSON)
layersDownloaded = true
if err != nil && j == retries {
out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil))
return layersDownloaded, fmt.Errorf("Failed to parse json: %s", err)
} else if err != nil {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
continue
} else {
break
}
}
for j := 1; j <= retries; j++ {
// Get the layer
status := "Pulling fs layer"
if j > 1 {
status = fmt.Sprintf("Pulling fs layer [retries: %d]", j)
}
out.Write(p.sf.FormatProgress(stringid.TruncateID(id), status, nil))
layer, err := p.session.GetRemoteImageLayer(img.ID, endpoint, int64(imgSize))
if uerr, ok := err.(*url.Error); ok {
err = uerr.Err
}
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
continue
} else if err != nil {
out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil))
return layersDownloaded, err
}
layersDownloaded = true
defer layer.Close()
err = p.graph.Register(img,
progressreader.New(progressreader.Config{
In: layer,
Out: out,
Formatter: p.sf,
Size: imgSize,
NewLines: false,
ID: stringid.TruncateID(id),
Action: "Downloading",
}))
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
continue
} else if err != nil {
out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error downloading dependent layers", nil))
return layersDownloaded, err
} else {
break
}
}
}
out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Download complete", nil))
}
return layersDownloaded, nil
}

384
graph/pull_v2.go Normal file
Просмотреть файл

@ -0,0 +1,384 @@
package graph
import (
"fmt"
"io"
"io/ioutil"
"os"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
"github.com/docker/docker/trust"
"github.com/docker/docker/utils"
"github.com/docker/libtrust"
)
type v2Puller struct {
*TagStore
endpoint registry.APIEndpoint
config *ImagePullConfig
sf *streamformatter.StreamFormatter
repoInfo *registry.RepositoryInfo
repo distribution.Repository
}
func (p *v2Puller) Pull(tag string) (fallback bool, err error) {
// TODO(tiborvass): was ReceiveTimeout
p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig)
if err != nil {
logrus.Debugf("Error getting v2 registry: %v", err)
return true, err
}
if err := p.pullV2Repository(tag); err != nil {
if registry.ContinueOnError(err) {
logrus.Debugf("Error trying v2 registry: %v", err)
return true, err
}
return false, err
}
return false, nil
}
func (p *v2Puller) pullV2Repository(tag string) (err error) {
var tags []string
taggedName := p.repoInfo.LocalName
if len(tag) > 0 {
tags = []string{tag}
taggedName = utils.ImageReference(p.repoInfo.LocalName, tag)
} else {
var err error
tags, err = p.repo.Manifests().Tags()
if err != nil {
return err
}
}
c, err := p.poolAdd("pull", taggedName)
if err != nil {
if c != nil {
// Another pull of the same repository is already taking place; just wait for it to finish
p.sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", p.repoInfo.CanonicalName)
<-c
return nil
}
return err
}
defer p.poolRemove("pull", taggedName)
var layersDownloaded bool
for _, tag := range tags {
// pulledNew is true if either new layers were downloaded OR if existing images were newly tagged
// TODO(tiborvass): should we change the name of `layersDownload`? What about message in WriteStatus?
pulledNew, err := p.pullV2Tag(tag, taggedName)
if err != nil {
return err
}
layersDownloaded = layersDownloaded || pulledNew
}
WriteStatus(taggedName, p.config.OutStream, p.sf, layersDownloaded)
return nil
}
// downloadInfo is used to pass information from download to extractor
type downloadInfo struct {
img *Image
tmpFile *os.File
digest digest.Digest
layer distribution.ReadSeekCloser
size int64
err chan error
verified bool
}
type errVerification struct{}
func (errVerification) Error() string { return "verification failed" }
func (p *v2Puller) download(di *downloadInfo) {
logrus.Debugf("pulling blob %q to %s", di.digest, di.img.ID)
out := p.config.OutStream
if c, err := p.poolAdd("pull", "img:"+di.img.ID); err != nil {
if c != nil {
out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Layer already being pulled by another client. Waiting.", nil))
<-c
out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil))
} else {
logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", di.img.ID, err)
}
di.err <- nil
return
}
defer p.poolRemove("pull", "img:"+di.img.ID)
tmpFile, err := ioutil.TempFile("", "GetImageBlob")
if err != nil {
di.err <- err
return
}
blobs := p.repo.Blobs(nil)
desc, err := blobs.Stat(nil, di.digest)
if err != nil {
logrus.Debugf("Error statting layer: %v", err)
di.err <- err
return
}
di.size = desc.Length
layerDownload, err := blobs.Open(nil, di.digest)
if err != nil {
logrus.Debugf("Error fetching layer: %v", err)
di.err <- err
return
}
defer layerDownload.Close()
verifier, err := digest.NewDigestVerifier(di.digest)
if err != nil {
di.err <- err
return
}
reader := progressreader.New(progressreader.Config{
In: ioutil.NopCloser(io.TeeReader(layerDownload, verifier)),
Out: out,
Formatter: p.sf,
Size: int(di.size),
NewLines: false,
ID: stringid.TruncateID(di.img.ID),
Action: "Downloading",
})
io.Copy(tmpFile, reader)
out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Verifying Checksum", nil))
di.verified = verifier.Verified()
if !di.verified {
logrus.Infof("Image verification failed for layer %s", di.digest)
}
out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil))
logrus.Debugf("Downloaded %s to tempfile %s", di.img.ID, tmpFile.Name())
di.tmpFile = tmpFile
di.layer = layerDownload
di.err <- nil
}
func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) {
logrus.Debugf("Pulling tag from V2 registry: %q", tag)
out := p.config.OutStream
manifest, err := p.repo.Manifests().GetByTag(tag)
if err != nil {
return false, err
}
verified, err := p.validateManifest(manifest, tag)
if err != nil {
return false, err
}
if verified {
logrus.Printf("Image manifest for %s has been verified", taggedName)
}
out.Write(p.sf.FormatStatus(tag, "Pulling from %s", p.repo.Name()))
downloads := make([]downloadInfo, len(manifest.FSLayers))
for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
img, err := NewImgJSON([]byte(manifest.History[i].V1Compatibility))
if err != nil {
logrus.Debugf("error getting image v1 json: %v", err)
return false, err
}
downloads[i].img = img
downloads[i].digest = manifest.FSLayers[i].BlobSum
// Check if exists
if p.graph.Exists(img.ID) {
logrus.Debugf("Image already exists: %s", img.ID)
continue
}
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pulling fs layer", nil))
downloads[i].err = make(chan error)
go p.download(&downloads[i])
}
var tagUpdated bool
for i := len(downloads) - 1; i >= 0; i-- {
d := &downloads[i]
if d.err != nil {
if err := <-d.err; err != nil {
return false, err
}
}
verified = verified && d.verified
if d.layer != nil {
// if tmpFile is empty assume download and extracted elsewhere
defer os.Remove(d.tmpFile.Name())
defer d.tmpFile.Close()
d.tmpFile.Seek(0, 0)
if d.tmpFile != nil {
reader := progressreader.New(progressreader.Config{
In: d.tmpFile,
Out: out,
Formatter: p.sf,
Size: int(d.size),
NewLines: false,
ID: stringid.TruncateID(d.img.ID),
Action: "Extracting",
})
err = p.graph.Register(d.img, reader)
if err != nil {
return false, err
}
if err := p.graph.SetDigest(d.img.ID, d.digest); err != nil {
return false, err
}
// FIXME: Pool release here for parallel tag pull (ensures any downloads block until fully extracted)
}
out.Write(p.sf.FormatProgress(stringid.TruncateID(d.img.ID), "Pull complete", nil))
tagUpdated = true
} else {
out.Write(p.sf.FormatProgress(stringid.TruncateID(d.img.ID), "Already exists", nil))
}
}
manifestDigest, err := digestFromManifest(manifest, p.repoInfo.LocalName)
if err != nil {
return false, err
}
// Check for new tag if no layers downloaded
if !tagUpdated {
repo, err := p.Get(p.repoInfo.LocalName)
if err != nil {
return false, err
}
if repo != nil {
if _, exists := repo[tag]; !exists {
tagUpdated = true
}
} else {
tagUpdated = true
}
}
if verified && tagUpdated {
out.Write(p.sf.FormatStatus(p.repo.Name()+":"+tag, "The image you are pulling has been verified. Important: image verification is a tech preview feature and should not be relied on to provide security."))
}
if utils.DigestReference(tag) {
// TODO(stevvooe): Ideally, we should always set the digest so we can
// use the digest whether we pull by it or not. Unfortunately, the tag
// store treats the digest as a separate tag, meaning there may be an
// untagged digest image that would seem to be dangling by a user.
if err = p.SetDigest(p.repoInfo.LocalName, tag, downloads[0].img.ID); err != nil {
return false, err
}
} else {
// only set the repository/tag -> image ID mapping when pulling by tag (i.e. not by digest)
if err = p.Tag(p.repoInfo.LocalName, tag, downloads[0].img.ID, true); err != nil {
return false, err
}
}
if manifestDigest != "" {
out.Write(p.sf.FormatStatus("", "Digest: %s", manifestDigest))
}
return tagUpdated, nil
}
// verifyTrustedKeys checks the keys provided against the trust store,
// ensuring that the provided keys are trusted for the namespace. The keys
// provided from this method must come from the signatures provided as part of
// the manifest JWS package, obtained from unpackSignedManifest or libtrust.
func (p *v2Puller) verifyTrustedKeys(namespace string, keys []libtrust.PublicKey) (verified bool, err error) {
if namespace[0] != '/' {
namespace = "/" + namespace
}
for _, key := range keys {
b, err := key.MarshalJSON()
if err != nil {
return false, fmt.Errorf("error marshalling public key: %s", err)
}
// Check key has read/write permission (0x03)
v, err := p.trustService.CheckKey(namespace, b, 0x03)
if err != nil {
vErr, ok := err.(trust.NotVerifiedError)
if !ok {
return false, fmt.Errorf("error running key check: %s", err)
}
logrus.Debugf("Key check result: %v", vErr)
}
verified = v
}
if verified {
logrus.Debug("Key check result: verified")
}
return
}
func (p *v2Puller) validateManifest(m *manifest.SignedManifest, tag string) (verified bool, err error) {
// TODO(tiborvass): what's the usecase for having manifest == nil and err == nil ? Shouldn't be the error be "DoesNotExist" ?
if m == nil {
return false, fmt.Errorf("image manifest does not exist for tag %q", tag)
}
if m.SchemaVersion != 1 {
return false, fmt.Errorf("unsupported schema version %d for tag %q", m.SchemaVersion, tag)
}
if len(m.FSLayers) != len(m.History) {
return false, fmt.Errorf("length of history not equal to number of layers for tag %q", tag)
}
if len(m.FSLayers) == 0 {
return false, fmt.Errorf("no FSLayers in manifest for tag %q", tag)
}
keys, err := manifest.Verify(m)
if err != nil {
return false, fmt.Errorf("error verifying manifest for tag %q: %v", tag, err)
}
verified, err = p.verifyTrustedKeys(m.Name, keys)
if err != nil {
return false, fmt.Errorf("error verifying manifest keys: %v", err)
}
localDigest, err := digest.ParseDigest(tag)
// if pull by digest, then verify
if err == nil {
verifier, err := digest.NewDigestVerifier(localDigest)
if err != nil {
return false, err
}
payload, err := m.Payload()
if err != nil {
return false, err
}
if _, err := verifier.Write(payload); err != nil {
return false, err
}
verified = verified && verifier.Verified()
}
return verified, nil
}

Просмотреть файл

@ -1,29 +1,15 @@
package graph
import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"sync"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/docker/cliconfig"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/utils"
"github.com/docker/libtrust"
)
var ErrV2RegistryUnavailable = errors.New("error v2 registry unavailable")
type ImagePushConfig struct {
MetaHeaders map[string][]string
AuthConfig *cliconfig.AuthConfig
@ -31,468 +17,41 @@ type ImagePushConfig struct {
OutStream io.Writer
}
// Retrieve the all the images to be uploaded in the correct order
func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string) ([]string, map[string][]string, error) {
var (
imageList []string
imagesSeen = make(map[string]bool)
tagsByImage = make(map[string][]string)
)
for tag, id := range localRepo {
if requestedTag != "" && requestedTag != tag {
// Include only the requested tag.
continue
}
if utils.DigestReference(tag) {
// Ignore digest references.
continue
}
var imageListForThisTag []string
tagsByImage[id] = append(tagsByImage[id], tag)
for img, err := s.graph.Get(id); img != nil; img, err = s.graph.GetParent(img) {
if err != nil {
return nil, nil, err
}
if imagesSeen[img.ID] {
// This image is already on the list, we can ignore it and all its parents
break
}
imagesSeen[img.ID] = true
imageListForThisTag = append(imageListForThisTag, img.ID)
}
// reverse the image list for this tag (so the "most"-parent image is first)
for i, j := 0, len(imageListForThisTag)-1; i < j; i, j = i+1, j-1 {
imageListForThisTag[i], imageListForThisTag[j] = imageListForThisTag[j], imageListForThisTag[i]
}
// append to main image list
imageList = append(imageList, imageListForThisTag...)
}
if len(imageList) == 0 {
return nil, nil, fmt.Errorf("No images found for the requested repository / tag")
}
logrus.Debugf("Image list: %v", imageList)
logrus.Debugf("Tags by image: %v", tagsByImage)
return imageList, tagsByImage, nil
type Pusher interface {
// Push tries to push the image configured at the creation of Pusher.
// Push returns an error if any, as well as a boolean that determines whether to retry Push on the next configured endpoint.
//
// TODO(tiborvass): have Push() take a reference to repository + tag, so that the pusher itself is repository-agnostic.
Push() (fallback bool, err error)
}
func (s *TagStore) getImageTags(localRepo map[string]string, askedTag string) ([]string, error) {
logrus.Debugf("Checking %s against %#v", askedTag, localRepo)
if len(askedTag) > 0 {
if _, ok := localRepo[askedTag]; !ok || utils.DigestReference(askedTag) {
return nil, fmt.Errorf("Tag does not exist: %s", askedTag)
}
return []string{askedTag}, nil
func (s *TagStore) NewPusher(endpoint registry.APIEndpoint, localRepo Repository, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig, sf *streamformatter.StreamFormatter) (Pusher, error) {
switch endpoint.Version {
case registry.APIVersion2:
return &v2Pusher{
TagStore: s,
endpoint: endpoint,
localRepo: localRepo,
repoInfo: repoInfo,
config: imagePushConfig,
sf: sf,
}, nil
case registry.APIVersion1:
return &v1Pusher{
TagStore: s,
endpoint: endpoint,
localRepo: localRepo,
repoInfo: repoInfo,
config: imagePushConfig,
sf: sf,
}, nil
}
var tags []string
for tag := range localRepo {
if !utils.DigestReference(tag) {
tags = append(tags, tag)
}
}
return tags, nil
}
// createImageIndex returns an index of an image's layer IDs and tags.
func (s *TagStore) createImageIndex(images []string, tags map[string][]string) []*registry.ImgData {
var imageIndex []*registry.ImgData
for _, id := range images {
if tags, hasTags := tags[id]; hasTags {
// If an image has tags you must add an entry in the image index
// for each tag
for _, tag := range tags {
imageIndex = append(imageIndex, &registry.ImgData{
ID: id,
Tag: tag,
})
}
continue
}
// If the image does not have a tag it still needs to be sent to the
// registry with an empty tag so that it is accociated with the repository
imageIndex = append(imageIndex, &registry.ImgData{
ID: id,
Tag: "",
})
}
return imageIndex
}
type imagePushData struct {
id string
endpoint string
tokens []string
}
// lookupImageOnEndpoint checks the specified endpoint to see if an image exists
// and if it is absent then it sends the image id to the channel to be pushed.
func lookupImageOnEndpoint(wg *sync.WaitGroup, r *registry.Session, out io.Writer, sf *streamformatter.StreamFormatter,
images chan imagePushData, imagesToPush chan string) {
defer wg.Done()
for image := range images {
if err := r.LookupRemoteImage(image.id, image.endpoint); err != nil {
logrus.Errorf("Error in LookupRemoteImage: %s", err)
imagesToPush <- image.id
continue
}
out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", stringid.TruncateID(image.id)))
}
}
func (s *TagStore) pushImageToEndpoint(endpoint string, out io.Writer, remoteName string, imageIDs []string,
tags map[string][]string, repo *registry.RepositoryData, sf *streamformatter.StreamFormatter, r *registry.Session) error {
workerCount := len(imageIDs)
// start a maximum of 5 workers to check if images exist on the specified endpoint.
if workerCount > 5 {
workerCount = 5
}
var (
wg = &sync.WaitGroup{}
imageData = make(chan imagePushData, workerCount*2)
imagesToPush = make(chan string, workerCount*2)
pushes = make(chan map[string]struct{}, 1)
)
for i := 0; i < workerCount; i++ {
wg.Add(1)
go lookupImageOnEndpoint(wg, r, out, sf, imageData, imagesToPush)
}
// start a go routine that consumes the images to push
go func() {
shouldPush := make(map[string]struct{})
for id := range imagesToPush {
shouldPush[id] = struct{}{}
}
pushes <- shouldPush
}()
for _, id := range imageIDs {
imageData <- imagePushData{
id: id,
endpoint: endpoint,
tokens: repo.Tokens,
}
}
// close the channel to notify the workers that there will be no more images to check.
close(imageData)
wg.Wait()
close(imagesToPush)
// wait for all the images that require pushes to be collected into a consumable map.
shouldPush := <-pushes
// finish by pushing any images and tags to the endpoint. The order that the images are pushed
// is very important that is why we are still iterating over the ordered list of imageIDs.
for _, id := range imageIDs {
if _, push := shouldPush[id]; push {
if _, err := s.pushImage(r, out, id, endpoint, repo.Tokens, sf); err != nil {
// FIXME: Continue on error?
return err
}
}
for _, tag := range tags[id] {
out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", stringid.TruncateID(id), endpoint+"repositories/"+remoteName+"/tags/"+tag))
if err := r.PushRegistryTag(remoteName, id, tag, endpoint); err != nil {
return err
}
}
}
return nil
}
// pushRepository pushes layers that do not already exist on the registry.
func (s *TagStore) pushRepository(r *registry.Session, out io.Writer,
repoInfo *registry.RepositoryInfo, localRepo map[string]string,
tag string, sf *streamformatter.StreamFormatter) error {
logrus.Debugf("Local repo: %s", localRepo)
out = ioutils.NewWriteFlusher(out)
imgList, tags, err := s.getImageList(localRepo, tag)
if err != nil {
return err
}
out.Write(sf.FormatStatus("", "Sending image list"))
imageIndex := s.createImageIndex(imgList, tags)
logrus.Debugf("Preparing to push %s with the following images and tags", localRepo)
for _, data := range imageIndex {
logrus.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
}
// Register all the images in a repository with the registry
// If an image is not in this list it will not be associated with the repository
repoData, err := r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, false, nil)
if err != nil {
return err
}
nTag := 1
if tag == "" {
nTag = len(localRepo)
}
out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag))
// push the repository to each of the endpoints only if it does not exist.
for _, endpoint := range repoData.Endpoints {
if err := s.pushImageToEndpoint(endpoint, out, repoInfo.RemoteName, imgList, tags, repoData, sf, r); err != nil {
return err
}
}
_, err = r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, true, repoData.Endpoints)
return err
}
func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep string, token []string, sf *streamformatter.StreamFormatter) (checksum string, err error) {
out = ioutils.NewWriteFlusher(out)
jsonRaw, err := s.graph.RawJSON(imgID)
if err != nil {
return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err)
}
out.Write(sf.FormatProgress(stringid.TruncateID(imgID), "Pushing", nil))
imgData := &registry.ImgData{
ID: imgID,
}
// Send the json
if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep); err != nil {
if err == registry.ErrAlreadyExists {
out.Write(sf.FormatProgress(stringid.TruncateID(imgData.ID), "Image already pushed, skipping", nil))
return "", nil
}
return "", err
}
layerData, err := s.graph.TempLayerArchive(imgID, sf, out)
if err != nil {
return "", fmt.Errorf("Failed to generate layer archive: %s", err)
}
defer os.RemoveAll(layerData.Name())
// Send the layer
logrus.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size)
checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID,
progressreader.New(progressreader.Config{
In: layerData,
Out: out,
Formatter: sf,
Size: int(layerData.Size),
NewLines: false,
ID: stringid.TruncateID(imgData.ID),
Action: "Pushing",
}), ep, jsonRaw)
if err != nil {
return "", err
}
imgData.Checksum = checksum
imgData.ChecksumPayload = checksumPayload
// Send the checksum
if err := r.PushImageChecksumRegistry(imgData, ep); err != nil {
return "", err
}
out.Write(sf.FormatProgress(stringid.TruncateID(imgData.ID), "Image successfully pushed", nil))
return imgData.Checksum, nil
}
func (s *TagStore) pushV2Repository(r *registry.Session, localRepo Repository, out io.Writer, repoInfo *registry.RepositoryInfo, tag string, sf *streamformatter.StreamFormatter) error {
endpoint, err := r.V2RegistryEndpoint(repoInfo.Index)
if err != nil {
if repoInfo.Index.Official {
logrus.Debugf("Unable to push to V2 registry, falling back to v1: %s", err)
return ErrV2RegistryUnavailable
}
return fmt.Errorf("error getting registry endpoint: %s", err)
}
tags, err := s.getImageTags(localRepo, tag)
if err != nil {
return err
}
if len(tags) == 0 {
return fmt.Errorf("No tags to push for %s", repoInfo.LocalName)
}
auth, err := r.GetV2Authorization(endpoint, repoInfo.RemoteName, false)
if err != nil {
return fmt.Errorf("error getting authorization: %s", err)
}
if !auth.CanAuthorizeV2() {
return ErrV2RegistryUnavailable
}
for _, tag := range tags {
logrus.Debugf("Pushing repository: %s:%s", repoInfo.CanonicalName, tag)
layerId, exists := localRepo[tag]
if !exists {
return fmt.Errorf("tag does not exist: %s", tag)
}
layer, err := s.graph.Get(layerId)
if err != nil {
return err
}
m := &registry.ManifestData{
SchemaVersion: 1,
Name: repoInfo.RemoteName,
Tag: tag,
Architecture: layer.Architecture,
}
var metadata runconfig.Config
if layer.Config != nil {
metadata = *layer.Config
}
layersSeen := make(map[string]bool)
layers := []*Image{}
for ; layer != nil; layer, err = s.graph.GetParent(layer) {
if err != nil {
return err
}
if layersSeen[layer.ID] {
break
}
layers = append(layers, layer)
layersSeen[layer.ID] = true
}
m.FSLayers = make([]*registry.FSLayer, len(layers))
m.History = make([]*registry.ManifestHistory, len(layers))
// Schema version 1 requires layer ordering from top to root
for i, layer := range layers {
logrus.Debugf("Pushing layer: %s", layer.ID)
if layer.Config != nil && metadata.Image != layer.ID {
if err := runconfig.Merge(&metadata, layer.Config); err != nil {
return err
}
}
jsonData, err := s.graph.RawJSON(layer.ID)
if err != nil {
return fmt.Errorf("cannot retrieve the path for %s: %s", layer.ID, err)
}
var exists bool
dgst, err := s.graph.GetDigest(layer.ID)
if err != nil {
if err != ErrDigestNotSet {
return fmt.Errorf("error getting image checksum: %s", err)
}
} else {
// Call mount blob
exists, err = r.HeadV2ImageBlob(endpoint, repoInfo.RemoteName, dgst, auth)
if err != nil {
out.Write(sf.FormatProgress(stringid.TruncateID(layer.ID), "Image push failed", nil))
return err
}
}
if !exists {
if pushDigest, err := s.pushV2Image(r, layer, endpoint, repoInfo.RemoteName, sf, out, auth); err != nil {
return err
} else if pushDigest != dgst {
// Cache new checksum
if err := s.graph.SetDigest(layer.ID, pushDigest); err != nil {
return err
}
dgst = pushDigest
}
} else {
out.Write(sf.FormatProgress(stringid.TruncateID(layer.ID), "Image already exists", nil))
}
m.FSLayers[i] = &registry.FSLayer{BlobSum: dgst.String()}
m.History[i] = &registry.ManifestHistory{V1Compatibility: string(jsonData)}
}
if err := validateManifest(m); err != nil {
return fmt.Errorf("invalid manifest: %s", err)
}
logrus.Debugf("Pushing %s:%s to v2 repository", repoInfo.LocalName, tag)
mBytes, err := json.MarshalIndent(m, "", " ")
if err != nil {
return err
}
js, err := libtrust.NewJSONSignature(mBytes)
if err != nil {
return err
}
if err = js.Sign(s.trustKey); err != nil {
return err
}
signedBody, err := js.PrettySignature("signatures")
if err != nil {
return err
}
logrus.Infof("Signed manifest for %s:%s using daemon's key: %s", repoInfo.LocalName, tag, s.trustKey.KeyID())
// push the manifest
digest, err := r.PutV2ImageManifest(endpoint, repoInfo.RemoteName, tag, signedBody, mBytes, auth)
if err != nil {
return err
}
out.Write(sf.FormatStatus("", "Digest: %s", digest))
}
return nil
}
// PushV2Image pushes the image content to the v2 registry, first buffering the contents to disk
func (s *TagStore) pushV2Image(r *registry.Session, img *Image, endpoint *registry.Endpoint, imageName string, sf *streamformatter.StreamFormatter, out io.Writer, auth *registry.RequestAuthorization) (digest.Digest, error) {
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Buffering to Disk", nil))
image, err := s.graph.Get(img.ID)
if err != nil {
return "", err
}
arch, err := s.graph.TarLayer(image)
if err != nil {
return "", err
}
defer arch.Close()
tf, err := s.graph.newTempFile()
if err != nil {
return "", err
}
defer func() {
tf.Close()
os.Remove(tf.Name())
}()
size, dgst, err := bufferToFile(tf, arch)
// Send the layer
logrus.Debugf("rendered layer for %s of [%d] size", img.ID, size)
if err := r.PutV2ImageBlob(endpoint, imageName, dgst,
progressreader.New(progressreader.Config{
In: tf,
Out: out,
Formatter: sf,
Size: int(size),
NewLines: false,
ID: stringid.TruncateID(img.ID),
Action: "Pushing",
}), auth); err != nil {
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Image push failed", nil))
return "", err
}
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Image successfully pushed", nil))
return dgst, nil
return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
}
// FIXME: Allow to interrupt current push when new push of same image is done.
func (s *TagStore) Push(localName string, imagePushConfig *ImagePushConfig) error {
var (
sf = streamformatter.NewJSONStreamFormatter()
)
var sf = streamformatter.NewJSONStreamFormatter()
// Resolve the Repository name from fqn to RepositoryInfo
repoInfo, err := s.registryService.ResolveRepository(localName)
@ -500,23 +59,7 @@ func (s *TagStore) Push(localName string, imagePushConfig *ImagePushConfig) erro
return err
}
if _, err := s.poolAdd("push", repoInfo.LocalName); err != nil {
return err
}
defer s.poolRemove("push", repoInfo.LocalName)
endpoint, err := repoInfo.GetEndpoint(imagePushConfig.MetaHeaders)
if err != nil {
return err
}
// TODO(tiborvass): reuse client from endpoint?
// Adds Docker-specific headers as well as user-specified headers (metaHeaders)
tr := transport.NewTransport(
registry.NewTransport(registry.NoTimeout, endpoint.IsSecure),
registry.DockerHeaders(imagePushConfig.MetaHeaders)...,
)
client := registry.HTTPClient(tr)
r, err := registry.NewSession(client, imagePushConfig.AuthConfig, endpoint)
endpoints, err := s.registryService.LookupEndpoints(repoInfo.CanonicalName)
if err != nil {
return err
}
@ -534,23 +77,31 @@ func (s *TagStore) Push(localName string, imagePushConfig *ImagePushConfig) erro
return fmt.Errorf("Repository does not exist: %s", repoInfo.LocalName)
}
if repoInfo.Index.Official || endpoint.Version == registry.APIVersion2 {
err := s.pushV2Repository(r, localRepo, imagePushConfig.OutStream, repoInfo, imagePushConfig.Tag, sf)
if err == nil {
s.eventsService.Log("push", repoInfo.LocalName, "")
return nil
var lastErr error
for _, endpoint := range endpoints {
logrus.Debugf("Trying to push %s to %s %s", repoInfo.CanonicalName, endpoint.URL, endpoint.Version)
pusher, err := s.NewPusher(endpoint, localRepo, repoInfo, imagePushConfig, sf)
if err != nil {
lastErr = err
continue
}
if fallback, err := pusher.Push(); err != nil {
if fallback {
lastErr = err
continue
}
logrus.Debugf("Not continuing with error: %v", err)
return err
}
if err != ErrV2RegistryUnavailable {
return fmt.Errorf("Error pushing to registry: %s", err)
}
logrus.Debug("V2 registry is unavailable, falling back on V1")
s.eventsService.Log("push", repoInfo.LocalName, "")
return nil
}
if err := s.pushRepository(r, imagePushConfig.OutStream, repoInfo, localRepo, imagePushConfig.Tag, sf); err != nil {
return err
if lastErr == nil {
lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.CanonicalName)
}
s.eventsService.Log("push", repoInfo.LocalName, "")
return nil
return lastErr
}

309
graph/push_v1.go Normal file
Просмотреть файл

@ -0,0 +1,309 @@
package graph
import (
"fmt"
"io"
"os"
"sync"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
"github.com/docker/docker/utils"
)
type v1Pusher struct {
*TagStore
endpoint registry.APIEndpoint
localRepo Repository
repoInfo *registry.RepositoryInfo
config *ImagePushConfig
sf *streamformatter.StreamFormatter
session *registry.Session
out io.Writer
}
func (p *v1Pusher) Push() (fallback bool, err error) {
tlsConfig, err := p.registryService.TlsConfig(p.repoInfo.Index.Name)
if err != nil {
return false, err
}
// Adds Docker-specific headers as well as user-specified headers (metaHeaders)
tr := transport.NewTransport(
// TODO(tiborvass): was NoTimeout
registry.NewTransport(tlsConfig),
registry.DockerHeaders(p.config.MetaHeaders)...,
)
client := registry.HTTPClient(tr)
v1Endpoint, err := p.endpoint.ToV1Endpoint(p.config.MetaHeaders)
if err != nil {
logrus.Debugf("Could not get v1 endpoint: %v", err)
return true, err
}
p.session, err = registry.NewSession(client, p.config.AuthConfig, v1Endpoint)
if err != nil {
// TODO(dmcgowan): Check if should fallback
return true, err
}
if err := p.pushRepository(p.config.Tag); err != nil {
// TODO(dmcgowan): Check if should fallback
return false, err
}
return false, nil
}
// Retrieve the all the images to be uploaded in the correct order
func (p *v1Pusher) getImageList(requestedTag string) ([]string, map[string][]string, error) {
var (
imageList []string
imagesSeen = make(map[string]bool)
tagsByImage = make(map[string][]string)
)
for tag, id := range p.localRepo {
if requestedTag != "" && requestedTag != tag {
// Include only the requested tag.
continue
}
if utils.DigestReference(tag) {
// Ignore digest references.
continue
}
var imageListForThisTag []string
tagsByImage[id] = append(tagsByImage[id], tag)
for img, err := p.graph.Get(id); img != nil; img, err = p.graph.GetParent(img) {
if err != nil {
return nil, nil, err
}
if imagesSeen[img.ID] {
// This image is already on the list, we can ignore it and all its parents
break
}
imagesSeen[img.ID] = true
imageListForThisTag = append(imageListForThisTag, img.ID)
}
// reverse the image list for this tag (so the "most"-parent image is first)
for i, j := 0, len(imageListForThisTag)-1; i < j; i, j = i+1, j-1 {
imageListForThisTag[i], imageListForThisTag[j] = imageListForThisTag[j], imageListForThisTag[i]
}
// append to main image list
imageList = append(imageList, imageListForThisTag...)
}
if len(imageList) == 0 {
return nil, nil, fmt.Errorf("No images found for the requested repository / tag")
}
logrus.Debugf("Image list: %v", imageList)
logrus.Debugf("Tags by image: %v", tagsByImage)
return imageList, tagsByImage, nil
}
// createImageIndex returns an index of an image's layer IDs and tags.
func (s *TagStore) createImageIndex(images []string, tags map[string][]string) []*registry.ImgData {
var imageIndex []*registry.ImgData
for _, id := range images {
if tags, hasTags := tags[id]; hasTags {
// If an image has tags you must add an entry in the image index
// for each tag
for _, tag := range tags {
imageIndex = append(imageIndex, &registry.ImgData{
ID: id,
Tag: tag,
})
}
continue
}
// If the image does not have a tag it still needs to be sent to the
// registry with an empty tag so that it is accociated with the repository
imageIndex = append(imageIndex, &registry.ImgData{
ID: id,
Tag: "",
})
}
return imageIndex
}
type imagePushData struct {
id string
endpoint string
tokens []string
}
// lookupImageOnEndpoint checks the specified endpoint to see if an image exists
// and if it is absent then it sends the image id to the channel to be pushed.
func (p *v1Pusher) lookupImageOnEndpoint(wg *sync.WaitGroup, images chan imagePushData, imagesToPush chan string) {
defer wg.Done()
for image := range images {
if err := p.session.LookupRemoteImage(image.id, image.endpoint); err != nil {
logrus.Errorf("Error in LookupRemoteImage: %s", err)
imagesToPush <- image.id
continue
}
p.out.Write(p.sf.FormatStatus("", "Image %s already pushed, skipping", stringid.TruncateID(image.id)))
}
}
func (p *v1Pusher) pushImageToEndpoint(endpoint string, imageIDs []string, tags map[string][]string, repo *registry.RepositoryData) error {
workerCount := len(imageIDs)
// start a maximum of 5 workers to check if images exist on the specified endpoint.
if workerCount > 5 {
workerCount = 5
}
var (
wg = &sync.WaitGroup{}
imageData = make(chan imagePushData, workerCount*2)
imagesToPush = make(chan string, workerCount*2)
pushes = make(chan map[string]struct{}, 1)
)
for i := 0; i < workerCount; i++ {
wg.Add(1)
go p.lookupImageOnEndpoint(wg, imageData, imagesToPush)
}
// start a go routine that consumes the images to push
go func() {
shouldPush := make(map[string]struct{})
for id := range imagesToPush {
shouldPush[id] = struct{}{}
}
pushes <- shouldPush
}()
for _, id := range imageIDs {
imageData <- imagePushData{
id: id,
endpoint: endpoint,
tokens: repo.Tokens,
}
}
// close the channel to notify the workers that there will be no more images to check.
close(imageData)
wg.Wait()
close(imagesToPush)
// wait for all the images that require pushes to be collected into a consumable map.
shouldPush := <-pushes
// finish by pushing any images and tags to the endpoint. The order that the images are pushed
// is very important that is why we are still iterating over the ordered list of imageIDs.
for _, id := range imageIDs {
if _, push := shouldPush[id]; push {
if _, err := p.pushImage(id, endpoint, repo.Tokens); err != nil {
// FIXME: Continue on error?
return err
}
}
for _, tag := range tags[id] {
p.out.Write(p.sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", stringid.TruncateID(id), endpoint+"repositories/"+p.repoInfo.RemoteName+"/tags/"+tag))
if err := p.session.PushRegistryTag(p.repoInfo.RemoteName, id, tag, endpoint); err != nil {
return err
}
}
}
return nil
}
// pushRepository pushes layers that do not already exist on the registry.
func (p *v1Pusher) pushRepository(tag string) error {
logrus.Debugf("Local repo: %s", p.localRepo)
p.out = ioutils.NewWriteFlusher(p.config.OutStream)
imgList, tags, err := p.getImageList(tag)
if err != nil {
return err
}
p.out.Write(p.sf.FormatStatus("", "Sending image list"))
imageIndex := p.createImageIndex(imgList, tags)
logrus.Debugf("Preparing to push %s with the following images and tags", p.localRepo)
for _, data := range imageIndex {
logrus.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
}
if _, err := p.poolAdd("push", p.repoInfo.LocalName); err != nil {
return err
}
defer p.poolRemove("push", p.repoInfo.LocalName)
// Register all the images in a repository with the registry
// If an image is not in this list it will not be associated with the repository
repoData, err := p.session.PushImageJSONIndex(p.repoInfo.RemoteName, imageIndex, false, nil)
if err != nil {
return err
}
nTag := 1
if tag == "" {
nTag = len(p.localRepo)
}
p.out.Write(p.sf.FormatStatus("", "Pushing repository %s (%d tags)", p.repoInfo.CanonicalName, nTag))
// push the repository to each of the endpoints only if it does not exist.
for _, endpoint := range repoData.Endpoints {
if err := p.pushImageToEndpoint(endpoint, imgList, tags, repoData); err != nil {
return err
}
}
_, err = p.session.PushImageJSONIndex(p.repoInfo.RemoteName, imageIndex, true, repoData.Endpoints)
return err
}
func (p *v1Pusher) pushImage(imgID, ep string, token []string) (checksum string, err error) {
jsonRaw, err := p.graph.RawJSON(imgID)
if err != nil {
return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err)
}
p.out.Write(p.sf.FormatProgress(stringid.TruncateID(imgID), "Pushing", nil))
imgData := &registry.ImgData{
ID: imgID,
}
// Send the json
if err := p.session.PushImageJSONRegistry(imgData, jsonRaw, ep); err != nil {
if err == registry.ErrAlreadyExists {
p.out.Write(p.sf.FormatProgress(stringid.TruncateID(imgData.ID), "Image already pushed, skipping", nil))
return "", nil
}
return "", err
}
layerData, err := p.graph.TempLayerArchive(imgID, p.sf, p.out)
if err != nil {
return "", fmt.Errorf("Failed to generate layer archive: %s", err)
}
defer os.RemoveAll(layerData.Name())
// Send the layer
logrus.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size)
checksum, checksumPayload, err := p.session.PushImageLayerRegistry(imgData.ID,
progressreader.New(progressreader.Config{
In: layerData,
Out: p.out,
Formatter: p.sf,
Size: int(layerData.Size),
NewLines: false,
ID: stringid.TruncateID(imgData.ID),
Action: "Pushing",
}), ep, jsonRaw)
if err != nil {
return "", err
}
imgData.Checksum = checksum
imgData.ChecksumPayload = checksumPayload
// Send the checksum
if err := p.session.PushImageChecksumRegistry(imgData, ep); err != nil {
return "", err
}
p.out.Write(p.sf.FormatProgress(stringid.TruncateID(imgData.ID), "Image successfully pushed", nil))
return imgData.Checksum, nil
}

254
graph/push_v2.go Normal file
Просмотреть файл

@ -0,0 +1,254 @@
package graph
import (
"fmt"
"io/ioutil"
"os"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/utils"
)
type v2Pusher struct {
*TagStore
endpoint registry.APIEndpoint
localRepo Repository
repoInfo *registry.RepositoryInfo
config *ImagePushConfig
sf *streamformatter.StreamFormatter
repo distribution.Repository
}
func (p *v2Pusher) Push() (fallback bool, err error) {
p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig)
if err != nil {
logrus.Debugf("Error getting v2 registry: %v", err)
return true, err
}
return false, p.pushV2Repository(p.config.Tag)
}
func (p *v2Pusher) getImageTags(askedTag string) ([]string, error) {
logrus.Debugf("Checking %q against %#v", askedTag, p.localRepo)
if len(askedTag) > 0 {
if _, ok := p.localRepo[askedTag]; !ok || utils.DigestReference(askedTag) {
return nil, fmt.Errorf("Tag does not exist for %s", askedTag)
}
return []string{askedTag}, nil
}
var tags []string
for tag := range p.localRepo {
if !utils.DigestReference(tag) {
tags = append(tags, tag)
}
}
return tags, nil
}
func (p *v2Pusher) pushV2Repository(tag string) error {
localName := p.repoInfo.LocalName
if _, err := p.poolAdd("push", localName); err != nil {
return err
}
defer p.poolRemove("push", localName)
tags, err := p.getImageTags(tag)
if err != nil {
return fmt.Errorf("error getting tags for %s: %s", localName, err)
}
if len(tags) == 0 {
return fmt.Errorf("no tags to push for %s", localName)
}
for _, tag := range tags {
if err := p.pushV2Tag(tag); err != nil {
return err
}
}
return nil
}
func (p *v2Pusher) pushV2Tag(tag string) error {
logrus.Debugf("Pushing repository: %s:%s", p.repo.Name(), tag)
layerId, exists := p.localRepo[tag]
if !exists {
return fmt.Errorf("tag does not exist: %s", tag)
}
layersSeen := make(map[string]bool)
layer, err := p.graph.Get(layerId)
if err != nil {
return err
}
m := &manifest.Manifest{
Versioned: manifest.Versioned{
SchemaVersion: 1,
},
Name: p.repo.Name(),
Tag: tag,
Architecture: layer.Architecture,
FSLayers: []manifest.FSLayer{},
History: []manifest.History{},
}
var metadata runconfig.Config
if layer != nil && layer.Config != nil {
metadata = *layer.Config
}
out := p.config.OutStream
for ; layer != nil; layer, err = p.graph.GetParent(layer) {
if err != nil {
return err
}
if layersSeen[layer.ID] {
break
}
logrus.Debugf("Pushing layer: %s", layer.ID)
if layer.Config != nil && metadata.Image != layer.ID {
if err := runconfig.Merge(&metadata, layer.Config); err != nil {
return err
}
}
jsonData, err := p.graph.RawJSON(layer.ID)
if err != nil {
return fmt.Errorf("cannot retrieve the path for %s: %s", layer.ID, err)
}
var exists bool
dgst, err := p.graph.GetDigest(layer.ID)
switch err {
case nil:
_, err := p.repo.Blobs(nil).Stat(nil, dgst)
switch err {
case nil:
exists = true
out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image already exists", nil))
case distribution.ErrBlobUnknown:
// nop
default:
out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image push failed", nil))
return err
}
case ErrDigestNotSet:
// nop
case digest.ErrDigestInvalidFormat, digest.ErrDigestUnsupported:
return fmt.Errorf("error getting image checksum: %v", err)
}
// if digest was empty or not saved, or if blob does not exist on the remote repository,
// then fetch it.
if !exists {
if pushDigest, err := p.pushV2Image(p.repo.Blobs(nil), layer); err != nil {
return err
} else if pushDigest != dgst {
// Cache new checksum
if err := p.graph.SetDigest(layer.ID, pushDigest); err != nil {
return err
}
dgst = pushDigest
}
}
m.FSLayers = append(m.FSLayers, manifest.FSLayer{BlobSum: dgst})
m.History = append(m.History, manifest.History{V1Compatibility: string(jsonData)})
layersSeen[layer.ID] = true
}
logrus.Infof("Signed manifest for %s:%s using daemon's key: %s", p.repo.Name(), tag, p.trustKey.KeyID())
signed, err := manifest.Sign(m, p.trustKey)
if err != nil {
return err
}
manifestDigest, err := digestFromManifest(signed, p.repo.Name())
if err != nil {
return err
}
if manifestDigest != "" {
out.Write(p.sf.FormatStatus("", "Digest: %s", manifestDigest))
}
return p.repo.Manifests().Put(signed)
}
func (p *v2Pusher) pushV2Image(bs distribution.BlobService, img *Image) (digest.Digest, error) {
out := p.config.OutStream
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Buffering to Disk", nil))
image, err := p.graph.Get(img.ID)
if err != nil {
return "", err
}
arch, err := p.graph.TarLayer(image)
if err != nil {
return "", err
}
tf, err := p.graph.newTempFile()
if err != nil {
return "", err
}
defer func() {
tf.Close()
os.Remove(tf.Name())
}()
size, dgst, err := bufferToFile(tf, arch)
if err != nil {
return "", err
}
// Send the layer
logrus.Debugf("rendered layer for %s of [%d] size", img.ID, size)
layerUpload, err := bs.Create(nil)
if err != nil {
return "", err
}
defer layerUpload.Close()
reader := progressreader.New(progressreader.Config{
In: ioutil.NopCloser(tf),
Out: out,
Formatter: p.sf,
Size: int(size),
NewLines: false,
ID: stringid.TruncateID(img.ID),
Action: "Pushing",
})
n, err := layerUpload.ReadFrom(reader)
if err != nil {
return "", err
}
if n != size {
return "", fmt.Errorf("short upload: only wrote %d of %d", n, size)
}
desc := distribution.Descriptor{Digest: dgst}
if _, err := layerUpload.Commit(nil, desc); err != nil {
return "", err
}
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Image successfully pushed", nil))
return dgst, nil
}

111
graph/registry.go Normal file
Просмотреть файл

@ -0,0 +1,111 @@
package graph
import (
"errors"
"net"
"net/http"
"net/url"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/registry/client"
"github.com/docker/distribution/registry/client/auth"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/docker/cliconfig"
"github.com/docker/docker/registry"
"golang.org/x/net/context"
)
type dumbCredentialStore struct {
auth *cliconfig.AuthConfig
}
func (dcs dumbCredentialStore) Basic(*url.URL) (string, string) {
return dcs.auth.Username, dcs.auth.Password
}
// v2 only
func NewV2Repository(repoInfo *registry.RepositoryInfo, endpoint registry.APIEndpoint, metaHeaders http.Header, authConfig *cliconfig.AuthConfig) (distribution.Repository, error) {
ctx := context.Background()
repoName := repoInfo.CanonicalName
// If endpoint does not support CanonicalName, use the RemoteName instead
if endpoint.TrimHostname {
repoName = repoInfo.RemoteName
}
// TODO(dmcgowan): Call close idle connections when complete, use keep alive
base := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: endpoint.TLSConfig,
// TODO(dmcgowan): Call close idle connections when complete and use keep alive
DisableKeepAlives: true,
}
modifiers := registry.DockerHeaders(metaHeaders)
authTransport := transport.NewTransport(base, modifiers...)
pingClient := &http.Client{
Transport: authTransport,
Timeout: 5 * time.Second,
}
endpointStr := endpoint.URL + "/v2/"
req, err := http.NewRequest("GET", endpointStr, nil)
if err != nil {
return nil, err
}
resp, err := pingClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
versions := auth.APIVersions(resp, endpoint.VersionHeader)
if endpoint.VersionHeader != "" && len(endpoint.Versions) > 0 {
var foundVersion bool
for _, version := range endpoint.Versions {
for _, pingVersion := range versions {
if version == pingVersion {
foundVersion = true
}
}
}
if !foundVersion {
return nil, errors.New("endpoint does not support v2 API")
}
}
challengeManager := auth.NewSimpleChallengeManager()
if err := challengeManager.AddResponse(resp); err != nil {
return nil, err
}
creds := dumbCredentialStore{auth: authConfig}
tokenHandler := auth.NewTokenHandler(authTransport, creds, repoName, "push", "pull")
basicHandler := auth.NewBasicHandler(creds)
modifiers = append(modifiers, auth.NewAuthorizer(challengeManager, tokenHandler, basicHandler))
tr := transport.NewTransport(base, modifiers...)
return client.NewRepository(ctx, repoName, endpoint.URL, tr)
}
func digestFromManifest(m *manifest.SignedManifest, localName string) (digest.Digest, error) {
payload, err := m.Payload()
if err != nil {
logrus.Debugf("could not retrieve manifest payload: %v", err)
return "", err
}
manifestDigest, err := digest.FromBytes(payload)
if err != nil {
logrus.Infof("Could not compute manifest digest for %s:%s : %v", localName, m.Tag, err)
}
return manifestDigest, nil
}

Просмотреть файл

@ -104,7 +104,7 @@ func (s *DockerRegistrySuite) TestPullByDigestNoFallback(c *check.C) {
// pull from the registry using the <name>@<digest> reference
imageReference := fmt.Sprintf("%s@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", repoName)
out, _, err := dockerCmdWithError(c, "pull", imageReference)
if err == nil || !strings.Contains(out, "pulling with digest reference failed from v2 registry") {
if err == nil || !strings.Contains(out, "manifest unknown") {
c.Fatalf("expected non-zero exit status and correct error message when pulling non-existing image: %s", out)
}
}

Просмотреть файл

@ -89,8 +89,6 @@ func (s *DockerSuite) TestPullImageOfficialNames(c *check.C) {
testRequires(c, Network)
names := []string{
"docker.io/hello-world",
"index.docker.io/hello-world",
"library/hello-world",
"docker.io/library/hello-world",
"index.docker.io/library/hello-world",

Просмотреть файл

@ -125,7 +125,7 @@ func loginV1(authConfig *cliconfig.AuthConfig, registryEndpoint *Endpoint) (stri
return "", fmt.Errorf("Server Error: Server Address not set.")
}
loginAgainstOfficialIndex := serverAddress == IndexServerAddress()
loginAgainstOfficialIndex := serverAddress == INDEXSERVER
// to avoid sending the server address to the server it should be removed before being marshalled
authCopy := *authConfig

Просмотреть файл

@ -37,7 +37,7 @@ func setupTempConfigFile() (*cliconfig.ConfigFile, error) {
root = filepath.Join(root, cliconfig.CONFIGFILE)
configFile := cliconfig.NewConfigFile(root)
for _, registry := range []string{"testIndex", IndexServerAddress()} {
for _, registry := range []string{"testIndex", INDEXSERVER} {
configFile.AuthConfigs[registry] = cliconfig.AuthConfig{
Username: "docker-user",
Password: "docker-pass",
@ -82,7 +82,7 @@ func TestResolveAuthConfigIndexServer(t *testing.T) {
}
defer os.RemoveAll(configFile.Filename())
indexConfig := configFile.AuthConfigs[IndexServerAddress()]
indexConfig := configFile.AuthConfigs[INDEXSERVER]
officialIndex := &IndexInfo{
Official: true,
@ -92,10 +92,10 @@ func TestResolveAuthConfigIndexServer(t *testing.T) {
}
resolved := ResolveAuthConfig(configFile, officialIndex)
assertEqual(t, resolved, indexConfig, "Expected ResolveAuthConfig to return IndexServerAddress()")
assertEqual(t, resolved, indexConfig, "Expected ResolveAuthConfig to return INDEXSERVER")
resolved = ResolveAuthConfig(configFile, privateIndex)
assertNotEqual(t, resolved, indexConfig, "Expected ResolveAuthConfig to not return IndexServerAddress()")
assertNotEqual(t, resolved, indexConfig, "Expected ResolveAuthConfig to not return INDEXSERVER")
}
func TestResolveAuthConfigFullURL(t *testing.T) {
@ -120,7 +120,7 @@ func TestResolveAuthConfigFullURL(t *testing.T) {
Password: "baz-pass",
Email: "baz@example.com",
}
configFile.AuthConfigs[IndexServerAddress()] = officialAuth
configFile.AuthConfigs[INDEXSERVER] = officialAuth
expectedAuths := map[string]cliconfig.AuthConfig{
"registry.example.com": registryAuth,

Просмотреть файл

@ -21,9 +21,16 @@ type Options struct {
}
const (
DEFAULT_NAMESPACE = "docker.io"
DEFAULT_V2_REGISTRY = "https://registry-1.docker.io"
DEFAULT_REGISTRY_VERSION_HEADER = "Docker-Distribution-Api-Version"
DEFAULT_V1_REGISTRY = "https://index.docker.io"
CERTS_DIR = "/etc/docker/certs.d"
// Only used for user auth + account creation
INDEXSERVER = "https://index.docker.io/v1/"
REGISTRYSERVER = "https://registry-1.docker.io/v2/"
REGISTRYSERVER = DEFAULT_V2_REGISTRY
INDEXSERVER = DEFAULT_V1_REGISTRY + "/v1/"
INDEXNAME = "docker.io"
// INDEXSERVER = "https://registry-stage.hub.docker.com/v1/"
@ -34,14 +41,6 @@ var (
emptyServiceConfig = NewServiceConfig(nil)
)
func IndexServerAddress() string {
return INDEXSERVER
}
func IndexServerName() string {
return INDEXNAME
}
// InstallFlags adds command-line options to the top-level flag parser for
// the current process.
func (options *Options) InstallFlags() {
@ -72,6 +71,7 @@ func (ipnet *netIPNet) UnmarshalJSON(b []byte) (err error) {
type ServiceConfig struct {
InsecureRegistryCIDRs []*netIPNet `json:"InsecureRegistryCIDRs"`
IndexConfigs map[string]*IndexInfo `json:"IndexConfigs"`
Mirrors []string
}
// NewServiceConfig returns a new instance of ServiceConfig
@ -93,6 +93,9 @@ func NewServiceConfig(options *Options) *ServiceConfig {
config := &ServiceConfig{
InsecureRegistryCIDRs: make([]*netIPNet, 0),
IndexConfigs: make(map[string]*IndexInfo, 0),
// Hack: Bypass setting the mirrors to IndexConfigs since they are going away
// and Mirrors are only for the official registry anyways.
Mirrors: options.Mirrors.GetAll(),
}
// Split --insecure-registry into CIDR and registry-specific settings.
for _, r := range options.InsecureRegistries.GetAll() {
@ -113,9 +116,9 @@ func NewServiceConfig(options *Options) *ServiceConfig {
}
// Configure public registry.
config.IndexConfigs[IndexServerName()] = &IndexInfo{
Name: IndexServerName(),
Mirrors: options.Mirrors.GetAll(),
config.IndexConfigs[INDEXNAME] = &IndexInfo{
Name: INDEXNAME,
Mirrors: config.Mirrors,
Secure: true,
Official: true,
}
@ -193,8 +196,8 @@ func ValidateMirror(val string) (string, error) {
// ValidateIndexName validates an index name.
func ValidateIndexName(val string) (string, error) {
// 'index.docker.io' => 'docker.io'
if val == "index."+IndexServerName() {
val = IndexServerName()
if val == "index."+INDEXNAME {
val = INDEXNAME
}
if strings.HasPrefix(val, "-") || strings.HasSuffix(val, "-") {
return "", fmt.Errorf("Invalid index name (%s). Cannot begin or end with a hyphen.", val)
@ -264,7 +267,7 @@ func (config *ServiceConfig) NewIndexInfo(indexName string) (*IndexInfo, error)
// index as the AuthConfig key, and uses the (host)name[:port] for private indexes.
func (index *IndexInfo) GetAuthConfigKey() string {
if index.Official {
return IndexServerAddress()
return INDEXSERVER
}
return index.Name
}
@ -277,7 +280,7 @@ func splitReposName(reposName string) (string, string) {
!strings.Contains(nameParts[0], ":") && nameParts[0] != "localhost") {
// This is a Docker Index repos (ex: samalba/hipache or ubuntu)
// 'docker.io'
indexName = IndexServerName()
indexName = INDEXNAME
remoteName = reposName
} else {
indexName = nameParts[0]

Просмотреть файл

@ -1,6 +1,7 @@
package registry
import (
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
@ -12,6 +13,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/docker/pkg/tlsconfig"
)
// for mocking in unit tests
@ -44,7 +46,9 @@ func scanForAPIVersion(address string) (string, APIVersion) {
// NewEndpoint parses the given address to return a registry endpoint.
func NewEndpoint(index *IndexInfo, metaHeaders http.Header) (*Endpoint, error) {
// *TODO: Allow per-registry configuration of endpoints.
endpoint, err := newEndpoint(index.GetAuthConfigKey(), index.Secure, metaHeaders)
tlsConfig := tlsconfig.ServerDefault
tlsConfig.InsecureSkipVerify = !index.Secure
endpoint, err := newEndpoint(index.GetAuthConfigKey(), &tlsConfig, metaHeaders)
if err != nil {
return nil, err
}
@ -82,7 +86,7 @@ func validateEndpoint(endpoint *Endpoint) error {
return nil
}
func newEndpoint(address string, secure bool, metaHeaders http.Header) (*Endpoint, error) {
func newEndpoint(address string, tlsConfig *tls.Config, metaHeaders http.Header) (*Endpoint, error) {
var (
endpoint = new(Endpoint)
trimmedAddress string
@ -93,13 +97,16 @@ func newEndpoint(address string, secure bool, metaHeaders http.Header) (*Endpoin
address = "https://" + address
}
endpoint.IsSecure = (tlsConfig == nil || !tlsConfig.InsecureSkipVerify)
trimmedAddress, endpoint.Version = scanForAPIVersion(address)
if endpoint.URL, err = url.Parse(trimmedAddress); err != nil {
return nil, err
}
endpoint.IsSecure = secure
tr := NewTransport(ConnectTimeout, endpoint.IsSecure)
// TODO(tiborvass): make sure a ConnectTimeout transport is used
tr := NewTransport(tlsConfig)
endpoint.client = HTTPClient(transport.NewTransport(tr, DockerHeaders(metaHeaders)...))
return endpoint, nil
}
@ -166,7 +173,7 @@ func (e *Endpoint) Ping() (RegistryInfo, error) {
func (e *Endpoint) pingV1() (RegistryInfo, error) {
logrus.Debugf("attempting v1 ping for registry endpoint %s", e)
if e.String() == IndexServerAddress() {
if e.String() == INDEXSERVER {
// Skip the check, we know this one is valid
// (and we never want to fallback to http in case of error)
return RegistryInfo{Standalone: false}, nil

Просмотреть файл

@ -12,14 +12,14 @@ func TestEndpointParse(t *testing.T) {
str string
expected string
}{
{IndexServerAddress(), IndexServerAddress()},
{INDEXSERVER, INDEXSERVER},
{"http://0.0.0.0:5000/v1/", "http://0.0.0.0:5000/v1/"},
{"http://0.0.0.0:5000/v2/", "http://0.0.0.0:5000/v2/"},
{"http://0.0.0.0:5000", "http://0.0.0.0:5000/v0/"},
{"0.0.0.0:5000", "https://0.0.0.0:5000/v0/"},
}
for _, td := range testData {
e, err := newEndpoint(td.str, false, nil)
e, err := newEndpoint(td.str, nil, nil)
if err != nil {
t.Errorf("%q: %s", td.str, err)
}
@ -60,7 +60,7 @@ func TestValidateEndpointAmbiguousAPIVersion(t *testing.T) {
testEndpoint := Endpoint{
URL: testServerURL,
Version: APIVersionUnknown,
client: HTTPClient(NewTransport(ConnectTimeout, false)),
client: HTTPClient(NewTransport(nil)),
}
if err = validateEndpoint(&testEndpoint); err != nil {

Просмотреть файл

@ -2,25 +2,20 @@ package registry
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"path"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/registry/api/errcode"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/docker/autogen/dockerversion"
"github.com/docker/docker/pkg/parsers/kernel"
"github.com/docker/docker/pkg/timeoutconn"
"github.com/docker/docker/pkg/tlsconfig"
"github.com/docker/docker/pkg/useragent"
)
@ -57,135 +52,13 @@ func init() {
dockerUserAgent = useragent.AppendVersions("", httpVersion...)
}
type httpsRequestModifier struct {
mu sync.Mutex
tlsConfig *tls.Config
}
// DRAGONS(tiborvass): If someone wonders why do we set tlsconfig in a roundtrip,
// it's because it's so as to match the current behavior in master: we generate the
// certpool on every-goddam-request. It's not great, but it allows people to just put
// the certs in /etc/docker/certs.d/.../ and let docker "pick it up" immediately. Would
// prefer an fsnotify implementation, but that was out of scope of my refactoring.
func (m *httpsRequestModifier) ModifyRequest(req *http.Request) error {
var (
roots *x509.CertPool
certs []tls.Certificate
hostDir string
)
if req.URL.Scheme == "https" {
hasFile := func(files []os.FileInfo, name string) bool {
for _, f := range files {
if f.Name() == name {
return true
}
}
return false
}
if runtime.GOOS == "windows" {
hostDir = path.Join(os.TempDir(), "/docker/certs.d", req.URL.Host)
} else {
hostDir = path.Join("/etc/docker/certs.d", req.URL.Host)
}
logrus.Debugf("hostDir: %s", hostDir)
fs, err := ioutil.ReadDir(hostDir)
if err != nil && !os.IsNotExist(err) {
return err
}
for _, f := range fs {
if strings.HasSuffix(f.Name(), ".crt") {
if roots == nil {
roots = x509.NewCertPool()
}
logrus.Debugf("crt: %s", hostDir+"/"+f.Name())
data, err := ioutil.ReadFile(filepath.Join(hostDir, f.Name()))
if err != nil {
return err
}
roots.AppendCertsFromPEM(data)
}
if strings.HasSuffix(f.Name(), ".cert") {
certName := f.Name()
keyName := certName[:len(certName)-5] + ".key"
logrus.Debugf("cert: %s", hostDir+"/"+f.Name())
if !hasFile(fs, keyName) {
return fmt.Errorf("Missing key %s for certificate %s", keyName, certName)
}
cert, err := tls.LoadX509KeyPair(filepath.Join(hostDir, certName), path.Join(hostDir, keyName))
if err != nil {
return err
}
certs = append(certs, cert)
}
if strings.HasSuffix(f.Name(), ".key") {
keyName := f.Name()
certName := keyName[:len(keyName)-4] + ".cert"
logrus.Debugf("key: %s", hostDir+"/"+f.Name())
if !hasFile(fs, certName) {
return fmt.Errorf("Missing certificate %s for key %s", certName, keyName)
}
}
}
m.mu.Lock()
m.tlsConfig.RootCAs = roots
m.tlsConfig.Certificates = certs
m.mu.Unlock()
}
return nil
}
func NewTransport(timeout TimeoutType, secure bool) http.RoundTripper {
tlsConfig := &tls.Config{
// Avoid fallback to SSL protocols < TLS1.0
MinVersion: tls.VersionTLS10,
InsecureSkipVerify: !secure,
CipherSuites: tlsconfig.DefaultServerAcceptedCiphers,
}
tr := &http.Transport{
DisableKeepAlives: true,
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsConfig,
}
switch timeout {
case ConnectTimeout:
tr.Dial = func(proto string, addr string) (net.Conn, error) {
// Set the connect timeout to 30 seconds to allow for slower connection
// times...
d := net.Dialer{Timeout: 30 * time.Second, DualStack: true}
conn, err := d.Dial(proto, addr)
if err != nil {
return nil, err
}
// Set the recv timeout to 10 seconds
conn.SetDeadline(time.Now().Add(10 * time.Second))
return conn, nil
}
case ReceiveTimeout:
tr.Dial = func(proto string, addr string) (net.Conn, error) {
d := net.Dialer{DualStack: true}
conn, err := d.Dial(proto, addr)
if err != nil {
return nil, err
}
conn = timeoutconn.New(conn, 1*time.Minute)
return conn, nil
func hasFile(files []os.FileInfo, name string) bool {
for _, f := range files {
if f.Name() == name {
return true
}
}
if secure {
// note: httpsTransport also handles http transport
// but for HTTPS, it sets up the certs
return transport.NewTransport(tr, &httpsRequestModifier{tlsConfig: tlsConfig})
}
return tr
return false
}
// DockerHeaders returns request modifiers that ensure requests have
@ -202,10 +75,6 @@ func DockerHeaders(metaHeaders http.Header) []transport.RequestModifier {
}
func HTTPClient(transport http.RoundTripper) *http.Client {
if transport == nil {
transport = NewTransport(ConnectTimeout, true)
}
return &http.Client{
Transport: transport,
CheckRedirect: AddRequiredHeadersToRedirectedRequests,
@ -245,3 +114,52 @@ func AddRequiredHeadersToRedirectedRequests(req *http.Request, via []*http.Reque
}
return nil
}
func shouldV2Fallback(err errcode.Error) bool {
logrus.Debugf("v2 error: %T %v", err, err)
switch err.Code {
case v2.ErrorCodeUnauthorized, v2.ErrorCodeManifestUnknown:
return true
}
return false
}
type ErrNoSupport struct{ Err error }
func (e ErrNoSupport) Error() string {
if e.Err == nil {
return "not supported"
}
return e.Err.Error()
}
func ContinueOnError(err error) bool {
switch v := err.(type) {
case errcode.Errors:
return ContinueOnError(v[0])
case ErrNoSupport:
return ContinueOnError(v.Err)
case errcode.Error:
return shouldV2Fallback(v)
}
return false
}
func NewTransport(tlsConfig *tls.Config) *http.Transport {
if tlsConfig == nil {
var cfg = tlsconfig.ServerDefault
tlsConfig = &cfg
}
return &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: tlsConfig,
// TODO(dmcgowan): Call close idle connections when complete and use keep alive
DisableKeepAlives: true,
}
}

Просмотреть файл

@ -165,7 +165,7 @@ func makeHttpsIndex(req string) *IndexInfo {
func makePublicIndex() *IndexInfo {
index := &IndexInfo{
Name: IndexServerAddress(),
Name: INDEXSERVER,
Secure: true,
Official: true,
}

Просмотреть файл

@ -27,7 +27,7 @@ func spawnTestRegistrySession(t *testing.T) *Session {
if err != nil {
t.Fatal(err)
}
var tr http.RoundTripper = debugTransport{NewTransport(ReceiveTimeout, endpoint.IsSecure), t.Log}
var tr http.RoundTripper = debugTransport{NewTransport(nil), t.Log}
tr = transport.NewTransport(AuthTransport(tr, authConfig, false), DockerHeaders(nil)...)
client := HTTPClient(tr)
r, err := NewSession(client, authConfig, endpoint)
@ -332,7 +332,7 @@ func TestParseRepositoryInfo(t *testing.T) {
expectedRepoInfos := map[string]RepositoryInfo{
"fooo/bar": {
Index: &IndexInfo{
Name: IndexServerName(),
Name: INDEXNAME,
Official: true,
},
RemoteName: "fooo/bar",
@ -342,7 +342,7 @@ func TestParseRepositoryInfo(t *testing.T) {
},
"library/ubuntu": {
Index: &IndexInfo{
Name: IndexServerName(),
Name: INDEXNAME,
Official: true,
},
RemoteName: "library/ubuntu",
@ -352,7 +352,7 @@ func TestParseRepositoryInfo(t *testing.T) {
},
"nonlibrary/ubuntu": {
Index: &IndexInfo{
Name: IndexServerName(),
Name: INDEXNAME,
Official: true,
},
RemoteName: "nonlibrary/ubuntu",
@ -362,7 +362,7 @@ func TestParseRepositoryInfo(t *testing.T) {
},
"ubuntu": {
Index: &IndexInfo{
Name: IndexServerName(),
Name: INDEXNAME,
Official: true,
},
RemoteName: "library/ubuntu",
@ -372,7 +372,7 @@ func TestParseRepositoryInfo(t *testing.T) {
},
"other/library": {
Index: &IndexInfo{
Name: IndexServerName(),
Name: INDEXNAME,
Official: true,
},
RemoteName: "other/library",
@ -480,9 +480,9 @@ func TestParseRepositoryInfo(t *testing.T) {
CanonicalName: "localhost/privatebase",
Official: false,
},
IndexServerName() + "/public/moonbase": {
INDEXNAME + "/public/moonbase": {
Index: &IndexInfo{
Name: IndexServerName(),
Name: INDEXNAME,
Official: true,
},
RemoteName: "public/moonbase",
@ -490,19 +490,9 @@ func TestParseRepositoryInfo(t *testing.T) {
CanonicalName: "docker.io/public/moonbase",
Official: false,
},
"index." + IndexServerName() + "/public/moonbase": {
"index." + INDEXNAME + "/public/moonbase": {
Index: &IndexInfo{
Name: IndexServerName(),
Official: true,
},
RemoteName: "public/moonbase",
LocalName: "public/moonbase",
CanonicalName: "docker.io/public/moonbase",
Official: false,
},
IndexServerName() + "/public/moonbase": {
Index: &IndexInfo{
Name: IndexServerName(),
Name: INDEXNAME,
Official: true,
},
RemoteName: "public/moonbase",
@ -512,7 +502,7 @@ func TestParseRepositoryInfo(t *testing.T) {
},
"ubuntu-12.04-base": {
Index: &IndexInfo{
Name: IndexServerName(),
Name: INDEXNAME,
Official: true,
},
RemoteName: "library/ubuntu-12.04-base",
@ -520,9 +510,9 @@ func TestParseRepositoryInfo(t *testing.T) {
CanonicalName: "docker.io/library/ubuntu-12.04-base",
Official: true,
},
IndexServerName() + "/ubuntu-12.04-base": {
INDEXNAME + "/ubuntu-12.04-base": {
Index: &IndexInfo{
Name: IndexServerName(),
Name: INDEXNAME,
Official: true,
},
RemoteName: "library/ubuntu-12.04-base",
@ -530,19 +520,9 @@ func TestParseRepositoryInfo(t *testing.T) {
CanonicalName: "docker.io/library/ubuntu-12.04-base",
Official: true,
},
IndexServerName() + "/ubuntu-12.04-base": {
"index." + INDEXNAME + "/ubuntu-12.04-base": {
Index: &IndexInfo{
Name: IndexServerName(),
Official: true,
},
RemoteName: "library/ubuntu-12.04-base",
LocalName: "ubuntu-12.04-base",
CanonicalName: "docker.io/library/ubuntu-12.04-base",
Official: true,
},
"index." + IndexServerName() + "/ubuntu-12.04-base": {
Index: &IndexInfo{
Name: IndexServerName(),
Name: INDEXNAME,
Official: true,
},
RemoteName: "library/ubuntu-12.04-base",
@ -585,14 +565,14 @@ func TestNewIndexInfo(t *testing.T) {
config := NewServiceConfig(nil)
noMirrors := make([]string, 0)
expectedIndexInfos := map[string]*IndexInfo{
IndexServerName(): {
Name: IndexServerName(),
INDEXNAME: {
Name: INDEXNAME,
Official: true,
Secure: true,
Mirrors: noMirrors,
},
"index." + IndexServerName(): {
Name: IndexServerName(),
"index." + INDEXNAME: {
Name: INDEXNAME,
Official: true,
Secure: true,
Mirrors: noMirrors,
@ -616,14 +596,14 @@ func TestNewIndexInfo(t *testing.T) {
config = makeServiceConfig(publicMirrors, []string{"example.com"})
expectedIndexInfos = map[string]*IndexInfo{
IndexServerName(): {
Name: IndexServerName(),
INDEXNAME: {
Name: INDEXNAME,
Official: true,
Secure: true,
Mirrors: publicMirrors,
},
"index." + IndexServerName(): {
Name: IndexServerName(),
"index." + INDEXNAME: {
Name: INDEXNAME,
Official: true,
Secure: true,
Mirrors: publicMirrors,
@ -880,7 +860,7 @@ func TestIsSecureIndex(t *testing.T) {
insecureRegistries []string
expected bool
}{
{IndexServerName(), nil, true},
{INDEXNAME, nil, true},
{"example.com", []string{}, true},
{"example.com", []string{"example.com"}, false},
{"localhost", []string{"localhost:5000"}, false},

Просмотреть файл

@ -1,9 +1,19 @@
package registry
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"strings"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/registry/client/auth"
"github.com/docker/docker/cliconfig"
"github.com/docker/docker/pkg/tlsconfig"
)
type Service struct {
@ -25,7 +35,7 @@ func (s *Service) Auth(authConfig *cliconfig.AuthConfig) (string, error) {
addr := authConfig.ServerAddress
if addr == "" {
// Use the official registry address if not specified.
addr = IndexServerAddress()
addr = INDEXSERVER
}
index, err := s.ResolveIndex(addr)
if err != nil {
@ -69,3 +79,186 @@ func (s *Service) ResolveRepository(name string) (*RepositoryInfo, error) {
func (s *Service) ResolveIndex(name string) (*IndexInfo, error) {
return s.Config.NewIndexInfo(name)
}
type APIEndpoint struct {
Mirror bool
URL string
Version APIVersion
Official bool
TrimHostname bool
TLSConfig *tls.Config
VersionHeader string
Versions []auth.APIVersion
}
func (e APIEndpoint) ToV1Endpoint(metaHeaders http.Header) (*Endpoint, error) {
return newEndpoint(e.URL, e.TLSConfig, metaHeaders)
}
func (s *Service) TlsConfig(hostname string) (*tls.Config, error) {
// we construct a client tls config from server defaults
// PreferredServerCipherSuites should have no effect
tlsConfig := tlsconfig.ServerDefault
isSecure := s.Config.isSecureIndex(hostname)
tlsConfig.InsecureSkipVerify = !isSecure
if isSecure {
hasFile := func(files []os.FileInfo, name string) bool {
for _, f := range files {
if f.Name() == name {
return true
}
}
return false
}
hostDir := filepath.Join(CERTS_DIR, hostname)
logrus.Debugf("hostDir: %s", hostDir)
fs, err := ioutil.ReadDir(hostDir)
if err != nil && !os.IsNotExist(err) {
return nil, err
}
for _, f := range fs {
if strings.HasSuffix(f.Name(), ".crt") {
if tlsConfig.RootCAs == nil {
// TODO(dmcgowan): Copy system pool
tlsConfig.RootCAs = x509.NewCertPool()
}
logrus.Debugf("crt: %s", filepath.Join(hostDir, f.Name()))
data, err := ioutil.ReadFile(filepath.Join(hostDir, f.Name()))
if err != nil {
return nil, err
}
tlsConfig.RootCAs.AppendCertsFromPEM(data)
}
if strings.HasSuffix(f.Name(), ".cert") {
certName := f.Name()
keyName := certName[:len(certName)-5] + ".key"
logrus.Debugf("cert: %s", filepath.Join(hostDir, f.Name()))
if !hasFile(fs, keyName) {
return nil, fmt.Errorf("Missing key %s for certificate %s", keyName, certName)
}
cert, err := tls.LoadX509KeyPair(filepath.Join(hostDir, certName), filepath.Join(hostDir, keyName))
if err != nil {
return nil, err
}
tlsConfig.Certificates = append(tlsConfig.Certificates, cert)
}
if strings.HasSuffix(f.Name(), ".key") {
keyName := f.Name()
certName := keyName[:len(keyName)-4] + ".cert"
logrus.Debugf("key: %s", filepath.Join(hostDir, f.Name()))
if !hasFile(fs, certName) {
return nil, fmt.Errorf("Missing certificate %s for key %s", certName, keyName)
}
}
}
}
return &tlsConfig, nil
}
func (s *Service) LookupEndpoints(repoName string) (endpoints []APIEndpoint, err error) {
var cfg = tlsconfig.ServerDefault
tlsConfig := &cfg
if strings.HasPrefix(repoName, DEFAULT_NAMESPACE+"/") {
// v2 mirrors
for _, mirror := range s.Config.Mirrors {
endpoints = append(endpoints, APIEndpoint{
URL: mirror,
// guess mirrors are v2
Version: APIVersion2,
Mirror: true,
TrimHostname: true,
TLSConfig: tlsConfig,
})
}
// v2 registry
endpoints = append(endpoints, APIEndpoint{
URL: DEFAULT_V2_REGISTRY,
Version: APIVersion2,
Official: true,
TrimHostname: true,
TLSConfig: tlsConfig,
})
// v1 mirrors
// TODO(tiborvass): shouldn't we remove v1 mirrors from here, since v1 mirrors are kinda special?
for _, mirror := range s.Config.Mirrors {
endpoints = append(endpoints, APIEndpoint{
URL: mirror,
// guess mirrors are v1
Version: APIVersion1,
Mirror: true,
TrimHostname: true,
TLSConfig: tlsConfig,
})
}
// v1 registry
endpoints = append(endpoints, APIEndpoint{
URL: DEFAULT_V1_REGISTRY,
Version: APIVersion1,
Official: true,
TrimHostname: true,
TLSConfig: tlsConfig,
})
return endpoints, nil
}
slashIndex := strings.IndexRune(repoName, '/')
if slashIndex <= 0 {
return nil, fmt.Errorf("invalid repo name: missing '/': %s", repoName)
}
hostname := repoName[:slashIndex]
tlsConfig, err = s.TlsConfig(hostname)
if err != nil {
return nil, err
}
isSecure := !tlsConfig.InsecureSkipVerify
v2Versions := []auth.APIVersion{
{
Type: "registry",
Version: "2.0",
},
}
endpoints = []APIEndpoint{
{
URL: "https://" + hostname,
Version: APIVersion2,
TrimHostname: true,
TLSConfig: tlsConfig,
VersionHeader: DEFAULT_REGISTRY_VERSION_HEADER,
Versions: v2Versions,
},
{
URL: "https://" + hostname,
Version: APIVersion1,
TrimHostname: true,
TLSConfig: tlsConfig,
},
}
if !isSecure {
endpoints = append(endpoints, APIEndpoint{
URL: "http://" + hostname,
Version: APIVersion2,
TrimHostname: true,
// used to check if supposed to be secure via InsecureSkipVerify
TLSConfig: tlsConfig,
VersionHeader: DEFAULT_REGISTRY_VERSION_HEADER,
Versions: v2Versions,
}, APIEndpoint{
URL: "http://" + hostname,
Version: APIVersion1,
TrimHostname: true,
// used to check if supposed to be secure via InsecureSkipVerify
TLSConfig: tlsConfig,
})
}
return endpoints, nil
}

Просмотреть файл

@ -98,7 +98,7 @@ func (tr *authTransport) RoundTrip(orig *http.Request) (*http.Response, error) {
return tr.RoundTripper.RoundTrip(orig)
}
req := transport.CloneRequest(orig)
req := cloneRequest(orig)
tr.mu.Lock()
tr.modReq[orig] = req
tr.mu.Unlock()
@ -164,12 +164,11 @@ func NewSession(client *http.Client, authConfig *cliconfig.AuthConfig, endpoint
// If we're working with a standalone private registry over HTTPS, send Basic Auth headers
// alongside all our requests.
if endpoint.VersionString(1) != IndexServerAddress() && endpoint.URL.Scheme == "https" {
if endpoint.VersionString(1) != INDEXSERVER && endpoint.URL.Scheme == "https" {
info, err := endpoint.Ping()
if err != nil {
return nil, err
}
if info.Standalone && authConfig != nil {
logrus.Debugf("Endpoint %s is eligible for private registry. Enabling decorator.", endpoint.String())
alwaysSetBasicAuth = true
@ -265,7 +264,7 @@ func (r *Session) GetRemoteImageLayer(imgID, registry string, imgSize int64) (io
if err != nil {
return nil, fmt.Errorf("Error while getting from the server: %v", err)
}
// TODO: why are we doing retries at this level?
// TODO(tiborvass): why are we doing retries at this level?
// These retries should be generic to both v1 and v2
for i := 1; i <= retries; i++ {
statusCode = 0
@ -432,7 +431,7 @@ func (r *Session) GetRepositoryData(remote string) (*RepositoryData, error) {
}
// Forge a better object from the retrieved data
imgsData := make(map[string]*ImgData)
imgsData := make(map[string]*ImgData, len(remoteChecksums))
for _, elem := range remoteChecksums {
imgsData[elem.ID] = elem
}

Просмотреть файл

@ -1,414 +0,0 @@
package registry
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/docker/pkg/httputils"
)
const DockerDigestHeader = "Docker-Content-Digest"
func getV2Builder(e *Endpoint) *v2.URLBuilder {
if e.URLBuilder == nil {
e.URLBuilder = v2.NewURLBuilder(e.URL)
}
return e.URLBuilder
}
func (r *Session) V2RegistryEndpoint(index *IndexInfo) (ep *Endpoint, err error) {
// TODO check if should use Mirror
if index.Official {
ep, err = newEndpoint(REGISTRYSERVER, true, nil)
if err != nil {
return
}
err = validateEndpoint(ep)
if err != nil {
return
}
} else if r.indexEndpoint.String() == index.GetAuthConfigKey() {
ep = r.indexEndpoint
} else {
ep, err = NewEndpoint(index, nil)
if err != nil {
return
}
}
ep.URLBuilder = v2.NewURLBuilder(ep.URL)
return
}
// GetV2Authorization gets the authorization needed to the given image
// If readonly access is requested, then the authorization may
// only be used for Get operations.
func (r *Session) GetV2Authorization(ep *Endpoint, imageName string, readOnly bool) (auth *RequestAuthorization, err error) {
scopes := []string{"pull"}
if !readOnly {
scopes = append(scopes, "push")
}
logrus.Debugf("Getting authorization for %s %s", imageName, scopes)
return NewRequestAuthorization(r.GetAuthConfig(true), ep, "repository", imageName, scopes), nil
}
//
// 1) Check if TarSum of each layer exists /v2/
// 1.a) if 200, continue
// 1.b) if 300, then push the
// 1.c) if anything else, err
// 2) PUT the created/signed manifest
//
// GetV2ImageManifest simply fetches the bytes of a manifest and the remote
// digest, if available in the request. Note that the application shouldn't
// rely on the untrusted remoteDigest, and should also verify against a
// locally provided digest, if applicable.
func (r *Session) GetV2ImageManifest(ep *Endpoint, imageName, tagName string, auth *RequestAuthorization) (remoteDigest digest.Digest, p []byte, err error) {
routeURL, err := getV2Builder(ep).BuildManifestURL(imageName, tagName)
if err != nil {
return "", nil, err
}
method := "GET"
logrus.Debugf("[registry] Calling %q %s", method, routeURL)
req, err := http.NewRequest(method, routeURL, nil)
if err != nil {
return "", nil, err
}
if err := auth.Authorize(req); err != nil {
return "", nil, err
}
res, err := r.client.Do(req)
if err != nil {
return "", nil, err
}
defer res.Body.Close()
if res.StatusCode != 200 {
if res.StatusCode == 401 {
return "", nil, errLoginRequired
} else if res.StatusCode == 404 {
return "", nil, ErrDoesNotExist
}
return "", nil, httputils.NewHTTPRequestError(fmt.Sprintf("Server error: %d trying to fetch for %s:%s", res.StatusCode, imageName, tagName), res)
}
p, err = ioutil.ReadAll(res.Body)
if err != nil {
return "", nil, fmt.Errorf("Error while reading the http response: %s", err)
}
dgstHdr := res.Header.Get(DockerDigestHeader)
if dgstHdr != "" {
remoteDigest, err = digest.ParseDigest(dgstHdr)
if err != nil {
// NOTE(stevvooe): Including the remote digest is optional. We
// don't need to verify against it, but it is good practice.
remoteDigest = ""
logrus.Debugf("error parsing remote digest when fetching %v: %v", routeURL, err)
}
}
return
}
// - Succeeded to head image blob (already exists)
// - Failed with no error (continue to Push the Blob)
// - Failed with error
func (r *Session) HeadV2ImageBlob(ep *Endpoint, imageName string, dgst digest.Digest, auth *RequestAuthorization) (bool, error) {
routeURL, err := getV2Builder(ep).BuildBlobURL(imageName, dgst)
if err != nil {
return false, err
}
method := "HEAD"
logrus.Debugf("[registry] Calling %q %s", method, routeURL)
req, err := http.NewRequest(method, routeURL, nil)
if err != nil {
return false, err
}
if err := auth.Authorize(req); err != nil {
return false, err
}
res, err := r.client.Do(req)
if err != nil {
return false, err
}
res.Body.Close() // close early, since we're not needing a body on this call .. yet?
switch {
case res.StatusCode >= 200 && res.StatusCode < 400:
// return something indicating no push needed
return true, nil
case res.StatusCode == 401:
return false, errLoginRequired
case res.StatusCode == 404:
// return something indicating blob push needed
return false, nil
}
return false, httputils.NewHTTPRequestError(fmt.Sprintf("Server error: %d trying head request for %s - %s", res.StatusCode, imageName, dgst), res)
}
func (r *Session) GetV2ImageBlob(ep *Endpoint, imageName string, dgst digest.Digest, blobWrtr io.Writer, auth *RequestAuthorization) error {
routeURL, err := getV2Builder(ep).BuildBlobURL(imageName, dgst)
if err != nil {
return err
}
method := "GET"
logrus.Debugf("[registry] Calling %q %s", method, routeURL)
req, err := http.NewRequest(method, routeURL, nil)
if err != nil {
return err
}
if err := auth.Authorize(req); err != nil {
return err
}
res, err := r.client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != 200 {
if res.StatusCode == 401 {
return errLoginRequired
}
return httputils.NewHTTPRequestError(fmt.Sprintf("Server error: %d trying to pull %s blob", res.StatusCode, imageName), res)
}
_, err = io.Copy(blobWrtr, res.Body)
return err
}
func (r *Session) GetV2ImageBlobReader(ep *Endpoint, imageName string, dgst digest.Digest, auth *RequestAuthorization) (io.ReadCloser, int64, error) {
routeURL, err := getV2Builder(ep).BuildBlobURL(imageName, dgst)
if err != nil {
return nil, 0, err
}
method := "GET"
logrus.Debugf("[registry] Calling %q %s", method, routeURL)
req, err := http.NewRequest(method, routeURL, nil)
if err != nil {
return nil, 0, err
}
if err := auth.Authorize(req); err != nil {
return nil, 0, err
}
res, err := r.client.Do(req)
if err != nil {
return nil, 0, err
}
if res.StatusCode != 200 {
if res.StatusCode == 401 {
return nil, 0, errLoginRequired
}
return nil, 0, httputils.NewHTTPRequestError(fmt.Sprintf("Server error: %d trying to pull %s blob - %s", res.StatusCode, imageName, dgst), res)
}
lenStr := res.Header.Get("Content-Length")
l, err := strconv.ParseInt(lenStr, 10, 64)
if err != nil {
return nil, 0, err
}
return res.Body, l, err
}
// Push the image to the server for storage.
// 'layer' is an uncompressed reader of the blob to be pushed.
// The server will generate it's own checksum calculation.
func (r *Session) PutV2ImageBlob(ep *Endpoint, imageName string, dgst digest.Digest, blobRdr io.Reader, auth *RequestAuthorization) error {
location, err := r.initiateBlobUpload(ep, imageName, auth)
if err != nil {
return err
}
method := "PUT"
logrus.Debugf("[registry] Calling %q %s", method, location)
req, err := http.NewRequest(method, location, ioutil.NopCloser(blobRdr))
if err != nil {
return err
}
queryParams := req.URL.Query()
queryParams.Add("digest", dgst.String())
req.URL.RawQuery = queryParams.Encode()
if err := auth.Authorize(req); err != nil {
return err
}
res, err := r.client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != 201 {
if res.StatusCode == 401 {
return errLoginRequired
}
errBody, err := ioutil.ReadAll(res.Body)
if err != nil {
return err
}
logrus.Debugf("Unexpected response from server: %q %#v", errBody, res.Header)
return httputils.NewHTTPRequestError(fmt.Sprintf("Server error: %d trying to push %s blob - %s", res.StatusCode, imageName, dgst), res)
}
return nil
}
// initiateBlobUpload gets the blob upload location for the given image name.
func (r *Session) initiateBlobUpload(ep *Endpoint, imageName string, auth *RequestAuthorization) (location string, err error) {
routeURL, err := getV2Builder(ep).BuildBlobUploadURL(imageName)
if err != nil {
return "", err
}
logrus.Debugf("[registry] Calling %q %s", "POST", routeURL)
req, err := http.NewRequest("POST", routeURL, nil)
if err != nil {
return "", err
}
if err := auth.Authorize(req); err != nil {
return "", err
}
res, err := r.client.Do(req)
if err != nil {
return "", err
}
if res.StatusCode != http.StatusAccepted {
if res.StatusCode == http.StatusUnauthorized {
return "", errLoginRequired
}
if res.StatusCode == http.StatusNotFound {
return "", ErrDoesNotExist
}
errBody, err := ioutil.ReadAll(res.Body)
if err != nil {
return "", err
}
logrus.Debugf("Unexpected response from server: %q %#v", errBody, res.Header)
return "", httputils.NewHTTPRequestError(fmt.Sprintf("Server error: unexpected %d response status trying to initiate upload of %s", res.StatusCode, imageName), res)
}
if location = res.Header.Get("Location"); location == "" {
return "", fmt.Errorf("registry did not return a Location header for resumable blob upload for image %s", imageName)
}
return
}
// Finally Push the (signed) manifest of the blobs we've just pushed
func (r *Session) PutV2ImageManifest(ep *Endpoint, imageName, tagName string, signedManifest, rawManifest []byte, auth *RequestAuthorization) (digest.Digest, error) {
routeURL, err := getV2Builder(ep).BuildManifestURL(imageName, tagName)
if err != nil {
return "", err
}
method := "PUT"
logrus.Debugf("[registry] Calling %q %s", method, routeURL)
req, err := http.NewRequest(method, routeURL, bytes.NewReader(signedManifest))
if err != nil {
return "", err
}
if err := auth.Authorize(req); err != nil {
return "", err
}
res, err := r.client.Do(req)
if err != nil {
return "", err
}
defer res.Body.Close()
// All 2xx and 3xx responses can be accepted for a put.
if res.StatusCode >= 400 {
if res.StatusCode == 401 {
return "", errLoginRequired
}
errBody, err := ioutil.ReadAll(res.Body)
if err != nil {
return "", err
}
logrus.Debugf("Unexpected response from server: %q %#v", errBody, res.Header)
return "", httputils.NewHTTPRequestError(fmt.Sprintf("Server error: %d trying to push %s:%s manifest", res.StatusCode, imageName, tagName), res)
}
hdrDigest, err := digest.ParseDigest(res.Header.Get(DockerDigestHeader))
if err != nil {
return "", fmt.Errorf("invalid manifest digest from registry: %s", err)
}
dgstVerifier, err := digest.NewDigestVerifier(hdrDigest)
if err != nil {
return "", fmt.Errorf("invalid manifest digest from registry: %s", err)
}
dgstVerifier.Write(rawManifest)
if !dgstVerifier.Verified() {
computedDigest, _ := digest.FromBytes(rawManifest)
return "", fmt.Errorf("unable to verify manifest digest: registry has %q, computed %q", hdrDigest, computedDigest)
}
return hdrDigest, nil
}
type remoteTags struct {
Name string `json:"name"`
Tags []string `json:"tags"`
}
// Given a repository name, returns a json array of string tags
func (r *Session) GetV2RemoteTags(ep *Endpoint, imageName string, auth *RequestAuthorization) ([]string, error) {
routeURL, err := getV2Builder(ep).BuildTagsURL(imageName)
if err != nil {
return nil, err
}
method := "GET"
logrus.Debugf("[registry] Calling %q %s", method, routeURL)
req, err := http.NewRequest(method, routeURL, nil)
if err != nil {
return nil, err
}
if err := auth.Authorize(req); err != nil {
return nil, err
}
res, err := r.client.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != 200 {
if res.StatusCode == 401 {
return nil, errLoginRequired
} else if res.StatusCode == 404 {
return nil, ErrDoesNotExist
}
return nil, httputils.NewHTTPRequestError(fmt.Sprintf("Server error: %d trying to fetch for %s", res.StatusCode, imageName), res)
}
var remote remoteTags
if err := json.NewDecoder(res.Body).Decode(&remote); err != nil {
return nil, fmt.Errorf("Error while decoding the http response: %s", err)
}
return remote.Tags, nil
}