From d9ed3165228b60cb89c31d0d66b99e01ab83eb3e Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Fri, 17 Apr 2015 14:32:18 -0700 Subject: [PATCH] Make API server datastructure Added daemon field to it, will use it later for acces to daemon from handlers Signed-off-by: Alexander Morozov --- api/server/server.go | 122 +++++++++++++++++++++-------------- api/server/server_linux.go | 32 ++++----- api/server/server_windows.go | 30 +++++---- api/server/tcp_socket.go | 4 +- api/server/unix_socket.go | 4 +- docker/daemon.go | 8 ++- integration/runtime_test.go | 28 ++++---- pkg/listenbuffer/buffer.go | 8 +-- 8 files changed, 131 insertions(+), 105 deletions(-) diff --git a/api/server/server.go b/api/server/server.go index d12bec9576..3146e09ed0 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -40,10 +40,6 @@ import ( "github.com/docker/docker/utils" ) -var ( - activationLock = make(chan struct{}) -) - type ServerConfig struct { Logging bool EnableCors bool @@ -57,6 +53,80 @@ type ServerConfig struct { TlsKey string } +type Server struct { + daemon *daemon.Daemon + cfg *ServerConfig + router *mux.Router + start chan struct{} + + // TODO: delete engine + eng *engine.Engine +} + +func New(cfg *ServerConfig, eng *engine.Engine) *Server { + r := createRouter( + eng, + cfg.Logging, + cfg.EnableCors, + cfg.CorsHeaders, + cfg.Version, + ) + return &Server{ + cfg: cfg, + router: r, + start: make(chan struct{}), + eng: eng, + } +} + +func (s *Server) SetDaemon(d *daemon.Daemon) { + s.daemon = d +} + +type serverCloser interface { + Serve() error + Close() error +} + +// ServeApi loops through all of the protocols sent in to docker and spawns +// off a go routine to setup a serving http.Server for each. +func (s *Server) ServeApi(protoAddrs []string) error { + var chErrors = make(chan error, len(protoAddrs)) + + for _, protoAddr := range protoAddrs { + protoAddrParts := strings.SplitN(protoAddr, "://", 2) + if len(protoAddrParts) != 2 { + return fmt.Errorf("bad format, expected PROTO://ADDR") + } + go func(proto, addr string) { + logrus.Infof("Listening for HTTP on %s (%s)", proto, addr) + srv, err := s.newServer(proto, addr) + if err != nil { + chErrors <- err + return + } + s.eng.OnShutdown(func() { + if err := srv.Close(); err != nil { + logrus.Error(err) + } + }) + if err = srv.Serve(); err != nil && strings.Contains(err.Error(), "use of closed network connection") { + err = nil + } + chErrors <- err + }(protoAddrParts[0], protoAddrParts[1]) + } + + for i := 0; i < len(protoAddrs); i++ { + err := <-chErrors + if err != nil { + return err + } + } + + return nil +} + type HttpServer struct { srv *http.Server l net.Listener @@ -1632,50 +1702,6 @@ func allocateDaemonPort(addr string) error { return nil } -type Server interface { - Serve() error - Close() error -} - -// ServeApi loops through all of the protocols sent in to docker and spawns -// off a go routine to setup a serving http.Server for each. -func ServeApi(protoAddrs []string, conf *ServerConfig, eng *engine.Engine) error { - var chErrors = make(chan error, len(protoAddrs)) - - for _, protoAddr := range protoAddrs { - protoAddrParts := strings.SplitN(protoAddr, "://", 2) - if len(protoAddrParts) != 2 { - return fmt.Errorf("bad format, expected PROTO://ADDR") - } - go func() { - logrus.Infof("Listening for HTTP on %s (%s)", protoAddrParts[0], protoAddrParts[1]) - srv, err := NewServer(protoAddrParts[0], protoAddrParts[1], conf, eng) - if err != nil { - chErrors <- err - return - } - eng.OnShutdown(func() { - if err := srv.Close(); err != nil { - logrus.Error(err) - } - }) - if err = srv.Serve(); err != nil && strings.Contains(err.Error(), "use of closed network connection") { - err = nil - } - chErrors <- err - }() - } - - for i := 0; i < len(protoAddrs); i++ { - err := <-chErrors - if err != nil { - return err - } - } - - return nil -} - func toBool(s string) bool { s = strings.ToLower(strings.TrimSpace(s)) return !(s == "" || s == "0" || s == "no" || s == "false" || s == "none") diff --git a/api/server/server_linux.go b/api/server/server_linux.go index 4d53a888ee..43f0eefe0e 100644 --- a/api/server/server_linux.go +++ b/api/server/server_linux.go @@ -8,22 +8,15 @@ import ( "net/http" "github.com/Sirupsen/logrus" - "github.com/docker/docker/engine" + "github.com/docker/docker/daemon" "github.com/docker/docker/pkg/systemd" ) -// NewServer sets up the required Server and does protocol specific checking. -func NewServer(proto, addr string, conf *ServerConfig, eng *engine.Engine) (Server, error) { +// newServer sets up the required serverCloser and does protocol specific checking. +func (s *Server) newServer(proto, addr string) (serverCloser, error) { var ( err error l net.Listener - r = createRouter( - eng, - conf.Logging, - conf.EnableCors, - conf.CorsHeaders, - conf.Version, - ) ) switch proto { case "fd": @@ -35,13 +28,13 @@ func NewServer(proto, addr string, conf *ServerConfig, eng *engine.Engine) (Serv // We don't want to start serving on these sockets until the // daemon is initialized and installed. Otherwise required handlers // won't be ready. - <-activationLock + <-s.start // Since ListenFD will return one or more sockets we have // to create a go func to spawn off multiple serves for i := range ls { listener := ls[i] go func() { - httpSrv := http.Server{Handler: r} + httpSrv := http.Server{Handler: s.router} chErrors <- httpSrv.Serve(listener) }() } @@ -52,17 +45,17 @@ func NewServer(proto, addr string, conf *ServerConfig, eng *engine.Engine) (Serv } return nil, nil case "tcp": - if !conf.TlsVerify { + if !s.cfg.TlsVerify { logrus.Warn("/!\\ DON'T BIND ON ANY IP ADDRESS WITHOUT setting -tlsverify IF YOU DON'T KNOW WHAT YOU'RE DOING /!\\") } - if l, err = NewTcpSocket(addr, tlsConfigFromServerConfig(conf)); err != nil { + if l, err = NewTcpSocket(addr, tlsConfigFromServerConfig(s.cfg), s.start); err != nil { return nil, err } if err := allocateDaemonPort(addr); err != nil { return nil, err } case "unix": - if l, err = NewUnixSocket(addr, conf.SocketGroup); err != nil { + if l, err = NewUnixSocket(addr, s.cfg.SocketGroup, s.start); err != nil { return nil, err } default: @@ -71,19 +64,20 @@ func NewServer(proto, addr string, conf *ServerConfig, eng *engine.Engine) (Serv return &HttpServer{ &http.Server{ Addr: addr, - Handler: r, + Handler: s.router, }, l, }, nil } -func AcceptConnections() { +func (s *Server) AcceptConnections(d *daemon.Daemon) { // Tell the init daemon we are accepting requests + s.daemon = d go systemd.SdNotify("READY=1") // close the lock so the listeners start accepting connections select { - case <-activationLock: + case <-s.start: default: - close(activationLock) + close(s.start) } } diff --git a/api/server/server_windows.go b/api/server/server_windows.go index e6b23b97ea..c121bbd3e8 100644 --- a/api/server/server_windows.go +++ b/api/server/server_windows.go @@ -5,30 +5,24 @@ package server import ( "errors" "net" + "net/http" "github.com/Sirupsen/logrus" - "github.com/docker/docker/engine" + "github.com/docker/docker/daemon" ) // NewServer sets up the required Server and does protocol specific checking. -func NewServer(proto, addr string, job *engine.Job) (Server, error) { +func (s *Server) newServer(proto, addr string) (Server, error) { var ( err error l net.Listener - r = createRouter( - job.Eng, - job.GetenvBool("Logging"), - job.GetenvBool("EnableCors"), - job.Getenv("CorsHeaders"), - job.Getenv("Version"), - ) ) switch proto { case "tcp": - if !job.GetenvBool("TlsVerify") { + if !s.cfg.TlsVerify { logrus.Warn("/!\\ DON'T BIND ON ANY IP ADDRESS WITHOUT setting -tlsverify IF YOU DON'T KNOW WHAT YOU'RE DOING /!\\") } - if l, err = NewTcpSocket(addr, tlsConfigFromJob(job)); err != nil { + if l, err = NewTcpSocket(addr, tlsConfigFromServerConfig(s.cfg)); err != nil { return nil, err } if err := allocateDaemonPort(addr); err != nil { @@ -37,13 +31,21 @@ func NewServer(proto, addr string, job *engine.Job) (Server, error) { default: return nil, errors.New("Invalid protocol format. Windows only supports tcp.") } + return &HttpServer{ + &http.Server{ + Addr: addr, + Handler: s.router, + }, + l, + }, nil } -func AcceptConnections() { +func (s *Server) AcceptConnections(d *daemon.Daemon) { + s.daemon = d // close the lock so the listeners start accepting connections select { - case <-activationLock: + case <-s.start: default: - close(activationLock) + close(s.start) } } diff --git a/api/server/tcp_socket.go b/api/server/tcp_socket.go index 8454e0c58b..a1f57231a5 100644 --- a/api/server/tcp_socket.go +++ b/api/server/tcp_socket.go @@ -31,8 +31,8 @@ func tlsConfigFromServerConfig(conf *ServerConfig) *tlsConfig { } } -func NewTcpSocket(addr string, config *tlsConfig) (net.Listener, error) { - l, err := listenbuffer.NewListenBuffer("tcp", addr, activationLock) +func NewTcpSocket(addr string, config *tlsConfig, activate <-chan struct{}) (net.Listener, error) { + l, err := listenbuffer.NewListenBuffer("tcp", addr, activate) if err != nil { return nil, err } diff --git a/api/server/unix_socket.go b/api/server/unix_socket.go index e472efd0a4..157005da6f 100644 --- a/api/server/unix_socket.go +++ b/api/server/unix_socket.go @@ -12,13 +12,13 @@ import ( "github.com/docker/libcontainer/user" ) -func NewUnixSocket(path, group string) (net.Listener, error) { +func NewUnixSocket(path, group string, activate <-chan struct{}) (net.Listener, error) { if err := syscall.Unlink(path); err != nil && !os.IsNotExist(err) { return nil, err } mask := syscall.Umask(0777) defer syscall.Umask(mask) - l, err := listenbuffer.NewListenBuffer("unix", path, activationLock) + l, err := listenbuffer.NewListenBuffer("unix", path, activate) if err != nil { return nil, err } diff --git a/docker/daemon.go b/docker/daemon.go index 0fe10de65b..0602ddf654 100644 --- a/docker/daemon.go +++ b/docker/daemon.go @@ -105,12 +105,14 @@ func mainDaemon() { TlsKey: *flKey, } + api := apiserver.New(serverConfig, eng) + // The serve API routine never exits unless an error occurs // We need to start it as a goroutine and wait on it so // daemon doesn't exit serveAPIWait := make(chan error) go func() { - if err := apiserver.ServeApi(flHosts, serverConfig, eng); err != nil { + if err := api.ServeApi(flHosts); err != nil { logrus.Errorf("ServeAPI error: %v", err) serveAPIWait <- err return @@ -143,8 +145,8 @@ func mainDaemon() { b.Install() // after the daemon is done setting up we can tell the api to start - // accepting connections - apiserver.AcceptConnections() + // accepting connections with specified daemon + api.AcceptConnections(d) // Daemon is fully initialized and handling API traffic // Wait for serve API job to complete diff --git a/integration/runtime_test.go b/integration/runtime_test.go index cd9be89a0b..beb15b8747 100644 --- a/integration/runtime_test.go +++ b/integration/runtime_test.go @@ -156,6 +156,8 @@ func spawnGlobalDaemon() { globalEngine = eng globalDaemon = mkDaemonFromEngine(eng, t) + serverConfig := &apiserver.ServerConfig{Logging: true} + api := apiserver.New(serverConfig, eng) // Spawn a Daemon go func() { logrus.Debugf("Spawning global daemon for integration tests") @@ -164,8 +166,7 @@ func spawnGlobalDaemon() { Host: testDaemonAddr, } - serverConfig := &apiserver.ServerConfig{Logging: true} - if err := apiserver.ServeApi([]string{listenURL.String()}, serverConfig, eng); err != nil { + if err := api.ServeApi([]string{listenURL.String()}); err != nil { logrus.Fatalf("Unable to spawn the test daemon: %s", err) } }() @@ -174,7 +175,7 @@ func spawnGlobalDaemon() { // FIXME: use inmem transports instead of tcp time.Sleep(time.Second) - apiserver.AcceptConnections() + api.AcceptConnections(getDaemon(eng)) } func spawnLegitHttpsDaemon() { @@ -204,6 +205,15 @@ func spawnHttpsDaemon(addr, cacert, cert, key string) *engine.Engine { eng := newTestEngine(t, true, root) + serverConfig := &apiserver.ServerConfig{ + Logging: true, + Tls: true, + TlsVerify: true, + TlsCa: cacert, + TlsCert: cert, + TlsKey: key, + } + api := apiserver.New(serverConfig, eng) // Spawn a Daemon go func() { logrus.Debugf("Spawning https daemon for integration tests") @@ -211,15 +221,7 @@ func spawnHttpsDaemon(addr, cacert, cert, key string) *engine.Engine { Scheme: testDaemonHttpsProto, Host: addr, } - serverConfig := &apiserver.ServerConfig{ - Logging: true, - Tls: true, - TlsVerify: true, - TlsCa: cacert, - TlsCert: cert, - TlsKey: key, - } - if err := apiserver.ServeApi([]string{listenURL.String()}, serverConfig, eng); err != nil { + if err := api.ServeApi([]string{listenURL.String()}); err != nil { logrus.Fatalf("Unable to spawn the test daemon: %s", err) } }() @@ -227,7 +229,7 @@ func spawnHttpsDaemon(addr, cacert, cert, key string) *engine.Engine { // Give some time to ListenAndServer to actually start time.Sleep(time.Second) - apiserver.AcceptConnections() + api.AcceptConnections(getDaemon(eng)) return eng } diff --git a/pkg/listenbuffer/buffer.go b/pkg/listenbuffer/buffer.go index 6e3656d2c4..97d622c15f 100644 --- a/pkg/listenbuffer/buffer.go +++ b/pkg/listenbuffer/buffer.go @@ -32,7 +32,7 @@ import "net" // NewListenBuffer returns a net.Listener listening on addr with the protocol // passed. The channel passed is used to activate the listenbuffer when the // caller is ready to accept connections. -func NewListenBuffer(proto, addr string, activate chan struct{}) (net.Listener, error) { +func NewListenBuffer(proto, addr string, activate <-chan struct{}) (net.Listener, error) { wrapped, err := net.Listen(proto, addr) if err != nil { return nil, err @@ -46,9 +46,9 @@ func NewListenBuffer(proto, addr string, activate chan struct{}) (net.Listener, // defaultListener is the buffered wrapper around the net.Listener type defaultListener struct { - wrapped net.Listener // The net.Listener wrapped by listenbuffer - ready bool // Whether the listenbuffer has been activated - activate chan struct{} // Channel to control activation of the listenbuffer + wrapped net.Listener // The net.Listener wrapped by listenbuffer + ready bool // Whether the listenbuffer has been activated + activate <-chan struct{} // Channel to control activation of the listenbuffer } // Close closes the wrapped socket.