diff --git a/api/server/server.go b/api/server/server.go index abec1c6334..737541c231 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -1068,14 +1068,11 @@ func (s *Server) postContainersWait(version version.Version, w http.ResponseWrit return fmt.Errorf("Missing parameter") } - name := vars["name"] - cont, err := s.daemon.Get(name) + status, err := s.daemon.ContainerWait(vars["name"], -1*time.Second) if err != nil { return err } - status, _ := cont.WaitStop(-1 * time.Second) - return writeJSON(w, http.StatusOK, &types.ContainerWaitResponse{ StatusCode: status, }) @@ -1109,50 +1106,33 @@ func (s *Server) postContainersAttach(version version.Version, w http.ResponseWr return fmt.Errorf("Missing parameter") } - cont, err := s.daemon.Get(vars["name"]) - if err != nil { - return err - } - inStream, outStream, err := hijackServer(w) if err != nil { return err } defer closeStreams(inStream, outStream) - var errStream io.Writer - if _, ok := r.Header["Upgrade"]; ok { fmt.Fprintf(outStream, "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n") } else { fmt.Fprintf(outStream, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n") } - if !cont.Config.Tty && version.GreaterThanOrEqualTo("1.6") { - errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr) - outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout) - } else { - errStream = outStream - } - logs := boolValue(r, "logs") - stream := boolValue(r, "stream") - - var stdin io.ReadCloser - var stdout, stderr io.Writer - - if boolValue(r, "stdin") { - stdin = inStream - } - if boolValue(r, "stdout") { - stdout = outStream - } - if boolValue(r, "stderr") { - stderr = errStream + attachWithLogsConfig := &daemon.ContainerAttachWithLogsConfig{ + InStream: inStream, + OutStream: outStream, + UseStdin: boolValue(r, "stdin"), + UseStdout: boolValue(r, "stdout"), + UseStderr: boolValue(r, "stderr"), + Logs: boolValue(r, "logs"), + Stream: boolValue(r, "stream"), + Multiplex: version.GreaterThanOrEqualTo("1.6"), } - if err := cont.AttachWithLogs(stdin, stdout, stderr, logs, stream); err != nil { + if err := s.daemon.ContainerAttachWithLogs(vars["name"], attachWithLogsConfig); err != nil { fmt.Fprintf(outStream, "Error attaching: %s\n", err) } + return nil } @@ -1163,17 +1143,19 @@ func (s *Server) wsContainersAttach(version version.Version, w http.ResponseWrit if vars == nil { return fmt.Errorf("Missing parameter") } - cont, err := s.daemon.Get(vars["name"]) - if err != nil { - return err - } h := websocket.Handler(func(ws *websocket.Conn) { defer ws.Close() - logs := r.Form.Get("logs") != "" - stream := r.Form.Get("stream") != "" - if err := cont.AttachWithLogs(ws, ws, ws, logs, stream); err != nil { + wsAttachWithLogsConfig := &daemon.ContainerWsAttachWithLogsConfig{ + InStream: ws, + OutStream: ws, + ErrStream: ws, + Logs: boolValue(r, "logs"), + Stream: boolValue(r, "stream"), + } + + if err := s.daemon.ContainerWsAttachWithLogs(vars["name"], wsAttachWithLogsConfig); err != nil { logrus.Errorf("Error attaching websocket: %s", err) } }) diff --git a/daemon/attach.go b/daemon/attach.go index b2b8d09067..5193cf101d 100644 --- a/daemon/attach.go +++ b/daemon/attach.go @@ -1,229 +1,61 @@ package daemon import ( - "encoding/json" "io" - "os" - "sync" - "time" - "github.com/Sirupsen/logrus" - "github.com/docker/docker/pkg/jsonlog" - "github.com/docker/docker/pkg/promise" + "github.com/docker/docker/pkg/stdcopy" ) -func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error { - if logs { - cLog, err := c.ReadLog("json") - if err != nil && os.IsNotExist(err) { - // Legacy logs - logrus.Debugf("Old logs format") - if stdout != nil { - cLog, err := c.ReadLog("stdout") - if err != nil { - logrus.Errorf("Error reading logs (stdout): %s", err) - } else if _, err := io.Copy(stdout, cLog); err != nil { - logrus.Errorf("Error streaming logs (stdout): %s", err) - } - } - if stderr != nil { - cLog, err := c.ReadLog("stderr") - if err != nil { - logrus.Errorf("Error reading logs (stderr): %s", err) - } else if _, err := io.Copy(stderr, cLog); err != nil { - logrus.Errorf("Error streaming logs (stderr): %s", err) - } - } - } else if err != nil { - logrus.Errorf("Error reading logs (json): %s", err) - } else { - dec := json.NewDecoder(cLog) - for { - l := &jsonlog.JSONLog{} - - if err := dec.Decode(l); err == io.EOF { - break - } else if err != nil { - logrus.Errorf("Error streaming logs: %s", err) - break - } - if l.Stream == "stdout" && stdout != nil { - io.WriteString(stdout, l.Log) - } - if l.Stream == "stderr" && stderr != nil { - io.WriteString(stderr, l.Log) - } - } - } - } - - //stream - if stream { - var stdinPipe io.ReadCloser - if stdin != nil { - r, w := io.Pipe() - go func() { - defer w.Close() - defer logrus.Debugf("Closing buffered stdin pipe") - io.Copy(w, stdin) - }() - stdinPipe = r - } - <-c.Attach(stdinPipe, stdout, stderr) - // If we are in stdinonce mode, wait for the process to end - // otherwise, simply return - if c.Config.StdinOnce && !c.Config.Tty { - c.WaitStop(-1 * time.Second) - } - } - return nil +type ContainerAttachWithLogsConfig struct { + InStream io.ReadCloser + OutStream io.Writer + UseStdin, UseStdout, UseStderr bool + Logs, Stream bool + Multiplex bool } -func (c *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error { - return attach(&c.StreamConfig, c.Config.OpenStdin, c.Config.StdinOnce, c.Config.Tty, stdin, stdout, stderr) +func (daemon *Daemon) ContainerAttachWithLogs(name string, c *ContainerAttachWithLogsConfig) error { + container, err := daemon.Get(name) + if err != nil { + return err + } + + var errStream io.Writer + + if !container.Config.Tty && c.Multiplex { + errStream = stdcopy.NewStdWriter(c.OutStream, stdcopy.Stderr) + c.OutStream = stdcopy.NewStdWriter(c.OutStream, stdcopy.Stdout) + } else { + errStream = c.OutStream + } + + var stdin io.ReadCloser + var stdout, stderr io.Writer + + if c.UseStdin { + stdin = c.InStream + } + if c.UseStdout { + stdout = c.OutStream + } + if c.UseStderr { + stderr = errStream + } + + return container.AttachWithLogs(stdin, stdout, stderr, c.Logs, c.Stream) } -func attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error { - var ( - cStdout, cStderr io.ReadCloser - cStdin io.WriteCloser - wg sync.WaitGroup - errors = make(chan error, 3) - ) - - if stdin != nil && openStdin { - cStdin = streamConfig.StdinPipe() - wg.Add(1) - } - - if stdout != nil { - cStdout = streamConfig.StdoutPipe() - wg.Add(1) - } - - if stderr != nil { - cStderr = streamConfig.StderrPipe() - wg.Add(1) - } - - // Connect stdin of container to the http conn. - go func() { - if stdin == nil || !openStdin { - return - } - logrus.Debugf("attach: stdin: begin") - defer func() { - if stdinOnce && !tty { - cStdin.Close() - } else { - // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr - if cStdout != nil { - cStdout.Close() - } - if cStderr != nil { - cStderr.Close() - } - } - wg.Done() - logrus.Debugf("attach: stdin: end") - }() - - var err error - if tty { - _, err = copyEscapable(cStdin, stdin) - } else { - _, err = io.Copy(cStdin, stdin) - - } - if err == io.ErrClosedPipe { - err = nil - } - if err != nil { - logrus.Errorf("attach: stdin: %s", err) - errors <- err - return - } - }() - - attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) { - if stream == nil { - return - } - defer func() { - // Make sure stdin gets closed - if stdin != nil { - stdin.Close() - } - streamPipe.Close() - wg.Done() - logrus.Debugf("attach: %s: end", name) - }() - - logrus.Debugf("attach: %s: begin", name) - _, err := io.Copy(stream, streamPipe) - if err == io.ErrClosedPipe { - err = nil - } - if err != nil { - logrus.Errorf("attach: %s: %v", name, err) - errors <- err - } - } - - go attachStream("stdout", stdout, cStdout) - go attachStream("stderr", stderr, cStderr) - - return promise.Go(func() error { - wg.Wait() - close(errors) - for err := range errors { - if err != nil { - return err - } - } - return nil - }) +type ContainerWsAttachWithLogsConfig struct { + InStream io.ReadCloser + OutStream, ErrStream io.Writer + Logs, Stream bool } -// Code c/c from io.Copy() modified to handle escape sequence -func copyEscapable(dst io.Writer, src io.ReadCloser) (written int64, err error) { - buf := make([]byte, 32*1024) - for { - nr, er := src.Read(buf) - if nr > 0 { - // ---- Docker addition - // char 16 is C-p - if nr == 1 && buf[0] == 16 { - nr, er = src.Read(buf) - // char 17 is C-q - if nr == 1 && buf[0] == 17 { - if err := src.Close(); err != nil { - return 0, err - } - return 0, nil - } - } - // ---- End of docker - nw, ew := dst.Write(buf[0:nr]) - if nw > 0 { - written += int64(nw) - } - if ew != nil { - err = ew - break - } - if nr != nw { - err = io.ErrShortWrite - break - } - } - if er == io.EOF { - break - } - if er != nil { - err = er - break - } +func (daemon *Daemon) ContainerWsAttachWithLogs(name string, c *ContainerWsAttachWithLogsConfig) error { + container, err := daemon.Get(name) + if err != nil { + return err } - return written, err + + return container.AttachWithLogs(c.InStream, c.OutStream, c.ErrStream, c.Logs, c.Stream) } diff --git a/daemon/container.go b/daemon/container.go index 7d8133854b..c29c3246f3 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -11,6 +11,7 @@ import ( "path" "path/filepath" "strings" + "sync" "syscall" "time" @@ -34,6 +35,7 @@ import ( "github.com/docker/docker/pkg/directory" "github.com/docker/docker/pkg/etchosts" "github.com/docker/docker/pkg/ioutils" + "github.com/docker/docker/pkg/jsonlog" "github.com/docker/docker/pkg/promise" "github.com/docker/docker/pkg/resolvconf" "github.com/docker/docker/pkg/stringid" @@ -1654,3 +1656,219 @@ func (container *Container) monitorExec(execConfig *execConfig, callback execdri return err } + +func (c *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error { + return attach(&c.StreamConfig, c.Config.OpenStdin, c.Config.StdinOnce, c.Config.Tty, stdin, stdout, stderr) +} + +func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error { + if logs { + cLog, err := c.ReadLog("json") + if err != nil && os.IsNotExist(err) { + // Legacy logs + logrus.Debugf("Old logs format") + if stdout != nil { + cLog, err := c.ReadLog("stdout") + if err != nil { + logrus.Errorf("Error reading logs (stdout): %s", err) + } else if _, err := io.Copy(stdout, cLog); err != nil { + logrus.Errorf("Error streaming logs (stdout): %s", err) + } + } + if stderr != nil { + cLog, err := c.ReadLog("stderr") + if err != nil { + logrus.Errorf("Error reading logs (stderr): %s", err) + } else if _, err := io.Copy(stderr, cLog); err != nil { + logrus.Errorf("Error streaming logs (stderr): %s", err) + } + } + } else if err != nil { + logrus.Errorf("Error reading logs (json): %s", err) + } else { + dec := json.NewDecoder(cLog) + for { + l := &jsonlog.JSONLog{} + + if err := dec.Decode(l); err == io.EOF { + break + } else if err != nil { + logrus.Errorf("Error streaming logs: %s", err) + break + } + if l.Stream == "stdout" && stdout != nil { + io.WriteString(stdout, l.Log) + } + if l.Stream == "stderr" && stderr != nil { + io.WriteString(stderr, l.Log) + } + } + } + } + + //stream + if stream { + var stdinPipe io.ReadCloser + if stdin != nil { + r, w := io.Pipe() + go func() { + defer w.Close() + defer logrus.Debugf("Closing buffered stdin pipe") + io.Copy(w, stdin) + }() + stdinPipe = r + } + <-c.Attach(stdinPipe, stdout, stderr) + // If we are in stdinonce mode, wait for the process to end + // otherwise, simply return + if c.Config.StdinOnce && !c.Config.Tty { + c.WaitStop(-1 * time.Second) + } + } + return nil +} + +func attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error { + var ( + cStdout, cStderr io.ReadCloser + cStdin io.WriteCloser + wg sync.WaitGroup + errors = make(chan error, 3) + ) + + if stdin != nil && openStdin { + cStdin = streamConfig.StdinPipe() + wg.Add(1) + } + + if stdout != nil { + cStdout = streamConfig.StdoutPipe() + wg.Add(1) + } + + if stderr != nil { + cStderr = streamConfig.StderrPipe() + wg.Add(1) + } + + // Connect stdin of container to the http conn. + go func() { + if stdin == nil || !openStdin { + return + } + logrus.Debugf("attach: stdin: begin") + defer func() { + if stdinOnce && !tty { + cStdin.Close() + } else { + // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr + if cStdout != nil { + cStdout.Close() + } + if cStderr != nil { + cStderr.Close() + } + } + wg.Done() + logrus.Debugf("attach: stdin: end") + }() + + var err error + if tty { + _, err = copyEscapable(cStdin, stdin) + } else { + _, err = io.Copy(cStdin, stdin) + + } + if err == io.ErrClosedPipe { + err = nil + } + if err != nil { + logrus.Errorf("attach: stdin: %s", err) + errors <- err + return + } + }() + + attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) { + if stream == nil { + return + } + defer func() { + // Make sure stdin gets closed + if stdin != nil { + stdin.Close() + } + streamPipe.Close() + wg.Done() + logrus.Debugf("attach: %s: end", name) + }() + + logrus.Debugf("attach: %s: begin", name) + _, err := io.Copy(stream, streamPipe) + if err == io.ErrClosedPipe { + err = nil + } + if err != nil { + logrus.Errorf("attach: %s: %v", name, err) + errors <- err + } + } + + go attachStream("stdout", stdout, cStdout) + go attachStream("stderr", stderr, cStderr) + + return promise.Go(func() error { + wg.Wait() + close(errors) + for err := range errors { + if err != nil { + return err + } + } + return nil + }) +} + +// Code c/c from io.Copy() modified to handle escape sequence +func copyEscapable(dst io.Writer, src io.ReadCloser) (written int64, err error) { + buf := make([]byte, 32*1024) + for { + nr, er := src.Read(buf) + if nr > 0 { + // ---- Docker addition + // char 16 is C-p + if nr == 1 && buf[0] == 16 { + nr, er = src.Read(buf) + // char 17 is C-q + if nr == 1 && buf[0] == 17 { + if err := src.Close(); err != nil { + return 0, err + } + return 0, nil + } + } + // ---- End of docker + nw, ew := dst.Write(buf[0:nr]) + if nw > 0 { + written += int64(nw) + } + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } + if er == io.EOF { + break + } + if er != nil { + err = er + break + } + } + return written, err +} diff --git a/daemon/wait.go b/daemon/wait.go new file mode 100644 index 0000000000..1101b2f085 --- /dev/null +++ b/daemon/wait.go @@ -0,0 +1,12 @@ +package daemon + +import "time" + +func (daemon *Daemon) ContainerWait(name string, timeout time.Duration) (int, error) { + container, err := daemon.Get(name) + if err != nil { + return -1, err + } + + return container.WaitStop(timeout) +}