From 8a756f417ee4925bad6c8526fabda05ab8a6c041 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Wed, 20 Nov 2013 13:51:05 -0800 Subject: [PATCH] wait on pull already in progress --- server.go | 51 ++++++++++++++++++++++++--------------------- server_unit_test.go | 35 +++++++++++-------------------- 2 files changed, 39 insertions(+), 47 deletions(-) diff --git a/server.go b/server.go index 10d3ecb2f8..0143e17835 100644 --- a/server.go +++ b/server.go @@ -732,9 +732,9 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin id := history[i] // ensure no two downloads of the same layer happen at the same time - if err := srv.poolAdd("pull", "layer:"+id); err != nil { + if c, err := srv.poolAdd("pull", "layer:"+id); err != nil { utils.Errorf("Image (id: %s) pull is already running, skipping: %v", id, err) - return nil + <-c } defer srv.poolRemove("pull", "layer:"+id) @@ -829,7 +829,7 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName } // ensure no two downloads of the same image happen at the same time - if err := srv.poolAdd("pull", "img:"+img.ID); err != nil { + if _, err := srv.poolAdd("pull", "img:"+img.ID); err != nil { utils.Errorf("Image (id: %s) pull is already running, skipping: %v", img.ID, err) if parallel { errors <- nil @@ -900,38 +900,41 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName return nil } -func (srv *Server) poolAdd(kind, key string) error { +func (srv *Server) poolAdd(kind, key string) (chan struct{}, error) { srv.Lock() defer srv.Unlock() - if _, exists := srv.pullingPool[key]; exists { - return fmt.Errorf("pull %s is already in progress", key) + if c, exists := srv.pullingPool[key]; exists { + return c, fmt.Errorf("pull %s is already in progress", key) } - if _, exists := srv.pushingPool[key]; exists { - return fmt.Errorf("push %s is already in progress", key) + if c, exists := srv.pushingPool[key]; exists { + return c, fmt.Errorf("push %s is already in progress", key) } + c := make(chan struct{}) switch kind { case "pull": - srv.pullingPool[key] = struct{}{} - break + srv.pullingPool[key] = c case "push": - srv.pushingPool[key] = struct{}{} - break + srv.pushingPool[key] = c default: - return fmt.Errorf("Unknown pool type") + return nil, fmt.Errorf("Unknown pool type") } - return nil + return c, nil } func (srv *Server) poolRemove(kind, key string) error { switch kind { case "pull": - delete(srv.pullingPool, key) - break + if c, exists := srv.pullingPool[key]; exists { + close(c) + delete(srv.pullingPool, key) + } case "push": - delete(srv.pushingPool, key) - break + if c, exists := srv.pushingPool[key]; exists { + close(c) + delete(srv.pushingPool, key) + } default: return fmt.Errorf("Unknown pool type") } @@ -943,7 +946,7 @@ func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *ut if err != nil { return err } - if err := srv.poolAdd("pull", localName+":"+tag); err != nil { + if _, err := srv.poolAdd("pull", localName+":"+tag); err != nil { return err } defer srv.poolRemove("pull", localName+":"+tag) @@ -1138,7 +1141,7 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID, // FIXME: Allow to interrupt current push when new push of same image is done. func (srv *Server) ImagePush(localName string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig, metaHeaders map[string][]string) error { - if err := srv.poolAdd("push", localName); err != nil { + if _, err := srv.poolAdd("push", localName); err != nil { return err } defer srv.poolRemove("push", localName) @@ -1769,8 +1772,8 @@ func NewServer(eng *engine.Engine, config *DaemonConfig) (*Server, error) { srv := &Server{ Eng: eng, runtime: runtime, - pullingPool: make(map[string]struct{}), - pushingPool: make(map[string]struct{}), + pullingPool: make(map[string]chan struct{}), + pushingPool: make(map[string]chan struct{}), events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events listeners: make(map[string]chan utils.JSONMessage), reqFactory: nil, @@ -1807,8 +1810,8 @@ func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage { type Server struct { sync.Mutex runtime *Runtime - pullingPool map[string]struct{} - pushingPool map[string]struct{} + pullingPool map[string]chan struct{} + pushingPool map[string]chan struct{} events []utils.JSONMessage listeners map[string]chan utils.JSONMessage reqFactory *utils.HTTPRequestFactory diff --git a/server_unit_test.go b/server_unit_test.go index a51e2ddff5..4c0d24ca1d 100644 --- a/server_unit_test.go +++ b/server_unit_test.go @@ -8,49 +8,38 @@ import ( func TestPools(t *testing.T) { srv := &Server{ - pullingPool: make(map[string]struct{}), - pushingPool: make(map[string]struct{}), + pullingPool: make(map[string]chan struct{}), + pushingPool: make(map[string]chan struct{}), } - err := srv.poolAdd("pull", "test1") - if err != nil { + if _, err := srv.poolAdd("pull", "test1"); err != nil { t.Fatal(err) } - err = srv.poolAdd("pull", "test2") - if err != nil { + if _, err := srv.poolAdd("pull", "test2"); err != nil { t.Fatal(err) } - err = srv.poolAdd("push", "test1") - if err == nil || err.Error() != "pull test1 is already in progress" { + if _, err := srv.poolAdd("push", "test1"); err == nil || err.Error() != "pull test1 is already in progress" { t.Fatalf("Expected `pull test1 is already in progress`") } - err = srv.poolAdd("pull", "test1") - if err == nil || err.Error() != "pull test1 is already in progress" { + if _, err := srv.poolAdd("pull", "test1"); err == nil || err.Error() != "pull test1 is already in progress" { t.Fatalf("Expected `pull test1 is already in progress`") } - err = srv.poolAdd("wait", "test3") - if err == nil || err.Error() != "Unknown pool type" { + if _, err := srv.poolAdd("wait", "test3"); err == nil || err.Error() != "Unknown pool type" { t.Fatalf("Expected `Unknown pool type`") } - - err = srv.poolRemove("pull", "test2") - if err != nil { + if err := srv.poolRemove("pull", "test2"); err != nil { t.Fatal(err) } - err = srv.poolRemove("pull", "test2") - if err != nil { + if err := srv.poolRemove("pull", "test2"); err != nil { t.Fatal(err) } - err = srv.poolRemove("pull", "test1") - if err != nil { + if err := srv.poolRemove("pull", "test1"); err != nil { t.Fatal(err) } - err = srv.poolRemove("push", "test1") - if err != nil { + if err := srv.poolRemove("push", "test1"); err != nil { t.Fatal(err) } - err = srv.poolRemove("wait", "test3") - if err == nil || err.Error() != "Unknown pool type" { + if err := srv.poolRemove("wait", "test3"); err == nil || err.Error() != "Unknown pool type" { t.Fatalf("Expected `Unknown pool type`") } }