зеркало из https://github.com/microsoft/docker.git
Fix issues with fifos blocking on open
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Родитель
eee025ec9a
Коммит
6d26464502
|
@ -144,6 +144,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0
|
||||||
|
|
||||||
# containerd
|
# containerd
|
||||||
clone git github.com/docker/containerd 52ef1ceb4b660c42cf4ea9013180a5663968d4c7
|
clone git github.com/docker/containerd 52ef1ceb4b660c42cf4ea9013180a5663968d4c7
|
||||||
|
clone git github.com/tonistiigi/fifo 8c56881ce5e63e19e2dfc495c8af0fb90916467d
|
||||||
|
|
||||||
# cluster
|
# cluster
|
||||||
clone git github.com/docker/swarmkit 3b221eb0391d34ae0b9dac65df02b5b64de6dff2
|
clone git github.com/docker/swarmkit 3b221eb0391d34ae0b9dac65df02b5b64de6dff2
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -322,10 +323,10 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
|
||||||
|
|
||||||
// Convert io.ReadClosers to io.Readers
|
// Convert io.ReadClosers to io.Readers
|
||||||
if stdout != nil {
|
if stdout != nil {
|
||||||
iopipe.Stdout = openReaderFromPipe(stdout)
|
iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout})
|
||||||
}
|
}
|
||||||
if stderr != nil {
|
if stderr != nil {
|
||||||
iopipe.Stderr = openReaderFromPipe(stderr)
|
iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr})
|
||||||
}
|
}
|
||||||
|
|
||||||
proc := &process{
|
proc := &process{
|
||||||
|
|
|
@ -7,11 +7,13 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
containerd "github.com/docker/containerd/api/grpc/types"
|
containerd "github.com/docker/containerd/api/grpc/types"
|
||||||
"github.com/docker/docker/pkg/ioutils"
|
"github.com/docker/docker/pkg/ioutils"
|
||||||
"github.com/opencontainers/runtime-spec/specs-go"
|
"github.com/opencontainers/runtime-spec/specs-go"
|
||||||
|
"github.com/tonistiigi/fifo"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -207,15 +209,15 @@ func (ctr *container) handleEvent(e *containerd.Event) error {
|
||||||
// discardFifos attempts to fully read the container fifos to unblock processes
|
// discardFifos attempts to fully read the container fifos to unblock processes
|
||||||
// that may be blocked on the writer side.
|
// that may be blocked on the writer side.
|
||||||
func (ctr *container) discardFifos() {
|
func (ctr *container) discardFifos() {
|
||||||
|
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
|
||||||
for _, i := range []int{syscall.Stdout, syscall.Stderr} {
|
for _, i := range []int{syscall.Stdout, syscall.Stderr} {
|
||||||
f := ctr.fifo(i)
|
f, err := fifo.OpenFifo(ctx, ctr.fifo(i), syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
|
||||||
c := make(chan struct{})
|
if err != nil {
|
||||||
|
logrus.Warnf("error opening fifo %v for discarding: %+v", f, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
go func() {
|
go func() {
|
||||||
r := openReaderFromFifo(f)
|
io.Copy(ioutil.Discard, f)
|
||||||
close(c) // this channel is used to not close the writer too early, before readonly open has been called.
|
|
||||||
io.Copy(ioutil.Discard, r)
|
|
||||||
}()
|
}()
|
||||||
<-c
|
|
||||||
closeReaderFifo(f) // avoid blocking permanently on open if there is no writer side
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package libcontainerd
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
@ -131,10 +132,10 @@ func (ctr *container) start() error {
|
||||||
|
|
||||||
// Convert io.ReadClosers to io.Readers
|
// Convert io.ReadClosers to io.Readers
|
||||||
if stdout != nil {
|
if stdout != nil {
|
||||||
iopipe.Stdout = openReaderFromPipe(stdout)
|
iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout})
|
||||||
}
|
}
|
||||||
if stderr != nil {
|
if stderr != nil {
|
||||||
iopipe.Stderr = openReaderFromPipe(stderr)
|
iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the PID
|
// Save the PID
|
||||||
|
|
|
@ -1,14 +1,16 @@
|
||||||
package libcontainerd
|
package libcontainerd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
containerd "github.com/docker/containerd/api/grpc/types"
|
containerd "github.com/docker/containerd/api/grpc/types"
|
||||||
"github.com/docker/docker/pkg/ioutils"
|
"github.com/docker/docker/pkg/ioutils"
|
||||||
|
"github.com/tonistiigi/fifo"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -26,34 +28,53 @@ type process struct {
|
||||||
dir string
|
dir string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *process) openFifos(terminal bool) (*IOPipe, error) {
|
func (p *process) openFifos(terminal bool) (pipe *IOPipe, err error) {
|
||||||
bundleDir := p.dir
|
if err := os.MkdirAll(p.dir, 0700); err != nil {
|
||||||
if err := os.MkdirAll(bundleDir, 0700); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < 3; i++ {
|
ctx, _ := context.WithTimeout(context.Background(), 15*time.Second)
|
||||||
f := p.fifo(i)
|
|
||||||
if err := syscall.Mkfifo(f, 0700); err != nil && !os.IsExist(err) {
|
|
||||||
return nil, fmt.Errorf("mkfifo: %s %v", f, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
io := &IOPipe{}
|
io := &IOPipe{}
|
||||||
stdinf, err := os.OpenFile(p.fifo(syscall.Stdin), syscall.O_RDWR, 0)
|
|
||||||
|
stdin, err := fifo.OpenFifo(ctx, p.fifo(syscall.Stdin), syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
io.Stdout = openReaderFromFifo(p.fifo(syscall.Stdout))
|
defer func() {
|
||||||
if !terminal {
|
if err != nil {
|
||||||
io.Stderr = openReaderFromFifo(p.fifo(syscall.Stderr))
|
stdin.Close()
|
||||||
} else {
|
}
|
||||||
io.Stderr = emptyReader{}
|
}()
|
||||||
|
|
||||||
|
io.Stdout, err = fifo.OpenFifo(ctx, p.fifo(syscall.Stdout), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
io.Stdin = ioutils.NewWriteCloserWrapper(stdinf, func() error {
|
defer func() {
|
||||||
stdinf.Close()
|
if err != nil {
|
||||||
|
io.Stdout.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if !terminal {
|
||||||
|
io.Stderr, err = fifo.OpenFifo(ctx, p.fifo(syscall.Stderr), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
io.Stderr.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
} else {
|
||||||
|
io.Stderr = ioutil.NopCloser(emptyReader{})
|
||||||
|
}
|
||||||
|
|
||||||
|
io.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
|
||||||
|
stdin.Close()
|
||||||
_, err := p.client.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
|
_, err := p.client.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
|
||||||
Id: p.containerID,
|
Id: p.containerID,
|
||||||
Pid: p.friendlyName,
|
Pid: p.friendlyName,
|
||||||
|
@ -67,8 +88,8 @@ func (p *process) openFifos(terminal bool) (*IOPipe, error) {
|
||||||
|
|
||||||
func (p *process) closeFifos(io *IOPipe) {
|
func (p *process) closeFifos(io *IOPipe) {
|
||||||
io.Stdin.Close()
|
io.Stdin.Close()
|
||||||
closeReaderFifo(p.fifo(syscall.Stdout))
|
io.Stdout.Close()
|
||||||
closeReaderFifo(p.fifo(syscall.Stderr))
|
io.Stderr.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
type emptyReader struct{}
|
type emptyReader struct{}
|
||||||
|
@ -77,34 +98,6 @@ func (r emptyReader) Read(b []byte) (int, error) {
|
||||||
return 0, io.EOF
|
return 0, io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
func openReaderFromFifo(fn string) io.Reader {
|
|
||||||
r, w := io.Pipe()
|
|
||||||
c := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
close(c)
|
|
||||||
stdoutf, err := os.OpenFile(fn, syscall.O_RDONLY, 0)
|
|
||||||
if err != nil {
|
|
||||||
r.CloseWithError(err)
|
|
||||||
}
|
|
||||||
if _, err := io.Copy(w, stdoutf); err != nil {
|
|
||||||
r.CloseWithError(err)
|
|
||||||
}
|
|
||||||
w.Close()
|
|
||||||
stdoutf.Close()
|
|
||||||
}()
|
|
||||||
<-c // wait for the goroutine to get scheduled and syscall to block
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
// closeReaderFifo closes fifo that may be blocked on open by opening the write side.
|
|
||||||
func closeReaderFifo(fn string) {
|
|
||||||
f, err := os.OpenFile(fn, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
f.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *process) fifo(index int) string {
|
func (p *process) fifo(index int) string {
|
||||||
return filepath.Join(p.dir, p.friendlyName+"-"+fdNames[index])
|
return filepath.Join(p.dir, p.friendlyName+"-"+fdNames[index])
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package libcontainerd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/Microsoft/hcsshim"
|
"github.com/Microsoft/hcsshim"
|
||||||
"github.com/docker/docker/pkg/ioutils"
|
"github.com/docker/docker/pkg/ioutils"
|
||||||
|
@ -18,16 +19,17 @@ type process struct {
|
||||||
hcsProcess hcsshim.Process
|
hcsProcess hcsshim.Process
|
||||||
}
|
}
|
||||||
|
|
||||||
func openReaderFromPipe(p io.ReadCloser) io.Reader {
|
type autoClosingReader struct {
|
||||||
r, w := io.Pipe()
|
io.ReadCloser
|
||||||
go func() {
|
sync.Once
|
||||||
if _, err := io.Copy(w, p); err != nil {
|
}
|
||||||
r.CloseWithError(err)
|
|
||||||
}
|
func (r *autoClosingReader) Read(b []byte) (n int, err error) {
|
||||||
w.Close()
|
n, err = r.ReadCloser.Read(b)
|
||||||
p.Close()
|
if err == io.EOF {
|
||||||
}()
|
r.Once.Do(func() { r.ReadCloser.Close() })
|
||||||
return r
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func createStdInCloser(pipe io.WriteCloser, process hcsshim.Process) io.WriteCloser {
|
func createStdInCloser(pipe io.WriteCloser, process hcsshim.Process) io.WriteCloser {
|
||||||
|
|
|
@ -61,7 +61,7 @@ type CreateOption interface {
|
||||||
// IOPipe contains the stdio streams.
|
// IOPipe contains the stdio streams.
|
||||||
type IOPipe struct {
|
type IOPipe struct {
|
||||||
Stdin io.WriteCloser
|
Stdin io.WriteCloser
|
||||||
Stdout io.Reader
|
Stdout io.ReadCloser
|
||||||
Stderr io.Reader
|
Stderr io.ReadCloser
|
||||||
Terminal bool // Whether stderr is connected on Windows
|
Terminal bool // Whether stderr is connected on Windows
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,21 @@
|
||||||
|
MIT
|
||||||
|
|
||||||
|
Copyright (C) 2016 Tõnis Tiigi <tonistiigi@gmail.com>
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in
|
||||||
|
all copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||||
|
THE SOFTWARE.
|
|
@ -0,0 +1,13 @@
|
||||||
|
.PHONY: fmt vet test deps
|
||||||
|
|
||||||
|
test: deps
|
||||||
|
go test -v ./...
|
||||||
|
|
||||||
|
deps:
|
||||||
|
go get -d -t ./...
|
||||||
|
|
||||||
|
fmt:
|
||||||
|
gofmt -s -l .
|
||||||
|
|
||||||
|
vet:
|
||||||
|
go vet ./...
|
|
@ -0,0 +1,215 @@
|
||||||
|
package fifo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type fifo struct {
|
||||||
|
flag int
|
||||||
|
opened chan struct{}
|
||||||
|
closed chan struct{}
|
||||||
|
closing chan struct{}
|
||||||
|
err error
|
||||||
|
file *os.File
|
||||||
|
closingOnce sync.Once // close has been called
|
||||||
|
closedOnce sync.Once // fifo is closed
|
||||||
|
handle *handle
|
||||||
|
}
|
||||||
|
|
||||||
|
var leakCheckWg *sync.WaitGroup
|
||||||
|
|
||||||
|
// OpenFifo opens a fifo. Returns io.ReadWriteCloser.
|
||||||
|
// Context can be used to cancel this function until open(2) has not returned.
|
||||||
|
// Accepted flags:
|
||||||
|
// - syscall.O_CREAT - create new fifo if one doesn't exist
|
||||||
|
// - syscall.O_RDONLY - open fifo only from reader side
|
||||||
|
// - syscall.O_WRONLY - open fifo only from writer side
|
||||||
|
// - syscall.O_RDWR - open fifo from both sides, never block on syscall level
|
||||||
|
// - syscall.O_NONBLOCK - return io.ReadWriteCloser even if other side of the
|
||||||
|
// fifo isn't open. read/write will be connected after the actual fifo is
|
||||||
|
// open or after fifo is closed.
|
||||||
|
func OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) {
|
||||||
|
if _, err := os.Stat(fn); err != nil {
|
||||||
|
if os.IsNotExist(err) && flag&syscall.O_CREAT != 0 {
|
||||||
|
if err := syscall.Mkfifo(fn, uint32(perm&os.ModePerm)); err != nil && !os.IsExist(err) {
|
||||||
|
return nil, errors.Wrapf(err, "error creating fifo %v", fn)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
block := flag&syscall.O_NONBLOCK == 0 || flag&syscall.O_RDWR != 0
|
||||||
|
|
||||||
|
flag &= ^syscall.O_CREAT
|
||||||
|
flag &= ^syscall.O_NONBLOCK
|
||||||
|
|
||||||
|
h, err := getHandle(fn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
f := &fifo{
|
||||||
|
handle: h,
|
||||||
|
flag: flag,
|
||||||
|
opened: make(chan struct{}),
|
||||||
|
closed: make(chan struct{}),
|
||||||
|
closing: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
wg := leakCheckWg
|
||||||
|
if wg != nil {
|
||||||
|
wg.Add(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if wg != nil {
|
||||||
|
defer wg.Done()
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
f.Close()
|
||||||
|
case <-f.opened:
|
||||||
|
case <-f.closed:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
if wg != nil {
|
||||||
|
defer wg.Done()
|
||||||
|
}
|
||||||
|
var file *os.File
|
||||||
|
fn, err := h.Path()
|
||||||
|
if err == nil {
|
||||||
|
file, err = os.OpenFile(fn, flag, 0)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-f.closing:
|
||||||
|
if err == nil {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
err = ctx.Err()
|
||||||
|
default:
|
||||||
|
err = errors.Errorf("fifo %v was closed before opening", fn)
|
||||||
|
}
|
||||||
|
if file != nil {
|
||||||
|
file.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
f.closedOnce.Do(func() {
|
||||||
|
f.err = err
|
||||||
|
close(f.closed)
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
f.file = file
|
||||||
|
close(f.opened)
|
||||||
|
}()
|
||||||
|
if block {
|
||||||
|
select {
|
||||||
|
case <-f.opened:
|
||||||
|
case <-f.closed:
|
||||||
|
return nil, f.err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read from a fifo to a byte array.
|
||||||
|
func (f *fifo) Read(b []byte) (int, error) {
|
||||||
|
if f.flag&syscall.O_WRONLY > 0 {
|
||||||
|
return 0, errors.New("reading from write-only fifo")
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-f.opened:
|
||||||
|
return f.file.Read(b)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-f.opened:
|
||||||
|
return f.file.Read(b)
|
||||||
|
case <-f.closed:
|
||||||
|
return 0, errors.New("reading from a closed fifo")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write from byte array to a fifo.
|
||||||
|
func (f *fifo) Write(b []byte) (int, error) {
|
||||||
|
if f.flag&(syscall.O_WRONLY|syscall.O_RDWR) == 0 {
|
||||||
|
return 0, errors.New("writing to read-only fifo")
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-f.opened:
|
||||||
|
return f.file.Write(b)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-f.opened:
|
||||||
|
return f.file.Write(b)
|
||||||
|
case <-f.closed:
|
||||||
|
return 0, errors.New("writing to a closed fifo")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the fifo. Next reads/writes will error. This method can also be used
|
||||||
|
// before open(2) has returned and fifo was never opened.
|
||||||
|
func (f *fifo) Close() error {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-f.closed:
|
||||||
|
f.handle.Close()
|
||||||
|
return f.err
|
||||||
|
default:
|
||||||
|
select {
|
||||||
|
case <-f.opened:
|
||||||
|
f.closedOnce.Do(func() {
|
||||||
|
f.err = f.file.Close()
|
||||||
|
close(f.closed)
|
||||||
|
})
|
||||||
|
default:
|
||||||
|
if f.flag&syscall.O_RDWR != 0 {
|
||||||
|
runtime.Gosched()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
f.closingOnce.Do(func() {
|
||||||
|
close(f.closing)
|
||||||
|
})
|
||||||
|
reverseMode := syscall.O_WRONLY
|
||||||
|
if f.flag&syscall.O_WRONLY > 0 {
|
||||||
|
reverseMode = syscall.O_RDONLY
|
||||||
|
}
|
||||||
|
fn, err := f.handle.Path()
|
||||||
|
// if Close() is called concurrently(shouldn't) it may cause error
|
||||||
|
// because handle is closed
|
||||||
|
select {
|
||||||
|
case <-f.closed:
|
||||||
|
default:
|
||||||
|
if err != nil {
|
||||||
|
// Path has become invalid. We will leak a goroutine.
|
||||||
|
// This case should not happen in linux.
|
||||||
|
f.closedOnce.Do(func() {
|
||||||
|
f.err = err
|
||||||
|
close(f.closed)
|
||||||
|
})
|
||||||
|
<-f.closed
|
||||||
|
break
|
||||||
|
}
|
||||||
|
f, err := os.OpenFile(fn, reverseMode|syscall.O_NONBLOCK, 0)
|
||||||
|
if err == nil {
|
||||||
|
f.Close()
|
||||||
|
}
|
||||||
|
runtime.Gosched()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,70 @@
|
||||||
|
// +build linux
|
||||||
|
|
||||||
|
package fifo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
const O_PATH = 010000000
|
||||||
|
|
||||||
|
type handle struct {
|
||||||
|
f *os.File
|
||||||
|
dev uint64
|
||||||
|
ino uint64
|
||||||
|
closeOnce sync.Once
|
||||||
|
}
|
||||||
|
|
||||||
|
func getHandle(fn string) (*handle, error) {
|
||||||
|
f, err := os.OpenFile(fn, O_PATH, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "failed to open %v with O_PATH", fn)
|
||||||
|
}
|
||||||
|
|
||||||
|
var stat syscall.Stat_t
|
||||||
|
if err := syscall.Fstat(int(f.Fd()), &stat); err != nil {
|
||||||
|
f.Close()
|
||||||
|
return nil, errors.Wrapf(err, "failed to stat handle %v", f.Fd())
|
||||||
|
}
|
||||||
|
|
||||||
|
h := &handle{
|
||||||
|
f: f,
|
||||||
|
dev: stat.Dev,
|
||||||
|
ino: stat.Ino,
|
||||||
|
}
|
||||||
|
|
||||||
|
// check /proc just in case
|
||||||
|
if _, err := os.Stat(h.procPath()); err != nil {
|
||||||
|
f.Close()
|
||||||
|
return nil, errors.Wrapf(err, "couldn't stat %v", h.procPath())
|
||||||
|
}
|
||||||
|
|
||||||
|
return h, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handle) procPath() string {
|
||||||
|
return fmt.Sprintf("/proc/self/fd/%d", h.f.Fd())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handle) Path() (string, error) {
|
||||||
|
var stat syscall.Stat_t
|
||||||
|
if err := syscall.Stat(h.procPath(), &stat); err != nil {
|
||||||
|
return "", errors.Wrapf(err, "path %v could not be statted", h.procPath())
|
||||||
|
}
|
||||||
|
if stat.Dev != h.dev || stat.Ino != h.ino {
|
||||||
|
return "", errors.Errorf("failed to verify handle %v/%v %v/%v", stat.Dev, h.dev, stat.Ino, h.ino)
|
||||||
|
}
|
||||||
|
return h.procPath(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handle) Close() error {
|
||||||
|
h.closeOnce.Do(func() {
|
||||||
|
h.f.Close()
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
// +build !linux
|
||||||
|
|
||||||
|
package fifo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"syscall"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type handle struct {
|
||||||
|
fn string
|
||||||
|
dev uint64
|
||||||
|
ino uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func getHandle(fn string) (*handle, error) {
|
||||||
|
var stat syscall.Stat_t
|
||||||
|
if err := syscall.Stat(fn, &stat); err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "failed to stat %v", fn)
|
||||||
|
}
|
||||||
|
|
||||||
|
h := &handle{
|
||||||
|
fn: fn,
|
||||||
|
dev: uint64(stat.Dev),
|
||||||
|
ino: stat.Ino,
|
||||||
|
}
|
||||||
|
|
||||||
|
return h, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handle) Path() (string, error) {
|
||||||
|
var stat syscall.Stat_t
|
||||||
|
if err := syscall.Stat(h.fn, &stat); err != nil {
|
||||||
|
return "", errors.Wrapf(err, "path %v could not be statted", h.fn)
|
||||||
|
}
|
||||||
|
if uint64(stat.Dev) != h.dev || stat.Ino != h.ino {
|
||||||
|
return "", errors.Errorf("failed to verify handle %v/%v %v/%v", stat.Dev, h.dev, stat.Ino, h.ino)
|
||||||
|
}
|
||||||
|
return h.fn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handle) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,30 @@
|
||||||
|
### fifo
|
||||||
|
|
||||||
|
Go package for handling fifos in a sane way.
|
||||||
|
|
||||||
|
```
|
||||||
|
// OpenFifo opens a fifo. Returns io.ReadWriteCloser.
|
||||||
|
// Context can be used to cancel this function until open(2) has not returned.
|
||||||
|
// Accepted flags:
|
||||||
|
// - syscall.O_CREAT - create new fifo if one doesn't exist
|
||||||
|
// - syscall.O_RDONLY - open fifo only from reader side
|
||||||
|
// - syscall.O_WRONLY - open fifo only from writer side
|
||||||
|
// - syscall.O_RDWR - open fifo from both sides, never block on syscall level
|
||||||
|
// - syscall.O_NONBLOCK - return io.ReadWriteCloser even if other side of the
|
||||||
|
// fifo isn't open. read/write will be connected after the actual fifo is
|
||||||
|
// open or after fifo is closed.
|
||||||
|
func OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error)
|
||||||
|
|
||||||
|
|
||||||
|
// Read from a fifo to a byte array.
|
||||||
|
func (f *fifo) Read(b []byte) (int, error)
|
||||||
|
|
||||||
|
|
||||||
|
// Write from byte array to a fifo.
|
||||||
|
func (f *fifo) Write(b []byte) (int, error)
|
||||||
|
|
||||||
|
|
||||||
|
// Close the fifo. Next reads/writes will error. This method can also be used
|
||||||
|
// before open(2) has returned and fifo was never opened.
|
||||||
|
func (f *fifo) Close() error
|
||||||
|
```
|
Загрузка…
Ссылка в новой задаче