diff --git a/api/types/plugins/logdriver/entry.pb.go b/api/types/plugins/logdriver/entry.pb.go new file mode 100644 index 0000000000..5d7d8b4c41 --- /dev/null +++ b/api/types/plugins/logdriver/entry.pb.go @@ -0,0 +1,449 @@ +// Code generated by protoc-gen-gogo. +// source: entry.proto +// DO NOT EDIT! + +/* + Package logdriver is a generated protocol buffer package. + + It is generated from these files: + entry.proto + + It has these top-level messages: + LogEntry +*/ +package logdriver + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type LogEntry struct { + Source string `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` + TimeNano int64 `protobuf:"varint,2,opt,name=time_nano,json=timeNano,proto3" json:"time_nano,omitempty"` + Line []byte `protobuf:"bytes,3,opt,name=line,proto3" json:"line,omitempty"` + Partial bool `protobuf:"varint,4,opt,name=partial,proto3" json:"partial,omitempty"` +} + +func (m *LogEntry) Reset() { *m = LogEntry{} } +func (m *LogEntry) String() string { return proto.CompactTextString(m) } +func (*LogEntry) ProtoMessage() {} +func (*LogEntry) Descriptor() ([]byte, []int) { return fileDescriptorEntry, []int{0} } + +func (m *LogEntry) GetSource() string { + if m != nil { + return m.Source + } + return "" +} + +func (m *LogEntry) GetTimeNano() int64 { + if m != nil { + return m.TimeNano + } + return 0 +} + +func (m *LogEntry) GetLine() []byte { + if m != nil { + return m.Line + } + return nil +} + +func (m *LogEntry) GetPartial() bool { + if m != nil { + return m.Partial + } + return false +} + +func init() { + proto.RegisterType((*LogEntry)(nil), "LogEntry") +} +func (m *LogEntry) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LogEntry) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Source) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintEntry(dAtA, i, uint64(len(m.Source))) + i += copy(dAtA[i:], m.Source) + } + if m.TimeNano != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintEntry(dAtA, i, uint64(m.TimeNano)) + } + if len(m.Line) > 0 { + dAtA[i] = 0x1a + i++ + i = encodeVarintEntry(dAtA, i, uint64(len(m.Line))) + i += copy(dAtA[i:], m.Line) + } + if m.Partial { + dAtA[i] = 0x20 + i++ + if m.Partial { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + } + return i, nil +} + +func encodeFixed64Entry(dAtA []byte, offset int, v uint64) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + dAtA[offset+4] = uint8(v >> 32) + dAtA[offset+5] = uint8(v >> 40) + dAtA[offset+6] = uint8(v >> 48) + dAtA[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Entry(dAtA []byte, offset int, v uint32) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintEntry(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *LogEntry) Size() (n int) { + var l int + _ = l + l = len(m.Source) + if l > 0 { + n += 1 + l + sovEntry(uint64(l)) + } + if m.TimeNano != 0 { + n += 1 + sovEntry(uint64(m.TimeNano)) + } + l = len(m.Line) + if l > 0 { + n += 1 + l + sovEntry(uint64(l)) + } + if m.Partial { + n += 2 + } + return n +} + +func sovEntry(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozEntry(x uint64) (n int) { + return sovEntry(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *LogEntry) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEntry + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LogEntry: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LogEntry: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Source", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEntry + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEntry + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Source = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TimeNano", wireType) + } + m.TimeNano = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEntry + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TimeNano |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Line", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEntry + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthEntry + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Line = append(m.Line[:0], dAtA[iNdEx:postIndex]...) + if m.Line == nil { + m.Line = []byte{} + } + iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Partial", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEntry + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Partial = bool(v != 0) + default: + iNdEx = preIndex + skippy, err := skipEntry(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthEntry + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipEntry(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowEntry + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowEntry + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowEntry + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthEntry + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowEntry + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipEntry(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthEntry = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowEntry = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("entry.proto", fileDescriptorEntry) } + +var fileDescriptorEntry = []byte{ + // 149 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29, + 0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x57, 0xca, 0xe5, 0xe2, 0xf0, 0xc9, 0x4f, 0x77, 0x05, + 0x89, 0x08, 0x89, 0x71, 0xb1, 0x15, 0xe7, 0x97, 0x16, 0x25, 0xa7, 0x4a, 0x30, 0x2a, 0x30, 0x6a, + 0x70, 0x06, 0x41, 0x79, 0x42, 0xd2, 0x5c, 0x9c, 0x25, 0x99, 0xb9, 0xa9, 0xf1, 0x79, 0x89, 0x79, + 0xf9, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xcc, 0x41, 0x1c, 0x20, 0x01, 0xbf, 0xc4, 0xbc, 0x7c, 0x21, + 0x21, 0x2e, 0x96, 0x9c, 0xcc, 0xbc, 0x54, 0x09, 0x66, 0x05, 0x46, 0x0d, 0x9e, 0x20, 0x30, 0x5b, + 0x48, 0x82, 0x8b, 0xbd, 0x20, 0xb1, 0xa8, 0x24, 0x33, 0x31, 0x47, 0x82, 0x45, 0x81, 0x51, 0x83, + 0x23, 0x08, 0xc6, 0x75, 0xe2, 0x39, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, + 0xe4, 0x18, 0x93, 0xd8, 0xc0, 0x6e, 0x30, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x2d, 0x24, 0x5a, + 0xd4, 0x92, 0x00, 0x00, 0x00, +} diff --git a/api/types/plugins/logdriver/entry.proto b/api/types/plugins/logdriver/entry.proto new file mode 100644 index 0000000000..a4e96ea5f4 --- /dev/null +++ b/api/types/plugins/logdriver/entry.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +message LogEntry { + string source = 1; + int64 time_nano = 2; + bytes line = 3; + bool partial = 4; +} diff --git a/api/types/plugins/logdriver/gen.go b/api/types/plugins/logdriver/gen.go new file mode 100644 index 0000000000..068f987324 --- /dev/null +++ b/api/types/plugins/logdriver/gen.go @@ -0,0 +1,3 @@ +//go:generate protoc --gogofast_out=import_path=github.com/docker/docker/api/types/plugins/logdriver:. entry.proto + +package logdriver diff --git a/api/types/plugins/logdriver/io.go b/api/types/plugins/logdriver/io.go new file mode 100644 index 0000000000..8c1ed49dd6 --- /dev/null +++ b/api/types/plugins/logdriver/io.go @@ -0,0 +1,87 @@ +package logdriver + +import ( + "encoding/binary" + "io" +) + +const binaryEncodeLen = 4 + +// LogEntryEncoder encodes a LogEntry to a protobuf stream +// The stream should look like: +// +// [uint32 binary encoded message size][protobuf message] +// +// To decode an entry, read the first 4 bytes to get the size of the entry, +// then read `size` bytes from the stream. +type LogEntryEncoder interface { + Encode(*LogEntry) error +} + +// NewLogEntryEncoder creates a protobuf stream encoder for log entries. +// This is used to write out log entries to a stream. +func NewLogEntryEncoder(w io.Writer) LogEntryEncoder { + return &logEntryEncoder{ + w: w, + buf: make([]byte, 1024), + } +} + +type logEntryEncoder struct { + buf []byte + w io.Writer +} + +func (e *logEntryEncoder) Encode(l *LogEntry) error { + n := l.Size() + + total := n + binaryEncodeLen + if total > len(e.buf) { + e.buf = make([]byte, total) + } + binary.BigEndian.PutUint32(e.buf, uint32(n)) + + if _, err := l.MarshalTo(e.buf[binaryEncodeLen:]); err != nil { + return err + } + _, err := e.w.Write(e.buf[:total]) + return err +} + +// LogEntryDecoder decodes log entries from a stream +// It is expected that the wire format is as defined by LogEntryEncoder. +type LogEntryDecoder interface { + Decode(*LogEntry) error +} + +// NewLogEntryDecoder creates a new stream decoder for log entries +func NewLogEntryDecoder(r io.Reader) LogEntryDecoder { + return &logEntryDecoder{ + lenBuf: make([]byte, binaryEncodeLen), + buf: make([]byte, 1024), + r: r, + } +} + +type logEntryDecoder struct { + r io.Reader + lenBuf []byte + buf []byte +} + +func (d *logEntryDecoder) Decode(l *LogEntry) error { + _, err := io.ReadFull(d.r, d.lenBuf) + if err != nil { + return err + } + + size := int(binary.BigEndian.Uint32(d.lenBuf)) + if len(d.buf) < size { + d.buf = make([]byte, size) + } + + if _, err := io.ReadFull(d.r, d.buf[:size]); err != nil { + return err + } + return l.Unmarshal(d.buf[:size]) +} diff --git a/daemon/daemon.go b/daemon/daemon.go index 3e2ed115c8..37946f4738 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -27,6 +27,7 @@ import ( "github.com/docker/docker/daemon/discovery" "github.com/docker/docker/daemon/events" "github.com/docker/docker/daemon/exec" + "github.com/docker/docker/daemon/logger" // register graph drivers _ "github.com/docker/docker/daemon/graphdriver/register" "github.com/docker/docker/daemon/initlayer" @@ -589,6 +590,7 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe d.RegistryService = registryService d.PluginStore = pluginStore + logger.RegisterPluginGetter(d.PluginStore) // Plugin system initialization should happen before restore. Do not change order. d.pluginManager, err = plugin.NewManager(plugin.ManagerConfig{ diff --git a/daemon/logger/adapter.go b/daemon/logger/adapter.go new file mode 100644 index 0000000000..e6d7598b44 --- /dev/null +++ b/daemon/logger/adapter.go @@ -0,0 +1,135 @@ +package logger + +import ( + "io" + "os" + "sync" + "time" + + "github.com/Sirupsen/logrus" + "github.com/docker/docker/api/types/plugins/logdriver" + "github.com/docker/docker/pkg/plugingetter" + "github.com/pkg/errors" +) + +// pluginAdapter takes a plugin and implements the Logger interface for logger +// instances +type pluginAdapter struct { + driverName string + id string + plugin logPlugin + fifoPath string + capabilities Capability + logInfo Info + + // synchronize access to the log stream and shared buffer + mu sync.Mutex + enc logdriver.LogEntryEncoder + stream io.WriteCloser + // buf is shared for each `Log()` call to reduce allocations. + // buf must be protected by mutex + buf logdriver.LogEntry +} + +func (a *pluginAdapter) Log(msg *Message) error { + a.mu.Lock() + + a.buf.Line = msg.Line + a.buf.TimeNano = msg.Timestamp.UnixNano() + a.buf.Partial = msg.Partial + a.buf.Source = msg.Source + + err := a.enc.Encode(&a.buf) + a.buf.Reset() + + a.mu.Unlock() + + PutMessage(msg) + return err +} + +func (a *pluginAdapter) Name() string { + return a.driverName +} + +func (a *pluginAdapter) Close() error { + a.mu.Lock() + defer a.mu.Unlock() + + if err := a.plugin.StopLogging(a.fifoPath); err != nil { + return err + } + + if err := a.stream.Close(); err != nil { + logrus.WithError(err).Error("error closing plugin fifo") + } + if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) { + logrus.WithError(err).Error("error cleaning up plugin fifo") + } + + // may be nil, especially for unit tests + if pluginGetter != nil { + pluginGetter.Get(a.Name(), extName, plugingetter.Release) + } + return nil +} + +type pluginAdapterWithRead struct { + *pluginAdapter +} + +func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { + watcher := NewLogWatcher() + + go func() { + defer close(watcher.Msg) + stream, err := a.plugin.ReadLogs(a.logInfo, config) + if err != nil { + watcher.Err <- errors.Wrap(err, "error getting log reader") + return + } + defer stream.Close() + + dec := logdriver.NewLogEntryDecoder(stream) + for { + select { + case <-watcher.WatchClose(): + return + default: + } + + var buf logdriver.LogEntry + if err := dec.Decode(&buf); err != nil { + if err == io.EOF { + return + } + select { + case watcher.Err <- errors.Wrap(err, "error decoding log message"): + case <-watcher.WatchClose(): + } + return + } + + msg := &Message{ + Timestamp: time.Unix(0, buf.TimeNano), + Line: buf.Line, + Source: buf.Source, + } + + // plugin should handle this, but check just in case + if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) { + continue + } + + select { + case watcher.Msg <- msg: + case <-watcher.WatchClose(): + // make sure the message we consumed is sent + watcher.Msg <- msg + return + } + } + }() + + return watcher +} diff --git a/daemon/logger/adapter_test.go b/daemon/logger/adapter_test.go new file mode 100644 index 0000000000..707550e7e3 --- /dev/null +++ b/daemon/logger/adapter_test.go @@ -0,0 +1,208 @@ +package logger + +import ( + "bytes" + "encoding/binary" + "io" + "io/ioutil" + "os" + "runtime" + "testing" + "time" + + "github.com/docker/docker/api/types/plugins/logdriver" + protoio "github.com/gogo/protobuf/io" +) + +// mockLoggingPlugin implements the loggingPlugin interface for testing purposes +// it only supports a single log stream +type mockLoggingPlugin struct { + inStream io.ReadCloser + f *os.File + closed chan struct{} + t *testing.T +} + +func (l *mockLoggingPlugin) StartLogging(file string, info Info) error { + go func() { + io.Copy(l.f, l.inStream) + close(l.closed) + }() + return nil +} + +func (l *mockLoggingPlugin) StopLogging(file string) error { + l.inStream.Close() + l.f.Close() + os.Remove(l.f.Name()) + return nil +} + +func (l *mockLoggingPlugin) Capabilities() (cap Capability, err error) { + return Capability{ReadLogs: true}, nil +} + +func (l *mockLoggingPlugin) ReadLogs(info Info, config ReadConfig) (io.ReadCloser, error) { + r, w := io.Pipe() + f, err := os.Open(l.f.Name()) + if err != nil { + return nil, err + } + go func() { + defer f.Close() + dec := protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6) + enc := logdriver.NewLogEntryEncoder(w) + + for { + select { + case <-l.closed: + w.Close() + return + default: + } + + var msg logdriver.LogEntry + if err := dec.ReadMsg(&msg); err != nil { + if err == io.EOF { + if !config.Follow { + w.Close() + return + } + dec = protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6) + continue + } + + l.t.Fatal(err) + continue + } + + if err := enc.Encode(&msg); err != nil { + w.CloseWithError(err) + return + } + } + }() + + return r, nil +} + +func newMockPluginAdapter(t *testing.T) Logger { + r, w := io.Pipe() + f, err := ioutil.TempFile("", "mock-plugin-adapter") + if err != nil { + t.Fatal(err) + } + enc := logdriver.NewLogEntryEncoder(w) + a := &pluginAdapterWithRead{ + &pluginAdapter{ + plugin: &mockLoggingPlugin{ + inStream: r, + f: f, + closed: make(chan struct{}), + t: t, + }, + stream: w, + enc: enc, + }, + } + a.plugin.StartLogging("", Info{}) + return a +} + +func TestAdapterReadLogs(t *testing.T) { + l := newMockPluginAdapter(t) + + testMsg := []Message{ + {Line: []byte("Are you the keymaker?"), Timestamp: time.Now()}, + {Line: []byte("Follow the white rabbit"), Timestamp: time.Now()}, + } + for _, msg := range testMsg { + m := msg.copy() + if err := l.Log(m); err != nil { + t.Fatal(err) + } + } + + lr, ok := l.(LogReader) + if !ok { + t.Fatal("expected log reader") + } + + lw := lr.ReadLogs(ReadConfig{}) + + for _, x := range testMsg { + select { + case msg := <-lw.Msg: + testMessageEqual(t, &x, msg) + case <-time.After(10 * time.Millisecond): + t.Fatal("timeout reading logs") + } + } + + select { + case _, ok := <-lw.Msg: + if ok { + t.Fatal("expected message channel to be closed") + } + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for message channel to close") + + } + lw.Close() + + lw = lr.ReadLogs(ReadConfig{Follow: true}) + for _, x := range testMsg { + select { + case msg := <-lw.Msg: + testMessageEqual(t, &x, msg) + case <-time.After(10 * time.Second): + t.Fatal("timeout reading logs") + } + } + + x := Message{Line: []byte("Too infinity and beyond!"), Timestamp: time.Now()} + + if err := l.Log(x.copy()); err != nil { + t.Fatal(err) + } + + select { + case msg, ok := <-lw.Msg: + if !ok { + t.Fatal("message channel unexpectedly closed") + } + testMessageEqual(t, &x, msg) + case <-time.After(10 * time.Second): + t.Fatal("timeout reading logs") + } + + l.Close() + select { + case msg, ok := <-lw.Msg: + if ok { + t.Fatal("expected message channel to be closed") + } + if msg != nil { + t.Fatal("expected nil message") + } + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for logger to close") + } +} + +func testMessageEqual(t *testing.T, a, b *Message) { + _, _, n, _ := runtime.Caller(1) + errFmt := "line %d: expected same messages:\nwant: %+v\nhave: %+v" + + if !bytes.Equal(a.Line, b.Line) { + t.Fatalf(errFmt, n, *a, *b) + } + + if a.Timestamp.UnixNano() != b.Timestamp.UnixNano() { + t.Fatalf(errFmt, n, *a, *b) + } + + if a.Source != b.Source { + t.Fatalf(errFmt, n, *a, *b) + } +} diff --git a/daemon/logger/factory.go b/daemon/logger/factory.go index 192d3e0df2..32d51effa8 100644 --- a/daemon/logger/factory.go +++ b/daemon/logger/factory.go @@ -5,6 +5,7 @@ import ( "sync" containertypes "github.com/docker/docker/api/types/container" + "github.com/docker/docker/pkg/plugingetter" units "github.com/docker/go-units" "github.com/pkg/errors" ) @@ -37,6 +38,13 @@ func (lf *logdriverFactory) driverRegistered(name string) bool { lf.m.Lock() _, ok := lf.registry[name] lf.m.Unlock() + if !ok { + if pluginGetter != nil { // this can be nil when the init functions are running + if l, _ := getPlugin(name, plugingetter.Lookup); l != nil { + return true + } + } + } return ok } @@ -56,10 +64,12 @@ func (lf *logdriverFactory) get(name string) (Creator, error) { defer lf.m.Unlock() c, ok := lf.registry[name] - if !ok { - return c, fmt.Errorf("logger: no log driver named '%s' is registered", name) + if ok { + return c, nil } - return c, nil + + c, err := getPlugin(name, plugingetter.Acquire) + return c, errors.Wrapf(err, "logger: no log driver named '%s' is registered", name) } func (lf *logdriverFactory) getLogOptValidator(name string) LogOptValidator { diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index 1135195dc2..daa5403de2 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -126,3 +126,11 @@ func (w *LogWatcher) Close() { func (w *LogWatcher) WatchClose() <-chan struct{} { return w.closeNotifier } + +// Capability defines the list of capabilties that a driver can implement +// These capabilities are not required to be a logging driver, however do +// determine how a logging driver can be used +type Capability struct { + // Determines if a log driver can read back logs + ReadLogs bool +} diff --git a/daemon/logger/logger_test.go b/daemon/logger/logger_test.go new file mode 100644 index 0000000000..4d6e079308 --- /dev/null +++ b/daemon/logger/logger_test.go @@ -0,0 +1,19 @@ +package logger + +func (m *Message) copy() *Message { + msg := &Message{ + Source: m.Source, + Partial: m.Partial, + Timestamp: m.Timestamp, + } + + if m.Attrs != nil { + msg.Attrs = make(map[string]string, len(m.Attrs)) + for k, v := range m.Attrs { + msg.Attrs[k] = v + } + } + + msg.Line = append(make([]byte, 0, len(m.Line)), m.Line...) + return msg +} diff --git a/daemon/logger/plugin.go b/daemon/logger/plugin.go new file mode 100644 index 0000000000..de618c5aea --- /dev/null +++ b/daemon/logger/plugin.go @@ -0,0 +1,89 @@ +package logger + +import ( + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/docker/docker/api/types/plugins/logdriver" + getter "github.com/docker/docker/pkg/plugingetter" + "github.com/docker/docker/pkg/stringid" + "github.com/pkg/errors" +) + +var pluginGetter getter.PluginGetter + +const extName = "LogDriver" + +// logPlugin defines the available functions that logging plugins must implement. +type logPlugin interface { + StartLogging(streamPath string, info Info) (err error) + StopLogging(streamPath string) (err error) + Capabilities() (cap Capability, err error) + ReadLogs(info Info, config ReadConfig) (stream io.ReadCloser, err error) +} + +// RegisterPluginGetter sets the plugingetter +func RegisterPluginGetter(plugingetter getter.PluginGetter) { + pluginGetter = plugingetter +} + +// GetDriver returns a logging driver by its name. +// If the driver is empty, it looks for the local driver. +func getPlugin(name string, mode int) (Creator, error) { + p, err := pluginGetter.Get(name, extName, mode) + if err != nil { + return nil, fmt.Errorf("error looking up logging plugin %s: %v", name, err) + } + + d := &logPluginProxy{p.Client()} + return makePluginCreator(name, d, p.BasePath()), nil +} + +func makePluginCreator(name string, l *logPluginProxy, basePath string) Creator { + return func(logCtx Info) (logger Logger, err error) { + defer func() { + if err != nil { + pluginGetter.Get(name, extName, getter.Release) + } + }() + root := filepath.Join(basePath, "run", "docker", "logging") + if err := os.MkdirAll(root, 0700); err != nil { + return nil, err + } + + id := stringid.GenerateNonCryptoID() + a := &pluginAdapter{ + driverName: name, + id: id, + plugin: l, + fifoPath: filepath.Join(root, id), + logInfo: logCtx, + } + + cap, err := a.plugin.Capabilities() + if err == nil { + a.capabilities = cap + } + + stream, err := openPluginStream(a) + if err != nil { + return nil, err + } + + a.stream = stream + a.enc = logdriver.NewLogEntryEncoder(a.stream) + + if err := l.StartLogging(strings.TrimPrefix(a.fifoPath, basePath), logCtx); err != nil { + return nil, errors.Wrapf(err, "error creating logger") + } + + if cap.ReadLogs { + return &pluginAdapterWithRead{a}, nil + } + + return a, nil + } +} diff --git a/daemon/logger/plugin_unix.go b/daemon/logger/plugin_unix.go new file mode 100644 index 0000000000..f254c9c57d --- /dev/null +++ b/daemon/logger/plugin_unix.go @@ -0,0 +1,20 @@ +// +build linux solaris freebsd + +package logger + +import ( + "context" + "io" + + "github.com/pkg/errors" + "github.com/tonistiigi/fifo" + "golang.org/x/sys/unix" +) + +func openPluginStream(a *pluginAdapter) (io.WriteCloser, error) { + f, err := fifo.OpenFifo(context.Background(), a.fifoPath, unix.O_WRONLY|unix.O_CREAT|unix.O_NONBLOCK, 0700) + if err != nil { + return nil, errors.Wrapf(err, "error creating i/o pipe for log plugin: %s", a.Name()) + } + return f, nil +} diff --git a/daemon/logger/plugin_unsupported.go b/daemon/logger/plugin_unsupported.go new file mode 100644 index 0000000000..0a2036c838 --- /dev/null +++ b/daemon/logger/plugin_unsupported.go @@ -0,0 +1,12 @@ +// +build !linux,!solaris,!freebsd + +package logger + +import ( + "errors" + "io" +) + +func openPluginStream(a *pluginAdapter) (io.WriteCloser, error) { + return nil, errors.New("log plugin not supported") +} diff --git a/daemon/logger/proxy.go b/daemon/logger/proxy.go new file mode 100644 index 0000000000..53860eba69 --- /dev/null +++ b/daemon/logger/proxy.go @@ -0,0 +1,107 @@ +package logger + +import ( + "errors" + "io" +) + +type client interface { + Call(string, interface{}, interface{}) error + Stream(string, interface{}) (io.ReadCloser, error) +} + +type logPluginProxy struct { + client +} + +type logPluginProxyStartLoggingRequest struct { + File string + Info Info +} + +type logPluginProxyStartLoggingResponse struct { + Err string +} + +func (pp *logPluginProxy) StartLogging(file string, info Info) (err error) { + var ( + req logPluginProxyStartLoggingRequest + ret logPluginProxyStartLoggingResponse + ) + + req.File = file + req.Info = info + if err = pp.Call("LogDriver.StartLogging", req, &ret); err != nil { + return + } + + if ret.Err != "" { + err = errors.New(ret.Err) + } + + return +} + +type logPluginProxyStopLoggingRequest struct { + File string +} + +type logPluginProxyStopLoggingResponse struct { + Err string +} + +func (pp *logPluginProxy) StopLogging(file string) (err error) { + var ( + req logPluginProxyStopLoggingRequest + ret logPluginProxyStopLoggingResponse + ) + + req.File = file + if err = pp.Call("LogDriver.StopLogging", req, &ret); err != nil { + return + } + + if ret.Err != "" { + err = errors.New(ret.Err) + } + + return +} + +type logPluginProxyCapabilitiesResponse struct { + Cap Capability + Err string +} + +func (pp *logPluginProxy) Capabilities() (cap Capability, err error) { + var ( + ret logPluginProxyCapabilitiesResponse + ) + + if err = pp.Call("LogDriver.Capabilities", nil, &ret); err != nil { + return + } + + cap = ret.Cap + + if ret.Err != "" { + err = errors.New(ret.Err) + } + + return +} + +type logPluginProxyReadLogsRequest struct { + Info Info + Config ReadConfig +} + +func (pp *logPluginProxy) ReadLogs(info Info, config ReadConfig) (stream io.ReadCloser, err error) { + var ( + req logPluginProxyReadLogsRequest + ) + + req.Info = info + req.Config = config + return pp.Stream("LogDriver.ReadLogs", req) +} diff --git a/docs/extend/config.md b/docs/extend/config.md index ad43e898c7..4feb6bf315 100644 --- a/docs/extend/config.md +++ b/docs/extend/config.md @@ -59,6 +59,8 @@ Config provides the base accessible fields for working with V0 plugin format - **docker.authz/1.0** + - **docker.logdriver/1.0** + - **`socket`** *string* socket is the name of the socket the engine should use to communicate with the plugins. diff --git a/docs/extend/plugins_logging.md b/docs/extend/plugins_logging.md new file mode 100644 index 0000000000..fd02a6a6f7 --- /dev/null +++ b/docs/extend/plugins_logging.md @@ -0,0 +1,220 @@ +--- +title: "Docker log driver plugins" +description: "Log driver plugins." +keywords: "Examples, Usage, plugins, docker, documentation, user guide, logging" +--- + + + +# Logging driver plugins + +This document describes logging driver plugins for Docker. + +Logging drivers enables users to forward container logs to another service for +processing. Docker includes several logging drivers as built-ins, however can +never hope to support all use-cases with built-in drivers. Plugins allow Docker +to support a wide range of logging services without requiring to embed client +libraries for these services in the main Docker codebase. See the +[plugin documentation](legacy_plugins.md) for more information. + +## Create a logging plugin + +The main interface for logging plugins uses the same JSON+HTTP RPC protocol used +by other plugin types. See the +[example](https://github.com/cpuguy83/docker-log-driver-test) plugin for a +reference implementation of a logging plugin. The example wraps the built-in +`jsonfilelog` log driver. + +## LogDriver protocol + +Logging plugins must register as a `LogDriver` during plugin activation. Once +activated users can specify the plugin as a log driver. + +There are two HTTP endpoints that logging plugins must implement: + +### `/LogDriver.StartLogging` + +Signals to the plugin that a container is starting that the plugin should start +receiving logs for. + +Logs will be streamed over the defined file in the request. On Linux this file +is a FIFO. Logging plugins are not currently supported on Windows. + +**Request**: +```json +{ + "File": "/path/to/file/stream", + "Info": { + "ContainerID": "123456" + } +} +``` + +`File` is the path to the log stream that needs to be consumed. Each call to +`StartLogging` should provide a different file path, even if it's a container +that the plugin has already received logs for prior. The file is created by +docker with a randomly generated name. + +`Info` is details about the container that's being logged. This is fairly +free-form, but is defined by the following struct definition: + +```go +type Info struct { + Config map[string]string + ContainerID string + ContainerName string + ContainerEntrypoint string + ContainerArgs []string + ContainerImageID string + ContainerImageName string + ContainerCreated time.Time + ContainerEnv []string + ContainerLabels map[string]string + LogPath string + DaemonName string +} +``` + + +`ContainerID` will always be supplied with this struct, but other fields may be +empty or missing. + +**Response** +```json +{ + "Err": "" +} +``` + +If an error occurred during this request, add an error message to the `Err` field +in the response. If no error then you can either send an empty response (`{}`) +or an empty value for the `Err` field. + +The driver should at this point be consuming log messages from the passed in file. +If messages are unconsumed, it may cause the contaier to block while trying to +write to its stdio streams. + +Log stream messages are encoded as protocol buffers. The protobuf definitions are +in the +[docker repository](https://github.com/docker/docker/blob/master/api/types/plugins/logdriver/entry.proto). + +Since protocol buffers are not self-delimited you must decode them from the stream +using the following stream format: + +``` +[size][message] +``` + +Where `size` is a 4-byte big endian binary encoded uint32. `size` in this case +defines the size of the next message. `message` is the actual log entry. + +A reference golang implementation of a stream encoder/decoder can be found +[here](https://github.com/docker/docker/blob/master/api/types/plugins/logdriver/io.go) + +### `/LogDriver.StopLogging` + +Signals to the plugin to stop collecting logs from the defined file. +Once a response is received, the file will be removed by Docker. You must make +sure to collect all logs on the stream before responding to this request or risk +losing log data. + +Requests on this endpoint does not mean that the container has been removed +only that it has stopped. + +**Request**: +```json +{ + "File": "/path/to/file/stream" +} +``` + +**Response**: +```json +{ + "Err": "" +} +``` + +If an error occurred during this request, add an error message to the `Err` field +in the response. If no error then you can either send an empty response (`{}`) +or an empty value for the `Err` field. + +## Optional endpoints + +Logging plugins can implement two extra logging endpoints: + +### `/LogDriver.Capabilities` + +Defines the capabilities of the log driver. You must implement this endpoint for +Docker to be able to take advantage of any of the defined capabilities. + +**Request**: +```json +{} +``` + +**Response**: +```json +{ + "ReadLogs": true +} +``` + +Supported capabilities: + +- `ReadLogs` - this tells Docker that the plugin is capable of reading back logs +to clients. Plugins that report that they support `ReadLogs` must implement the +`/LogDriver.ReadLogs` endpoint + +### `/LogDriver.ReadLogs` + +Reads back logs to the client. This is used when `docker logs ` is +called. + +In order for Docker to use this endpoint, the plugin must specify as much when +`/LogDriver.Capabilities` is called. + + +**Request**: +```json +{ + "ReadConfig": {}, + "Info": { + "ContainerID": "123456" + } +} +``` + +`ReadConfig` is the list of options for reading, it is defined with the following +golang struct: + +```go +type ReadConfig struct { + Since time.Time + Tail int + Follow bool +} +``` + +- `Since` defines the oldest log that should be sent. +- `Tail` defines the number of lines to read (e.g. like the command `tail -n 10`) +- `Follow` signals that the client wants to stay attached to receive new log messages +as they come in once the existing logs have been read. + +`Info` is the same type defined in `/LogDriver.StartLogging`. It should be used +to determine what set of logs to read. + +**Response**: +``` +{{ log stream }} +``` + +The response should be the encoded log message using the same format as the +messages that the plugin consumed from Docker. diff --git a/hack/validate/lint b/hack/validate/lint index b4799e796b..d362f46243 100755 --- a/hack/validate/lint +++ b/hack/validate/lint @@ -4,7 +4,7 @@ export SCRIPTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" source "${SCRIPTDIR}/.validate" IFS=$'\n' -files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' | grep -v '^vendor/' | grep -v '^api/types/container/' | grep -v '^cli/compose/schema/bindata.go' || true) ) +files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' | grep -v '^vendor/' | grep -v '^api/types/container/' | grep -v '^cli/compose/schema/bindata.go' | grep -v '^api/types/plugins/logdriver/entry.pb.go' || true) ) unset IFS errors=() diff --git a/integration-cli/docker_cli_plugins_logdriver_test.go b/integration-cli/docker_cli_plugins_logdriver_test.go new file mode 100644 index 0000000000..c5029e2527 --- /dev/null +++ b/integration-cli/docker_cli_plugins_logdriver_test.go @@ -0,0 +1,27 @@ +package main + +import ( + "strings" + + "github.com/docker/docker/integration-cli/checker" + "github.com/go-check/check" +) + +func (s *DockerSuite) TestPluginLogDriver(c *check.C) { + testRequires(c, IsAmd64, DaemonIsLinux) + + pluginName := "cpuguy83/docker-logdriver-test:latest" + + dockerCmd(c, "plugin", "install", pluginName) + dockerCmd(c, "run", "--log-driver", pluginName, "--name=test", "busybox", "echo", "hello") + out, _ := dockerCmd(c, "logs", "test") + c.Assert(strings.TrimSpace(out), checker.Equals, "hello") + + dockerCmd(c, "start", "-a", "test") + out, _ = dockerCmd(c, "logs", "test") + c.Assert(strings.TrimSpace(out), checker.Equals, "hello\nhello") + + dockerCmd(c, "rm", "test") + dockerCmd(c, "plugin", "disable", pluginName) + dockerCmd(c, "plugin", "rm", pluginName) +} diff --git a/vendor/github.com/gogo/protobuf/io/full.go b/vendor/github.com/gogo/protobuf/io/full.go new file mode 100644 index 0000000000..550726a32f --- /dev/null +++ b/vendor/github.com/gogo/protobuf/io/full.go @@ -0,0 +1,102 @@ +// Protocol Buffers for Go with Gadgets +// +// Copyright (c) 2013, The GoGo Authors. All rights reserved. +// http://github.com/gogo/protobuf +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package io + +import ( + "github.com/gogo/protobuf/proto" + "io" +) + +func NewFullWriter(w io.Writer) WriteCloser { + return &fullWriter{w, nil} +} + +type fullWriter struct { + w io.Writer + buffer []byte +} + +func (this *fullWriter) WriteMsg(msg proto.Message) (err error) { + var data []byte + if m, ok := msg.(marshaler); ok { + n, ok := getSize(m) + if !ok { + data, err = proto.Marshal(msg) + if err != nil { + return err + } + } + if n >= len(this.buffer) { + this.buffer = make([]byte, n) + } + _, err = m.MarshalTo(this.buffer) + if err != nil { + return err + } + data = this.buffer[:n] + } else { + data, err = proto.Marshal(msg) + if err != nil { + return err + } + } + _, err = this.w.Write(data) + return err +} + +func (this *fullWriter) Close() error { + if closer, ok := this.w.(io.Closer); ok { + return closer.Close() + } + return nil +} + +type fullReader struct { + r io.Reader + buf []byte +} + +func NewFullReader(r io.Reader, maxSize int) ReadCloser { + return &fullReader{r, make([]byte, maxSize)} +} + +func (this *fullReader) ReadMsg(msg proto.Message) error { + length, err := this.r.Read(this.buf) + if err != nil { + return err + } + return proto.Unmarshal(this.buf[:length], msg) +} + +func (this *fullReader) Close() error { + if closer, ok := this.r.(io.Closer); ok { + return closer.Close() + } + return nil +} diff --git a/vendor/github.com/gogo/protobuf/io/io.go b/vendor/github.com/gogo/protobuf/io/io.go new file mode 100644 index 0000000000..6dca519a18 --- /dev/null +++ b/vendor/github.com/gogo/protobuf/io/io.go @@ -0,0 +1,70 @@ +// Protocol Buffers for Go with Gadgets +// +// Copyright (c) 2013, The GoGo Authors. All rights reserved. +// http://github.com/gogo/protobuf +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package io + +import ( + "github.com/gogo/protobuf/proto" + "io" +) + +type Writer interface { + WriteMsg(proto.Message) error +} + +type WriteCloser interface { + Writer + io.Closer +} + +type Reader interface { + ReadMsg(msg proto.Message) error +} + +type ReadCloser interface { + Reader + io.Closer +} + +type marshaler interface { + MarshalTo(data []byte) (n int, err error) +} + +func getSize(v interface{}) (int, bool) { + if sz, ok := v.(interface { + Size() (n int) + }); ok { + return sz.Size(), true + } else if sz, ok := v.(interface { + ProtoSize() (n int) + }); ok { + return sz.ProtoSize(), true + } else { + return 0, false + } +} diff --git a/vendor/github.com/gogo/protobuf/io/uint32.go b/vendor/github.com/gogo/protobuf/io/uint32.go new file mode 100644 index 0000000000..c3dad1ae75 --- /dev/null +++ b/vendor/github.com/gogo/protobuf/io/uint32.go @@ -0,0 +1,126 @@ +// Protocol Buffers for Go with Gadgets +// +// Copyright (c) 2013, The GoGo Authors. All rights reserved. +// http://github.com/gogo/protobuf +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package io + +import ( + "encoding/binary" + "github.com/gogo/protobuf/proto" + "io" +) + +func NewUint32DelimitedWriter(w io.Writer, byteOrder binary.ByteOrder) WriteCloser { + return &uint32Writer{w, byteOrder, nil} +} + +func NewSizeUint32DelimitedWriter(w io.Writer, byteOrder binary.ByteOrder, size int) WriteCloser { + return &uint32Writer{w, byteOrder, make([]byte, size)} +} + +type uint32Writer struct { + w io.Writer + byteOrder binary.ByteOrder + buffer []byte +} + +func (this *uint32Writer) WriteMsg(msg proto.Message) (err error) { + var data []byte + if m, ok := msg.(marshaler); ok { + n, ok := getSize(m) + if !ok { + data, err = proto.Marshal(msg) + if err != nil { + return err + } + } + if n >= len(this.buffer) { + this.buffer = make([]byte, n) + } + _, err = m.MarshalTo(this.buffer) + if err != nil { + return err + } + data = this.buffer[:n] + } else { + data, err = proto.Marshal(msg) + if err != nil { + return err + } + } + length := uint32(len(data)) + if err = binary.Write(this.w, this.byteOrder, &length); err != nil { + return err + } + _, err = this.w.Write(data) + return err +} + +func (this *uint32Writer) Close() error { + if closer, ok := this.w.(io.Closer); ok { + return closer.Close() + } + return nil +} + +type uint32Reader struct { + r io.Reader + byteOrder binary.ByteOrder + lenBuf []byte + buf []byte + maxSize int +} + +func NewUint32DelimitedReader(r io.Reader, byteOrder binary.ByteOrder, maxSize int) ReadCloser { + return &uint32Reader{r, byteOrder, make([]byte, 4), nil, maxSize} +} + +func (this *uint32Reader) ReadMsg(msg proto.Message) error { + if _, err := io.ReadFull(this.r, this.lenBuf); err != nil { + return err + } + length32 := this.byteOrder.Uint32(this.lenBuf) + length := int(length32) + if length < 0 || length > this.maxSize { + return io.ErrShortBuffer + } + if length >= len(this.buf) { + this.buf = make([]byte, length) + } + _, err := io.ReadFull(this.r, this.buf[:length]) + if err != nil { + return err + } + return proto.Unmarshal(this.buf[:length], msg) +} + +func (this *uint32Reader) Close() error { + if closer, ok := this.r.(io.Closer); ok { + return closer.Close() + } + return nil +} diff --git a/vendor/github.com/gogo/protobuf/io/varint.go b/vendor/github.com/gogo/protobuf/io/varint.go new file mode 100644 index 0000000000..a72e14a583 --- /dev/null +++ b/vendor/github.com/gogo/protobuf/io/varint.go @@ -0,0 +1,134 @@ +// Protocol Buffers for Go with Gadgets +// +// Copyright (c) 2013, The GoGo Authors. All rights reserved. +// http://github.com/gogo/protobuf +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package io + +import ( + "bufio" + "encoding/binary" + "errors" + "github.com/gogo/protobuf/proto" + "io" +) + +var ( + errSmallBuffer = errors.New("Buffer Too Small") + errLargeValue = errors.New("Value is Larger than 64 bits") +) + +func NewDelimitedWriter(w io.Writer) WriteCloser { + return &varintWriter{w, make([]byte, 10), nil} +} + +type varintWriter struct { + w io.Writer + lenBuf []byte + buffer []byte +} + +func (this *varintWriter) WriteMsg(msg proto.Message) (err error) { + var data []byte + if m, ok := msg.(marshaler); ok { + n, ok := getSize(m) + if !ok { + data, err = proto.Marshal(msg) + if err != nil { + return err + } + } + if n >= len(this.buffer) { + this.buffer = make([]byte, n) + } + _, err = m.MarshalTo(this.buffer) + if err != nil { + return err + } + data = this.buffer[:n] + } else { + data, err = proto.Marshal(msg) + if err != nil { + return err + } + } + length := uint64(len(data)) + n := binary.PutUvarint(this.lenBuf, length) + _, err = this.w.Write(this.lenBuf[:n]) + if err != nil { + return err + } + _, err = this.w.Write(data) + return err +} + +func (this *varintWriter) Close() error { + if closer, ok := this.w.(io.Closer); ok { + return closer.Close() + } + return nil +} + +func NewDelimitedReader(r io.Reader, maxSize int) ReadCloser { + var closer io.Closer + if c, ok := r.(io.Closer); ok { + closer = c + } + return &varintReader{bufio.NewReader(r), nil, maxSize, closer} +} + +type varintReader struct { + r *bufio.Reader + buf []byte + maxSize int + closer io.Closer +} + +func (this *varintReader) ReadMsg(msg proto.Message) error { + length64, err := binary.ReadUvarint(this.r) + if err != nil { + return err + } + length := int(length64) + if length < 0 || length > this.maxSize { + return io.ErrShortBuffer + } + if len(this.buf) < length { + this.buf = make([]byte, length) + } + buf := this.buf[:length] + if _, err := io.ReadFull(this.r, buf); err != nil { + return err + } + return proto.Unmarshal(buf, msg) +} + +func (this *varintReader) Close() error { + if this.closer != nil { + return this.closer.Close() + } + return nil +}