From 3fec7c0858a0a3dee5423e6bffc3a3a1b238c30f Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 7 Dec 2017 14:26:27 -0500 Subject: [PATCH] Remove libcontainerd.IOPipe replaced with cio.DirectIO Signed-off-by: Daniel Nephin --- container/container.go | 3 +- container/stream/streams.go | 4 +- daemon/exec/exec.go | 3 +- libcontainerd/client_daemon.go | 15 +- libcontainerd/client_daemon_linux.go | 14 +- libcontainerd/client_local_windows.go | 60 ++++---- libcontainerd/io.go | 36 ----- libcontainerd/io_unix.go | 60 -------- libcontainerd/io_windows.go | 138 ------------------- libcontainerd/remote_daemon_process.go | 56 -------- libcontainerd/remote_daemon_process_linux.go | 59 -------- libcontainerd/types.go | 19 +-- plugin/executor/containerd/containerd.go | 2 +- 13 files changed, 48 insertions(+), 421 deletions(-) delete mode 100644 libcontainerd/io.go delete mode 100644 libcontainerd/io_unix.go delete mode 100644 libcontainerd/io_windows.go delete mode 100644 libcontainerd/remote_daemon_process.go delete mode 100644 libcontainerd/remote_daemon_process_linux.go diff --git a/container/container.go b/container/container.go index 11814b7719..dd11a11543 100644 --- a/container/container.go +++ b/container/container.go @@ -27,7 +27,6 @@ import ( "github.com/docker/docker/daemon/network" "github.com/docker/docker/image" "github.com/docker/docker/layer" - "github.com/docker/docker/libcontainerd" "github.com/docker/docker/opts" "github.com/docker/docker/pkg/containerfs" "github.com/docker/docker/pkg/idtools" @@ -1004,7 +1003,7 @@ func (container *Container) CloseStreams() error { } // InitializeStdio is called by libcontainerd to connect the stdio. -func (container *Container) InitializeStdio(iop *libcontainerd.IOPipe) (cio.IO, error) { +func (container *Container) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) { if err := container.startLogging(); err != nil { container.Reset(false) return nil, err diff --git a/container/stream/streams.go b/container/stream/streams.go index 106e2b1814..0ec164a486 100644 --- a/container/stream/streams.go +++ b/container/stream/streams.go @@ -7,7 +7,7 @@ import ( "strings" "sync" - "github.com/docker/docker/libcontainerd" + "github.com/containerd/containerd/cio" "github.com/docker/docker/pkg/broadcaster" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/pools" @@ -114,7 +114,7 @@ func (c *Config) CloseStreams() error { } // CopyToPipe connects streamconfig with a libcontainerd.IOPipe -func (c *Config) CopyToPipe(iop *libcontainerd.IOPipe) { +func (c *Config) CopyToPipe(iop *cio.DirectIO) { copyFunc := func(w io.Writer, r io.ReadCloser) { c.Add(1) go func() { diff --git a/daemon/exec/exec.go b/daemon/exec/exec.go index 370b4032c7..08ec67dc3e 100644 --- a/daemon/exec/exec.go +++ b/daemon/exec/exec.go @@ -6,7 +6,6 @@ import ( "github.com/containerd/containerd/cio" "github.com/docker/docker/container/stream" - "github.com/docker/docker/libcontainerd" "github.com/docker/docker/pkg/stringid" "github.com/sirupsen/logrus" ) @@ -63,7 +62,7 @@ func (i *rio) Wait() { } // InitializeStdio is called by libcontainerd to connect the stdio. -func (c *Config) InitializeStdio(iop *libcontainerd.IOPipe) (cio.IO, error) { +func (c *Config) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) { c.StreamConfig.CopyToPipe(iop) if c.StreamConfig.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" { diff --git a/libcontainerd/client_daemon.go b/libcontainerd/client_daemon.go index 79b5e478be..a698154785 100644 --- a/libcontainerd/client_daemon.go +++ b/libcontainerd/client_daemon.go @@ -121,7 +121,7 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallba c.Lock() defer c.Unlock() - var rio cio.IO + var rio *cio.DirectIO defer func() { err = wrapError(err) }() @@ -139,13 +139,12 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallba }() t, err := ctr.Task(ctx, func(fifos *cio.FIFOSet) (cio.IO, error) { - io, err := newIOPipe(fifos) + rio, err = cio.NewDirectIO(ctx, fifos) if err != nil { return nil, err } - rio, err = attachStdio(io) - return rio, err + return attachStdio(rio) }) if err != nil && !errdefs.IsNotFound(errors.Cause(err)) { return false, -1, err @@ -255,7 +254,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin uid, gid := getSpecUser(spec) t, err = ctr.ctr.NewTask(ctx, func(id string) (cio.IO, error) { - fifos := newFIFOSet(ctr.bundleDir, id, InitProcessName, withStdin, spec.Process.Terminal) + fifos := newFIFOSet(ctr.bundleDir, InitProcessName, withStdin, spec.Process.Terminal) rio, err = c.createIO(fifos, id, InitProcessName, stdinCloseSync, attachStdio) return rio, err }, @@ -315,7 +314,7 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec * stdinCloseSync = make(chan struct{}) ) - fifos := newFIFOSet(ctr.bundleDir, containerID, processID, withStdin, spec.Terminal) + fifos := newFIFOSet(ctr.bundleDir, processID, withStdin, spec.Terminal) defer func() { if err != nil { @@ -612,7 +611,7 @@ func (c *client) getProcess(containerID, processID string) (containerd.Process, // createIO creates the io to be used by a process // This needs to get a pointer to interface as upon closure the process may not have yet been registered func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio StdioCallback) (cio.IO, error) { - io, err := newIOPipe(fifos) + io, err := cio.NewDirectIO(context.Background(), fifos) if err != nil { return nil, err } @@ -687,7 +686,7 @@ func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) { "container": ei.ContainerID, }).Error("failed to find container") } else { - rmFIFOSet(newFIFOSet(ctr.bundleDir, ei.ContainerID, ei.ProcessID, true, false)) + rmFIFOSet(newFIFOSet(ctr.bundleDir, ei.ProcessID, true, false)) } } }) diff --git a/libcontainerd/client_daemon_linux.go b/libcontainerd/client_daemon_linux.go index 9a98fbdf13..302ba90006 100644 --- a/libcontainerd/client_daemon_linux.go +++ b/libcontainerd/client_daemon_linux.go @@ -80,25 +80,27 @@ func prepareBundleDir(bundleDir string, ociSpec *specs.Spec) (string, error) { return p, nil } -func newFIFOSet(bundleDir, containerID, processID string, withStdin, withTerminal bool) *cio.FIFOSet { +func newFIFOSet(bundleDir, processID string, withStdin, withTerminal bool) *cio.FIFOSet { fifos := &cio.FIFOSet{ - Terminal: withTerminal, - Out: filepath.Join(bundleDir, processID+"-stdout"), + Config: cio.Config{ + Terminal: withTerminal, + Stdout: filepath.Join(bundleDir, processID+"-stdout"), + }, } if withStdin { - fifos.In = filepath.Join(bundleDir, processID+"-stdin") + fifos.Stdin = filepath.Join(bundleDir, processID+"-stdin") } if !fifos.Terminal { - fifos.Err = filepath.Join(bundleDir, processID+"-stderr") + fifos.Stderr = filepath.Join(bundleDir, processID+"-stderr") } return fifos } func rmFIFOSet(fset *cio.FIFOSet) { - for _, fn := range []string{fset.Out, fset.In, fset.Err} { + for _, fn := range []string{fset.Stdout, fset.Stdin, fset.Stderr} { if fn != "" { if err := os.RemoveAll(fn); err != nil { logrus.Warnf("libcontainerd: failed to remove fifo %v: %v", fn, err) diff --git a/libcontainerd/client_local_windows.go b/libcontainerd/client_local_windows.go index 09872905c4..22329cc1fa 100644 --- a/libcontainerd/client_local_windows.go +++ b/libcontainerd/client_local_windows.go @@ -18,6 +18,7 @@ import ( "github.com/Microsoft/hcsshim" opengcs "github.com/Microsoft/opengcs/client" "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" "github.com/docker/docker/pkg/sysinfo" "github.com/docker/docker/pkg/system" specs "github.com/opencontainers/runtime-spec/specs-go" @@ -670,28 +671,12 @@ func (c *client) Start(_ context.Context, id, _ string, withStdin bool, attachSt return p.pid, nil } - var ( - stdout, stderr io.ReadCloser - stdin io.WriteCloser - ) - stdin, stdout, stderr, err = newProcess.Stdio() + dio, err := newIOFromProcess(newProcess) if err != nil { logger.WithError(err).Error("failed to get stdio pipes") return -1, err } - - iopipe := &IOPipe{Terminal: ctr.ociSpec.Process.Terminal} - iopipe.Stdin = createStdInCloser(stdin, newProcess) - - // Convert io.ReadClosers to io.Readers - if stdout != nil { - iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout}) - } - if stderr != nil { - iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr}) - } - - _, err = attachStdio(iopipe) + _, err = attachStdio(dio) if err != nil { logger.WithError(err).Error("failed to attache stdio") return -1, err @@ -727,6 +712,26 @@ func (c *client) Start(_ context.Context, id, _ string, withStdin bool, attachSt return p.pid, nil } +func newIOFromProcess(newProcess process) (*cio.DirectIO, error) { + stdin, stdout, stderr, err := newProcess.Stdio() + if err != nil { + return nil, err + } + + dio := cio.DirectIO{ + Terminal: ctr.ociSpec.Process.Terminal, + Stdin: createStdInCloser(stdin, newProcess), + } + // Convert io.ReadClosers to io.Readers + if stdout != nil { + dio.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout}) + } + if stderr != nil { + dio.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr}) + } + return dio, nil +} + // Exec adds a process in an running container func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error) { ctr := c.getContainer(containerID) @@ -807,25 +812,14 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec * } }() - stdin, stdout, stderr, err = newProcess.Stdio() + dio, err := newIOFromProcess(newProcess) if err != nil { - logger.WithError(err).Error("getting std pipes failed") + logger.WithError(err).Error("failed to get stdio pipes") return -1, err } - - iopipe := &IOPipe{Terminal: spec.Terminal} - iopipe.Stdin = createStdInCloser(stdin, newProcess) - - // Convert io.ReadClosers to io.Readers - if stdout != nil { - iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout}) - } - if stderr != nil { - iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr}) - } - + dio.Termainl = spec.Terminal // Tell the engine to attach streams back to the client - _, err = attachStdio(iopipe) + _, err = attachStdio(dio) if err != nil { return -1, err } diff --git a/libcontainerd/io.go b/libcontainerd/io.go deleted file mode 100644 index 25a894b078..0000000000 --- a/libcontainerd/io.go +++ /dev/null @@ -1,36 +0,0 @@ -package libcontainerd - -import "github.com/containerd/containerd/cio" - -// Config returns the containerd.IOConfig of this pipe set -func (p *IOPipe) Config() cio.Config { - return p.config -} - -// Cancel aborts ongoing operations if they have not completed yet -func (p *IOPipe) Cancel() { - p.cancel() -} - -// Wait waits for io operations to finish -func (p *IOPipe) Wait() { -} - -// Close closes the underlying pipes -func (p *IOPipe) Close() error { - p.cancel() - - if p.Stdin != nil { - p.Stdin.Close() - } - - if p.Stdout != nil { - p.Stdout.Close() - } - - if p.Stderr != nil { - p.Stderr.Close() - } - - return nil -} diff --git a/libcontainerd/io_unix.go b/libcontainerd/io_unix.go deleted file mode 100644 index 8e597914ee..0000000000 --- a/libcontainerd/io_unix.go +++ /dev/null @@ -1,60 +0,0 @@ -// +build !windows - -package libcontainerd - -import ( - "context" - "io" - "syscall" - - "github.com/containerd/containerd/cio" - "github.com/containerd/fifo" - "github.com/pkg/errors" -) - -func newIOPipe(fifos *cio.FIFOSet) (*IOPipe, error) { - var ( - err error - ctx, cancel = context.WithCancel(context.Background()) - f io.ReadWriteCloser - iop = &IOPipe{ - Terminal: fifos.Terminal, - cancel: cancel, - config: cio.Config{ - Terminal: fifos.Terminal, - Stdin: fifos.In, - Stdout: fifos.Out, - Stderr: fifos.Err, - }, - } - ) - defer func() { - if err != nil { - cancel() - iop.Close() - } - }() - - if fifos.In != "" { - if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, errors.WithStack(err) - } - iop.Stdin = f - } - - if fifos.Out != "" { - if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, errors.WithStack(err) - } - iop.Stdout = f - } - - if fifos.Err != "" { - if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, errors.WithStack(err) - } - iop.Stderr = f - } - - return iop, nil -} diff --git a/libcontainerd/io_windows.go b/libcontainerd/io_windows.go deleted file mode 100644 index f2e5a93fe1..0000000000 --- a/libcontainerd/io_windows.go +++ /dev/null @@ -1,138 +0,0 @@ -package libcontainerd - -import ( - "context" - "io" - "net" - "sync" - - winio "github.com/Microsoft/go-winio" - "github.com/containerd/containerd/cio" - "github.com/pkg/errors" -) - -type winpipe struct { - sync.Mutex - - ctx context.Context - listener net.Listener - readyCh chan struct{} - readyErr error - - client net.Conn -} - -func newWinpipe(ctx context.Context, pipe string) (*winpipe, error) { - l, err := winio.ListenPipe(pipe, nil) - if err != nil { - return nil, errors.Wrapf(err, "%q pipe creation failed", pipe) - } - wp := &winpipe{ - ctx: ctx, - listener: l, - readyCh: make(chan struct{}), - } - go func() { - go func() { - defer close(wp.readyCh) - defer wp.listener.Close() - c, err := wp.listener.Accept() - if err != nil { - wp.Lock() - if wp.readyErr == nil { - wp.readyErr = err - } - wp.Unlock() - return - } - wp.client = c - }() - - select { - case <-wp.readyCh: - case <-ctx.Done(): - wp.Lock() - if wp.readyErr == nil { - wp.listener.Close() - wp.readyErr = ctx.Err() - } - wp.Unlock() - } - }() - - return wp, nil -} - -func (wp *winpipe) Read(b []byte) (int, error) { - select { - case <-wp.ctx.Done(): - return 0, wp.ctx.Err() - case <-wp.readyCh: - return wp.client.Read(b) - } -} - -func (wp *winpipe) Write(b []byte) (int, error) { - select { - case <-wp.ctx.Done(): - return 0, wp.ctx.Err() - case <-wp.readyCh: - return wp.client.Write(b) - } -} - -func (wp *winpipe) Close() error { - select { - case <-wp.readyCh: - return wp.client.Close() - default: - return nil - } -} - -func newIOPipe(fifos *cio.FIFOSet) (*IOPipe, error) { - var ( - err error - ctx, cancel = context.WithCancel(context.Background()) - p io.ReadWriteCloser - iop = &IOPipe{ - Terminal: fifos.Terminal, - cancel: cancel, - config: cio.Config{ - Terminal: fifos.Terminal, - Stdin: fifos.In, - Stdout: fifos.Out, - Stderr: fifos.Err, - }, - } - ) - defer func() { - if err != nil { - cancel() - iop.Close() - } - }() - - if fifos.In != "" { - if p, err = newWinpipe(ctx, fifos.In); err != nil { - return nil, err - } - iop.Stdin = p - } - - if fifos.Out != "" { - if p, err = newWinpipe(ctx, fifos.Out); err != nil { - return nil, err - } - iop.Stdout = p - } - - if fifos.Err != "" { - if p, err = newWinpipe(ctx, fifos.Err); err != nil { - return nil, err - } - iop.Stderr = p - } - - return iop, nil -} diff --git a/libcontainerd/remote_daemon_process.go b/libcontainerd/remote_daemon_process.go deleted file mode 100644 index a00406e150..0000000000 --- a/libcontainerd/remote_daemon_process.go +++ /dev/null @@ -1,56 +0,0 @@ -// +build !windows - -package libcontainerd - -import "github.com/pkg/errors" - -// process represents the state for the main container process or an exec. -type process struct { - // id is the logical name of the process - id string - - // cid is the container id to which this process belongs - cid string - - // pid is the identifier of the process - pid uint32 - - // io holds the io reader/writer associated with the process - io *IOPipe - - // root is the state directory for the process - root string -} - -func (p *process) ID() string { - return p.id -} - -func (p *process) Pid() uint32 { - return p.pid -} - -func (p *process) SetPid(pid uint32) error { - if p.pid != 0 { - return errors.Errorf("pid is already set to %d", pid) - } - - p.pid = pid - return nil -} - -func (p *process) IOPipe() *IOPipe { - return p.io -} - -func (p *process) CloseIO() { - if p.io.Stdin != nil { - p.io.Stdin.Close() - } - if p.io.Stdout != nil { - p.io.Stdout.Close() - } - if p.io.Stderr != nil { - p.io.Stderr.Close() - } -} diff --git a/libcontainerd/remote_daemon_process_linux.go b/libcontainerd/remote_daemon_process_linux.go deleted file mode 100644 index fd54d01981..0000000000 --- a/libcontainerd/remote_daemon_process_linux.go +++ /dev/null @@ -1,59 +0,0 @@ -package libcontainerd - -import ( - "os" - "path/filepath" - - "github.com/pkg/errors" - "golang.org/x/sys/unix" -) - -var fdNames = map[int]string{ - unix.Stdin: "stdin", - unix.Stdout: "stdout", - unix.Stderr: "stderr", -} - -func (p *process) pipeName(index int) string { - return filepath.Join(p.root, p.id+"-"+fdNames[index]) -} - -func (p *process) IOPaths() (string, string, string) { - var ( - stdin = p.pipeName(unix.Stdin) - stdout = p.pipeName(unix.Stdout) - stderr = p.pipeName(unix.Stderr) - ) - // TODO: debug why we're having zombies when I don't unset those - if p.io.Stdin == nil { - stdin = "" - } - if p.io.Stderr == nil { - stderr = "" - } - return stdin, stdout, stderr -} - -func (p *process) Cleanup() error { - var retErr error - - // Ensure everything was closed - p.CloseIO() - - for _, i := range [3]string{ - p.pipeName(unix.Stdin), - p.pipeName(unix.Stdout), - p.pipeName(unix.Stderr), - } { - err := os.Remove(i) - if err != nil { - if retErr == nil { - retErr = errors.Wrapf(err, "failed to remove %s", i) - } else { - retErr = errors.Wrapf(retErr, "failed to remove %s", i) - } - } - } - - return retErr -} diff --git a/libcontainerd/types.go b/libcontainerd/types.go index 346fd241f1..4286415eae 100644 --- a/libcontainerd/types.go +++ b/libcontainerd/types.go @@ -2,7 +2,6 @@ package libcontainerd import ( "context" - "io" "time" "github.com/containerd/containerd" @@ -107,20 +106,4 @@ type Client interface { } // StdioCallback is called to connect a container or process stdio. -type StdioCallback func(*IOPipe) (cio.IO, error) - -// IOPipe contains the stdio streams. -type IOPipe struct { - Stdin io.WriteCloser - Stdout io.ReadCloser - Stderr io.ReadCloser - Terminal bool // Whether stderr is connected on Windows - - cancel context.CancelFunc - config cio.Config -} - -// ServerVersion contains version information as retrieved from the -// server -type ServerVersion struct { -} +type StdioCallback func(io *cio.DirectIO) (cio.IO, error) diff --git a/plugin/executor/containerd/containerd.go b/plugin/executor/containerd/containerd.go index 5343b858e0..38dcfcd58c 100644 --- a/plugin/executor/containerd/containerd.go +++ b/plugin/executor/containerd/containerd.go @@ -122,7 +122,7 @@ func (c *rio) Wait() { } func attachStreamsFunc(stdout, stderr io.WriteCloser) libcontainerd.StdioCallback { - return func(iop *libcontainerd.IOPipe) (cio.IO, error) { + return func(iop *cio.DirectIO) (cio.IO, error) { if iop.Stdin != nil { iop.Stdin.Close() // closing stdin shouldn't be needed here, it should never be open