go-events/broadcast.go

179 строки
4.1 KiB
Go
Исходник Обычный вид История

package events
import (
Don't include serialization of sink in log messages Some log messages add an events.sink field whose value is the actual sink object. Trying to log this can cause a data race, because some sinks contain a sync.Once field: WARNING: DATA RACE Read at 0x00c42004beb0 by goroutine 75: reflect.Value.Int() /usr/local/go/src/reflect/value.go:886 +0x192 fmt.(*pp).printValue() /usr/local/go/src/fmt/print.go:703 +0x39f6 fmt.(*pp).printValue() /usr/local/go/src/fmt/print.go:764 +0x2d45 fmt.(*pp).printValue() /usr/local/go/src/fmt/print.go:764 +0x2d45 fmt.(*pp).printValue() /usr/local/go/src/fmt/print.go:764 +0x2d45 fmt.(*pp).printValue() /usr/local/go/src/fmt/print.go:835 +0x2448 fmt.(*pp).printArg() /usr/local/go/src/fmt/print.go:668 +0x1a3 fmt.(*pp).doPrint() /usr/local/go/src/fmt/print.go:1113 +0xe3 fmt.Fprint() /usr/local/go/src/fmt/print.go:215 +0x69 github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus.(*TextFormatter).appendKeyValue() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus/text_formatter.go:157 +0x134 github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus.(*TextFormatter).Format() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus/text_formatter.go:91 +0x6e6 github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus.(*Entry).Reader() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus/entry.go:44 +0x73 github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus.Entry.log() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus/entry.go:94 +0x1f6 github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus.(*Entry).Debug() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus/entry.go:119 +0x106 github.com/docker/swarmkit/vendor/github.com/docker/go-events.(*Queue).run() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/vendor/github.com/docker/go-events/queue.go:85 +0x2eb Previous write at 0x00c42004beb0 by goroutine 67: sync/atomic.AddInt32() /usr/local/go/src/runtime/race_amd64.s:269 +0xb sync.(*Mutex).Unlock() /usr/local/go/src/sync/mutex.go:109 +0x54 sync.(*Once).Do() /usr/local/go/src/sync/once.go:46 +0xa6 github.com/docker/swarmkit/vendor/github.com/docker/go-events.(*Channel).Close() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/vendor/github.com/docker/go-events/channel.go:45 +0x79 github.com/docker/swarmkit/manager/state/watch.(*Queue).CallbackWatch.func1() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/manager/state/watch/watch.go:48 +0x9d github.com/docker/swarmkit/manager/state/watch.(*Queue).CallbackWatch.func2() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/manager/state/watch/watch.go:62 +0x126 github.com/docker/swarmkit/ca.(*Server).Run() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/ca/server.go:380 +0xb25 To avoid the data race, add String methods to objects that aren't safe to serialize directly. These serialize a copy that excludes the sync.Once. Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
2016-08-26 00:05:09 +03:00
"fmt"
"sync"
"github.com/Sirupsen/logrus"
)
// Broadcaster sends events to multiple, reliable Sinks. The goal of this
// component is to dispatch events to configured endpoints. Reliability can be
// provided by wrapping incoming sinks.
type Broadcaster struct {
sinks []Sink
events chan Event
adds chan configureRequest
removes chan configureRequest
shutdown chan struct{}
closed chan struct{}
once sync.Once
}
// NewBroadcaster appends one or more sinks to the list of sinks. The
// broadcaster behavior will be affected by the properties of the sink.
// Generally, the sink should accept all messages and deal with reliability on
// its own. Use of EventQueue and RetryingSink should be used here.
func NewBroadcaster(sinks ...Sink) *Broadcaster {
b := Broadcaster{
sinks: sinks,
events: make(chan Event),
adds: make(chan configureRequest),
removes: make(chan configureRequest),
shutdown: make(chan struct{}),
closed: make(chan struct{}),
}
// Start the broadcaster
go b.run()
return &b
}
// Write accepts an event to be dispatched to all sinks. This method will never
// fail and should never block (hopefully!). The caller cedes the memory to the
// broadcaster and should not modify it after calling write.
func (b *Broadcaster) Write(event Event) error {
select {
case b.events <- event:
case <-b.closed:
return ErrSinkClosed
}
return nil
}
// Add the sink to the broadcaster.
//
// The provided sink must be comparable with equality. Typically, this just
// works with a regular pointer type.
func (b *Broadcaster) Add(sink Sink) error {
return b.configure(b.adds, sink)
}
// Remove the provided sink.
func (b *Broadcaster) Remove(sink Sink) error {
return b.configure(b.removes, sink)
}
type configureRequest struct {
sink Sink
response chan error
}
func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error {
response := make(chan error, 1)
for {
select {
case ch <- configureRequest{
sink: sink,
response: response}:
ch = nil
case err := <-response:
return err
case <-b.closed:
return ErrSinkClosed
}
}
}
// Close the broadcaster, ensuring that all messages are flushed to the
// underlying sink before returning.
func (b *Broadcaster) Close() error {
b.once.Do(func() {
close(b.shutdown)
})
<-b.closed
return nil
}
// run is the main broadcast loop, started when the broadcaster is created.
// Under normal conditions, it waits for events on the event channel. After
// Close is called, this goroutine will exit.
func (b *Broadcaster) run() {
defer close(b.closed)
remove := func(target Sink) {
for i, sink := range b.sinks {
if sink == target {
b.sinks = append(b.sinks[:i], b.sinks[i+1:]...)
break
}
}
}
for {
select {
case event := <-b.events:
for _, sink := range b.sinks {
if err := sink.Write(event); err != nil {
if err == ErrSinkClosed {
// remove closed sinks
remove(sink)
continue
}
logrus.WithField("event", event).WithField("events.sink", sink).WithError(err).
Errorf("broadcaster: dropping event")
}
}
case request := <-b.adds:
// while we have to iterate for add/remove, common iteration for
// send is faster against slice.
var found bool
for _, sink := range b.sinks {
if request.sink == sink {
found = true
break
}
}
if !found {
b.sinks = append(b.sinks, request.sink)
}
// b.sinks[request.sink] = struct{}{}
request.response <- nil
case request := <-b.removes:
remove(request.sink)
request.response <- nil
case <-b.shutdown:
// close all the underlying sinks
for _, sink := range b.sinks {
if err := sink.Close(); err != nil && err != ErrSinkClosed {
logrus.WithField("events.sink", sink).WithError(err).
Errorf("broadcaster: closing sink failed")
}
}
return
}
}
}
Don't include serialization of sink in log messages Some log messages add an events.sink field whose value is the actual sink object. Trying to log this can cause a data race, because some sinks contain a sync.Once field: WARNING: DATA RACE Read at 0x00c42004beb0 by goroutine 75: reflect.Value.Int() /usr/local/go/src/reflect/value.go:886 +0x192 fmt.(*pp).printValue() /usr/local/go/src/fmt/print.go:703 +0x39f6 fmt.(*pp).printValue() /usr/local/go/src/fmt/print.go:764 +0x2d45 fmt.(*pp).printValue() /usr/local/go/src/fmt/print.go:764 +0x2d45 fmt.(*pp).printValue() /usr/local/go/src/fmt/print.go:764 +0x2d45 fmt.(*pp).printValue() /usr/local/go/src/fmt/print.go:835 +0x2448 fmt.(*pp).printArg() /usr/local/go/src/fmt/print.go:668 +0x1a3 fmt.(*pp).doPrint() /usr/local/go/src/fmt/print.go:1113 +0xe3 fmt.Fprint() /usr/local/go/src/fmt/print.go:215 +0x69 github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus.(*TextFormatter).appendKeyValue() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus/text_formatter.go:157 +0x134 github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus.(*TextFormatter).Format() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus/text_formatter.go:91 +0x6e6 github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus.(*Entry).Reader() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus/entry.go:44 +0x73 github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus.Entry.log() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus/entry.go:94 +0x1f6 github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus.(*Entry).Debug() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/vendor/github.com/Sirupsen/logrus/entry.go:119 +0x106 github.com/docker/swarmkit/vendor/github.com/docker/go-events.(*Queue).run() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/vendor/github.com/docker/go-events/queue.go:85 +0x2eb Previous write at 0x00c42004beb0 by goroutine 67: sync/atomic.AddInt32() /usr/local/go/src/runtime/race_amd64.s:269 +0xb sync.(*Mutex).Unlock() /usr/local/go/src/sync/mutex.go:109 +0x54 sync.(*Once).Do() /usr/local/go/src/sync/once.go:46 +0xa6 github.com/docker/swarmkit/vendor/github.com/docker/go-events.(*Channel).Close() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/vendor/github.com/docker/go-events/channel.go:45 +0x79 github.com/docker/swarmkit/manager/state/watch.(*Queue).CallbackWatch.func1() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/manager/state/watch/watch.go:48 +0x9d github.com/docker/swarmkit/manager/state/watch.(*Queue).CallbackWatch.func2() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/manager/state/watch/watch.go:62 +0x126 github.com/docker/swarmkit/ca.(*Server).Run() /home/ubuntu/.go_workspace/src/github.com/docker/swarmkit/ca/server.go:380 +0xb25 To avoid the data race, add String methods to objects that aren't safe to serialize directly. These serialize a copy that excludes the sync.Once. Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
2016-08-26 00:05:09 +03:00
func (b Broadcaster) String() string {
// Serialize copy of this broadcaster without the sync.once, to avoid
// a data race.
b2 := Broadcaster{
sinks: b.sinks,
events: b.events,
adds: b.adds,
removes: b.removes,
shutdown: b.shutdown,
closed: b.closed,
}
return fmt.Sprint(b2)
}