From 21e44d7a21014f6f0d5e159221f9b9165874a2e1 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Thu, 4 Dec 2014 16:12:29 -0500 Subject: [PATCH] Refactor daemon.attach() Also makes streamConfig Pipe methods not return error, since there was no error for them to be able to return anyway. Signed-off-by: Brian Goff --- daemon/attach.go | 171 ++++++++++++++-------------------- daemon/container.go | 17 ++-- integration/commands_test.go | 2 +- integration/container_test.go | 40 ++------ integration/runtime_test.go | 2 +- integration/utils_test.go | 15 +-- 6 files changed, 88 insertions(+), 159 deletions(-) diff --git a/daemon/attach.go b/daemon/attach.go index 599b272472..144c5c777a 100644 --- a/daemon/attach.go +++ b/daemon/attach.go @@ -8,7 +8,6 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/docker/engine" - "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/jsonlog" "github.com/docker/docker/pkg/promise" "github.com/docker/docker/utils" @@ -114,131 +113,97 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status { func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error { var ( cStdout, cStderr io.ReadCloser + cStdin io.WriteCloser nJobs int - errors = make(chan error, 3) ) + if stdin != nil && openStdin { + cStdin = streamConfig.StdinPipe() + nJobs++ + } + + if stdout != nil { + cStdout = streamConfig.StdoutPipe() + nJobs++ + } + + if stderr != nil { + cStderr = streamConfig.StderrPipe() + nJobs++ + } + + errors := make(chan error, nJobs) + // Connect stdin of container to the http conn. if stdin != nil && openStdin { - nJobs++ // Get the stdin pipe. - if cStdin, err := streamConfig.StdinPipe(); err != nil { - errors <- err - } else { - go func() { - log.Debugf("attach: stdin: begin") - defer log.Debugf("attach: stdin: end") + cStdin = streamConfig.StdinPipe() + go func() { + log.Debugf("attach: stdin: begin") + defer func() { if stdinOnce && !tty { defer cStdin.Close() } else { // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr - defer func() { - if cStdout != nil { - cStdout.Close() - } - if cStderr != nil { - cStderr.Close() - } - }() + if cStdout != nil { + cStdout.Close() + } + if cStderr != nil { + cStderr.Close() + } } - if tty { - _, err = utils.CopyEscapable(cStdin, stdin) - } else { - _, err = io.Copy(cStdin, stdin) + log.Debugf("attach: stdin: end") + }() + var err error + if tty { + _, err = utils.CopyEscapable(cStdin, stdin) + } else { + _, err = io.Copy(cStdin, stdin) - } - if err == io.ErrClosedPipe { - err = nil - } - if err != nil { - log.Errorf("attach: stdin: %s", err) - } - errors <- err - }() - } - } - if stdout != nil { - nJobs++ - // Get a reader end of a pipe that is attached as stdout to the container. - if p, err := streamConfig.StdoutPipe(); err != nil { - errors <- err - } else { - cStdout = p - go func() { - log.Debugf("attach: stdout: begin") - defer log.Debugf("attach: stdout: end") - // If we are in StdinOnce mode, then close stdin - if stdinOnce && stdin != nil { - defer stdin.Close() - } - _, err := io.Copy(stdout, cStdout) - if err == io.ErrClosedPipe { - err = nil - } - if err != nil { - log.Errorf("attach: stdout: %s", err) - } - errors <- err - }() - } - } else { - // Point stdout of container to a no-op writer. - go func() { - if cStdout, err := streamConfig.StdoutPipe(); err != nil { - log.Errorf("attach: stdout pipe: %s", err) - } else { - io.Copy(&ioutils.NopWriter{}, cStdout) } + if err == io.ErrClosedPipe { + err = nil + } + if err != nil { + log.Errorf("attach: stdin: %s", err) + } + errors <- err }() } - if stderr != nil { - nJobs++ - if p, err := streamConfig.StderrPipe(); err != nil { - errors <- err - } else { - cStderr = p - go func() { - log.Debugf("attach: stderr: begin") - defer log.Debugf("attach: stderr: end") - // If we are in StdinOnce mode, then close stdin - // Why are we closing stdin here and above while handling stdout? - if stdinOnce && stdin != nil { - defer stdin.Close() - } - _, err := io.Copy(stderr, cStderr) - if err == io.ErrClosedPipe { - err = nil - } - if err != nil { - log.Errorf("attach: stderr: %s", err) - } - errors <- err - }() + + attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) { + if stream == nil { + return } - } else { - // Point stderr at a no-op writer. - go func() { - if cStderr, err := streamConfig.StderrPipe(); err != nil { - log.Errorf("attach: stdout pipe: %s", err) - } else { - io.Copy(&ioutils.NopWriter{}, cStderr) + defer func() { + // Make sure stdin gets closed + if stdinOnce && cStdin != nil { + stdin.Close() + cStdin.Close() } + streamPipe.Close() }() + + log.Debugf("attach: %s: begin", name) + defer log.Debugf("attach: %s: end", name) + _, err := io.Copy(stream, streamPipe) + if err == io.ErrClosedPipe { + err = nil + } + if err != nil { + log.Errorf("attach: %s: %v", name, err) + } + errors <- err } + go attachStream("stdout", stdout, cStdout) + go attachStream("stderr", stderr, cStderr) + return promise.Go(func() error { - defer func() { - if cStdout != nil { - cStdout.Close() - } - if cStderr != nil { - cStderr.Close() - } - }() - for i := 0; i < nJobs; i++ { log.Debugf("attach: waiting for job %d/%d", i+1, nJobs) - if err := <-errors; err != nil { + err := <-errors + if err != nil { log.Errorf("attach: job %d returned error %s, aborting all jobs", i+1, err) return err } diff --git a/daemon/container.go b/daemon/container.go index 75cd133fec..2b584abef5 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -370,10 +370,7 @@ func (container *Container) Run() error { } func (container *Container) Output() (output []byte, err error) { - pipe, err := container.StdoutPipe() - if err != nil { - return nil, err - } + pipe := container.StdoutPipe() defer pipe.Close() if err := container.Start(); err != nil { return nil, err @@ -391,20 +388,20 @@ func (container *Container) Output() (output []byte, err error) { // copied and delivered to all StdoutPipe and StderrPipe consumers, using // a kind of "broadcaster". -func (streamConfig *StreamConfig) StdinPipe() (io.WriteCloser, error) { - return streamConfig.stdinPipe, nil +func (streamConfig *StreamConfig) StdinPipe() io.WriteCloser { + return streamConfig.stdinPipe } -func (streamConfig *StreamConfig) StdoutPipe() (io.ReadCloser, error) { +func (streamConfig *StreamConfig) StdoutPipe() io.ReadCloser { reader, writer := io.Pipe() streamConfig.stdout.AddWriter(writer, "") - return ioutils.NewBufReader(reader), nil + return ioutils.NewBufReader(reader) } -func (streamConfig *StreamConfig) StderrPipe() (io.ReadCloser, error) { +func (streamConfig *StreamConfig) StderrPipe() io.ReadCloser { reader, writer := io.Pipe() streamConfig.stderr.AddWriter(writer, "") - return ioutils.NewBufReader(reader), nil + return ioutils.NewBufReader(reader) } func (streamConfig *StreamConfig) StdoutLogPipe() io.ReadCloser { diff --git a/integration/commands_test.go b/integration/commands_test.go index 79ed132114..bd91716d82 100644 --- a/integration/commands_test.go +++ b/integration/commands_test.go @@ -412,7 +412,7 @@ func TestAttachDisconnect(t *testing.T) { } // Try to avoid the timeout in destroy. Best effort, don't check error - cStdin, _ := container.StdinPipe() + cStdin := container.StdinPipe() cStdin.Close() container.WaitStop(-1 * time.Second) } diff --git a/integration/container_test.go b/integration/container_test.go index ab94cbc679..5de4bcc6f4 100644 --- a/integration/container_test.go +++ b/integration/container_test.go @@ -26,14 +26,8 @@ func TestRestartStdin(t *testing.T) { } defer daemon.Destroy(container) - stdin, err := container.StdinPipe() - if err != nil { - t.Fatal(err) - } - stdout, err := container.StdoutPipe() - if err != nil { - t.Fatal(err) - } + stdin := container.StdinPipe() + stdout := container.StdoutPipe() if err := container.Start(); err != nil { t.Fatal(err) } @@ -56,14 +50,8 @@ func TestRestartStdin(t *testing.T) { } // Restart and try again - stdin, err = container.StdinPipe() - if err != nil { - t.Fatal(err) - } - stdout, err = container.StdoutPipe() - if err != nil { - t.Fatal(err) - } + stdin = container.StdinPipe() + stdout = container.StdoutPipe() if err := container.Start(); err != nil { t.Fatal(err) } @@ -103,14 +91,8 @@ func TestStdin(t *testing.T) { } defer daemon.Destroy(container) - stdin, err := container.StdinPipe() - if err != nil { - t.Fatal(err) - } - stdout, err := container.StdoutPipe() - if err != nil { - t.Fatal(err) - } + stdin := container.StdinPipe() + stdout := container.StdoutPipe() if err := container.Start(); err != nil { t.Fatal(err) } @@ -149,14 +131,8 @@ func TestTty(t *testing.T) { } defer daemon.Destroy(container) - stdin, err := container.StdinPipe() - if err != nil { - t.Fatal(err) - } - stdout, err := container.StdoutPipe() - if err != nil { - t.Fatal(err) - } + stdin := container.StdinPipe() + stdout := container.StdoutPipe() if err := container.Start(); err != nil { t.Fatal(err) } diff --git a/integration/runtime_test.go b/integration/runtime_test.go index 93a32e9f70..a436995fd3 100644 --- a/integration/runtime_test.go +++ b/integration/runtime_test.go @@ -610,7 +610,7 @@ func TestRestore(t *testing.T) { } // Simulate a crash/manual quit of dockerd: process dies, states stays 'Running' - cStdin, _ := container2.StdinPipe() + cStdin := container2.StdinPipe() cStdin.Close() if _, err := container2.WaitStop(2 * time.Second); err != nil { t.Fatal(err) diff --git a/integration/utils_test.go b/integration/utils_test.go index 0c78a76170..0cb22ee1c2 100644 --- a/integration/utils_test.go +++ b/integration/utils_test.go @@ -85,14 +85,8 @@ func containerFileExists(eng *engine.Engine, id, dir string, t Fataler) bool { func containerAttach(eng *engine.Engine, id string, t Fataler) (io.WriteCloser, io.ReadCloser) { c := getContainer(eng, id, t) - i, err := c.StdinPipe() - if err != nil { - t.Fatal(err) - } - o, err := c.StdoutPipe() - if err != nil { - t.Fatal(err) - } + i := c.StdinPipe() + o := c.StdoutPipe() return i, o } @@ -292,10 +286,7 @@ func runContainer(eng *engine.Engine, r *daemon.Daemon, args []string, t *testin return "", err } defer r.Destroy(container) - stdout, err := container.StdoutPipe() - if err != nil { - return "", err - } + stdout := container.StdoutPipe() defer stdout.Close() job := eng.Job("start", container.ID)