diff --git a/api/client/logs.go b/api/client/logs.go index a1375226b1..0f4be3dac6 100644 --- a/api/client/logs.go +++ b/api/client/logs.go @@ -19,7 +19,7 @@ func (cli *DockerCli) CmdLogs(args ...string) error { follow := cmd.Bool([]string{"f", "-follow"}, false, "Follow log output") since := cmd.String([]string{"-since"}, "", "Show logs since timestamp") times := cmd.Bool([]string{"t", "-timestamps"}, false, "Show timestamps") - tail := cmd.String([]string{"-tail"}, "latest", "Number of lines to show from the end of the logs") + tail := cmd.String([]string{"-tail"}, "all", "Number of lines to show from the end of the logs") cmd.Require(flag.Exact, 1) cmd.ParseFlags(args, true) diff --git a/api/server/server.go b/api/server/server.go index 35ce4ebfd6..a97f94a0df 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -629,6 +629,17 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite closeNotifier = notifier.CloseNotify() } + c, err := s.daemon.Get(vars["name"]) + if err != nil { + return err + } + + outStream := ioutils.NewWriteFlusher(w) + // write an empty chunk of data (this is to ensure that the + // HTTP Response is sent immediatly, even if the container has + // not yet produced any data) + outStream.Write(nil) + logsConfig := &daemon.ContainerLogsConfig{ Follow: boolValue(r, "follow"), Timestamps: boolValue(r, "timestamps"), @@ -636,11 +647,11 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite Tail: r.Form.Get("tail"), UseStdout: stdout, UseStderr: stderr, - OutStream: ioutils.NewWriteFlusher(w), + OutStream: outStream, Stop: closeNotifier, } - if err := s.daemon.ContainerLogs(vars["name"], logsConfig); err != nil { + if err := s.daemon.ContainerLogs(c, logsConfig); err != nil { fmt.Fprintf(w, "Error running logs job: %s\n", err) } diff --git a/daemon/container.go b/daemon/container.go index 6060d0149f..0b19034b04 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -25,7 +25,6 @@ import ( "github.com/docker/docker/pkg/broadcastwriter" "github.com/docker/docker/pkg/fileutils" "github.com/docker/docker/pkg/ioutils" - "github.com/docker/docker/pkg/jsonlog" "github.com/docker/docker/pkg/mount" "github.com/docker/docker/pkg/nat" "github.com/docker/docker/pkg/promise" @@ -325,25 +324,13 @@ func (streamConfig *StreamConfig) StdinPipe() io.WriteCloser { func (streamConfig *StreamConfig) StdoutPipe() io.ReadCloser { reader, writer := io.Pipe() - streamConfig.stdout.AddWriter(writer, "") + streamConfig.stdout.AddWriter(writer) return ioutils.NewBufReader(reader) } func (streamConfig *StreamConfig) StderrPipe() io.ReadCloser { reader, writer := io.Pipe() - streamConfig.stderr.AddWriter(writer, "") - return ioutils.NewBufReader(reader) -} - -func (streamConfig *StreamConfig) StdoutLogPipe() io.ReadCloser { - reader, writer := io.Pipe() - streamConfig.stdout.AddWriter(writer, "stdout") - return ioutils.NewBufReader(reader) -} - -func (streamConfig *StreamConfig) StderrLogPipe() io.ReadCloser { - reader, writer := io.Pipe() - streamConfig.stderr.AddWriter(writer, "stderr") + streamConfig.stderr.AddWriter(writer) return ioutils.NewBufReader(reader) } @@ -715,6 +702,9 @@ func (container *Container) getLogConfig() runconfig.LogConfig { } func (container *Container) getLogger() (logger.Logger, error) { + if container.logDriver != nil && container.IsRunning() { + return container.logDriver, nil + } cfg := container.getLogConfig() if err := logger.ValidateLogOpts(cfg.Type, cfg.Config); err != nil { return nil, err @@ -888,36 +878,33 @@ func (c *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writ } func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error { - if logs { logDriver, err := c.getLogger() if err != nil { - logrus.Errorf("Error obtaining the logger %v", err) return err } - if _, ok := logDriver.(logger.Reader); !ok { - logrus.Errorf("cannot read logs for [%s] driver", logDriver.Name()) - } else { - if cLog, err := logDriver.(logger.Reader).ReadLog(); err != nil { - logrus.Errorf("Error reading logs %v", err) - } else { - dec := json.NewDecoder(cLog) - for { - l := &jsonlog.JSONLog{} + cLog, ok := logDriver.(logger.LogReader) + if !ok { + return logger.ErrReadLogsNotSupported + } + logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1}) - if err := dec.Decode(l); err == io.EOF { - break - } else if err != nil { - logrus.Errorf("Error streaming logs: %s", err) - break - } - if l.Stream == "stdout" && stdout != nil { - io.WriteString(stdout, l.Log) - } - if l.Stream == "stderr" && stderr != nil { - io.WriteString(stderr, l.Log) - } + LogLoop: + for { + select { + case msg, ok := <-logs.Msg: + if !ok { + break LogLoop } + if msg.Source == "stdout" && stdout != nil { + stdout.Write(msg.Line) + } + if msg.Source == "stderr" && stderr != nil { + stderr.Write(msg.Line) + } + case err := <-logs.Err: + logrus.Errorf("Error streaming logs: %v", err) + break LogLoop } } } diff --git a/daemon/logger/factory.go b/daemon/logger/factory.go index a086c51ebc..14b09596a2 100644 --- a/daemon/logger/factory.go +++ b/daemon/logger/factory.go @@ -27,6 +27,7 @@ type Context struct { LogPath string } +// Hostname returns the hostname from the underlying OS func (ctx *Context) Hostname() (string, error) { hostname, err := os.Hostname() if err != nil { @@ -35,6 +36,7 @@ func (ctx *Context) Hostname() (string, error) { return hostname, nil } +// Command returns the command that the container being logged was started with func (ctx *Context) Command() string { terms := []string{ctx.ContainerEntrypoint} for _, arg := range ctx.ContainerArgs { diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index be1572bc31..383aada822 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -2,32 +2,42 @@ package jsonfilelog import ( "bytes" + "encoding/json" "fmt" "io" "os" "strconv" "sync" + "time" + + "gopkg.in/fsnotify.v1" "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/jsonlog" + "github.com/docker/docker/pkg/pubsub" + "github.com/docker/docker/pkg/tailfile" "github.com/docker/docker/pkg/timeutils" "github.com/docker/docker/pkg/units" ) const ( - Name = "json-file" + Name = "json-file" + maxJSONDecodeRetry = 10 ) // JSONFileLogger is Logger implementation for default docker logging: // JSON objects to file type JSONFileLogger struct { - buf *bytes.Buffer - f *os.File // store for closing - mu sync.Mutex // protects buffer - capacity int64 //maximum size of each file - n int //maximum number of files - ctx logger.Context + buf *bytes.Buffer + f *os.File // store for closing + mu sync.Mutex // protects buffer + capacity int64 //maximum size of each file + n int //maximum number of files + ctx logger.Context + readers map[*logger.LogWatcher]struct{} // stores the active log followers + notifyRotate *pubsub.Publisher } func init() { @@ -64,11 +74,13 @@ func New(ctx logger.Context) (logger.Logger, error) { } } return &JSONFileLogger{ - f: log, - buf: bytes.NewBuffer(nil), - ctx: ctx, - capacity: capval, - n: maxFiles, + f: log, + buf: bytes.NewBuffer(nil), + ctx: ctx, + capacity: capval, + n: maxFiles, + readers: make(map[*logger.LogWatcher]struct{}), + notifyRotate: pubsub.NewPublisher(0, 1), }, nil } @@ -111,6 +123,7 @@ func writeLog(l *JSONFileLogger) (int64, error) { return -1, err } l.f = file + l.notifyRotate.Publish(struct{}{}) } return writeToBuf(l) } @@ -148,11 +161,11 @@ func backup(old, curr string) error { } } if _, err := os.Stat(curr); os.IsNotExist(err) { - if f, err := os.Create(curr); err != nil { + f, err := os.Create(curr) + if err != nil { return err - } else { - f.Close() } + f.Close() } return os.Rename(curr, old) } @@ -169,31 +182,200 @@ func ValidateLogOpt(cfg map[string]string) error { return nil } -func (l *JSONFileLogger) ReadLog(args ...string) (io.Reader, error) { - pth := l.ctx.LogPath - if len(args) > 0 { - //check if args[0] is an integer index - index, err := strconv.ParseInt(args[0], 0, 0) - if err != nil { - return nil, err - } - if index > 0 { - pth = pth + "." + args[0] - } - } - return os.Open(pth) -} - func (l *JSONFileLogger) LogPath() string { return l.ctx.LogPath } -// Close closes underlying file +// Close closes underlying file and signals all readers to stop func (l *JSONFileLogger) Close() error { - return l.f.Close() + l.mu.Lock() + err := l.f.Close() + for r := range l.readers { + r.Close() + delete(l.readers, r) + } + l.mu.Unlock() + return err } // Name returns name of this logger func (l *JSONFileLogger) Name() string { return Name } + +func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) { + l.Reset() + if err := dec.Decode(l); err != nil { + return nil, err + } + msg := &logger.Message{ + Source: l.Stream, + Timestamp: l.Created, + Line: []byte(l.Log), + } + return msg, nil +} + +// Reads from the log file +func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { + logWatcher := logger.NewLogWatcher() + + go l.readLogs(logWatcher, config) + return logWatcher +} + +func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { + defer close(logWatcher.Msg) + + pth := l.ctx.LogPath + var files []io.ReadSeeker + for i := l.n; i > 1; i-- { + f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1)) + if err != nil { + if !os.IsNotExist(err) { + logWatcher.Err <- err + break + } + continue + } + defer f.Close() + files = append(files, f) + } + + latestFile, err := os.Open(pth) + if err != nil { + logWatcher.Err <- err + return + } + defer latestFile.Close() + + files = append(files, latestFile) + tailer := ioutils.MultiReadSeeker(files...) + + if config.Tail != 0 { + tailFile(tailer, logWatcher, config.Tail, config.Since) + } + + if !config.Follow { + return + } + if config.Tail == 0 { + latestFile.Seek(0, os.SEEK_END) + } + + l.mu.Lock() + l.readers[logWatcher] = struct{}{} + l.mu.Unlock() + + notifyRotate := l.notifyRotate.Subscribe() + followLogs(latestFile, logWatcher, notifyRotate, config.Since) + + l.mu.Lock() + delete(l.readers, logWatcher) + l.mu.Unlock() + + l.notifyRotate.Evict(notifyRotate) +} + +func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) { + var rdr io.Reader = f + if tail > 0 { + ls, err := tailfile.TailFile(f, tail) + if err != nil { + logWatcher.Err <- err + return + } + rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n"))) + } + dec := json.NewDecoder(rdr) + l := &jsonlog.JSONLog{} + for { + msg, err := decodeLogLine(dec, l) + if err != nil { + if err != io.EOF { + logWatcher.Err <- err + } + return + } + if !since.IsZero() && msg.Timestamp.Before(since) { + continue + } + logWatcher.Msg <- msg + } +} + +func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) { + dec := json.NewDecoder(f) + l := &jsonlog.JSONLog{} + fileWatcher, err := fsnotify.NewWatcher() + if err != nil { + logWatcher.Err <- err + return + } + defer fileWatcher.Close() + if err := fileWatcher.Add(f.Name()); err != nil { + logWatcher.Err <- err + return + } + + var retries int + for { + msg, err := decodeLogLine(dec, l) + if err != nil { + if err != io.EOF { + // try again because this shouldn't happen + if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry { + dec = json.NewDecoder(f) + retries += 1 + continue + } + logWatcher.Err <- err + return + } + + select { + case <-fileWatcher.Events: + dec = json.NewDecoder(f) + continue + case <-fileWatcher.Errors: + logWatcher.Err <- err + return + case <-logWatcher.WatchClose(): + return + case <-notifyRotate: + fileWatcher.Remove(f.Name()) + + f, err = os.Open(f.Name()) + if err != nil { + logWatcher.Err <- err + return + } + if err := fileWatcher.Add(f.Name()); err != nil { + logWatcher.Err <- err + } + dec = json.NewDecoder(f) + continue + } + } + + retries = 0 // reset retries since we've succeeded + if !since.IsZero() && msg.Timestamp.Before(since) { + continue + } + select { + case logWatcher.Msg <- msg: + case <-logWatcher.WatchClose(): + logWatcher.Msg <- msg + for { + msg, err := decodeLogLine(dec, l) + if err != nil { + return + } + if !since.IsZero() && msg.Timestamp.Before(since) { + continue + } + logWatcher.Msg <- msg + } + } + } +} diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index e0d1b6a38a..96421f4b96 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -2,11 +2,19 @@ package logger import ( "errors" - "io" "time" + + "github.com/docker/docker/pkg/timeutils" ) -var ReadLogsNotSupported = errors.New("configured logging reader does not support reading") +// ErrReadLogsNotSupported is returned when the logger does not support reading logs +var ErrReadLogsNotSupported = errors.New("configured logging reader does not support reading") + +const ( + // TimeFormat is the time format used for timestamps sent to log readers + TimeFormat = timeutils.RFC3339NanoFixed + logWatcherBufferSize = 4096 +) // Message is datastructure that represents record from some container type Message struct { @@ -16,14 +24,51 @@ type Message struct { Timestamp time.Time } -// Logger is interface for docker logging drivers +// Logger is the interface for docker logging drivers type Logger interface { Log(*Message) error Name() string Close() error } -//Reader is an interface for docker logging drivers that support reading -type Reader interface { - ReadLog(args ...string) (io.Reader, error) +// ReadConfig is the configuration passed into ReadLogs +type ReadConfig struct { + Since time.Time + Tail int + Follow bool +} + +// LogReader is the interface for reading log messages for loggers that support reading +type LogReader interface { + // Read logs from underlying logging backend + ReadLogs(ReadConfig) *LogWatcher +} + +// LogWatcher is used when consuming logs read from the LogReader interface +type LogWatcher struct { + // For sending log messages to a reader + Msg chan *Message + // For sending error messages that occur while while reading logs + Err chan error + closeNotifier chan struct{} +} + +// NewLogWatcher returns a new LogWatcher. +func NewLogWatcher() *LogWatcher { + return &LogWatcher{ + Msg: make(chan *Message, logWatcherBufferSize), + Err: make(chan error, 1), + closeNotifier: make(chan struct{}), + } +} + +// Close notifies the underlying log reader to stop +func (w *LogWatcher) Close() { + close(w.closeNotifier) +} + +// WatchClose returns a channel receiver that receives notification when the watcher has been closed +// This should only be called from one goroutine +func (w *LogWatcher) WatchClose() <-chan struct{} { + return w.closeNotifier } diff --git a/daemon/logs.go b/daemon/logs.go index b9e0d964b7..e032c5716e 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -1,23 +1,14 @@ package daemon import ( - "bytes" - "encoding/json" "fmt" "io" - "net" - "os" "strconv" - "syscall" "time" "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/daemon/logger/jsonfilelog" - "github.com/docker/docker/pkg/jsonlog" "github.com/docker/docker/pkg/stdcopy" - "github.com/docker/docker/pkg/tailfile" - "github.com/docker/docker/pkg/timeutils" ) type ContainerLogsConfig struct { @@ -29,209 +20,64 @@ type ContainerLogsConfig struct { Stop <-chan bool } -func (daemon *Daemon) ContainerLogs(name string, config *ContainerLogsConfig) error { - var ( - lines = -1 - format string - ) +func (daemon *Daemon) ContainerLogs(container *Container, config *ContainerLogsConfig) error { if !(config.UseStdout || config.UseStderr) { return fmt.Errorf("You must choose at least one stream") } - if config.Timestamps { - format = timeutils.RFC3339NanoFixed - } - if config.Tail == "" { - config.Tail = "latest" - } - container, err := daemon.Get(name) - if err != nil { - return err - } - - var ( - outStream = config.OutStream - errStream io.Writer - ) + outStream := config.OutStream + errStream := outStream if !container.Config.Tty { errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr) outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout) - } else { - errStream = outStream } - if container.LogDriverType() != jsonfilelog.Name { - return fmt.Errorf("\"logs\" endpoint is supported only for \"json-file\" logging driver") - } - - maxFile := 1 - container.readHostConfig() - cfg := container.getLogConfig() - conf := cfg.Config - if val, ok := conf["max-file"]; ok { - var err error - maxFile, err = strconv.Atoi(val) - if err != nil { - return fmt.Errorf("Error reading max-file value: %s", err) - } - } - - logDriver, err := container.getLogger() + cLog, err := container.getLogger() if err != nil { return err } - _, ok := logDriver.(logger.Reader) + logReader, ok := cLog.(logger.LogReader) if !ok { - logrus.Errorf("Cannot read logs of the [%s] driver", logDriver.Name()) - } else { - // json-file driver - if config.Tail != "all" && config.Tail != "latest" { - var err error - lines, err = strconv.Atoi(config.Tail) - if err != nil { - logrus.Errorf("Failed to parse tail %s, error: %v, show all logs", config.Tail, err) - lines = -1 - } - } - - if lines != 0 { - n := maxFile - if config.Tail == "latest" && config.Since.IsZero() { - n = 1 - } - before := false - for i := n; i > 0; i-- { - if before { - break - } - cLog, err := getReader(logDriver, i, n, lines) - if err != nil { - logrus.Debugf("Error reading %d log file: %v", i-1, err) - continue - } - //if lines are specified, then iterate only once - if lines > 0 { - i = 1 - } else { // if lines are not specified, cLog is a file, It needs to be closed - defer cLog.(*os.File).Close() - } - dec := json.NewDecoder(cLog) - l := &jsonlog.JSONLog{} - for { - l.Reset() - if err := dec.Decode(l); err == io.EOF { - break - } else if err != nil { - logrus.Errorf("Error streaming logs: %s", err) - break - } - logLine := l.Log - if !config.Since.IsZero() && l.Created.Before(config.Since) { - continue - } - if config.Timestamps { - // format can be "" or time format, so here can't be error - logLine, _ = l.Format(format) - } - if l.Stream == "stdout" && config.UseStdout { - io.WriteString(outStream, logLine) - } - if l.Stream == "stderr" && config.UseStderr { - io.WriteString(errStream, logLine) - } - } - } - } + return logger.ErrReadLogsNotSupported } - if config.Follow && container.IsRunning() { - chErrStderr := make(chan error) - chErrStdout := make(chan error) - var stdoutPipe, stderrPipe io.ReadCloser + follow := config.Follow && container.IsRunning() + tailLines, err := strconv.Atoi(config.Tail) + if err != nil { + tailLines = -1 + } - // write an empty chunk of data (this is to ensure that the - // HTTP Response is sent immediatly, even if the container has - // not yet produced any data) - outStream.Write(nil) - - if config.UseStdout { - stdoutPipe = container.StdoutLogPipe() - go func() { - logrus.Debug("logs: stdout stream begin") - chErrStdout <- jsonlog.WriteLog(stdoutPipe, outStream, format, config.Since) - logrus.Debug("logs: stdout stream end") - }() - } - if config.UseStderr { - stderrPipe = container.StderrLogPipe() - go func() { - logrus.Debug("logs: stderr stream begin") - chErrStderr <- jsonlog.WriteLog(stderrPipe, errStream, format, config.Since) - logrus.Debug("logs: stderr stream end") - }() - } + logrus.Debug("logs: begin stream") + readConfig := logger.ReadConfig{ + Since: config.Since, + Tail: tailLines, + Follow: follow, + } + logs := logReader.ReadLogs(readConfig) + for { select { - case err = <-chErrStderr: - if stdoutPipe != nil { - stdoutPipe.Close() - <-chErrStdout - } - case err = <-chErrStdout: - if stderrPipe != nil { - stderrPipe.Close() - <-chErrStderr - } - case <-config.Stop: - if stdoutPipe != nil { - stdoutPipe.Close() - <-chErrStdout - } - if stderrPipe != nil { - stderrPipe.Close() - <-chErrStderr - } + case err := <-logs.Err: + logrus.Errorf("Error streaming logs: %v", err) return nil - } - - if err != nil && err != io.EOF && err != io.ErrClosedPipe { - if e, ok := err.(*net.OpError); ok && e.Err != syscall.EPIPE { - logrus.Errorf("error streaming logs: %v", err) + case <-config.Stop: + logs.Close() + return nil + case msg, ok := <-logs.Msg: + if !ok { + logrus.Debugf("logs: end stream") + return nil + } + logLine := msg.Line + if config.Timestamps { + logLine = append([]byte(msg.Timestamp.Format(logger.TimeFormat)+" "), logLine...) + } + if msg.Source == "stdout" && config.UseStdout { + outStream.Write(logLine) + } + if msg.Source == "stderr" && config.UseStderr { + errStream.Write(logLine) } } } - return nil -} - -func getReader(logDriver logger.Logger, fileIndex, maxFiles, lines int) (io.Reader, error) { - if lines <= 0 { - index := strconv.Itoa(fileIndex - 1) - cLog, err := logDriver.(logger.Reader).ReadLog(index) - return cLog, err - } - buf := bytes.NewBuffer([]byte{}) - remaining := lines - for i := 0; i < maxFiles; i++ { - index := strconv.Itoa(i) - cLog, err := logDriver.(logger.Reader).ReadLog(index) - if err != nil { - return buf, err - } - f := cLog.(*os.File) - ls, err := tailfile.TailFile(f, remaining) - if err != nil { - return buf, err - } - tmp := bytes.NewBuffer([]byte{}) - for _, l := range ls { - fmt.Fprintf(tmp, "%s\n", l) - } - tmp.ReadFrom(buf) - buf = tmp - if len(ls) == remaining { - return buf, nil - } - remaining = remaining - len(ls) - } - return buf, nil } diff --git a/daemon/monitor.go b/daemon/monitor.go index ff173c8f03..1f020574b0 100644 --- a/daemon/monitor.go +++ b/daemon/monitor.go @@ -12,7 +12,10 @@ import ( "github.com/docker/docker/runconfig" ) -const defaultTimeIncrement = 100 +const ( + defaultTimeIncrement = 100 + loggerCloseTimeout = 10 * time.Second +) // containerMonitor monitors the execution of a container's main process. // If a restart policy is specified for the container the monitor will ensure that the @@ -310,7 +313,7 @@ func (m *containerMonitor) resetContainer(lock bool) { close(exit) }() select { - case <-time.After(1 * time.Second): + case <-time.After(loggerCloseTimeout): logrus.Warnf("Logger didn't exit in time: logs may be truncated") case <-exit: } diff --git a/docs/reference/commandline/logs.md b/docs/reference/commandline/logs.md index db97143dec..a2e69e4d21 100644 --- a/docs/reference/commandline/logs.md +++ b/docs/reference/commandline/logs.md @@ -29,7 +29,7 @@ The `docker logs --follow` command will continue streaming the new output from the container's `STDOUT` and `STDERR`. Passing a negative number or a non-integer to `--tail` is invalid and the -value is set to `latest` in that case. +value is set to `all` in that case. The `docker logs --timestamp` commands will add an RFC3339Nano timestamp, for example `2014-09-16T06:17:46.000000000Z`, to each diff --git a/hack/vendor.sh b/hack/vendor.sh index 94756deb80..cafcf07a12 100755 --- a/hack/vendor.sh +++ b/hack/vendor.sh @@ -50,4 +50,7 @@ clone git github.com/fluent/fluent-logger-golang v1.0.0 clone git github.com/philhofer/fwd 899e4efba8eaa1fea74175308f3fae18ff3319fa clone git github.com/tinylib/msgp 75ee40d2601edf122ef667e2a07d600d4c44490c +# fsnotify +clone git gopkg.in/fsnotify.v1 v1.2.0 + clean diff --git a/integration-cli/docker_cli_logs_test.go b/integration-cli/docker_cli_logs_test.go index 5fa6747e23..6c9421771b 100644 --- a/integration-cli/docker_cli_logs_test.go +++ b/integration-cli/docker_cli_logs_test.go @@ -250,13 +250,9 @@ func (s *DockerSuite) TestLogsFollowSlowStdoutConsumer(c *check.C) { }() logCmd := exec.Command(dockerBinary, "logs", "-f", cleanedContainerID) - stdout, err := logCmd.StdoutPipe() c.Assert(err, check.IsNil) - - if err := logCmd.Start(); err != nil { - c.Fatal(err) - } + c.Assert(logCmd.Start(), check.IsNil) // First read slowly bytes1, err := consumeWithSpeed(stdout, 10, 50*time.Millisecond, stopSlowRead) diff --git a/pkg/broadcastwriter/broadcastwriter.go b/pkg/broadcastwriter/broadcastwriter.go index bd9b675553..5d53658ec3 100644 --- a/pkg/broadcastwriter/broadcastwriter.go +++ b/pkg/broadcastwriter/broadcastwriter.go @@ -1,33 +1,20 @@ package broadcastwriter import ( - "bytes" "io" "sync" - "time" - - "github.com/Sirupsen/logrus" - "github.com/docker/docker/pkg/jsonlog" - "github.com/docker/docker/pkg/timeutils" ) // BroadcastWriter accumulate multiple io.WriteCloser by stream. type BroadcastWriter struct { sync.Mutex - buf *bytes.Buffer - jsLogBuf *bytes.Buffer - streams map[string](map[io.WriteCloser]struct{}) + writers map[io.WriteCloser]struct{} } -// AddWriter adds new io.WriteCloser for stream. -// If stream is "", then all writes proceed as is. Otherwise every line from -// input will be packed to serialized jsonlog.JSONLog. -func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) { +// AddWriter adds new io.WriteCloser. +func (w *BroadcastWriter) AddWriter(writer io.WriteCloser) { w.Lock() - if _, ok := w.streams[stream]; !ok { - w.streams[stream] = make(map[io.WriteCloser]struct{}) - } - w.streams[stream][writer] = struct{}{} + w.writers[writer] = struct{}{} w.Unlock() } @@ -35,67 +22,12 @@ func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) { // this call. func (w *BroadcastWriter) Write(p []byte) (n int, err error) { w.Lock() - if writers, ok := w.streams[""]; ok { - for sw := range writers { - if n, err := sw.Write(p); err != nil || n != len(p) { - // On error, evict the writer - delete(writers, sw) - } - } - if len(w.streams) == 1 { - if w.buf.Len() >= 4096 { - w.buf.Reset() - } else { - w.buf.Write(p) - } - w.Unlock() - return len(p), nil + for sw := range w.writers { + if n, err := sw.Write(p); err != nil || n != len(p) { + // On error, evict the writer + delete(w.writers, sw) } } - if w.jsLogBuf == nil { - w.jsLogBuf = new(bytes.Buffer) - w.jsLogBuf.Grow(1024) - } - var timestamp string - created := time.Now().UTC() - w.buf.Write(p) - for { - if n := w.buf.Len(); n == 0 { - break - } - i := bytes.IndexByte(w.buf.Bytes(), '\n') - if i < 0 { - break - } - lineBytes := w.buf.Next(i + 1) - if timestamp == "" { - timestamp, err = timeutils.FastMarshalJSON(created) - if err != nil { - continue - } - } - - for stream, writers := range w.streams { - if stream == "" { - continue - } - jsonLog := jsonlog.JSONLogBytes{Log: lineBytes, Stream: stream, Created: timestamp} - err = jsonLog.MarshalJSONBuf(w.jsLogBuf) - if err != nil { - logrus.Errorf("Error making JSON log line: %s", err) - continue - } - w.jsLogBuf.WriteByte('\n') - b := w.jsLogBuf.Bytes() - for sw := range writers { - if _, err := sw.Write(b); err != nil { - delete(writers, sw) - } - } - } - w.jsLogBuf.Reset() - } - w.jsLogBuf.Reset() w.Unlock() return len(p), nil } @@ -104,19 +36,16 @@ func (w *BroadcastWriter) Write(p []byte) (n int, err error) { // will be saved. func (w *BroadcastWriter) Clean() error { w.Lock() - for _, writers := range w.streams { - for w := range writers { - w.Close() - } + for w := range w.writers { + w.Close() } - w.streams = make(map[string](map[io.WriteCloser]struct{})) + w.writers = make(map[io.WriteCloser]struct{}) w.Unlock() return nil } func New() *BroadcastWriter { return &BroadcastWriter{ - streams: make(map[string](map[io.WriteCloser]struct{})), - buf: bytes.NewBuffer(nil), + writers: make(map[io.WriteCloser]struct{}), } } diff --git a/pkg/broadcastwriter/broadcastwriter_test.go b/pkg/broadcastwriter/broadcastwriter_test.go index 71227821b2..bc243207d1 100644 --- a/pkg/broadcastwriter/broadcastwriter_test.go +++ b/pkg/broadcastwriter/broadcastwriter_test.go @@ -32,9 +32,9 @@ func TestBroadcastWriter(t *testing.T) { // Test 1: Both bufferA and bufferB should contain "foo" bufferA := &dummyWriter{} - writer.AddWriter(bufferA, "") + writer.AddWriter(bufferA) bufferB := &dummyWriter{} - writer.AddWriter(bufferB, "") + writer.AddWriter(bufferB) writer.Write([]byte("foo")) if bufferA.String() != "foo" { @@ -48,7 +48,7 @@ func TestBroadcastWriter(t *testing.T) { // Test2: bufferA and bufferB should contain "foobar", // while bufferC should only contain "bar" bufferC := &dummyWriter{} - writer.AddWriter(bufferC, "") + writer.AddWriter(bufferC) writer.Write([]byte("bar")) if bufferA.String() != "foobar" { @@ -100,7 +100,7 @@ func TestRaceBroadcastWriter(t *testing.T) { writer := New() c := make(chan bool) go func() { - writer.AddWriter(devNullCloser(0), "") + writer.AddWriter(devNullCloser(0)) c <- true }() writer.Write([]byte("hello")) @@ -111,9 +111,9 @@ func BenchmarkBroadcastWriter(b *testing.B) { writer := New() setUpWriter := func() { for i := 0; i < 100; i++ { - writer.AddWriter(devNullCloser(0), "stdout") - writer.AddWriter(devNullCloser(0), "stderr") - writer.AddWriter(devNullCloser(0), "") + writer.AddWriter(devNullCloser(0)) + writer.AddWriter(devNullCloser(0)) + writer.AddWriter(devNullCloser(0)) } } testLine := "Line that thinks that it is log line from docker" @@ -142,33 +142,3 @@ func BenchmarkBroadcastWriter(b *testing.B) { b.StartTimer() } } - -func BenchmarkBroadcastWriterWithoutStdoutStderr(b *testing.B) { - writer := New() - setUpWriter := func() { - for i := 0; i < 100; i++ { - writer.AddWriter(devNullCloser(0), "") - } - } - testLine := "Line that thinks that it is log line from docker" - var buf bytes.Buffer - for i := 0; i < 100; i++ { - buf.Write([]byte(testLine + "\n")) - } - // line without eol - buf.Write([]byte(testLine)) - testText := buf.Bytes() - b.SetBytes(int64(5 * len(testText))) - b.ResetTimer() - for i := 0; i < b.N; i++ { - setUpWriter() - - for j := 0; j < 5; j++ { - if _, err := writer.Write(testText); err != nil { - b.Fatal(err) - } - } - - writer.Clean() - } -} diff --git a/pkg/ioutils/multireader.go b/pkg/ioutils/multireader.go new file mode 100644 index 0000000000..f231aa9daf --- /dev/null +++ b/pkg/ioutils/multireader.go @@ -0,0 +1,226 @@ +package ioutils + +import ( + "bytes" + "fmt" + "io" + "os" +) + +type pos struct { + idx int + offset int64 +} + +type multiReadSeeker struct { + readers []io.ReadSeeker + pos *pos + posIdx map[io.ReadSeeker]int +} + +func (r *multiReadSeeker) Seek(offset int64, whence int) (int64, error) { + var tmpOffset int64 + switch whence { + case os.SEEK_SET: + for i, rdr := range r.readers { + // get size of the current reader + s, err := rdr.Seek(0, os.SEEK_END) + if err != nil { + return -1, err + } + + if offset > tmpOffset+s { + if i == len(r.readers)-1 { + rdrOffset := s + (offset - tmpOffset) + if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil { + return -1, err + } + r.pos = &pos{i, rdrOffset} + return offset, nil + } + + tmpOffset += s + continue + } + + rdrOffset := offset - tmpOffset + idx := i + + rdr.Seek(rdrOffset, os.SEEK_SET) + // make sure all following readers are at 0 + for _, rdr := range r.readers[i+1:] { + rdr.Seek(0, os.SEEK_SET) + } + + if rdrOffset == s && i != len(r.readers)-1 { + idx += 1 + rdrOffset = 0 + } + r.pos = &pos{idx, rdrOffset} + return offset, nil + } + case os.SEEK_END: + for _, rdr := range r.readers { + s, err := rdr.Seek(0, os.SEEK_END) + if err != nil { + return -1, err + } + tmpOffset += s + } + r.Seek(tmpOffset+offset, os.SEEK_SET) + return tmpOffset + offset, nil + case os.SEEK_CUR: + if r.pos == nil { + return r.Seek(offset, os.SEEK_SET) + } + // Just return the current offset + if offset == 0 { + return r.getCurOffset() + } + + curOffset, err := r.getCurOffset() + if err != nil { + return -1, err + } + rdr, rdrOffset, err := r.getReaderForOffset(curOffset + offset) + if err != nil { + return -1, err + } + + r.pos = &pos{r.posIdx[rdr], rdrOffset} + return curOffset + offset, nil + default: + return -1, fmt.Errorf("Invalid whence: %d", whence) + } + + return -1, fmt.Errorf("Error seeking for whence: %d, offset: %d", whence, offset) +} + +func (r *multiReadSeeker) getReaderForOffset(offset int64) (io.ReadSeeker, int64, error) { + var rdr io.ReadSeeker + var rdrOffset int64 + + for i, rdr := range r.readers { + offsetTo, err := r.getOffsetToReader(rdr) + if err != nil { + return nil, -1, err + } + if offsetTo > offset { + rdr = r.readers[i-1] + rdrOffset = offsetTo - offset + break + } + + if rdr == r.readers[len(r.readers)-1] { + rdrOffset = offsetTo + offset + break + } + } + + return rdr, rdrOffset, nil +} + +func (r *multiReadSeeker) getCurOffset() (int64, error) { + var totalSize int64 + for _, rdr := range r.readers[:r.pos.idx+1] { + if r.posIdx[rdr] == r.pos.idx { + totalSize += r.pos.offset + break + } + + size, err := getReadSeekerSize(rdr) + if err != nil { + return -1, fmt.Errorf("error getting seeker size: %v", err) + } + totalSize += size + } + return totalSize, nil +} + +func (r *multiReadSeeker) getOffsetToReader(rdr io.ReadSeeker) (int64, error) { + var offset int64 + for _, r := range r.readers { + if r == rdr { + break + } + + size, err := getReadSeekerSize(rdr) + if err != nil { + return -1, err + } + offset += size + } + return offset, nil +} + +func (r *multiReadSeeker) Read(b []byte) (int, error) { + if r.pos == nil { + r.pos = &pos{0, 0} + } + + bCap := int64(cap(b)) + buf := bytes.NewBuffer(nil) + var rdr io.ReadSeeker + + for _, rdr = range r.readers[r.pos.idx:] { + readBytes, err := io.CopyN(buf, rdr, bCap) + if err != nil && err != io.EOF { + return -1, err + } + bCap -= readBytes + + if bCap == 0 { + break + } + } + + rdrPos, err := rdr.Seek(0, os.SEEK_CUR) + if err != nil { + return -1, err + } + r.pos = &pos{r.posIdx[rdr], rdrPos} + return buf.Read(b) +} + +func getReadSeekerSize(rdr io.ReadSeeker) (int64, error) { + // save the current position + pos, err := rdr.Seek(0, os.SEEK_CUR) + if err != nil { + return -1, err + } + + // get the size + size, err := rdr.Seek(0, os.SEEK_END) + if err != nil { + return -1, err + } + + // reset the position + if _, err := rdr.Seek(pos, os.SEEK_SET); err != nil { + return -1, err + } + return size, nil +} + +// MultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided +// input readseekers. After calling this method the initial position is set to the +// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances +// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker. +// Seek can be used over the sum of lengths of all readseekers. +// +// When a MultiReadSeeker is used, no Read and Seek operations should be made on +// its ReadSeeker components. Also, users should make no assumption on the state +// of individual readseekers while the MultiReadSeeker is used. +func MultiReadSeeker(readers ...io.ReadSeeker) io.ReadSeeker { + if len(readers) == 1 { + return readers[0] + } + idx := make(map[io.ReadSeeker]int) + for i, rdr := range readers { + idx[rdr] = i + } + return &multiReadSeeker{ + readers: readers, + posIdx: idx, + } +} diff --git a/pkg/ioutils/multireader_test.go b/pkg/ioutils/multireader_test.go new file mode 100644 index 0000000000..de495b56da --- /dev/null +++ b/pkg/ioutils/multireader_test.go @@ -0,0 +1,149 @@ +package ioutils + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "strings" + "testing" +) + +func TestMultiReadSeekerReadAll(t *testing.T) { + str := "hello world" + s1 := strings.NewReader(str + " 1") + s2 := strings.NewReader(str + " 2") + s3 := strings.NewReader(str + " 3") + mr := MultiReadSeeker(s1, s2, s3) + + expectedSize := int64(s1.Len() + s2.Len() + s3.Len()) + + b, err := ioutil.ReadAll(mr) + if err != nil { + t.Fatal(err) + } + + expected := "hello world 1hello world 2hello world 3" + if string(b) != expected { + t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected) + } + + size, err := mr.Seek(0, os.SEEK_END) + if err != nil { + t.Fatal(err) + } + if size != expectedSize { + t.Fatalf("reader size does not match, got %d, expected %d", size, expectedSize) + } + + // Reset the position and read again + pos, err := mr.Seek(0, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if pos != 0 { + t.Fatalf("expected position to be set to 0, got %d", pos) + } + + b, err = ioutil.ReadAll(mr) + if err != nil { + t.Fatal(err) + } + + if string(b) != expected { + t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected) + } +} + +func TestMultiReadSeekerReadEach(t *testing.T) { + str := "hello world" + s1 := strings.NewReader(str + " 1") + s2 := strings.NewReader(str + " 2") + s3 := strings.NewReader(str + " 3") + mr := MultiReadSeeker(s1, s2, s3) + + var totalBytes int64 + for i, s := range []*strings.Reader{s1, s2, s3} { + sLen := int64(s.Len()) + buf := make([]byte, s.Len()) + expected := []byte(fmt.Sprintf("%s %d", str, i+1)) + + if _, err := mr.Read(buf); err != nil && err != io.EOF { + t.Fatal(err) + } + + if !bytes.Equal(buf, expected) { + t.Fatalf("expected %q to be %q", string(buf), string(expected)) + } + + pos, err := mr.Seek(0, os.SEEK_CUR) + if err != nil { + t.Fatalf("iteration: %d, error: %v", i+1, err) + } + + // check that the total bytes read is the current position of the seeker + totalBytes += sLen + if pos != totalBytes { + t.Fatalf("expected current position to be: %d, got: %d, iteration: %d", totalBytes, pos, i+1) + } + + // This tests not only that SEEK_SET and SEEK_CUR give the same values, but that the next iteration is in the expected position as well + newPos, err := mr.Seek(pos, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if newPos != pos { + t.Fatalf("expected to get same position when calling SEEK_SET with value from SEEK_CUR, cur: %d, set: %d", pos, newPos) + } + } +} + +func TestMultiReadSeekerReadSpanningChunks(t *testing.T) { + str := "hello world" + s1 := strings.NewReader(str + " 1") + s2 := strings.NewReader(str + " 2") + s3 := strings.NewReader(str + " 3") + mr := MultiReadSeeker(s1, s2, s3) + + buf := make([]byte, s1.Len()+3) + _, err := mr.Read(buf) + if err != nil { + t.Fatal(err) + } + + // expected is the contents of s1 + 3 bytes from s2, ie, the `hel` at the end of this string + expected := "hello world 1hel" + if string(buf) != expected { + t.Fatalf("expected %s to be %s", string(buf), expected) + } +} + +func TestMultiReadSeekerNegativeSeek(t *testing.T) { + str := "hello world" + s1 := strings.NewReader(str + " 1") + s2 := strings.NewReader(str + " 2") + s3 := strings.NewReader(str + " 3") + mr := MultiReadSeeker(s1, s2, s3) + + s1Len := s1.Len() + s2Len := s2.Len() + s3Len := s3.Len() + + s, err := mr.Seek(int64(-1*s3.Len()), os.SEEK_END) + if err != nil { + t.Fatal(err) + } + if s != int64(s1Len+s2Len) { + t.Fatalf("expected %d to be %d", s, s1.Len()+s2.Len()) + } + + buf := make([]byte, s3Len) + if _, err := mr.Read(buf); err != nil && err != io.EOF { + t.Fatal(err) + } + expected := fmt.Sprintf("%s %d", str, 3) + if string(buf) != fmt.Sprintf("%s %d", str, 3) { + t.Fatalf("expected %q to be %q", string(buf), expected) + } +} diff --git a/pkg/jsonlog/jsonlog.go b/pkg/jsonlog/jsonlog.go index 85afb3b503..edcf7643d6 100644 --- a/pkg/jsonlog/jsonlog.go +++ b/pkg/jsonlog/jsonlog.go @@ -3,7 +3,6 @@ package jsonlog import ( "encoding/json" "fmt" - "io" "time" ) @@ -29,28 +28,3 @@ func (jl *JSONLog) Reset() { jl.Stream = "" jl.Created = time.Time{} } - -func WriteLog(src io.Reader, dst io.Writer, format string, since time.Time) error { - dec := json.NewDecoder(src) - l := &JSONLog{} - for { - l.Reset() - if err := dec.Decode(l); err != nil { - if err == io.EOF { - return nil - } - return err - } - if !since.IsZero() && l.Created.Before(since) { - continue - } - - line, err := l.Format(format) - if err != nil { - return err - } - if _, err := io.WriteString(dst, line); err != nil { - return err - } - } -} diff --git a/pkg/jsonlog/jsonlog_test.go b/pkg/jsonlog/jsonlog_test.go deleted file mode 100644 index 2b787efc20..0000000000 --- a/pkg/jsonlog/jsonlog_test.go +++ /dev/null @@ -1,157 +0,0 @@ -package jsonlog - -import ( - "bytes" - "encoding/json" - "io/ioutil" - "regexp" - "strconv" - "strings" - "testing" - "time" - - "github.com/docker/docker/pkg/timeutils" -) - -// Invalid json should return an error -func TestWriteLogWithInvalidJSON(t *testing.T) { - json := strings.NewReader("Invalid json") - w := bytes.NewBuffer(nil) - if err := WriteLog(json, w, "json", time.Time{}); err == nil { - t.Fatalf("Expected an error, got [%v]", w.String()) - } -} - -// Any format is valid, it will just print it -func TestWriteLogWithInvalidFormat(t *testing.T) { - testLine := "Line that thinks that it is log line from docker\n" - var buf bytes.Buffer - e := json.NewEncoder(&buf) - for i := 0; i < 35; i++ { - e.Encode(JSONLog{Log: testLine, Stream: "stdout", Created: time.Now()}) - } - w := bytes.NewBuffer(nil) - if err := WriteLog(&buf, w, "invalid format", time.Time{}); err != nil { - t.Fatal(err) - } - res := w.String() - t.Logf("Result of WriteLog: %q", res) - lines := strings.Split(strings.TrimSpace(res), "\n") - expression := "^invalid format Line that thinks that it is log line from docker$" - logRe := regexp.MustCompile(expression) - expectedLines := 35 - if len(lines) != expectedLines { - t.Fatalf("Must be %v lines but got %d", expectedLines, len(lines)) - } - for _, l := range lines { - if !logRe.MatchString(l) { - t.Fatalf("Log line not in expected format [%v]: %q", expression, l) - } - } -} - -// Having multiple Log/Stream element -func TestWriteLogWithMultipleStreamLog(t *testing.T) { - testLine := "Line that thinks that it is log line from docker\n" - var buf bytes.Buffer - e := json.NewEncoder(&buf) - for i := 0; i < 35; i++ { - e.Encode(JSONLog{Log: testLine, Stream: "stdout", Created: time.Now()}) - } - w := bytes.NewBuffer(nil) - if err := WriteLog(&buf, w, "invalid format", time.Time{}); err != nil { - t.Fatal(err) - } - res := w.String() - t.Logf("Result of WriteLog: %q", res) - lines := strings.Split(strings.TrimSpace(res), "\n") - expression := "^invalid format Line that thinks that it is log line from docker$" - logRe := regexp.MustCompile(expression) - expectedLines := 35 - if len(lines) != expectedLines { - t.Fatalf("Must be %v lines but got %d", expectedLines, len(lines)) - } - for _, l := range lines { - if !logRe.MatchString(l) { - t.Fatalf("Log line not in expected format [%v]: %q", expression, l) - } - } -} - -// Write log with since after created, it won't print anything -func TestWriteLogWithDate(t *testing.T) { - created, _ := time.Parse("YYYY-MM-dd", "2015-01-01") - var buf bytes.Buffer - testLine := "Line that thinks that it is log line from docker\n" - jsonLog := JSONLog{Log: testLine, Stream: "stdout", Created: created} - if err := json.NewEncoder(&buf).Encode(jsonLog); err != nil { - t.Fatal(err) - } - w := bytes.NewBuffer(nil) - if err := WriteLog(&buf, w, "json", time.Now()); err != nil { - t.Fatal(err) - } - res := w.String() - if res != "" { - t.Fatalf("Expected empty log, got [%v]", res) - } -} - -// Happy path :) -func TestWriteLog(t *testing.T) { - testLine := "Line that thinks that it is log line from docker\n" - format := timeutils.RFC3339NanoFixed - logs := map[string][]string{ - "": {"35", "^Line that thinks that it is log line from docker$"}, - "json": {"1", `^{\"log\":\"Line that thinks that it is log line from docker\\n\",\"stream\":\"stdout\",\"time\":.{30,}\"}$`}, - // 30+ symbols, five more can come from system timezone - format: {"35", `.{30,} Line that thinks that it is log line from docker`}, - } - for givenFormat, expressionAndLines := range logs { - expectedLines, _ := strconv.Atoi(expressionAndLines[0]) - expression := expressionAndLines[1] - var buf bytes.Buffer - e := json.NewEncoder(&buf) - for i := 0; i < 35; i++ { - e.Encode(JSONLog{Log: testLine, Stream: "stdout", Created: time.Now()}) - } - w := bytes.NewBuffer(nil) - if err := WriteLog(&buf, w, givenFormat, time.Time{}); err != nil { - t.Fatal(err) - } - res := w.String() - t.Logf("Result of WriteLog: %q", res) - lines := strings.Split(strings.TrimSpace(res), "\n") - if len(lines) != expectedLines { - t.Fatalf("Must be %v lines but got %d", expectedLines, len(lines)) - } - logRe := regexp.MustCompile(expression) - for _, l := range lines { - if !logRe.MatchString(l) { - t.Fatalf("Log line not in expected format [%v]: %q", expression, l) - } - } - } -} - -func BenchmarkWriteLog(b *testing.B) { - var buf bytes.Buffer - e := json.NewEncoder(&buf) - testLine := "Line that thinks that it is log line from docker\n" - for i := 0; i < 30; i++ { - e.Encode(JSONLog{Log: testLine, Stream: "stdout", Created: time.Now()}) - } - r := bytes.NewReader(buf.Bytes()) - w := ioutil.Discard - format := timeutils.RFC3339NanoFixed - b.SetBytes(int64(r.Len())) - b.ResetTimer() - for i := 0; i < b.N; i++ { - if err := WriteLog(r, w, format, time.Time{}); err != nil { - b.Fatal(err) - } - b.StopTimer() - r.Seek(0, 0) - b.StartTimer() - } -} diff --git a/pkg/tailfile/tailfile.go b/pkg/tailfile/tailfile.go index 2ffd36d258..92aea4608e 100644 --- a/pkg/tailfile/tailfile.go +++ b/pkg/tailfile/tailfile.go @@ -3,6 +3,7 @@ package tailfile import ( "bytes" "errors" + "io" "os" ) @@ -12,7 +13,7 @@ var eol = []byte("\n") var ErrNonPositiveLinesNumber = errors.New("Lines number must be positive") //TailFile returns last n lines of file f -func TailFile(f *os.File, n int) ([][]byte, error) { +func TailFile(f io.ReadSeeker, n int) ([][]byte, error) { if n <= 0 { return nil, ErrNonPositiveLinesNumber } diff --git a/vendor/src/gopkg.in/fsnotify.v1/.gitignore b/vendor/src/gopkg.in/fsnotify.v1/.gitignore new file mode 100644 index 0000000000..4cd0cbaf43 --- /dev/null +++ b/vendor/src/gopkg.in/fsnotify.v1/.gitignore @@ -0,0 +1,6 @@ +# Setup a Global .gitignore for OS and editor generated files: +# https://help.github.com/articles/ignoring-files +# git config --global core.excludesfile ~/.gitignore_global + +.vagrant +*.sublime-project diff --git a/vendor/src/gopkg.in/fsnotify.v1/.travis.yml b/vendor/src/gopkg.in/fsnotify.v1/.travis.yml new file mode 100644 index 0000000000..67467e1407 --- /dev/null +++ b/vendor/src/gopkg.in/fsnotify.v1/.travis.yml @@ -0,0 +1,15 @@ +sudo: false +language: go + +go: + - 1.4.1 + +before_script: + - FIXED=$(go fmt ./... | wc -l); if [ $FIXED -gt 0 ]; then echo "gofmt - $FIXED file(s) not formatted correctly, please run gofmt to fix this." && exit 1; fi + +os: + - linux + - osx + +notifications: + email: false diff --git a/vendor/src/gopkg.in/fsnotify.v1/AUTHORS b/vendor/src/gopkg.in/fsnotify.v1/AUTHORS new file mode 100644 index 0000000000..4e0e8284e9 --- /dev/null +++ b/vendor/src/gopkg.in/fsnotify.v1/AUTHORS @@ -0,0 +1,34 @@ +# Names should be added to this file as +# Name or Organization +# The email address is not required for organizations. + +# You can update this list using the following command: +# +# $ git shortlog -se | awk '{print $2 " " $3 " " $4}' + +# Please keep the list sorted. + +Adrien Bustany +Caleb Spare +Case Nelson +Chris Howey +Christoffer Buchholz +Dave Cheney +Francisco Souza +Hari haran +John C Barstow +Kelvin Fo +Matt Layher +Nathan Youngman +Paul Hammond +Pieter Droogendijk +Pursuit92 +Rob Figueiredo +Soge Zhang +Tilak Sharma +Travis Cline +Tudor Golubenco +Yukang +bronze1man +debrando +henrikedwards diff --git a/vendor/src/gopkg.in/fsnotify.v1/CHANGELOG.md b/vendor/src/gopkg.in/fsnotify.v1/CHANGELOG.md new file mode 100644 index 0000000000..ea9428a2a4 --- /dev/null +++ b/vendor/src/gopkg.in/fsnotify.v1/CHANGELOG.md @@ -0,0 +1,263 @@ +# Changelog + +## v1.2.0 / 2015-02-08 + +* inotify: use epoll to wake up readEvents [#66](https://github.com/go-fsnotify/fsnotify/pull/66) (thanks @PieterD) +* inotify: closing watcher should now always shut down goroutine [#63](https://github.com/go-fsnotify/fsnotify/pull/63) (thanks @PieterD) +* kqueue: close kqueue after removing watches, fixes [#59](https://github.com/go-fsnotify/fsnotify/issues/59) + +## v1.1.1 / 2015-02-05 + +* inotify: Retry read on EINTR [#61](https://github.com/go-fsnotify/fsnotify/issues/61) (thanks @PieterD) + +## v1.1.0 / 2014-12-12 + +* kqueue: rework internals [#43](https://github.com/go-fsnotify/fsnotify/pull/43) + * add low-level functions + * only need to store flags on directories + * less mutexes [#13](https://github.com/go-fsnotify/fsnotify/issues/13) + * done can be an unbuffered channel + * remove calls to os.NewSyscallError +* More efficient string concatenation for Event.String() [#52](https://github.com/go-fsnotify/fsnotify/pull/52) (thanks @mdlayher) +* kqueue: fix regression in rework causing subdirectories to be watched [#48](https://github.com/go-fsnotify/fsnotify/issues/48) +* kqueue: cleanup internal watch before sending remove event [#51](https://github.com/go-fsnotify/fsnotify/issues/51) + +## v1.0.4 / 2014-09-07 + +* kqueue: add dragonfly to the build tags. +* Rename source code files, rearrange code so exported APIs are at the top. +* Add done channel to example code. [#37](https://github.com/go-fsnotify/fsnotify/pull/37) (thanks @chenyukang) + +## v1.0.3 / 2014-08-19 + +* [Fix] Windows MOVED_TO now translates to Create like on BSD and Linux. [#36](https://github.com/go-fsnotify/fsnotify/issues/36) + +## v1.0.2 / 2014-08-17 + +* [Fix] Missing create events on OS X. [#14](https://github.com/go-fsnotify/fsnotify/issues/14) (thanks @zhsso) +* [Fix] Make ./path and path equivalent. (thanks @zhsso) + +## v1.0.0 / 2014-08-15 + +* [API] Remove AddWatch on Windows, use Add. +* Improve documentation for exported identifiers. [#30](https://github.com/go-fsnotify/fsnotify/issues/30) +* Minor updates based on feedback from golint. + +## dev / 2014-07-09 + +* Moved to [github.com/go-fsnotify/fsnotify](https://github.com/go-fsnotify/fsnotify). +* Use os.NewSyscallError instead of returning errno (thanks @hariharan-uno) + +## dev / 2014-07-04 + +* kqueue: fix incorrect mutex used in Close() +* Update example to demonstrate usage of Op. + +## dev / 2014-06-28 + +* [API] Don't set the Write Op for attribute notifications [#4](https://github.com/go-fsnotify/fsnotify/issues/4) +* Fix for String() method on Event (thanks Alex Brainman) +* Don't build on Plan 9 or Solaris (thanks @4ad) + +## dev / 2014-06-21 + +* Events channel of type Event rather than *Event. +* [internal] use syscall constants directly for inotify and kqueue. +* [internal] kqueue: rename events to kevents and fileEvent to event. + +## dev / 2014-06-19 + +* Go 1.3+ required on Windows (uses syscall.ERROR_MORE_DATA internally). +* [internal] remove cookie from Event struct (unused). +* [internal] Event struct has the same definition across every OS. +* [internal] remove internal watch and removeWatch methods. + +## dev / 2014-06-12 + +* [API] Renamed Watch() to Add() and RemoveWatch() to Remove(). +* [API] Pluralized channel names: Events and Errors. +* [API] Renamed FileEvent struct to Event. +* [API] Op constants replace methods like IsCreate(). + +## dev / 2014-06-12 + +* Fix data race on kevent buffer (thanks @tilaks) [#98](https://github.com/howeyc/fsnotify/pull/98) + +## dev / 2014-05-23 + +* [API] Remove current implementation of WatchFlags. + * current implementation doesn't take advantage of OS for efficiency + * provides little benefit over filtering events as they are received, but has extra bookkeeping and mutexes + * no tests for the current implementation + * not fully implemented on Windows [#93](https://github.com/howeyc/fsnotify/issues/93#issuecomment-39285195) + +## v0.9.3 / 2014-12-31 + +* kqueue: cleanup internal watch before sending remove event [#51](https://github.com/go-fsnotify/fsnotify/issues/51) + +## v0.9.2 / 2014-08-17 + +* [Backport] Fix missing create events on OS X. [#14](https://github.com/go-fsnotify/fsnotify/issues/14) (thanks @zhsso) + +## v0.9.1 / 2014-06-12 + +* Fix data race on kevent buffer (thanks @tilaks) [#98](https://github.com/howeyc/fsnotify/pull/98) + +## v0.9.0 / 2014-01-17 + +* IsAttrib() for events that only concern a file's metadata [#79][] (thanks @abustany) +* [Fix] kqueue: fix deadlock [#77][] (thanks @cespare) +* [NOTICE] Development has moved to `code.google.com/p/go.exp/fsnotify` in preparation for inclusion in the Go standard library. + +## v0.8.12 / 2013-11-13 + +* [API] Remove FD_SET and friends from Linux adapter + +## v0.8.11 / 2013-11-02 + +* [Doc] Add Changelog [#72][] (thanks @nathany) +* [Doc] Spotlight and double modify events on OS X [#62][] (reported by @paulhammond) + +## v0.8.10 / 2013-10-19 + +* [Fix] kqueue: remove file watches when parent directory is removed [#71][] (reported by @mdwhatcott) +* [Fix] kqueue: race between Close and readEvents [#70][] (reported by @bernerdschaefer) +* [Doc] specify OS-specific limits in README (thanks @debrando) + +## v0.8.9 / 2013-09-08 + +* [Doc] Contributing (thanks @nathany) +* [Doc] update package path in example code [#63][] (thanks @paulhammond) +* [Doc] GoCI badge in README (Linux only) [#60][] +* [Doc] Cross-platform testing with Vagrant [#59][] (thanks @nathany) + +## v0.8.8 / 2013-06-17 + +* [Fix] Windows: handle `ERROR_MORE_DATA` on Windows [#49][] (thanks @jbowtie) + +## v0.8.7 / 2013-06-03 + +* [API] Make syscall flags internal +* [Fix] inotify: ignore event changes +* [Fix] race in symlink test [#45][] (reported by @srid) +* [Fix] tests on Windows +* lower case error messages + +## v0.8.6 / 2013-05-23 + +* kqueue: Use EVT_ONLY flag on Darwin +* [Doc] Update README with full example + +## v0.8.5 / 2013-05-09 + +* [Fix] inotify: allow monitoring of "broken" symlinks (thanks @tsg) + +## v0.8.4 / 2013-04-07 + +* [Fix] kqueue: watch all file events [#40][] (thanks @ChrisBuchholz) + +## v0.8.3 / 2013-03-13 + +* [Fix] inoitfy/kqueue memory leak [#36][] (reported by @nbkolchin) +* [Fix] kqueue: use fsnFlags for watching a directory [#33][] (reported by @nbkolchin) + +## v0.8.2 / 2013-02-07 + +* [Doc] add Authors +* [Fix] fix data races for map access [#29][] (thanks @fsouza) + +## v0.8.1 / 2013-01-09 + +* [Fix] Windows path separators +* [Doc] BSD License + +## v0.8.0 / 2012-11-09 + +* kqueue: directory watching improvements (thanks @vmirage) +* inotify: add `IN_MOVED_TO` [#25][] (requested by @cpisto) +* [Fix] kqueue: deleting watched directory [#24][] (reported by @jakerr) + +## v0.7.4 / 2012-10-09 + +* [Fix] inotify: fixes from https://codereview.appspot.com/5418045/ (ugorji) +* [Fix] kqueue: preserve watch flags when watching for delete [#21][] (reported by @robfig) +* [Fix] kqueue: watch the directory even if it isn't a new watch (thanks @robfig) +* [Fix] kqueue: modify after recreation of file + +## v0.7.3 / 2012-09-27 + +* [Fix] kqueue: watch with an existing folder inside the watched folder (thanks @vmirage) +* [Fix] kqueue: no longer get duplicate CREATE events + +## v0.7.2 / 2012-09-01 + +* kqueue: events for created directories + +## v0.7.1 / 2012-07-14 + +* [Fix] for renaming files + +## v0.7.0 / 2012-07-02 + +* [Feature] FSNotify flags +* [Fix] inotify: Added file name back to event path + +## v0.6.0 / 2012-06-06 + +* kqueue: watch files after directory created (thanks @tmc) + +## v0.5.1 / 2012-05-22 + +* [Fix] inotify: remove all watches before Close() + +## v0.5.0 / 2012-05-03 + +* [API] kqueue: return errors during watch instead of sending over channel +* kqueue: match symlink behavior on Linux +* inotify: add `DELETE_SELF` (requested by @taralx) +* [Fix] kqueue: handle EINTR (reported by @robfig) +* [Doc] Godoc example [#1][] (thanks @davecheney) + +## v0.4.0 / 2012-03-30 + +* Go 1 released: build with go tool +* [Feature] Windows support using winfsnotify +* Windows does not have attribute change notifications +* Roll attribute notifications into IsModify + +## v0.3.0 / 2012-02-19 + +* kqueue: add files when watch directory + +## v0.2.0 / 2011-12-30 + +* update to latest Go weekly code + +## v0.1.0 / 2011-10-19 + +* kqueue: add watch on file creation to match inotify +* kqueue: create file event +* inotify: ignore `IN_IGNORED` events +* event String() +* linux: common FileEvent functions +* initial commit + +[#79]: https://github.com/howeyc/fsnotify/pull/79 +[#77]: https://github.com/howeyc/fsnotify/pull/77 +[#72]: https://github.com/howeyc/fsnotify/issues/72 +[#71]: https://github.com/howeyc/fsnotify/issues/71 +[#70]: https://github.com/howeyc/fsnotify/issues/70 +[#63]: https://github.com/howeyc/fsnotify/issues/63 +[#62]: https://github.com/howeyc/fsnotify/issues/62 +[#60]: https://github.com/howeyc/fsnotify/issues/60 +[#59]: https://github.com/howeyc/fsnotify/issues/59 +[#49]: https://github.com/howeyc/fsnotify/issues/49 +[#45]: https://github.com/howeyc/fsnotify/issues/45 +[#40]: https://github.com/howeyc/fsnotify/issues/40 +[#36]: https://github.com/howeyc/fsnotify/issues/36 +[#33]: https://github.com/howeyc/fsnotify/issues/33 +[#29]: https://github.com/howeyc/fsnotify/issues/29 +[#25]: https://github.com/howeyc/fsnotify/issues/25 +[#24]: https://github.com/howeyc/fsnotify/issues/24 +[#21]: https://github.com/howeyc/fsnotify/issues/21 + diff --git a/vendor/src/gopkg.in/fsnotify.v1/CONTRIBUTING.md b/vendor/src/gopkg.in/fsnotify.v1/CONTRIBUTING.md new file mode 100644 index 0000000000..0f377f341b --- /dev/null +++ b/vendor/src/gopkg.in/fsnotify.v1/CONTRIBUTING.md @@ -0,0 +1,77 @@ +# Contributing + +## Issues + +* Request features and report bugs using the [GitHub Issue Tracker](https://github.com/go-fsnotify/fsnotify/issues). +* Please indicate the platform you are using fsnotify on. +* A code example to reproduce the problem is appreciated. + +## Pull Requests + +### Contributor License Agreement + +fsnotify is derived from code in the [golang.org/x/exp](https://godoc.org/golang.org/x/exp) package and it may be included [in the standard library](https://github.com/go-fsnotify/fsnotify/issues/1) in the future. Therefore fsnotify carries the same [LICENSE](https://github.com/go-fsnotify/fsnotify/blob/master/LICENSE) as Go. Contributors retain their copyright, so you need to fill out a short form before we can accept your contribution: [Google Individual Contributor License Agreement](https://developers.google.com/open-source/cla/individual). + +Please indicate that you have signed the CLA in your pull request. + +### How fsnotify is Developed + +* Development is done on feature branches. +* Tests are run on BSD, Linux, OS X and Windows. +* Pull requests are reviewed and [applied to master][am] using [hub][]. + * Maintainers may modify or squash commits rather than asking contributors to. +* To issue a new release, the maintainers will: + * Update the CHANGELOG + * Tag a version, which will become available through gopkg.in. + +### How to Fork + +For smooth sailing, always use the original import path. Installing with `go get` makes this easy. + +1. Install from GitHub (`go get -u github.com/go-fsnotify/fsnotify`) +2. Create your feature branch (`git checkout -b my-new-feature`) +3. Ensure everything works and the tests pass (see below) +4. Commit your changes (`git commit -am 'Add some feature'`) + +Contribute upstream: + +1. Fork fsnotify on GitHub +2. Add your remote (`git remote add fork git@github.com:mycompany/repo.git`) +3. Push to the branch (`git push fork my-new-feature`) +4. Create a new Pull Request on GitHub + +This workflow is [thoroughly explained by Katrina Owen](https://blog.splice.com/contributing-open-source-git-repositories-go/). + +### Testing + +fsnotify uses build tags to compile different code on Linux, BSD, OS X, and Windows. + +Before doing a pull request, please do your best to test your changes on multiple platforms, and list which platforms you were able/unable to test on. + +To aid in cross-platform testing there is a Vagrantfile for Linux and BSD. + +* Install [Vagrant](http://www.vagrantup.com/) and [VirtualBox](https://www.virtualbox.org/) +* Setup [Vagrant Gopher](https://github.com/nathany/vagrant-gopher) in your `src` folder. +* Run `vagrant up` from the project folder. You can also setup just one box with `vagrant up linux` or `vagrant up bsd` (note: the BSD box doesn't support Windows hosts at this time, and NFS may prompt for your host OS password) +* Once setup, you can run the test suite on a given OS with a single command `vagrant ssh linux -c 'cd go-fsnotify/fsnotify; go test'`. +* When you're done, you will want to halt or destroy the Vagrant boxes. + +Notice: fsnotify file system events won't trigger in shared folders. The tests get around this limitation by using the /tmp directory. + +Right now there is no equivalent solution for Windows and OS X, but there are Windows VMs [freely available from Microsoft](http://www.modern.ie/en-us/virtualization-tools#downloads). + +### Maintainers + +Help maintaining fsnotify is welcome. To be a maintainer: + +* Submit a pull request and sign the CLA as above. +* You must be able to run the test suite on Mac, Windows, Linux and BSD. + +To keep master clean, the fsnotify project uses the "apply mail" workflow outlined in Nathaniel Talbott's post ["Merge pull request" Considered Harmful][am]. This requires installing [hub][]. + +All code changes should be internal pull requests. + +Releases are tagged using [Semantic Versioning](http://semver.org/). + +[hub]: https://github.com/github/hub +[am]: http://blog.spreedly.com/2014/06/24/merge-pull-request-considered-harmful/#.VGa5yZPF_Zs diff --git a/vendor/src/gopkg.in/fsnotify.v1/LICENSE b/vendor/src/gopkg.in/fsnotify.v1/LICENSE new file mode 100644 index 0000000000..f21e540800 --- /dev/null +++ b/vendor/src/gopkg.in/fsnotify.v1/LICENSE @@ -0,0 +1,28 @@ +Copyright (c) 2012 The Go Authors. All rights reserved. +Copyright (c) 2012 fsnotify Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/src/gopkg.in/fsnotify.v1/NotUsed.xcworkspace b/vendor/src/gopkg.in/fsnotify.v1/NotUsed.xcworkspace new file mode 100644 index 0000000000..e69de29bb2 diff --git a/vendor/src/gopkg.in/fsnotify.v1/README.md b/vendor/src/gopkg.in/fsnotify.v1/README.md new file mode 100644 index 0000000000..7a0b247364 --- /dev/null +++ b/vendor/src/gopkg.in/fsnotify.v1/README.md @@ -0,0 +1,59 @@ +# File system notifications for Go + +[![Coverage](http://gocover.io/_badge/github.com/go-fsnotify/fsnotify)](http://gocover.io/github.com/go-fsnotify/fsnotify) [![GoDoc](https://godoc.org/gopkg.in/fsnotify.v1?status.svg)](https://godoc.org/gopkg.in/fsnotify.v1) + +Go 1.3+ required. + +Cross platform: Windows, Linux, BSD and OS X. + +|Adapter |OS |Status | +|----------|----------|----------| +|inotify |Linux, Android\*|Supported [![Build Status](https://travis-ci.org/go-fsnotify/fsnotify.svg?branch=master)](https://travis-ci.org/go-fsnotify/fsnotify)| +|kqueue |BSD, OS X, iOS\*|Supported [![Circle CI](https://circleci.com/gh/go-fsnotify/fsnotify.svg?style=svg)](https://circleci.com/gh/go-fsnotify/fsnotify)| +|ReadDirectoryChangesW|Windows|Supported [![Build status](https://ci.appveyor.com/api/projects/status/ivwjubaih4r0udeh/branch/master?svg=true)](https://ci.appveyor.com/project/NathanYoungman/fsnotify/branch/master)| +|FSEvents |OS X |[Planned](https://github.com/go-fsnotify/fsnotify/issues/11)| +|FEN |Solaris 11 |[Planned](https://github.com/go-fsnotify/fsnotify/issues/12)| +|fanotify |Linux 2.6.37+ | | +|USN Journals |Windows |[Maybe](https://github.com/go-fsnotify/fsnotify/issues/53)| +|Polling |*All* |[Maybe](https://github.com/go-fsnotify/fsnotify/issues/9)| + +\* Android and iOS are untested. + +Please see [the documentation](https://godoc.org/gopkg.in/fsnotify.v1) for usage. Consult the [Wiki](https://github.com/go-fsnotify/fsnotify/wiki) for the FAQ and further information. + +## API stability + +Two major versions of fsnotify exist. + +**[fsnotify.v0](https://gopkg.in/fsnotify.v0)** is API-compatible with [howeyc/fsnotify](https://godoc.org/github.com/howeyc/fsnotify). Bugfixes *may* be backported, but I recommend upgrading to v1. + +```go +import "gopkg.in/fsnotify.v0" +``` + +\* Refer to the package as fsnotify (without the .v0 suffix). + +**[fsnotify.v1](https://gopkg.in/fsnotify.v1)** provides [a new API](https://godoc.org/gopkg.in/fsnotify.v1) based on [this design document](http://goo.gl/MrYxyA). You can import v1 with: + +```go +import "gopkg.in/fsnotify.v1" +``` + +Further API changes are [planned](https://github.com/go-fsnotify/fsnotify/milestones), but a new major revision will be tagged, so you can depend on the v1 API. + +**Master** may have unreleased changes. Use it to test the very latest code or when [contributing][], but don't expect it to remain API-compatible: + +```go +import "github.com/go-fsnotify/fsnotify" +``` + +## Contributing + +Please refer to [CONTRIBUTING][] before opening an issue or pull request. + +## Example + +See [example_test.go](https://github.com/go-fsnotify/fsnotify/blob/master/example_test.go). + + +[contributing]: https://github.com/go-fsnotify/fsnotify/blob/master/CONTRIBUTING.md diff --git a/vendor/src/gopkg.in/fsnotify.v1/circle.yml b/vendor/src/gopkg.in/fsnotify.v1/circle.yml new file mode 100644 index 0000000000..204217fb0b --- /dev/null +++ b/vendor/src/gopkg.in/fsnotify.v1/circle.yml @@ -0,0 +1,26 @@ +## OS X build (CircleCI iOS beta) + +# Pretend like it's an Xcode project, at least to get it running. +machine: + environment: + XCODE_WORKSPACE: NotUsed.xcworkspace + XCODE_SCHEME: NotUsed + # This is where the go project is actually checked out to: + CIRCLE_BUILD_DIR: $HOME/.go_project/src/github.com/go-fsnotify/fsnotify + +dependencies: + pre: + - brew upgrade go + +test: + override: + - go test ./... + +# Idealized future config, eventually with cross-platform build matrix :-) + +# machine: +# go: +# version: 1.4 +# os: +# - osx +# - linux diff --git a/vendor/src/gopkg.in/fsnotify.v1/fsnotify.go b/vendor/src/gopkg.in/fsnotify.v1/fsnotify.go new file mode 100644 index 0000000000..c899ee0083 --- /dev/null +++ b/vendor/src/gopkg.in/fsnotify.v1/fsnotify.go @@ -0,0 +1,62 @@ +// Copyright 2012 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !plan9,!solaris + +// Package fsnotify provides a platform-independent interface for file system notifications. +package fsnotify + +import ( + "bytes" + "fmt" +) + +// Event represents a single file system notification. +type Event struct { + Name string // Relative path to the file or directory. + Op Op // File operation that triggered the event. +} + +// Op describes a set of file operations. +type Op uint32 + +// These are the generalized file operations that can trigger a notification. +const ( + Create Op = 1 << iota + Write + Remove + Rename + Chmod +) + +// String returns a string representation of the event in the form +// "file: REMOVE|WRITE|..." +func (e Event) String() string { + // Use a buffer for efficient string concatenation + var buffer bytes.Buffer + + if e.Op&Create == Create { + buffer.WriteString("|CREATE") + } + if e.Op&Remove == Remove { + buffer.WriteString("|REMOVE") + } + if e.Op&Write == Write { + buffer.WriteString("|WRITE") + } + if e.Op&Rename == Rename { + buffer.WriteString("|RENAME") + } + if e.Op&Chmod == Chmod { + buffer.WriteString("|CHMOD") + } + + // If buffer remains empty, return no event names + if buffer.Len() == 0 { + return fmt.Sprintf("%q: ", e.Name) + } + + // Return a list of event names, with leading pipe character stripped + return fmt.Sprintf("%q: %s", e.Name, buffer.String()[1:]) +} diff --git a/vendor/src/gopkg.in/fsnotify.v1/inotify.go b/vendor/src/gopkg.in/fsnotify.v1/inotify.go new file mode 100644 index 0000000000..d7759ec8c8 --- /dev/null +++ b/vendor/src/gopkg.in/fsnotify.v1/inotify.go @@ -0,0 +1,306 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build linux + +package fsnotify + +import ( + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "sync" + "syscall" + "unsafe" +) + +// Watcher watches a set of files, delivering events to a channel. +type Watcher struct { + Events chan Event + Errors chan error + mu sync.Mutex // Map access + fd int + poller *fdPoller + watches map[string]*watch // Map of inotify watches (key: path) + paths map[int]string // Map of watched paths (key: watch descriptor) + done chan struct{} // Channel for sending a "quit message" to the reader goroutine + doneResp chan struct{} // Channel to respond to Close +} + +// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. +func NewWatcher() (*Watcher, error) { + // Create inotify fd + fd, errno := syscall.InotifyInit() + if fd == -1 { + return nil, errno + } + // Create epoll + poller, err := newFdPoller(fd) + if err != nil { + syscall.Close(fd) + return nil, err + } + w := &Watcher{ + fd: fd, + poller: poller, + watches: make(map[string]*watch), + paths: make(map[int]string), + Events: make(chan Event), + Errors: make(chan error), + done: make(chan struct{}), + doneResp: make(chan struct{}), + } + + go w.readEvents() + return w, nil +} + +func (w *Watcher) isClosed() bool { + select { + case <-w.done: + return true + default: + return false + } +} + +// Close removes all watches and closes the events channel. +func (w *Watcher) Close() error { + if w.isClosed() { + return nil + } + + // Send 'close' signal to goroutine, and set the Watcher to closed. + close(w.done) + + // Wake up goroutine + w.poller.wake() + + // Wait for goroutine to close + <-w.doneResp + + return nil +} + +// Add starts watching the named file or directory (non-recursively). +func (w *Watcher) Add(name string) error { + name = filepath.Clean(name) + if w.isClosed() { + return errors.New("inotify instance already closed") + } + + const agnosticEvents = syscall.IN_MOVED_TO | syscall.IN_MOVED_FROM | + syscall.IN_CREATE | syscall.IN_ATTRIB | syscall.IN_MODIFY | + syscall.IN_MOVE_SELF | syscall.IN_DELETE | syscall.IN_DELETE_SELF + + var flags uint32 = agnosticEvents + + w.mu.Lock() + watchEntry, found := w.watches[name] + w.mu.Unlock() + if found { + watchEntry.flags |= flags + flags |= syscall.IN_MASK_ADD + } + wd, errno := syscall.InotifyAddWatch(w.fd, name, flags) + if wd == -1 { + return errno + } + + w.mu.Lock() + w.watches[name] = &watch{wd: uint32(wd), flags: flags} + w.paths[wd] = name + w.mu.Unlock() + + return nil +} + +// Remove stops watching the named file or directory (non-recursively). +func (w *Watcher) Remove(name string) error { + name = filepath.Clean(name) + + // Fetch the watch. + w.mu.Lock() + defer w.mu.Unlock() + watch, ok := w.watches[name] + + // Remove it from inotify. + if !ok { + return fmt.Errorf("can't remove non-existent inotify watch for: %s", name) + } + // inotify_rm_watch will return EINVAL if the file has been deleted; + // the inotify will already have been removed. + // That means we can safely delete it from our watches, whatever inotify_rm_watch does. + delete(w.watches, name) + success, errno := syscall.InotifyRmWatch(w.fd, watch.wd) + if success == -1 { + // TODO: Perhaps it's not helpful to return an error here in every case. + // the only two possible errors are: + // EBADF, which happens when w.fd is not a valid file descriptor of any kind. + // EINVAL, which is when fd is not an inotify descriptor or wd is not a valid watch descriptor. + // Watch descriptors are invalidated when they are removed explicitly or implicitly; + // explicitly by inotify_rm_watch, implicitly when the file they are watching is deleted. + return errno + } + return nil +} + +type watch struct { + wd uint32 // Watch descriptor (as returned by the inotify_add_watch() syscall) + flags uint32 // inotify flags of this watch (see inotify(7) for the list of valid flags) +} + +// readEvents reads from the inotify file descriptor, converts the +// received events into Event objects and sends them via the Events channel +func (w *Watcher) readEvents() { + var ( + buf [syscall.SizeofInotifyEvent * 4096]byte // Buffer for a maximum of 4096 raw events + n int // Number of bytes read with read() + errno error // Syscall errno + ok bool // For poller.wait + ) + + defer close(w.doneResp) + defer close(w.Errors) + defer close(w.Events) + defer syscall.Close(w.fd) + defer w.poller.close() + + for { + // See if we have been closed. + if w.isClosed() { + return + } + + ok, errno = w.poller.wait() + if errno != nil { + select { + case w.Errors <- errno: + case <-w.done: + return + } + continue + } + + if !ok { + continue + } + + n, errno = syscall.Read(w.fd, buf[:]) + // If a signal interrupted execution, see if we've been asked to close, and try again. + // http://man7.org/linux/man-pages/man7/signal.7.html : + // "Before Linux 3.8, reads from an inotify(7) file descriptor were not restartable" + if errno == syscall.EINTR { + continue + } + + // syscall.Read might have been woken up by Close. If so, we're done. + if w.isClosed() { + return + } + + if n < syscall.SizeofInotifyEvent { + var err error + if n == 0 { + // If EOF is received. This should really never happen. + err = io.EOF + } else if n < 0 { + // If an error occured while reading. + err = errno + } else { + // Read was too short. + err = errors.New("notify: short read in readEvents()") + } + select { + case w.Errors <- err: + case <-w.done: + return + } + continue + } + + var offset uint32 + // We don't know how many events we just read into the buffer + // While the offset points to at least one whole event... + for offset <= uint32(n-syscall.SizeofInotifyEvent) { + // Point "raw" to the event in the buffer + raw := (*syscall.InotifyEvent)(unsafe.Pointer(&buf[offset])) + + mask := uint32(raw.Mask) + nameLen := uint32(raw.Len) + // If the event happened to the watched directory or the watched file, the kernel + // doesn't append the filename to the event, but we would like to always fill the + // the "Name" field with a valid filename. We retrieve the path of the watch from + // the "paths" map. + w.mu.Lock() + name := w.paths[int(raw.Wd)] + w.mu.Unlock() + if nameLen > 0 { + // Point "bytes" at the first byte of the filename + bytes := (*[syscall.PathMax]byte)(unsafe.Pointer(&buf[offset+syscall.SizeofInotifyEvent])) + // The filename is padded with NULL bytes. TrimRight() gets rid of those. + name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\000") + } + + event := newEvent(name, mask) + + // Send the events that are not ignored on the events channel + if !event.ignoreLinux(mask) { + select { + case w.Events <- event: + case <-w.done: + return + } + } + + // Move to the next event in the buffer + offset += syscall.SizeofInotifyEvent + nameLen + } + } +} + +// Certain types of events can be "ignored" and not sent over the Events +// channel. Such as events marked ignore by the kernel, or MODIFY events +// against files that do not exist. +func (e *Event) ignoreLinux(mask uint32) bool { + // Ignore anything the inotify API says to ignore + if mask&syscall.IN_IGNORED == syscall.IN_IGNORED { + return true + } + + // If the event is not a DELETE or RENAME, the file must exist. + // Otherwise the event is ignored. + // *Note*: this was put in place because it was seen that a MODIFY + // event was sent after the DELETE. This ignores that MODIFY and + // assumes a DELETE will come or has come if the file doesn't exist. + if !(e.Op&Remove == Remove || e.Op&Rename == Rename) { + _, statErr := os.Lstat(e.Name) + return os.IsNotExist(statErr) + } + return false +} + +// newEvent returns an platform-independent Event based on an inotify mask. +func newEvent(name string, mask uint32) Event { + e := Event{Name: name} + if mask&syscall.IN_CREATE == syscall.IN_CREATE || mask&syscall.IN_MOVED_TO == syscall.IN_MOVED_TO { + e.Op |= Create + } + if mask&syscall.IN_DELETE_SELF == syscall.IN_DELETE_SELF || mask&syscall.IN_DELETE == syscall.IN_DELETE { + e.Op |= Remove + } + if mask&syscall.IN_MODIFY == syscall.IN_MODIFY { + e.Op |= Write + } + if mask&syscall.IN_MOVE_SELF == syscall.IN_MOVE_SELF || mask&syscall.IN_MOVED_FROM == syscall.IN_MOVED_FROM { + e.Op |= Rename + } + if mask&syscall.IN_ATTRIB == syscall.IN_ATTRIB { + e.Op |= Chmod + } + return e +} diff --git a/vendor/src/gopkg.in/fsnotify.v1/inotify_poller.go b/vendor/src/gopkg.in/fsnotify.v1/inotify_poller.go new file mode 100644 index 0000000000..3b41784041 --- /dev/null +++ b/vendor/src/gopkg.in/fsnotify.v1/inotify_poller.go @@ -0,0 +1,186 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build linux + +package fsnotify + +import ( + "errors" + "syscall" +) + +type fdPoller struct { + fd int // File descriptor (as returned by the inotify_init() syscall) + epfd int // Epoll file descriptor + pipe [2]int // Pipe for waking up +} + +func emptyPoller(fd int) *fdPoller { + poller := new(fdPoller) + poller.fd = fd + poller.epfd = -1 + poller.pipe[0] = -1 + poller.pipe[1] = -1 + return poller +} + +// Create a new inotify poller. +// This creates an inotify handler, and an epoll handler. +func newFdPoller(fd int) (*fdPoller, error) { + var errno error + poller := emptyPoller(fd) + defer func() { + if errno != nil { + poller.close() + } + }() + poller.fd = fd + + // Create epoll fd + poller.epfd, errno = syscall.EpollCreate(1) + if poller.epfd == -1 { + return nil, errno + } + // Create pipe; pipe[0] is the read end, pipe[1] the write end. + errno = syscall.Pipe2(poller.pipe[:], syscall.O_NONBLOCK) + if errno != nil { + return nil, errno + } + + // Register inotify fd with epoll + event := syscall.EpollEvent{ + Fd: int32(poller.fd), + Events: syscall.EPOLLIN, + } + errno = syscall.EpollCtl(poller.epfd, syscall.EPOLL_CTL_ADD, poller.fd, &event) + if errno != nil { + return nil, errno + } + + // Register pipe fd with epoll + event = syscall.EpollEvent{ + Fd: int32(poller.pipe[0]), + Events: syscall.EPOLLIN, + } + errno = syscall.EpollCtl(poller.epfd, syscall.EPOLL_CTL_ADD, poller.pipe[0], &event) + if errno != nil { + return nil, errno + } + + return poller, nil +} + +// Wait using epoll. +// Returns true if something is ready to be read, +// false if there is not. +func (poller *fdPoller) wait() (bool, error) { + // 3 possible events per fd, and 2 fds, makes a maximum of 6 events. + // I don't know whether epoll_wait returns the number of events returned, + // or the total number of events ready. + // I decided to catch both by making the buffer one larger than the maximum. + events := make([]syscall.EpollEvent, 7) + for { + n, errno := syscall.EpollWait(poller.epfd, events, -1) + if n == -1 { + if errno == syscall.EINTR { + continue + } + return false, errno + } + if n == 0 { + // If there are no events, try again. + continue + } + if n > 6 { + // This should never happen. More events were returned than should be possible. + return false, errors.New("epoll_wait returned more events than I know what to do with") + } + ready := events[:n] + epollhup := false + epollerr := false + epollin := false + for _, event := range ready { + if event.Fd == int32(poller.fd) { + if event.Events&syscall.EPOLLHUP != 0 { + // This should not happen, but if it does, treat it as a wakeup. + epollhup = true + } + if event.Events&syscall.EPOLLERR != 0 { + // If an error is waiting on the file descriptor, we should pretend + // something is ready to read, and let syscall.Read pick up the error. + epollerr = true + } + if event.Events&syscall.EPOLLIN != 0 { + // There is data to read. + epollin = true + } + } + if event.Fd == int32(poller.pipe[0]) { + if event.Events&syscall.EPOLLHUP != 0 { + // Write pipe descriptor was closed, by us. This means we're closing down the + // watcher, and we should wake up. + } + if event.Events&syscall.EPOLLERR != 0 { + // If an error is waiting on the pipe file descriptor. + // This is an absolute mystery, and should never ever happen. + return false, errors.New("Error on the pipe descriptor.") + } + if event.Events&syscall.EPOLLIN != 0 { + // This is a regular wakeup, so we have to clear the buffer. + err := poller.clearWake() + if err != nil { + return false, err + } + } + } + } + + if epollhup || epollerr || epollin { + return true, nil + } + return false, nil + } +} + +// Close the write end of the poller. +func (poller *fdPoller) wake() error { + buf := make([]byte, 1) + n, errno := syscall.Write(poller.pipe[1], buf) + if n == -1 { + if errno == syscall.EAGAIN { + // Buffer is full, poller will wake. + return nil + } + return errno + } + return nil +} + +func (poller *fdPoller) clearWake() error { + // You have to be woken up a LOT in order to get to 100! + buf := make([]byte, 100) + n, errno := syscall.Read(poller.pipe[0], buf) + if n == -1 { + if errno == syscall.EAGAIN { + // Buffer is empty, someone else cleared our wake. + return nil + } + return errno + } + return nil +} + +// Close all poller file descriptors, but not the one passed to it. +func (poller *fdPoller) close() { + if poller.pipe[1] != -1 { + syscall.Close(poller.pipe[1]) + } + if poller.pipe[0] != -1 { + syscall.Close(poller.pipe[0]) + } + if poller.epfd != -1 { + syscall.Close(poller.epfd) + } +} diff --git a/vendor/src/gopkg.in/fsnotify.v1/kqueue.go b/vendor/src/gopkg.in/fsnotify.v1/kqueue.go new file mode 100644 index 0000000000..265622d201 --- /dev/null +++ b/vendor/src/gopkg.in/fsnotify.v1/kqueue.go @@ -0,0 +1,463 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build freebsd openbsd netbsd dragonfly darwin + +package fsnotify + +import ( + "errors" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sync" + "syscall" + "time" +) + +// Watcher watches a set of files, delivering events to a channel. +type Watcher struct { + Events chan Event + Errors chan error + done chan bool // Channel for sending a "quit message" to the reader goroutine + + kq int // File descriptor (as returned by the kqueue() syscall). + + mu sync.Mutex // Protects access to watcher data + watches map[string]int // Map of watched file descriptors (key: path). + externalWatches map[string]bool // Map of watches added by user of the library. + dirFlags map[string]uint32 // Map of watched directories to fflags used in kqueue. + paths map[int]pathInfo // Map file descriptors to path names for processing kqueue events. + fileExists map[string]bool // Keep track of if we know this file exists (to stop duplicate create events). + isClosed bool // Set to true when Close() is first called +} + +type pathInfo struct { + name string + isDir bool +} + +// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. +func NewWatcher() (*Watcher, error) { + kq, err := kqueue() + if err != nil { + return nil, err + } + + w := &Watcher{ + kq: kq, + watches: make(map[string]int), + dirFlags: make(map[string]uint32), + paths: make(map[int]pathInfo), + fileExists: make(map[string]bool), + externalWatches: make(map[string]bool), + Events: make(chan Event), + Errors: make(chan error), + done: make(chan bool), + } + + go w.readEvents() + return w, nil +} + +// Close removes all watches and closes the events channel. +func (w *Watcher) Close() error { + w.mu.Lock() + if w.isClosed { + w.mu.Unlock() + return nil + } + w.isClosed = true + w.mu.Unlock() + + w.mu.Lock() + ws := w.watches + w.mu.Unlock() + + var err error + for name := range ws { + if e := w.Remove(name); e != nil && err == nil { + err = e + } + } + + // Send "quit" message to the reader goroutine: + w.done <- true + + return nil +} + +// Add starts watching the named file or directory (non-recursively). +func (w *Watcher) Add(name string) error { + w.mu.Lock() + w.externalWatches[name] = true + w.mu.Unlock() + return w.addWatch(name, noteAllEvents) +} + +// Remove stops watching the the named file or directory (non-recursively). +func (w *Watcher) Remove(name string) error { + name = filepath.Clean(name) + w.mu.Lock() + watchfd, ok := w.watches[name] + w.mu.Unlock() + if !ok { + return fmt.Errorf("can't remove non-existent kevent watch for: %s", name) + } + + const registerRemove = syscall.EV_DELETE + if err := register(w.kq, []int{watchfd}, registerRemove, 0); err != nil { + return err + } + + syscall.Close(watchfd) + + w.mu.Lock() + isDir := w.paths[watchfd].isDir + delete(w.watches, name) + delete(w.paths, watchfd) + delete(w.dirFlags, name) + w.mu.Unlock() + + // Find all watched paths that are in this directory that are not external. + if isDir { + var pathsToRemove []string + w.mu.Lock() + for _, path := range w.paths { + wdir, _ := filepath.Split(path.name) + if filepath.Clean(wdir) == name { + if !w.externalWatches[path.name] { + pathsToRemove = append(pathsToRemove, path.name) + } + } + } + w.mu.Unlock() + for _, name := range pathsToRemove { + // Since these are internal, not much sense in propagating error + // to the user, as that will just confuse them with an error about + // a path they did not explicitly watch themselves. + w.Remove(name) + } + } + + return nil +} + +// Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE) +const noteAllEvents = syscall.NOTE_DELETE | syscall.NOTE_WRITE | syscall.NOTE_ATTRIB | syscall.NOTE_RENAME + +// keventWaitTime to block on each read from kevent +var keventWaitTime = durationToTimespec(100 * time.Millisecond) + +// addWatch adds name to the watched file set. +// The flags are interpreted as described in kevent(2). +func (w *Watcher) addWatch(name string, flags uint32) error { + var isDir bool + // Make ./name and name equivalent + name = filepath.Clean(name) + + w.mu.Lock() + if w.isClosed { + w.mu.Unlock() + return errors.New("kevent instance already closed") + } + watchfd, alreadyWatching := w.watches[name] + // We already have a watch, but we can still override flags. + if alreadyWatching { + isDir = w.paths[watchfd].isDir + } + w.mu.Unlock() + + if !alreadyWatching { + fi, err := os.Lstat(name) + if err != nil { + return err + } + + // Don't watch sockets. + if fi.Mode()&os.ModeSocket == os.ModeSocket { + return nil + } + + // Follow Symlinks + // Unfortunately, Linux can add bogus symlinks to watch list without + // issue, and Windows can't do symlinks period (AFAIK). To maintain + // consistency, we will act like everything is fine. There will simply + // be no file events for broken symlinks. + // Hence the returns of nil on errors. + if fi.Mode()&os.ModeSymlink == os.ModeSymlink { + name, err = filepath.EvalSymlinks(name) + if err != nil { + return nil + } + + fi, err = os.Lstat(name) + if err != nil { + return nil + } + } + + watchfd, err = syscall.Open(name, openMode, 0700) + if watchfd == -1 { + return err + } + + isDir = fi.IsDir() + } + + const registerAdd = syscall.EV_ADD | syscall.EV_CLEAR | syscall.EV_ENABLE + if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil { + syscall.Close(watchfd) + return err + } + + if !alreadyWatching { + w.mu.Lock() + w.watches[name] = watchfd + w.paths[watchfd] = pathInfo{name: name, isDir: isDir} + w.mu.Unlock() + } + + if isDir { + // Watch the directory if it has not been watched before, + // or if it was watched before, but perhaps only a NOTE_DELETE (watchDirectoryFiles) + w.mu.Lock() + watchDir := (flags&syscall.NOTE_WRITE) == syscall.NOTE_WRITE && + (!alreadyWatching || (w.dirFlags[name]&syscall.NOTE_WRITE) != syscall.NOTE_WRITE) + // Store flags so this watch can be updated later + w.dirFlags[name] = flags + w.mu.Unlock() + + if watchDir { + if err := w.watchDirectoryFiles(name); err != nil { + return err + } + } + } + return nil +} + +// readEvents reads from kqueue and converts the received kevents into +// Event values that it sends down the Events channel. +func (w *Watcher) readEvents() { + eventBuffer := make([]syscall.Kevent_t, 10) + + for { + // See if there is a message on the "done" channel + select { + case <-w.done: + err := syscall.Close(w.kq) + if err != nil { + w.Errors <- err + } + close(w.Events) + close(w.Errors) + return + default: + } + + // Get new events + kevents, err := read(w.kq, eventBuffer, &keventWaitTime) + // EINTR is okay, the syscall was interrupted before timeout expired. + if err != nil && err != syscall.EINTR { + w.Errors <- err + continue + } + + // Flush the events we received to the Events channel + for len(kevents) > 0 { + kevent := &kevents[0] + watchfd := int(kevent.Ident) + mask := uint32(kevent.Fflags) + w.mu.Lock() + path := w.paths[watchfd] + w.mu.Unlock() + event := newEvent(path.name, mask) + + if path.isDir && !(event.Op&Remove == Remove) { + // Double check to make sure the directory exists. This can happen when + // we do a rm -fr on a recursively watched folders and we receive a + // modification event first but the folder has been deleted and later + // receive the delete event + if _, err := os.Lstat(event.Name); os.IsNotExist(err) { + // mark is as delete event + event.Op |= Remove + } + } + + if event.Op&Rename == Rename || event.Op&Remove == Remove { + w.Remove(event.Name) + w.mu.Lock() + delete(w.fileExists, event.Name) + w.mu.Unlock() + } + + if path.isDir && event.Op&Write == Write && !(event.Op&Remove == Remove) { + w.sendDirectoryChangeEvents(event.Name) + } else { + // Send the event on the Events channel + w.Events <- event + } + + if event.Op&Remove == Remove { + // Look for a file that may have overwritten this. + // For example, mv f1 f2 will delete f2, then create f2. + fileDir, _ := filepath.Split(event.Name) + fileDir = filepath.Clean(fileDir) + w.mu.Lock() + _, found := w.watches[fileDir] + w.mu.Unlock() + if found { + // make sure the directory exists before we watch for changes. When we + // do a recursive watch and perform rm -fr, the parent directory might + // have gone missing, ignore the missing directory and let the + // upcoming delete event remove the watch from the parent directory. + if _, err := os.Lstat(fileDir); os.IsExist(err) { + w.sendDirectoryChangeEvents(fileDir) + // FIXME: should this be for events on files or just isDir? + } + } + } + + // Move to next event + kevents = kevents[1:] + } + } +} + +// newEvent returns an platform-independent Event based on kqueue Fflags. +func newEvent(name string, mask uint32) Event { + e := Event{Name: name} + if mask&syscall.NOTE_DELETE == syscall.NOTE_DELETE { + e.Op |= Remove + } + if mask&syscall.NOTE_WRITE == syscall.NOTE_WRITE { + e.Op |= Write + } + if mask&syscall.NOTE_RENAME == syscall.NOTE_RENAME { + e.Op |= Rename + } + if mask&syscall.NOTE_ATTRIB == syscall.NOTE_ATTRIB { + e.Op |= Chmod + } + return e +} + +func newCreateEvent(name string) Event { + return Event{Name: name, Op: Create} +} + +// watchDirectoryFiles to mimic inotify when adding a watch on a directory +func (w *Watcher) watchDirectoryFiles(dirPath string) error { + // Get all files + files, err := ioutil.ReadDir(dirPath) + if err != nil { + return err + } + + for _, fileInfo := range files { + filePath := filepath.Join(dirPath, fileInfo.Name()) + if err := w.internalWatch(filePath, fileInfo); err != nil { + return err + } + + w.mu.Lock() + w.fileExists[filePath] = true + w.mu.Unlock() + } + + return nil +} + +// sendDirectoryEvents searches the directory for newly created files +// and sends them over the event channel. This functionality is to have +// the BSD version of fsnotify match Linux inotify which provides a +// create event for files created in a watched directory. +func (w *Watcher) sendDirectoryChangeEvents(dirPath string) { + // Get all files + files, err := ioutil.ReadDir(dirPath) + if err != nil { + w.Errors <- err + } + + // Search for new files + for _, fileInfo := range files { + filePath := filepath.Join(dirPath, fileInfo.Name()) + w.mu.Lock() + _, doesExist := w.fileExists[filePath] + w.mu.Unlock() + if !doesExist { + // Send create event + w.Events <- newCreateEvent(filePath) + } + + // like watchDirectoryFiles (but without doing another ReadDir) + if err := w.internalWatch(filePath, fileInfo); err != nil { + return + } + + w.mu.Lock() + w.fileExists[filePath] = true + w.mu.Unlock() + } +} + +func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) error { + if fileInfo.IsDir() { + // mimic Linux providing delete events for subdirectories + // but preserve the flags used if currently watching subdirectory + w.mu.Lock() + flags := w.dirFlags[name] + w.mu.Unlock() + + flags |= syscall.NOTE_DELETE + return w.addWatch(name, flags) + } + + // watch file to mimic Linux inotify + return w.addWatch(name, noteAllEvents) +} + +// kqueue creates a new kernel event queue and returns a descriptor. +func kqueue() (kq int, err error) { + kq, err = syscall.Kqueue() + if kq == -1 { + return kq, err + } + return kq, nil +} + +// register events with the queue +func register(kq int, fds []int, flags int, fflags uint32) error { + changes := make([]syscall.Kevent_t, len(fds)) + + for i, fd := range fds { + // SetKevent converts int to the platform-specific types: + syscall.SetKevent(&changes[i], fd, syscall.EVFILT_VNODE, flags) + changes[i].Fflags = fflags + } + + // register the events + success, err := syscall.Kevent(kq, changes, nil, nil) + if success == -1 { + return err + } + return nil +} + +// read retrieves pending events, or waits until an event occurs. +// A timeout of nil blocks indefinitely, while 0 polls the queue. +func read(kq int, events []syscall.Kevent_t, timeout *syscall.Timespec) ([]syscall.Kevent_t, error) { + n, err := syscall.Kevent(kq, nil, events, timeout) + if err != nil { + return nil, err + } + return events[0:n], nil +} + +// durationToTimespec prepares a timeout value +func durationToTimespec(d time.Duration) syscall.Timespec { + return syscall.NsecToTimespec(d.Nanoseconds()) +} diff --git a/vendor/src/gopkg.in/fsnotify.v1/open_mode_bsd.go b/vendor/src/gopkg.in/fsnotify.v1/open_mode_bsd.go new file mode 100644 index 0000000000..c57ccb427b --- /dev/null +++ b/vendor/src/gopkg.in/fsnotify.v1/open_mode_bsd.go @@ -0,0 +1,11 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build freebsd openbsd netbsd dragonfly + +package fsnotify + +import "syscall" + +const openMode = syscall.O_NONBLOCK | syscall.O_RDONLY diff --git a/vendor/src/gopkg.in/fsnotify.v1/open_mode_darwin.go b/vendor/src/gopkg.in/fsnotify.v1/open_mode_darwin.go new file mode 100644 index 0000000000..174b2c331f --- /dev/null +++ b/vendor/src/gopkg.in/fsnotify.v1/open_mode_darwin.go @@ -0,0 +1,12 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build darwin + +package fsnotify + +import "syscall" + +// note: this constant is not defined on BSD +const openMode = syscall.O_EVTONLY diff --git a/vendor/src/gopkg.in/fsnotify.v1/windows.go b/vendor/src/gopkg.in/fsnotify.v1/windows.go new file mode 100644 index 0000000000..811585227d --- /dev/null +++ b/vendor/src/gopkg.in/fsnotify.v1/windows.go @@ -0,0 +1,561 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build windows + +package fsnotify + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "runtime" + "sync" + "syscall" + "unsafe" +) + +// Watcher watches a set of files, delivering events to a channel. +type Watcher struct { + Events chan Event + Errors chan error + isClosed bool // Set to true when Close() is first called + mu sync.Mutex // Map access + port syscall.Handle // Handle to completion port + watches watchMap // Map of watches (key: i-number) + input chan *input // Inputs to the reader are sent on this channel + quit chan chan<- error +} + +// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. +func NewWatcher() (*Watcher, error) { + port, e := syscall.CreateIoCompletionPort(syscall.InvalidHandle, 0, 0, 0) + if e != nil { + return nil, os.NewSyscallError("CreateIoCompletionPort", e) + } + w := &Watcher{ + port: port, + watches: make(watchMap), + input: make(chan *input, 1), + Events: make(chan Event, 50), + Errors: make(chan error), + quit: make(chan chan<- error, 1), + } + go w.readEvents() + return w, nil +} + +// Close removes all watches and closes the events channel. +func (w *Watcher) Close() error { + if w.isClosed { + return nil + } + w.isClosed = true + + // Send "quit" message to the reader goroutine + ch := make(chan error) + w.quit <- ch + if err := w.wakeupReader(); err != nil { + return err + } + return <-ch +} + +// Add starts watching the named file or directory (non-recursively). +func (w *Watcher) Add(name string) error { + if w.isClosed { + return errors.New("watcher already closed") + } + in := &input{ + op: opAddWatch, + path: filepath.Clean(name), + flags: sys_FS_ALL_EVENTS, + reply: make(chan error), + } + w.input <- in + if err := w.wakeupReader(); err != nil { + return err + } + return <-in.reply +} + +// Remove stops watching the the named file or directory (non-recursively). +func (w *Watcher) Remove(name string) error { + in := &input{ + op: opRemoveWatch, + path: filepath.Clean(name), + reply: make(chan error), + } + w.input <- in + if err := w.wakeupReader(); err != nil { + return err + } + return <-in.reply +} + +const ( + // Options for AddWatch + sys_FS_ONESHOT = 0x80000000 + sys_FS_ONLYDIR = 0x1000000 + + // Events + sys_FS_ACCESS = 0x1 + sys_FS_ALL_EVENTS = 0xfff + sys_FS_ATTRIB = 0x4 + sys_FS_CLOSE = 0x18 + sys_FS_CREATE = 0x100 + sys_FS_DELETE = 0x200 + sys_FS_DELETE_SELF = 0x400 + sys_FS_MODIFY = 0x2 + sys_FS_MOVE = 0xc0 + sys_FS_MOVED_FROM = 0x40 + sys_FS_MOVED_TO = 0x80 + sys_FS_MOVE_SELF = 0x800 + + // Special events + sys_FS_IGNORED = 0x8000 + sys_FS_Q_OVERFLOW = 0x4000 +) + +func newEvent(name string, mask uint32) Event { + e := Event{Name: name} + if mask&sys_FS_CREATE == sys_FS_CREATE || mask&sys_FS_MOVED_TO == sys_FS_MOVED_TO { + e.Op |= Create + } + if mask&sys_FS_DELETE == sys_FS_DELETE || mask&sys_FS_DELETE_SELF == sys_FS_DELETE_SELF { + e.Op |= Remove + } + if mask&sys_FS_MODIFY == sys_FS_MODIFY { + e.Op |= Write + } + if mask&sys_FS_MOVE == sys_FS_MOVE || mask&sys_FS_MOVE_SELF == sys_FS_MOVE_SELF || mask&sys_FS_MOVED_FROM == sys_FS_MOVED_FROM { + e.Op |= Rename + } + if mask&sys_FS_ATTRIB == sys_FS_ATTRIB { + e.Op |= Chmod + } + return e +} + +const ( + opAddWatch = iota + opRemoveWatch +) + +const ( + provisional uint64 = 1 << (32 + iota) +) + +type input struct { + op int + path string + flags uint32 + reply chan error +} + +type inode struct { + handle syscall.Handle + volume uint32 + index uint64 +} + +type watch struct { + ov syscall.Overlapped + ino *inode // i-number + path string // Directory path + mask uint64 // Directory itself is being watched with these notify flags + names map[string]uint64 // Map of names being watched and their notify flags + rename string // Remembers the old name while renaming a file + buf [4096]byte +} + +type indexMap map[uint64]*watch +type watchMap map[uint32]indexMap + +func (w *Watcher) wakeupReader() error { + e := syscall.PostQueuedCompletionStatus(w.port, 0, 0, nil) + if e != nil { + return os.NewSyscallError("PostQueuedCompletionStatus", e) + } + return nil +} + +func getDir(pathname string) (dir string, err error) { + attr, e := syscall.GetFileAttributes(syscall.StringToUTF16Ptr(pathname)) + if e != nil { + return "", os.NewSyscallError("GetFileAttributes", e) + } + if attr&syscall.FILE_ATTRIBUTE_DIRECTORY != 0 { + dir = pathname + } else { + dir, _ = filepath.Split(pathname) + dir = filepath.Clean(dir) + } + return +} + +func getIno(path string) (ino *inode, err error) { + h, e := syscall.CreateFile(syscall.StringToUTF16Ptr(path), + syscall.FILE_LIST_DIRECTORY, + syscall.FILE_SHARE_READ|syscall.FILE_SHARE_WRITE|syscall.FILE_SHARE_DELETE, + nil, syscall.OPEN_EXISTING, + syscall.FILE_FLAG_BACKUP_SEMANTICS|syscall.FILE_FLAG_OVERLAPPED, 0) + if e != nil { + return nil, os.NewSyscallError("CreateFile", e) + } + var fi syscall.ByHandleFileInformation + if e = syscall.GetFileInformationByHandle(h, &fi); e != nil { + syscall.CloseHandle(h) + return nil, os.NewSyscallError("GetFileInformationByHandle", e) + } + ino = &inode{ + handle: h, + volume: fi.VolumeSerialNumber, + index: uint64(fi.FileIndexHigh)<<32 | uint64(fi.FileIndexLow), + } + return ino, nil +} + +// Must run within the I/O thread. +func (m watchMap) get(ino *inode) *watch { + if i := m[ino.volume]; i != nil { + return i[ino.index] + } + return nil +} + +// Must run within the I/O thread. +func (m watchMap) set(ino *inode, watch *watch) { + i := m[ino.volume] + if i == nil { + i = make(indexMap) + m[ino.volume] = i + } + i[ino.index] = watch +} + +// Must run within the I/O thread. +func (w *Watcher) addWatch(pathname string, flags uint64) error { + dir, err := getDir(pathname) + if err != nil { + return err + } + if flags&sys_FS_ONLYDIR != 0 && pathname != dir { + return nil + } + ino, err := getIno(dir) + if err != nil { + return err + } + w.mu.Lock() + watchEntry := w.watches.get(ino) + w.mu.Unlock() + if watchEntry == nil { + if _, e := syscall.CreateIoCompletionPort(ino.handle, w.port, 0, 0); e != nil { + syscall.CloseHandle(ino.handle) + return os.NewSyscallError("CreateIoCompletionPort", e) + } + watchEntry = &watch{ + ino: ino, + path: dir, + names: make(map[string]uint64), + } + w.mu.Lock() + w.watches.set(ino, watchEntry) + w.mu.Unlock() + flags |= provisional + } else { + syscall.CloseHandle(ino.handle) + } + if pathname == dir { + watchEntry.mask |= flags + } else { + watchEntry.names[filepath.Base(pathname)] |= flags + } + if err = w.startRead(watchEntry); err != nil { + return err + } + if pathname == dir { + watchEntry.mask &= ^provisional + } else { + watchEntry.names[filepath.Base(pathname)] &= ^provisional + } + return nil +} + +// Must run within the I/O thread. +func (w *Watcher) remWatch(pathname string) error { + dir, err := getDir(pathname) + if err != nil { + return err + } + ino, err := getIno(dir) + if err != nil { + return err + } + w.mu.Lock() + watch := w.watches.get(ino) + w.mu.Unlock() + if watch == nil { + return fmt.Errorf("can't remove non-existent watch for: %s", pathname) + } + if pathname == dir { + w.sendEvent(watch.path, watch.mask&sys_FS_IGNORED) + watch.mask = 0 + } else { + name := filepath.Base(pathname) + w.sendEvent(watch.path+"\\"+name, watch.names[name]&sys_FS_IGNORED) + delete(watch.names, name) + } + return w.startRead(watch) +} + +// Must run within the I/O thread. +func (w *Watcher) deleteWatch(watch *watch) { + for name, mask := range watch.names { + if mask&provisional == 0 { + w.sendEvent(watch.path+"\\"+name, mask&sys_FS_IGNORED) + } + delete(watch.names, name) + } + if watch.mask != 0 { + if watch.mask&provisional == 0 { + w.sendEvent(watch.path, watch.mask&sys_FS_IGNORED) + } + watch.mask = 0 + } +} + +// Must run within the I/O thread. +func (w *Watcher) startRead(watch *watch) error { + if e := syscall.CancelIo(watch.ino.handle); e != nil { + w.Errors <- os.NewSyscallError("CancelIo", e) + w.deleteWatch(watch) + } + mask := toWindowsFlags(watch.mask) + for _, m := range watch.names { + mask |= toWindowsFlags(m) + } + if mask == 0 { + if e := syscall.CloseHandle(watch.ino.handle); e != nil { + w.Errors <- os.NewSyscallError("CloseHandle", e) + } + w.mu.Lock() + delete(w.watches[watch.ino.volume], watch.ino.index) + w.mu.Unlock() + return nil + } + e := syscall.ReadDirectoryChanges(watch.ino.handle, &watch.buf[0], + uint32(unsafe.Sizeof(watch.buf)), false, mask, nil, &watch.ov, 0) + if e != nil { + err := os.NewSyscallError("ReadDirectoryChanges", e) + if e == syscall.ERROR_ACCESS_DENIED && watch.mask&provisional == 0 { + // Watched directory was probably removed + if w.sendEvent(watch.path, watch.mask&sys_FS_DELETE_SELF) { + if watch.mask&sys_FS_ONESHOT != 0 { + watch.mask = 0 + } + } + err = nil + } + w.deleteWatch(watch) + w.startRead(watch) + return err + } + return nil +} + +// readEvents reads from the I/O completion port, converts the +// received events into Event objects and sends them via the Events channel. +// Entry point to the I/O thread. +func (w *Watcher) readEvents() { + var ( + n, key uint32 + ov *syscall.Overlapped + ) + runtime.LockOSThread() + + for { + e := syscall.GetQueuedCompletionStatus(w.port, &n, &key, &ov, syscall.INFINITE) + watch := (*watch)(unsafe.Pointer(ov)) + + if watch == nil { + select { + case ch := <-w.quit: + w.mu.Lock() + var indexes []indexMap + for _, index := range w.watches { + indexes = append(indexes, index) + } + w.mu.Unlock() + for _, index := range indexes { + for _, watch := range index { + w.deleteWatch(watch) + w.startRead(watch) + } + } + var err error + if e := syscall.CloseHandle(w.port); e != nil { + err = os.NewSyscallError("CloseHandle", e) + } + close(w.Events) + close(w.Errors) + ch <- err + return + case in := <-w.input: + switch in.op { + case opAddWatch: + in.reply <- w.addWatch(in.path, uint64(in.flags)) + case opRemoveWatch: + in.reply <- w.remWatch(in.path) + } + default: + } + continue + } + + switch e { + case syscall.ERROR_MORE_DATA: + if watch == nil { + w.Errors <- errors.New("ERROR_MORE_DATA has unexpectedly null lpOverlapped buffer") + } else { + // The i/o succeeded but the buffer is full. + // In theory we should be building up a full packet. + // In practice we can get away with just carrying on. + n = uint32(unsafe.Sizeof(watch.buf)) + } + case syscall.ERROR_ACCESS_DENIED: + // Watched directory was probably removed + w.sendEvent(watch.path, watch.mask&sys_FS_DELETE_SELF) + w.deleteWatch(watch) + w.startRead(watch) + continue + case syscall.ERROR_OPERATION_ABORTED: + // CancelIo was called on this handle + continue + default: + w.Errors <- os.NewSyscallError("GetQueuedCompletionPort", e) + continue + case nil: + } + + var offset uint32 + for { + if n == 0 { + w.Events <- newEvent("", sys_FS_Q_OVERFLOW) + w.Errors <- errors.New("short read in readEvents()") + break + } + + // Point "raw" to the event in the buffer + raw := (*syscall.FileNotifyInformation)(unsafe.Pointer(&watch.buf[offset])) + buf := (*[syscall.MAX_PATH]uint16)(unsafe.Pointer(&raw.FileName)) + name := syscall.UTF16ToString(buf[:raw.FileNameLength/2]) + fullname := watch.path + "\\" + name + + var mask uint64 + switch raw.Action { + case syscall.FILE_ACTION_REMOVED: + mask = sys_FS_DELETE_SELF + case syscall.FILE_ACTION_MODIFIED: + mask = sys_FS_MODIFY + case syscall.FILE_ACTION_RENAMED_OLD_NAME: + watch.rename = name + case syscall.FILE_ACTION_RENAMED_NEW_NAME: + if watch.names[watch.rename] != 0 { + watch.names[name] |= watch.names[watch.rename] + delete(watch.names, watch.rename) + mask = sys_FS_MOVE_SELF + } + } + + sendNameEvent := func() { + if w.sendEvent(fullname, watch.names[name]&mask) { + if watch.names[name]&sys_FS_ONESHOT != 0 { + delete(watch.names, name) + } + } + } + if raw.Action != syscall.FILE_ACTION_RENAMED_NEW_NAME { + sendNameEvent() + } + if raw.Action == syscall.FILE_ACTION_REMOVED { + w.sendEvent(fullname, watch.names[name]&sys_FS_IGNORED) + delete(watch.names, name) + } + if w.sendEvent(fullname, watch.mask&toFSnotifyFlags(raw.Action)) { + if watch.mask&sys_FS_ONESHOT != 0 { + watch.mask = 0 + } + } + if raw.Action == syscall.FILE_ACTION_RENAMED_NEW_NAME { + fullname = watch.path + "\\" + watch.rename + sendNameEvent() + } + + // Move to the next event in the buffer + if raw.NextEntryOffset == 0 { + break + } + offset += raw.NextEntryOffset + + // Error! + if offset >= n { + w.Errors <- errors.New("Windows system assumed buffer larger than it is, events have likely been missed.") + break + } + } + + if err := w.startRead(watch); err != nil { + w.Errors <- err + } + } +} + +func (w *Watcher) sendEvent(name string, mask uint64) bool { + if mask == 0 { + return false + } + event := newEvent(name, uint32(mask)) + select { + case ch := <-w.quit: + w.quit <- ch + case w.Events <- event: + } + return true +} + +func toWindowsFlags(mask uint64) uint32 { + var m uint32 + if mask&sys_FS_ACCESS != 0 { + m |= syscall.FILE_NOTIFY_CHANGE_LAST_ACCESS + } + if mask&sys_FS_MODIFY != 0 { + m |= syscall.FILE_NOTIFY_CHANGE_LAST_WRITE + } + if mask&sys_FS_ATTRIB != 0 { + m |= syscall.FILE_NOTIFY_CHANGE_ATTRIBUTES + } + if mask&(sys_FS_MOVE|sys_FS_CREATE|sys_FS_DELETE) != 0 { + m |= syscall.FILE_NOTIFY_CHANGE_FILE_NAME | syscall.FILE_NOTIFY_CHANGE_DIR_NAME + } + return m +} + +func toFSnotifyFlags(action uint32) uint64 { + switch action { + case syscall.FILE_ACTION_ADDED: + return sys_FS_CREATE + case syscall.FILE_ACTION_REMOVED: + return sys_FS_DELETE + case syscall.FILE_ACTION_MODIFIED: + return sys_FS_MODIFY + case syscall.FILE_ACTION_RENAMED_OLD_NAME: + return sys_FS_MOVED_FROM + case syscall.FILE_ACTION_RENAMED_NEW_NAME: + return sys_FS_MOVED_TO + } + return 0 +}