diff --git a/daemon/container.go b/daemon/container.go index 3533628aa9..bea29e5256 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -19,8 +19,6 @@ import ( "github.com/docker/docker/daemon/logger/jsonfilelog" "github.com/docker/docker/daemon/network" derr "github.com/docker/docker/errors" - "github.com/docker/docker/pkg/broadcaster" - "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/nat" "github.com/docker/docker/pkg/promise" "github.com/docker/docker/pkg/signal" @@ -36,17 +34,10 @@ var ( ErrRootFSReadOnly = errors.New("container rootfs is marked read-only") ) -type streamConfig struct { - stdout *broadcaster.Unbuffered - stderr *broadcaster.Unbuffered - stdin io.ReadCloser - stdinPipe io.WriteCloser -} - // CommonContainer holds the fields for a container which are // applicable across all platforms supported by the daemon. type CommonContainer struct { - streamConfig + *runconfig.StreamConfig // embed for Container to support states directly. *State `json:"State"` // Needed for remote api version <= 1.11 root string // Path to the "home" of the container, including metadata. @@ -87,6 +78,7 @@ func newBaseContainer(id, root string) *Container { execCommands: newExecStore(), root: root, MountPoints: make(map[string]*volume.MountPoint), + StreamConfig: runconfig.NewStreamConfig(), }, } } @@ -243,30 +235,6 @@ func (container *Container) getRootResourcePath(path string) (string, error) { return symlink.FollowSymlinkInScope(filepath.Join(container.root, cleanPath), container.root) } -// streamConfig.StdinPipe returns a WriteCloser which can be used to feed data -// to the standard input of the container's active process. -// Container.StdoutPipe and Container.StderrPipe each return a ReadCloser -// which can be used to retrieve the standard output (and error) generated -// by the container's active process. The output (and error) are actually -// copied and delivered to all StdoutPipe and StderrPipe consumers, using -// a kind of "broadcaster". - -func (streamConfig *streamConfig) StdinPipe() io.WriteCloser { - return streamConfig.stdinPipe -} - -func (streamConfig *streamConfig) StdoutPipe() io.ReadCloser { - bytesPipe := ioutils.NewBytesPipe(nil) - streamConfig.stdout.Add(bytesPipe) - return bytesPipe -} - -func (streamConfig *streamConfig) StderrPipe() io.ReadCloser { - bytesPipe := ioutils.NewBytesPipe(nil) - streamConfig.stderr.Add(bytesPipe) - return bytesPipe -} - // ExitOnNext signals to the monitor that it should not restart the container // after we send the kill signal. func (container *Container) ExitOnNext() { @@ -372,10 +340,10 @@ func (container *Container) getExecIDs() []string { // Attach connects to the container's TTY, delegating to standard // streams or websockets depending on the configuration. func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error { - return attach(&container.streamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr) + return attach(container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr) } -func attach(streamConfig *streamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error { +func attach(streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error { var ( cStdout, cStderr io.ReadCloser cStdin io.WriteCloser diff --git a/daemon/daemon.go b/daemon/daemon.go index b083a65871..fbdb851539 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -32,12 +32,10 @@ import ( "github.com/docker/docker/graph" "github.com/docker/docker/image" "github.com/docker/docker/pkg/archive" - "github.com/docker/docker/pkg/broadcaster" "github.com/docker/docker/pkg/discovery" "github.com/docker/docker/pkg/fileutils" "github.com/docker/docker/pkg/graphdb" "github.com/docker/docker/pkg/idtools" - "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/pkg/mount" "github.com/docker/docker/pkg/namesgenerator" @@ -205,15 +203,11 @@ func (daemon *Daemon) Register(container *Container) error { } // Attach to stdout and stderr - container.stderr = new(broadcaster.Unbuffered) - container.stdout = new(broadcaster.Unbuffered) - // Attach to stdin if container.Config.OpenStdin { - container.stdin, container.stdinPipe = io.Pipe() + container.NewInputPipes() } else { - container.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) // Silently drop stdin + container.NewNopInputPipe() } - // done daemon.containers.Add(container.ID, container) // don't update the Suffixarray if we're starting up diff --git a/daemon/exec.go b/daemon/exec.go index 73f76172de..324e3419c0 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -2,7 +2,6 @@ package daemon import ( "io" - "io/ioutil" "strings" "sync" "time" @@ -10,8 +9,6 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/execdriver" derr "github.com/docker/docker/errors" - "github.com/docker/docker/pkg/broadcaster" - "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/promise" "github.com/docker/docker/pkg/stringid" @@ -28,12 +25,12 @@ type ExecConfig struct { Running bool ExitCode int ProcessConfig *execdriver.ProcessConfig - streamConfig - OpenStdin bool - OpenStderr bool - OpenStdout bool - Container *Container - canRemove bool + OpenStdin bool + OpenStderr bool + OpenStdout bool + streamConfig *runconfig.StreamConfig + Container *Container + canRemove bool // waitStart will be closed immediately after the exec is really started. waitStart chan struct{} @@ -170,7 +167,7 @@ func (d *Daemon) ContainerExecCreate(config *runconfig.ExecConfig) (string, erro OpenStdin: config.AttachStdin, OpenStdout: config.AttachStdout, OpenStderr: config.AttachStderr, - streamConfig: streamConfig{}, + streamConfig: runconfig.NewStreamConfig(), ProcessConfig: processConfig, Container: container, Running: false, @@ -225,16 +222,13 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io. cStderr = stderr } - ec.streamConfig.stderr = new(broadcaster.Unbuffered) - ec.streamConfig.stdout = new(broadcaster.Unbuffered) - // Attach to stdin if ec.OpenStdin { - ec.streamConfig.stdin, ec.streamConfig.stdinPipe = io.Pipe() + ec.streamConfig.NewInputPipes() } else { - ec.streamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) // Silently drop stdin + ec.streamConfig.NewNopInputPipe() } - attachErr := attach(&ec.streamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr) + attachErr := attach(ec.streamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr) execErr := make(chan error) @@ -354,23 +348,17 @@ func (d *Daemon) containerExec(container *Container, ec *ExecConfig) error { } func (d *Daemon) monitorExec(container *Container, ExecConfig *ExecConfig, callback execdriver.DriverCallback) error { - pipes := execdriver.NewPipes(ExecConfig.streamConfig.stdin, ExecConfig.streamConfig.stdout, ExecConfig.streamConfig.stderr, ExecConfig.OpenStdin) + pipes := execdriver.NewPipes(ExecConfig.streamConfig.Stdin(), ExecConfig.streamConfig.Stdout(), ExecConfig.streamConfig.Stderr(), ExecConfig.OpenStdin) exitCode, err := d.Exec(container, ExecConfig, pipes, callback) if err != nil { logrus.Errorf("Error running command in existing container %s: %s", container.ID, err) } logrus.Debugf("Exec task in container %s exited with code %d", container.ID, exitCode) - if ExecConfig.OpenStdin { - if err := ExecConfig.streamConfig.stdin.Close(); err != nil { - logrus.Errorf("Error closing stdin while running in %s: %s", container.ID, err) - } - } - if err := ExecConfig.streamConfig.stdout.Clean(); err != nil { - logrus.Errorf("Error closing stdout while running in %s: %s", container.ID, err) - } - if err := ExecConfig.streamConfig.stderr.Clean(); err != nil { - logrus.Errorf("Error closing stderr while running in %s: %s", container.ID, err) + + if err := ExecConfig.streamConfig.CloseStreams(); err != nil { + logrus.Errorf("%s: %s", container.ID, err) } + if ExecConfig.ProcessConfig.Terminal != nil { if err := ExecConfig.ProcessConfig.Terminal.Close(); err != nil { logrus.Errorf("Error closing terminal while running in container %s: %s", container.ID, err) diff --git a/daemon/monitor.go b/daemon/monitor.go index c36d427a96..a4d67cfde8 100644 --- a/daemon/monitor.go +++ b/daemon/monitor.go @@ -158,7 +158,7 @@ func (m *containerMonitor) Start() error { return err } - pipes := execdriver.NewPipes(m.container.stdin, m.container.stdout, m.container.stderr, m.container.Config.OpenStdin) + pipes := execdriver.NewPipes(m.container.Stdin(), m.container.Stdout(), m.container.Stderr(), m.container.Config.OpenStdin) m.logEvent("start") @@ -329,18 +329,8 @@ func (m *containerMonitor) resetContainer(lock bool) { defer container.Unlock() } - if container.Config.OpenStdin { - if err := container.stdin.Close(); err != nil { - logrus.Errorf("%s: Error close stdin: %s", container.ID, err) - } - } - - if err := container.stdout.Clean(); err != nil { - logrus.Errorf("%s: Error close stdout: %s", container.ID, err) - } - - if err := container.stderr.Clean(); err != nil { - logrus.Errorf("%s: Error close stderr: %s", container.ID, err) + if err := container.CloseStreams(); err != nil { + logrus.Errorf("%s: %s", container.ID, err) } if container.command != nil && container.command.ProcessConfig.Terminal != nil { @@ -351,7 +341,7 @@ func (m *containerMonitor) resetContainer(lock bool) { // Re-create a brand new stdin pipe once the container exited if container.Config.OpenStdin { - container.stdin, container.stdinPipe = io.Pipe() + container.NewInputPipes() } if container.logDriver != nil { diff --git a/runconfig/streams.go b/runconfig/streams.go new file mode 100644 index 0000000000..7a35dd7d31 --- /dev/null +++ b/runconfig/streams.go @@ -0,0 +1,107 @@ +package runconfig + +import ( + "fmt" + "io" + "io/ioutil" + "strings" + + "github.com/docker/docker/pkg/broadcaster" + "github.com/docker/docker/pkg/ioutils" +) + +// StreamConfig holds information about I/O streams managed together. +// +// streamConfig.StdinPipe returns a WriteCloser which can be used to feed data +// to the standard input of the streamConfig's active process. +// streamConfig.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser +// which can be used to retrieve the standard output (and error) generated +// by the container's active process. The output (and error) are actually +// copied and delivered to all StdoutPipe and StderrPipe consumers, using +// a kind of "broadcaster". +type StreamConfig struct { + stdout *broadcaster.Unbuffered + stderr *broadcaster.Unbuffered + stdin io.ReadCloser + stdinPipe io.WriteCloser +} + +// NewStreamConfig creates a stream config and initializes +// the standard err and standard out to new unbuffered broadcasters. +func NewStreamConfig() *StreamConfig { + return &StreamConfig{ + stderr: new(broadcaster.Unbuffered), + stdout: new(broadcaster.Unbuffered), + } +} + +// Stdout returns the standard output in the configuration. +func (streamConfig *StreamConfig) Stdout() *broadcaster.Unbuffered { + return streamConfig.stdout +} + +// Stderr returns the standard error in the configuration. +func (streamConfig *StreamConfig) Stderr() *broadcaster.Unbuffered { + return streamConfig.stderr +} + +// Stdin returns the standard input in the configuration. +func (streamConfig *StreamConfig) Stdin() io.ReadCloser { + return streamConfig.stdin +} + +// StdinPipe returns an input writer pipe as an io.WriteCloser. +func (streamConfig *StreamConfig) StdinPipe() io.WriteCloser { + return streamConfig.stdinPipe +} + +// StdoutPipe creates a new io.ReadCloser with an empty bytes pipe. +// It adds this new out pipe to the Stdout broadcaster. +func (streamConfig *StreamConfig) StdoutPipe() io.ReadCloser { + bytesPipe := ioutils.NewBytesPipe(nil) + streamConfig.stdout.Add(bytesPipe) + return bytesPipe +} + +// StderrPipe creates a new io.ReadCloser with an empty bytes pipe. +// It adds this new err pipe to the Stderr broadcaster. +func (streamConfig *StreamConfig) StderrPipe() io.ReadCloser { + bytesPipe := ioutils.NewBytesPipe(nil) + streamConfig.stderr.Add(bytesPipe) + return bytesPipe +} + +// NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe. +func (streamConfig *StreamConfig) NewInputPipes() { + streamConfig.stdin, streamConfig.stdinPipe = io.Pipe() +} + +// NewNopInputPipe creates a new input pipe that will silently drop all messages in the input. +func (streamConfig *StreamConfig) NewNopInputPipe() { + streamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) +} + +// CloseStreams ensures that the configured streams are properly closed. +func (streamConfig *StreamConfig) CloseStreams() error { + var errors []string + + if streamConfig.stdin != nil { + if err := streamConfig.stdin.Close(); err != nil { + errors = append(errors, fmt.Sprintf("error close stdin: %s", err)) + } + } + + if err := streamConfig.stdout.Clean(); err != nil { + errors = append(errors, fmt.Sprintf("error close stdout: %s", err)) + } + + if err := streamConfig.stderr.Clean(); err != nil { + errors = append(errors, fmt.Sprintf("error close stderr: %s", err)) + } + + if len(errors) > 0 { + return fmt.Errorf(strings.Join(errors, "\n")) + } + + return nil +}