Implement plugins for logging drivers

Logging plugins use the same HTTP interface as other plugins for basic
command operations meanwhile actual logging operations are handled (on
Unix) via a fifo.

The plugin interface looks like so:

```go
type loggingPlugin interface {
  StartLogging(fifoPath string, loggingContext Context) error
  StopLogging(fifoPath)
```

This means a plugin must implement `LoggingDriver.StartLogging` and
`LoggingDriver.StopLogging` endpoints and be able to consume the passed
in fifo.

Logs are sent via stream encoder to the fifo encoded with protobuf.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2016-11-14 13:53:53 -05:00
Родитель fc6d8fb94d
Коммит 27bd6842f8
22 изменённых файлов: 1842 добавлений и 4 удалений

Просмотреть файл

@ -0,0 +1,449 @@
// Code generated by protoc-gen-gogo.
// source: entry.proto
// DO NOT EDIT!
/*
Package logdriver is a generated protocol buffer package.
It is generated from these files:
entry.proto
It has these top-level messages:
LogEntry
*/
package logdriver
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
import io "io"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type LogEntry struct {
Source string `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"`
TimeNano int64 `protobuf:"varint,2,opt,name=time_nano,json=timeNano,proto3" json:"time_nano,omitempty"`
Line []byte `protobuf:"bytes,3,opt,name=line,proto3" json:"line,omitempty"`
Partial bool `protobuf:"varint,4,opt,name=partial,proto3" json:"partial,omitempty"`
}
func (m *LogEntry) Reset() { *m = LogEntry{} }
func (m *LogEntry) String() string { return proto.CompactTextString(m) }
func (*LogEntry) ProtoMessage() {}
func (*LogEntry) Descriptor() ([]byte, []int) { return fileDescriptorEntry, []int{0} }
func (m *LogEntry) GetSource() string {
if m != nil {
return m.Source
}
return ""
}
func (m *LogEntry) GetTimeNano() int64 {
if m != nil {
return m.TimeNano
}
return 0
}
func (m *LogEntry) GetLine() []byte {
if m != nil {
return m.Line
}
return nil
}
func (m *LogEntry) GetPartial() bool {
if m != nil {
return m.Partial
}
return false
}
func init() {
proto.RegisterType((*LogEntry)(nil), "LogEntry")
}
func (m *LogEntry) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *LogEntry) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.Source) > 0 {
dAtA[i] = 0xa
i++
i = encodeVarintEntry(dAtA, i, uint64(len(m.Source)))
i += copy(dAtA[i:], m.Source)
}
if m.TimeNano != 0 {
dAtA[i] = 0x10
i++
i = encodeVarintEntry(dAtA, i, uint64(m.TimeNano))
}
if len(m.Line) > 0 {
dAtA[i] = 0x1a
i++
i = encodeVarintEntry(dAtA, i, uint64(len(m.Line)))
i += copy(dAtA[i:], m.Line)
}
if m.Partial {
dAtA[i] = 0x20
i++
if m.Partial {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i++
}
return i, nil
}
func encodeFixed64Entry(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
dAtA[offset+1] = uint8(v >> 8)
dAtA[offset+2] = uint8(v >> 16)
dAtA[offset+3] = uint8(v >> 24)
dAtA[offset+4] = uint8(v >> 32)
dAtA[offset+5] = uint8(v >> 40)
dAtA[offset+6] = uint8(v >> 48)
dAtA[offset+7] = uint8(v >> 56)
return offset + 8
}
func encodeFixed32Entry(dAtA []byte, offset int, v uint32) int {
dAtA[offset] = uint8(v)
dAtA[offset+1] = uint8(v >> 8)
dAtA[offset+2] = uint8(v >> 16)
dAtA[offset+3] = uint8(v >> 24)
return offset + 4
}
func encodeVarintEntry(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return offset + 1
}
func (m *LogEntry) Size() (n int) {
var l int
_ = l
l = len(m.Source)
if l > 0 {
n += 1 + l + sovEntry(uint64(l))
}
if m.TimeNano != 0 {
n += 1 + sovEntry(uint64(m.TimeNano))
}
l = len(m.Line)
if l > 0 {
n += 1 + l + sovEntry(uint64(l))
}
if m.Partial {
n += 2
}
return n
}
func sovEntry(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
}
func sozEntry(x uint64) (n int) {
return sovEntry(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *LogEntry) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEntry
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: LogEntry: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: LogEntry: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Source", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEntry
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthEntry
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Source = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field TimeNano", wireType)
}
m.TimeNano = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEntry
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.TimeNano |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Line", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEntry
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthEntry
}
postIndex := iNdEx + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Line = append(m.Line[:0], dAtA[iNdEx:postIndex]...)
if m.Line == nil {
m.Line = []byte{}
}
iNdEx = postIndex
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Partial", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEntry
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.Partial = bool(v != 0)
default:
iNdEx = preIndex
skippy, err := skipEntry(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthEntry
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipEntry(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowEntry
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowEntry
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowEntry
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
iNdEx += length
if length < 0 {
return 0, ErrInvalidLengthEntry
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowEntry
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipEntry(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
}
return iNdEx, nil
case 4:
return iNdEx, nil
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
}
panic("unreachable")
}
var (
ErrInvalidLengthEntry = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowEntry = fmt.Errorf("proto: integer overflow")
)
func init() { proto.RegisterFile("entry.proto", fileDescriptorEntry) }
var fileDescriptorEntry = []byte{
// 149 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29,
0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x57, 0xca, 0xe5, 0xe2, 0xf0, 0xc9, 0x4f, 0x77, 0x05,
0x89, 0x08, 0x89, 0x71, 0xb1, 0x15, 0xe7, 0x97, 0x16, 0x25, 0xa7, 0x4a, 0x30, 0x2a, 0x30, 0x6a,
0x70, 0x06, 0x41, 0x79, 0x42, 0xd2, 0x5c, 0x9c, 0x25, 0x99, 0xb9, 0xa9, 0xf1, 0x79, 0x89, 0x79,
0xf9, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xcc, 0x41, 0x1c, 0x20, 0x01, 0xbf, 0xc4, 0xbc, 0x7c, 0x21,
0x21, 0x2e, 0x96, 0x9c, 0xcc, 0xbc, 0x54, 0x09, 0x66, 0x05, 0x46, 0x0d, 0x9e, 0x20, 0x30, 0x5b,
0x48, 0x82, 0x8b, 0xbd, 0x20, 0xb1, 0xa8, 0x24, 0x33, 0x31, 0x47, 0x82, 0x45, 0x81, 0x51, 0x83,
0x23, 0x08, 0xc6, 0x75, 0xe2, 0x39, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f,
0xe4, 0x18, 0x93, 0xd8, 0xc0, 0x6e, 0x30, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x2d, 0x24, 0x5a,
0xd4, 0x92, 0x00, 0x00, 0x00,
}

Просмотреть файл

@ -0,0 +1,8 @@
syntax = "proto3";
message LogEntry {
string source = 1;
int64 time_nano = 2;
bytes line = 3;
bool partial = 4;
}

Просмотреть файл

@ -0,0 +1,3 @@
//go:generate protoc --gogofast_out=import_path=github.com/docker/docker/api/types/plugins/logdriver:. entry.proto
package logdriver

Просмотреть файл

@ -0,0 +1,87 @@
package logdriver
import (
"encoding/binary"
"io"
)
const binaryEncodeLen = 4
// LogEntryEncoder encodes a LogEntry to a protobuf stream
// The stream should look like:
//
// [uint32 binary encoded message size][protobuf message]
//
// To decode an entry, read the first 4 bytes to get the size of the entry,
// then read `size` bytes from the stream.
type LogEntryEncoder interface {
Encode(*LogEntry) error
}
// NewLogEntryEncoder creates a protobuf stream encoder for log entries.
// This is used to write out log entries to a stream.
func NewLogEntryEncoder(w io.Writer) LogEntryEncoder {
return &logEntryEncoder{
w: w,
buf: make([]byte, 1024),
}
}
type logEntryEncoder struct {
buf []byte
w io.Writer
}
func (e *logEntryEncoder) Encode(l *LogEntry) error {
n := l.Size()
total := n + binaryEncodeLen
if total > len(e.buf) {
e.buf = make([]byte, total)
}
binary.BigEndian.PutUint32(e.buf, uint32(n))
if _, err := l.MarshalTo(e.buf[binaryEncodeLen:]); err != nil {
return err
}
_, err := e.w.Write(e.buf[:total])
return err
}
// LogEntryDecoder decodes log entries from a stream
// It is expected that the wire format is as defined by LogEntryEncoder.
type LogEntryDecoder interface {
Decode(*LogEntry) error
}
// NewLogEntryDecoder creates a new stream decoder for log entries
func NewLogEntryDecoder(r io.Reader) LogEntryDecoder {
return &logEntryDecoder{
lenBuf: make([]byte, binaryEncodeLen),
buf: make([]byte, 1024),
r: r,
}
}
type logEntryDecoder struct {
r io.Reader
lenBuf []byte
buf []byte
}
func (d *logEntryDecoder) Decode(l *LogEntry) error {
_, err := io.ReadFull(d.r, d.lenBuf)
if err != nil {
return err
}
size := int(binary.BigEndian.Uint32(d.lenBuf))
if len(d.buf) < size {
d.buf = make([]byte, size)
}
if _, err := io.ReadFull(d.r, d.buf[:size]); err != nil {
return err
}
return l.Unmarshal(d.buf[:size])
}

Просмотреть файл

@ -27,6 +27,7 @@ import (
"github.com/docker/docker/daemon/discovery"
"github.com/docker/docker/daemon/events"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/daemon/logger"
// register graph drivers
_ "github.com/docker/docker/daemon/graphdriver/register"
"github.com/docker/docker/daemon/initlayer"
@ -589,6 +590,7 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe
d.RegistryService = registryService
d.PluginStore = pluginStore
logger.RegisterPluginGetter(d.PluginStore)
// Plugin system initialization should happen before restore. Do not change order.
d.pluginManager, err = plugin.NewManager(plugin.ManagerConfig{

135
daemon/logger/adapter.go Normal file
Просмотреть файл

@ -0,0 +1,135 @@
package logger
import (
"io"
"os"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/types/plugins/logdriver"
"github.com/docker/docker/pkg/plugingetter"
"github.com/pkg/errors"
)
// pluginAdapter takes a plugin and implements the Logger interface for logger
// instances
type pluginAdapter struct {
driverName string
id string
plugin logPlugin
fifoPath string
capabilities Capability
logInfo Info
// synchronize access to the log stream and shared buffer
mu sync.Mutex
enc logdriver.LogEntryEncoder
stream io.WriteCloser
// buf is shared for each `Log()` call to reduce allocations.
// buf must be protected by mutex
buf logdriver.LogEntry
}
func (a *pluginAdapter) Log(msg *Message) error {
a.mu.Lock()
a.buf.Line = msg.Line
a.buf.TimeNano = msg.Timestamp.UnixNano()
a.buf.Partial = msg.Partial
a.buf.Source = msg.Source
err := a.enc.Encode(&a.buf)
a.buf.Reset()
a.mu.Unlock()
PutMessage(msg)
return err
}
func (a *pluginAdapter) Name() string {
return a.driverName
}
func (a *pluginAdapter) Close() error {
a.mu.Lock()
defer a.mu.Unlock()
if err := a.plugin.StopLogging(a.fifoPath); err != nil {
return err
}
if err := a.stream.Close(); err != nil {
logrus.WithError(err).Error("error closing plugin fifo")
}
if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) {
logrus.WithError(err).Error("error cleaning up plugin fifo")
}
// may be nil, especially for unit tests
if pluginGetter != nil {
pluginGetter.Get(a.Name(), extName, plugingetter.Release)
}
return nil
}
type pluginAdapterWithRead struct {
*pluginAdapter
}
func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
watcher := NewLogWatcher()
go func() {
defer close(watcher.Msg)
stream, err := a.plugin.ReadLogs(a.logInfo, config)
if err != nil {
watcher.Err <- errors.Wrap(err, "error getting log reader")
return
}
defer stream.Close()
dec := logdriver.NewLogEntryDecoder(stream)
for {
select {
case <-watcher.WatchClose():
return
default:
}
var buf logdriver.LogEntry
if err := dec.Decode(&buf); err != nil {
if err == io.EOF {
return
}
select {
case watcher.Err <- errors.Wrap(err, "error decoding log message"):
case <-watcher.WatchClose():
}
return
}
msg := &Message{
Timestamp: time.Unix(0, buf.TimeNano),
Line: buf.Line,
Source: buf.Source,
}
// plugin should handle this, but check just in case
if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
continue
}
select {
case watcher.Msg <- msg:
case <-watcher.WatchClose():
// make sure the message we consumed is sent
watcher.Msg <- msg
return
}
}
}()
return watcher
}

Просмотреть файл

@ -0,0 +1,208 @@
package logger
import (
"bytes"
"encoding/binary"
"io"
"io/ioutil"
"os"
"runtime"
"testing"
"time"
"github.com/docker/docker/api/types/plugins/logdriver"
protoio "github.com/gogo/protobuf/io"
)
// mockLoggingPlugin implements the loggingPlugin interface for testing purposes
// it only supports a single log stream
type mockLoggingPlugin struct {
inStream io.ReadCloser
f *os.File
closed chan struct{}
t *testing.T
}
func (l *mockLoggingPlugin) StartLogging(file string, info Info) error {
go func() {
io.Copy(l.f, l.inStream)
close(l.closed)
}()
return nil
}
func (l *mockLoggingPlugin) StopLogging(file string) error {
l.inStream.Close()
l.f.Close()
os.Remove(l.f.Name())
return nil
}
func (l *mockLoggingPlugin) Capabilities() (cap Capability, err error) {
return Capability{ReadLogs: true}, nil
}
func (l *mockLoggingPlugin) ReadLogs(info Info, config ReadConfig) (io.ReadCloser, error) {
r, w := io.Pipe()
f, err := os.Open(l.f.Name())
if err != nil {
return nil, err
}
go func() {
defer f.Close()
dec := protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6)
enc := logdriver.NewLogEntryEncoder(w)
for {
select {
case <-l.closed:
w.Close()
return
default:
}
var msg logdriver.LogEntry
if err := dec.ReadMsg(&msg); err != nil {
if err == io.EOF {
if !config.Follow {
w.Close()
return
}
dec = protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6)
continue
}
l.t.Fatal(err)
continue
}
if err := enc.Encode(&msg); err != nil {
w.CloseWithError(err)
return
}
}
}()
return r, nil
}
func newMockPluginAdapter(t *testing.T) Logger {
r, w := io.Pipe()
f, err := ioutil.TempFile("", "mock-plugin-adapter")
if err != nil {
t.Fatal(err)
}
enc := logdriver.NewLogEntryEncoder(w)
a := &pluginAdapterWithRead{
&pluginAdapter{
plugin: &mockLoggingPlugin{
inStream: r,
f: f,
closed: make(chan struct{}),
t: t,
},
stream: w,
enc: enc,
},
}
a.plugin.StartLogging("", Info{})
return a
}
func TestAdapterReadLogs(t *testing.T) {
l := newMockPluginAdapter(t)
testMsg := []Message{
{Line: []byte("Are you the keymaker?"), Timestamp: time.Now()},
{Line: []byte("Follow the white rabbit"), Timestamp: time.Now()},
}
for _, msg := range testMsg {
m := msg.copy()
if err := l.Log(m); err != nil {
t.Fatal(err)
}
}
lr, ok := l.(LogReader)
if !ok {
t.Fatal("expected log reader")
}
lw := lr.ReadLogs(ReadConfig{})
for _, x := range testMsg {
select {
case msg := <-lw.Msg:
testMessageEqual(t, &x, msg)
case <-time.After(10 * time.Millisecond):
t.Fatal("timeout reading logs")
}
}
select {
case _, ok := <-lw.Msg:
if ok {
t.Fatal("expected message channel to be closed")
}
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for message channel to close")
}
lw.Close()
lw = lr.ReadLogs(ReadConfig{Follow: true})
for _, x := range testMsg {
select {
case msg := <-lw.Msg:
testMessageEqual(t, &x, msg)
case <-time.After(10 * time.Second):
t.Fatal("timeout reading logs")
}
}
x := Message{Line: []byte("Too infinity and beyond!"), Timestamp: time.Now()}
if err := l.Log(x.copy()); err != nil {
t.Fatal(err)
}
select {
case msg, ok := <-lw.Msg:
if !ok {
t.Fatal("message channel unexpectedly closed")
}
testMessageEqual(t, &x, msg)
case <-time.After(10 * time.Second):
t.Fatal("timeout reading logs")
}
l.Close()
select {
case msg, ok := <-lw.Msg:
if ok {
t.Fatal("expected message channel to be closed")
}
if msg != nil {
t.Fatal("expected nil message")
}
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for logger to close")
}
}
func testMessageEqual(t *testing.T, a, b *Message) {
_, _, n, _ := runtime.Caller(1)
errFmt := "line %d: expected same messages:\nwant: %+v\nhave: %+v"
if !bytes.Equal(a.Line, b.Line) {
t.Fatalf(errFmt, n, *a, *b)
}
if a.Timestamp.UnixNano() != b.Timestamp.UnixNano() {
t.Fatalf(errFmt, n, *a, *b)
}
if a.Source != b.Source {
t.Fatalf(errFmt, n, *a, *b)
}
}

Просмотреть файл

@ -5,6 +5,7 @@ import (
"sync"
containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/pkg/plugingetter"
units "github.com/docker/go-units"
"github.com/pkg/errors"
)
@ -37,6 +38,13 @@ func (lf *logdriverFactory) driverRegistered(name string) bool {
lf.m.Lock()
_, ok := lf.registry[name]
lf.m.Unlock()
if !ok {
if pluginGetter != nil { // this can be nil when the init functions are running
if l, _ := getPlugin(name, plugingetter.Lookup); l != nil {
return true
}
}
}
return ok
}
@ -56,10 +64,12 @@ func (lf *logdriverFactory) get(name string) (Creator, error) {
defer lf.m.Unlock()
c, ok := lf.registry[name]
if !ok {
return c, fmt.Errorf("logger: no log driver named '%s' is registered", name)
if ok {
return c, nil
}
return c, nil
c, err := getPlugin(name, plugingetter.Acquire)
return c, errors.Wrapf(err, "logger: no log driver named '%s' is registered", name)
}
func (lf *logdriverFactory) getLogOptValidator(name string) LogOptValidator {

Просмотреть файл

@ -126,3 +126,11 @@ func (w *LogWatcher) Close() {
func (w *LogWatcher) WatchClose() <-chan struct{} {
return w.closeNotifier
}
// Capability defines the list of capabilties that a driver can implement
// These capabilities are not required to be a logging driver, however do
// determine how a logging driver can be used
type Capability struct {
// Determines if a log driver can read back logs
ReadLogs bool
}

Просмотреть файл

@ -0,0 +1,19 @@
package logger
func (m *Message) copy() *Message {
msg := &Message{
Source: m.Source,
Partial: m.Partial,
Timestamp: m.Timestamp,
}
if m.Attrs != nil {
msg.Attrs = make(map[string]string, len(m.Attrs))
for k, v := range m.Attrs {
msg.Attrs[k] = v
}
}
msg.Line = append(make([]byte, 0, len(m.Line)), m.Line...)
return msg
}

89
daemon/logger/plugin.go Normal file
Просмотреть файл

@ -0,0 +1,89 @@
package logger
import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/docker/docker/api/types/plugins/logdriver"
getter "github.com/docker/docker/pkg/plugingetter"
"github.com/docker/docker/pkg/stringid"
"github.com/pkg/errors"
)
var pluginGetter getter.PluginGetter
const extName = "LogDriver"
// logPlugin defines the available functions that logging plugins must implement.
type logPlugin interface {
StartLogging(streamPath string, info Info) (err error)
StopLogging(streamPath string) (err error)
Capabilities() (cap Capability, err error)
ReadLogs(info Info, config ReadConfig) (stream io.ReadCloser, err error)
}
// RegisterPluginGetter sets the plugingetter
func RegisterPluginGetter(plugingetter getter.PluginGetter) {
pluginGetter = plugingetter
}
// GetDriver returns a logging driver by its name.
// If the driver is empty, it looks for the local driver.
func getPlugin(name string, mode int) (Creator, error) {
p, err := pluginGetter.Get(name, extName, mode)
if err != nil {
return nil, fmt.Errorf("error looking up logging plugin %s: %v", name, err)
}
d := &logPluginProxy{p.Client()}
return makePluginCreator(name, d, p.BasePath()), nil
}
func makePluginCreator(name string, l *logPluginProxy, basePath string) Creator {
return func(logCtx Info) (logger Logger, err error) {
defer func() {
if err != nil {
pluginGetter.Get(name, extName, getter.Release)
}
}()
root := filepath.Join(basePath, "run", "docker", "logging")
if err := os.MkdirAll(root, 0700); err != nil {
return nil, err
}
id := stringid.GenerateNonCryptoID()
a := &pluginAdapter{
driverName: name,
id: id,
plugin: l,
fifoPath: filepath.Join(root, id),
logInfo: logCtx,
}
cap, err := a.plugin.Capabilities()
if err == nil {
a.capabilities = cap
}
stream, err := openPluginStream(a)
if err != nil {
return nil, err
}
a.stream = stream
a.enc = logdriver.NewLogEntryEncoder(a.stream)
if err := l.StartLogging(strings.TrimPrefix(a.fifoPath, basePath), logCtx); err != nil {
return nil, errors.Wrapf(err, "error creating logger")
}
if cap.ReadLogs {
return &pluginAdapterWithRead{a}, nil
}
return a, nil
}
}

Просмотреть файл

@ -0,0 +1,20 @@
// +build linux solaris freebsd
package logger
import (
"context"
"io"
"github.com/pkg/errors"
"github.com/tonistiigi/fifo"
"golang.org/x/sys/unix"
)
func openPluginStream(a *pluginAdapter) (io.WriteCloser, error) {
f, err := fifo.OpenFifo(context.Background(), a.fifoPath, unix.O_WRONLY|unix.O_CREAT|unix.O_NONBLOCK, 0700)
if err != nil {
return nil, errors.Wrapf(err, "error creating i/o pipe for log plugin: %s", a.Name())
}
return f, nil
}

Просмотреть файл

@ -0,0 +1,12 @@
// +build !linux,!solaris,!freebsd
package logger
import (
"errors"
"io"
)
func openPluginStream(a *pluginAdapter) (io.WriteCloser, error) {
return nil, errors.New("log plugin not supported")
}

107
daemon/logger/proxy.go Normal file
Просмотреть файл

@ -0,0 +1,107 @@
package logger
import (
"errors"
"io"
)
type client interface {
Call(string, interface{}, interface{}) error
Stream(string, interface{}) (io.ReadCloser, error)
}
type logPluginProxy struct {
client
}
type logPluginProxyStartLoggingRequest struct {
File string
Info Info
}
type logPluginProxyStartLoggingResponse struct {
Err string
}
func (pp *logPluginProxy) StartLogging(file string, info Info) (err error) {
var (
req logPluginProxyStartLoggingRequest
ret logPluginProxyStartLoggingResponse
)
req.File = file
req.Info = info
if err = pp.Call("LogDriver.StartLogging", req, &ret); err != nil {
return
}
if ret.Err != "" {
err = errors.New(ret.Err)
}
return
}
type logPluginProxyStopLoggingRequest struct {
File string
}
type logPluginProxyStopLoggingResponse struct {
Err string
}
func (pp *logPluginProxy) StopLogging(file string) (err error) {
var (
req logPluginProxyStopLoggingRequest
ret logPluginProxyStopLoggingResponse
)
req.File = file
if err = pp.Call("LogDriver.StopLogging", req, &ret); err != nil {
return
}
if ret.Err != "" {
err = errors.New(ret.Err)
}
return
}
type logPluginProxyCapabilitiesResponse struct {
Cap Capability
Err string
}
func (pp *logPluginProxy) Capabilities() (cap Capability, err error) {
var (
ret logPluginProxyCapabilitiesResponse
)
if err = pp.Call("LogDriver.Capabilities", nil, &ret); err != nil {
return
}
cap = ret.Cap
if ret.Err != "" {
err = errors.New(ret.Err)
}
return
}
type logPluginProxyReadLogsRequest struct {
Info Info
Config ReadConfig
}
func (pp *logPluginProxy) ReadLogs(info Info, config ReadConfig) (stream io.ReadCloser, err error) {
var (
req logPluginProxyReadLogsRequest
)
req.Info = info
req.Config = config
return pp.Stream("LogDriver.ReadLogs", req)
}

Просмотреть файл

@ -59,6 +59,8 @@ Config provides the base accessible fields for working with V0 plugin format
- **docker.authz/1.0**
- **docker.logdriver/1.0**
- **`socket`** *string*
socket is the name of the socket the engine should use to communicate with the plugins.

Просмотреть файл

@ -0,0 +1,220 @@
---
title: "Docker log driver plugins"
description: "Log driver plugins."
keywords: "Examples, Usage, plugins, docker, documentation, user guide, logging"
---
<!-- This file is maintained within the docker/docker Github
repository at https://github.com/docker/docker/. Make all
pull requests against that repo. If you see this file in
another repository, consider it read-only there, as it will
periodically be overwritten by the definitive file. Pull
requests which include edits to this file in other repositories
will be rejected.
-->
# Logging driver plugins
This document describes logging driver plugins for Docker.
Logging drivers enables users to forward container logs to another service for
processing. Docker includes several logging drivers as built-ins, however can
never hope to support all use-cases with built-in drivers. Plugins allow Docker
to support a wide range of logging services without requiring to embed client
libraries for these services in the main Docker codebase. See the
[plugin documentation](legacy_plugins.md) for more information.
## Create a logging plugin
The main interface for logging plugins uses the same JSON+HTTP RPC protocol used
by other plugin types. See the
[example](https://github.com/cpuguy83/docker-log-driver-test) plugin for a
reference implementation of a logging plugin. The example wraps the built-in
`jsonfilelog` log driver.
## LogDriver protocol
Logging plugins must register as a `LogDriver` during plugin activation. Once
activated users can specify the plugin as a log driver.
There are two HTTP endpoints that logging plugins must implement:
### `/LogDriver.StartLogging`
Signals to the plugin that a container is starting that the plugin should start
receiving logs for.
Logs will be streamed over the defined file in the request. On Linux this file
is a FIFO. Logging plugins are not currently supported on Windows.
**Request**:
```json
{
"File": "/path/to/file/stream",
"Info": {
"ContainerID": "123456"
}
}
```
`File` is the path to the log stream that needs to be consumed. Each call to
`StartLogging` should provide a different file path, even if it's a container
that the plugin has already received logs for prior. The file is created by
docker with a randomly generated name.
`Info` is details about the container that's being logged. This is fairly
free-form, but is defined by the following struct definition:
```go
type Info struct {
Config map[string]string
ContainerID string
ContainerName string
ContainerEntrypoint string
ContainerArgs []string
ContainerImageID string
ContainerImageName string
ContainerCreated time.Time
ContainerEnv []string
ContainerLabels map[string]string
LogPath string
DaemonName string
}
```
`ContainerID` will always be supplied with this struct, but other fields may be
empty or missing.
**Response**
```json
{
"Err": ""
}
```
If an error occurred during this request, add an error message to the `Err` field
in the response. If no error then you can either send an empty response (`{}`)
or an empty value for the `Err` field.
The driver should at this point be consuming log messages from the passed in file.
If messages are unconsumed, it may cause the contaier to block while trying to
write to its stdio streams.
Log stream messages are encoded as protocol buffers. The protobuf definitions are
in the
[docker repository](https://github.com/docker/docker/blob/master/api/types/plugins/logdriver/entry.proto).
Since protocol buffers are not self-delimited you must decode them from the stream
using the following stream format:
```
[size][message]
```
Where `size` is a 4-byte big endian binary encoded uint32. `size` in this case
defines the size of the next message. `message` is the actual log entry.
A reference golang implementation of a stream encoder/decoder can be found
[here](https://github.com/docker/docker/blob/master/api/types/plugins/logdriver/io.go)
### `/LogDriver.StopLogging`
Signals to the plugin to stop collecting logs from the defined file.
Once a response is received, the file will be removed by Docker. You must make
sure to collect all logs on the stream before responding to this request or risk
losing log data.
Requests on this endpoint does not mean that the container has been removed
only that it has stopped.
**Request**:
```json
{
"File": "/path/to/file/stream"
}
```
**Response**:
```json
{
"Err": ""
}
```
If an error occurred during this request, add an error message to the `Err` field
in the response. If no error then you can either send an empty response (`{}`)
or an empty value for the `Err` field.
## Optional endpoints
Logging plugins can implement two extra logging endpoints:
### `/LogDriver.Capabilities`
Defines the capabilities of the log driver. You must implement this endpoint for
Docker to be able to take advantage of any of the defined capabilities.
**Request**:
```json
{}
```
**Response**:
```json
{
"ReadLogs": true
}
```
Supported capabilities:
- `ReadLogs` - this tells Docker that the plugin is capable of reading back logs
to clients. Plugins that report that they support `ReadLogs` must implement the
`/LogDriver.ReadLogs` endpoint
### `/LogDriver.ReadLogs`
Reads back logs to the client. This is used when `docker logs <container>` is
called.
In order for Docker to use this endpoint, the plugin must specify as much when
`/LogDriver.Capabilities` is called.
**Request**:
```json
{
"ReadConfig": {},
"Info": {
"ContainerID": "123456"
}
}
```
`ReadConfig` is the list of options for reading, it is defined with the following
golang struct:
```go
type ReadConfig struct {
Since time.Time
Tail int
Follow bool
}
```
- `Since` defines the oldest log that should be sent.
- `Tail` defines the number of lines to read (e.g. like the command `tail -n 10`)
- `Follow` signals that the client wants to stay attached to receive new log messages
as they come in once the existing logs have been read.
`Info` is the same type defined in `/LogDriver.StartLogging`. It should be used
to determine what set of logs to read.
**Response**:
```
{{ log stream }}
```
The response should be the encoded log message using the same format as the
messages that the plugin consumed from Docker.

Просмотреть файл

@ -4,7 +4,7 @@ export SCRIPTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
source "${SCRIPTDIR}/.validate"
IFS=$'\n'
files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' | grep -v '^vendor/' | grep -v '^api/types/container/' | grep -v '^cli/compose/schema/bindata.go' || true) )
files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' | grep -v '^vendor/' | grep -v '^api/types/container/' | grep -v '^cli/compose/schema/bindata.go' | grep -v '^api/types/plugins/logdriver/entry.pb.go' || true) )
unset IFS
errors=()

Просмотреть файл

@ -0,0 +1,27 @@
package main
import (
"strings"
"github.com/docker/docker/integration-cli/checker"
"github.com/go-check/check"
)
func (s *DockerSuite) TestPluginLogDriver(c *check.C) {
testRequires(c, IsAmd64, DaemonIsLinux)
pluginName := "cpuguy83/docker-logdriver-test:latest"
dockerCmd(c, "plugin", "install", pluginName)
dockerCmd(c, "run", "--log-driver", pluginName, "--name=test", "busybox", "echo", "hello")
out, _ := dockerCmd(c, "logs", "test")
c.Assert(strings.TrimSpace(out), checker.Equals, "hello")
dockerCmd(c, "start", "-a", "test")
out, _ = dockerCmd(c, "logs", "test")
c.Assert(strings.TrimSpace(out), checker.Equals, "hello\nhello")
dockerCmd(c, "rm", "test")
dockerCmd(c, "plugin", "disable", pluginName)
dockerCmd(c, "plugin", "rm", pluginName)
}

102
vendor/github.com/gogo/protobuf/io/full.go сгенерированный поставляемый Normal file
Просмотреть файл

@ -0,0 +1,102 @@
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package io
import (
"github.com/gogo/protobuf/proto"
"io"
)
func NewFullWriter(w io.Writer) WriteCloser {
return &fullWriter{w, nil}
}
type fullWriter struct {
w io.Writer
buffer []byte
}
func (this *fullWriter) WriteMsg(msg proto.Message) (err error) {
var data []byte
if m, ok := msg.(marshaler); ok {
n, ok := getSize(m)
if !ok {
data, err = proto.Marshal(msg)
if err != nil {
return err
}
}
if n >= len(this.buffer) {
this.buffer = make([]byte, n)
}
_, err = m.MarshalTo(this.buffer)
if err != nil {
return err
}
data = this.buffer[:n]
} else {
data, err = proto.Marshal(msg)
if err != nil {
return err
}
}
_, err = this.w.Write(data)
return err
}
func (this *fullWriter) Close() error {
if closer, ok := this.w.(io.Closer); ok {
return closer.Close()
}
return nil
}
type fullReader struct {
r io.Reader
buf []byte
}
func NewFullReader(r io.Reader, maxSize int) ReadCloser {
return &fullReader{r, make([]byte, maxSize)}
}
func (this *fullReader) ReadMsg(msg proto.Message) error {
length, err := this.r.Read(this.buf)
if err != nil {
return err
}
return proto.Unmarshal(this.buf[:length], msg)
}
func (this *fullReader) Close() error {
if closer, ok := this.r.(io.Closer); ok {
return closer.Close()
}
return nil
}

70
vendor/github.com/gogo/protobuf/io/io.go сгенерированный поставляемый Normal file
Просмотреть файл

@ -0,0 +1,70 @@
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package io
import (
"github.com/gogo/protobuf/proto"
"io"
)
type Writer interface {
WriteMsg(proto.Message) error
}
type WriteCloser interface {
Writer
io.Closer
}
type Reader interface {
ReadMsg(msg proto.Message) error
}
type ReadCloser interface {
Reader
io.Closer
}
type marshaler interface {
MarshalTo(data []byte) (n int, err error)
}
func getSize(v interface{}) (int, bool) {
if sz, ok := v.(interface {
Size() (n int)
}); ok {
return sz.Size(), true
} else if sz, ok := v.(interface {
ProtoSize() (n int)
}); ok {
return sz.ProtoSize(), true
} else {
return 0, false
}
}

126
vendor/github.com/gogo/protobuf/io/uint32.go сгенерированный поставляемый Normal file
Просмотреть файл

@ -0,0 +1,126 @@
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package io
import (
"encoding/binary"
"github.com/gogo/protobuf/proto"
"io"
)
func NewUint32DelimitedWriter(w io.Writer, byteOrder binary.ByteOrder) WriteCloser {
return &uint32Writer{w, byteOrder, nil}
}
func NewSizeUint32DelimitedWriter(w io.Writer, byteOrder binary.ByteOrder, size int) WriteCloser {
return &uint32Writer{w, byteOrder, make([]byte, size)}
}
type uint32Writer struct {
w io.Writer
byteOrder binary.ByteOrder
buffer []byte
}
func (this *uint32Writer) WriteMsg(msg proto.Message) (err error) {
var data []byte
if m, ok := msg.(marshaler); ok {
n, ok := getSize(m)
if !ok {
data, err = proto.Marshal(msg)
if err != nil {
return err
}
}
if n >= len(this.buffer) {
this.buffer = make([]byte, n)
}
_, err = m.MarshalTo(this.buffer)
if err != nil {
return err
}
data = this.buffer[:n]
} else {
data, err = proto.Marshal(msg)
if err != nil {
return err
}
}
length := uint32(len(data))
if err = binary.Write(this.w, this.byteOrder, &length); err != nil {
return err
}
_, err = this.w.Write(data)
return err
}
func (this *uint32Writer) Close() error {
if closer, ok := this.w.(io.Closer); ok {
return closer.Close()
}
return nil
}
type uint32Reader struct {
r io.Reader
byteOrder binary.ByteOrder
lenBuf []byte
buf []byte
maxSize int
}
func NewUint32DelimitedReader(r io.Reader, byteOrder binary.ByteOrder, maxSize int) ReadCloser {
return &uint32Reader{r, byteOrder, make([]byte, 4), nil, maxSize}
}
func (this *uint32Reader) ReadMsg(msg proto.Message) error {
if _, err := io.ReadFull(this.r, this.lenBuf); err != nil {
return err
}
length32 := this.byteOrder.Uint32(this.lenBuf)
length := int(length32)
if length < 0 || length > this.maxSize {
return io.ErrShortBuffer
}
if length >= len(this.buf) {
this.buf = make([]byte, length)
}
_, err := io.ReadFull(this.r, this.buf[:length])
if err != nil {
return err
}
return proto.Unmarshal(this.buf[:length], msg)
}
func (this *uint32Reader) Close() error {
if closer, ok := this.r.(io.Closer); ok {
return closer.Close()
}
return nil
}

134
vendor/github.com/gogo/protobuf/io/varint.go сгенерированный поставляемый Normal file
Просмотреть файл

@ -0,0 +1,134 @@
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package io
import (
"bufio"
"encoding/binary"
"errors"
"github.com/gogo/protobuf/proto"
"io"
)
var (
errSmallBuffer = errors.New("Buffer Too Small")
errLargeValue = errors.New("Value is Larger than 64 bits")
)
func NewDelimitedWriter(w io.Writer) WriteCloser {
return &varintWriter{w, make([]byte, 10), nil}
}
type varintWriter struct {
w io.Writer
lenBuf []byte
buffer []byte
}
func (this *varintWriter) WriteMsg(msg proto.Message) (err error) {
var data []byte
if m, ok := msg.(marshaler); ok {
n, ok := getSize(m)
if !ok {
data, err = proto.Marshal(msg)
if err != nil {
return err
}
}
if n >= len(this.buffer) {
this.buffer = make([]byte, n)
}
_, err = m.MarshalTo(this.buffer)
if err != nil {
return err
}
data = this.buffer[:n]
} else {
data, err = proto.Marshal(msg)
if err != nil {
return err
}
}
length := uint64(len(data))
n := binary.PutUvarint(this.lenBuf, length)
_, err = this.w.Write(this.lenBuf[:n])
if err != nil {
return err
}
_, err = this.w.Write(data)
return err
}
func (this *varintWriter) Close() error {
if closer, ok := this.w.(io.Closer); ok {
return closer.Close()
}
return nil
}
func NewDelimitedReader(r io.Reader, maxSize int) ReadCloser {
var closer io.Closer
if c, ok := r.(io.Closer); ok {
closer = c
}
return &varintReader{bufio.NewReader(r), nil, maxSize, closer}
}
type varintReader struct {
r *bufio.Reader
buf []byte
maxSize int
closer io.Closer
}
func (this *varintReader) ReadMsg(msg proto.Message) error {
length64, err := binary.ReadUvarint(this.r)
if err != nil {
return err
}
length := int(length64)
if length < 0 || length > this.maxSize {
return io.ErrShortBuffer
}
if len(this.buf) < length {
this.buf = make([]byte, length)
}
buf := this.buf[:length]
if _, err := io.ReadFull(this.r, buf); err != nil {
return err
}
return proto.Unmarshal(buf, msg)
}
func (this *varintReader) Close() error {
if this.closer != nil {
return this.closer.Close()
}
return nil
}