Merge pull request #917 from dotcloud/pull_pool

- Runtime: Forbid parralel push/pull for a single image/repo. Fixes #311
This commit is contained in:
Guillaume J. Charmes 2013-06-19 14:11:29 -07:00
Родитель a056f1deec fe204e6f48
Коммит 7e065aaacd
2 изменённых файлов: 62 добавлений и 6 удалений

Просмотреть файл

@ -65,7 +65,11 @@ func init() {
// Create the "Server"
srv := &Server{
runtime: runtime,
runtime: runtime,
enableCors: false,
lock: &sync.Mutex{},
pullingPool: make(map[string]struct{}),
pushingPool: make(map[string]struct{}),
}
// Retrieve the Image
if err := srv.ImagePull(unitTestImageName, "", "", os.Stdout, utils.NewStreamFormatter(false), nil); err != nil {

Просмотреть файл

@ -15,6 +15,7 @@ import (
"path"
"runtime"
"strings"
"sync"
)
func (srv *Server) DockerVersion() APIVersion {
@ -413,7 +414,47 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, local, re
return nil
}
func (srv *Server) poolAdd(kind, key string) error {
srv.lock.Lock()
defer srv.lock.Unlock()
if _, exists := srv.pullingPool[key]; exists {
return fmt.Errorf("%s %s is already in progress", key, kind)
}
switch kind {
case "pull":
srv.pullingPool[key] = struct{}{}
break
case "push":
srv.pushingPool[key] = struct{}{}
break
default:
return fmt.Errorf("Unkown pool type")
}
return nil
}
func (srv *Server) poolRemove(kind, key string) error {
switch kind {
case "pull":
delete(srv.pullingPool, key)
break
case "push":
delete(srv.pushingPool, key)
break
default:
return fmt.Errorf("Unkown pool type")
}
return nil
}
func (srv *Server) ImagePull(name, tag, endpoint string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig) error {
if err := srv.poolAdd("pull", name+":"+tag); err != nil {
return err
}
defer srv.poolRemove("pull", name+":"+tag)
r := registry.NewRegistry(srv.runtime.root, authConfig)
out = utils.NewWriteFlusher(out)
if endpoint != "" {
@ -430,7 +471,6 @@ func (srv *Server) ImagePull(name, tag, endpoint string, out io.Writer, sf *util
if err := srv.pullRepository(r, out, name, remote, tag, sf); err != nil {
return err
}
return nil
}
@ -605,7 +645,13 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgId,
return nil
}
// FIXME: Allow to interupt current push when new push of same image is done.
func (srv *Server) ImagePush(name, endpoint string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig) error {
if err := srv.poolAdd("push", name); err != nil {
return err
}
defer srv.poolRemove("push", name)
out = utils.NewWriteFlusher(out)
img, err := srv.runtime.graph.Get(name)
r := registry.NewRegistry(srv.runtime.root, authConfig)
@ -1000,14 +1046,20 @@ func NewServer(autoRestart, enableCors bool, dns ListOpts) (*Server, error) {
return nil, err
}
srv := &Server{
runtime: runtime,
enableCors: enableCors,
runtime: runtime,
enableCors: enableCors,
lock: &sync.Mutex{},
pullingPool: make(map[string]struct{}),
pushingPool: make(map[string]struct{}),
}
runtime.srv = srv
return srv, nil
}
type Server struct {
runtime *Runtime
enableCors bool
runtime *Runtime
enableCors bool
lock *sync.Mutex
pullingPool map[string]struct{}
pushingPool map[string]struct{}
}