sinks: address close race condition

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2016-08-19 14:52:11 -07:00
Родитель afb2b9f2c2
Коммит eb6518f25a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: FB5F6B2905D7ECF3
7 изменённых файлов: 40 добавлений и 39 удалений

Просмотреть файл

@ -1,6 +1,10 @@
package events
import "github.com/Sirupsen/logrus"
import (
"sync"
"github.com/Sirupsen/logrus"
)
// Broadcaster sends events to multiple, reliable Sinks. The goal of this
// component is to dispatch events to configured endpoints. Reliability can be
@ -10,7 +14,10 @@ type Broadcaster struct {
events chan Event
adds chan configureRequest
removes chan configureRequest
closed chan chan struct{}
shutdown chan struct{}
closed chan struct{}
once sync.Once
}
// NewBroadcaster appends one or more sinks to the list of sinks. The
@ -19,11 +26,12 @@ type Broadcaster struct {
// its own. Use of EventQueue and RetryingSink should be used here.
func NewBroadcaster(sinks ...Sink) *Broadcaster {
b := Broadcaster{
sinks: sinks,
events: make(chan Event),
adds: make(chan configureRequest),
removes: make(chan configureRequest),
closed: make(chan chan struct{}),
sinks: sinks,
events: make(chan Event),
adds: make(chan configureRequest),
removes: make(chan configureRequest),
shutdown: make(chan struct{}),
closed: make(chan struct{}),
}
// Start the broadcaster
@ -82,24 +90,19 @@ func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error {
// Close the broadcaster, ensuring that all messages are flushed to the
// underlying sink before returning.
func (b *Broadcaster) Close() error {
select {
case <-b.closed:
// already closed
return ErrSinkClosed
default:
// do a little chan handoff dance to synchronize closing
closed := make(chan struct{})
b.closed <- closed
close(b.closed)
<-closed
return nil
}
b.once.Do(func() {
close(b.shutdown)
})
<-b.closed
return nil
}
// run is the main broadcast loop, started when the broadcaster is created.
// Under normal conditions, it waits for events on the event channel. After
// Close is called, this goroutine will exit.
func (b *Broadcaster) run() {
defer close(b.closed)
remove := func(target Sink) {
for i, sink := range b.sinks {
if sink == target {
@ -143,7 +146,7 @@ func (b *Broadcaster) run() {
case request := <-b.removes:
remove(request.sink)
request.response <- nil
case closing := <-b.closed:
case <-b.shutdown:
// close all the underlying sinks
for _, sink := range b.sinks {
if err := sink.Close(); err != nil && err != ErrSinkClosed {
@ -151,7 +154,6 @@ func (b *Broadcaster) run() {
Errorf("broadcaster: closing sink failed")
}
}
closing <- struct{}{}
return
}
}

Просмотреть файл

@ -1,5 +1,7 @@
package events
import "sync"
// Channel provides a sink that can be listened on. The writer and channel
// listener must operate in separate goroutines.
//
@ -8,6 +10,7 @@ type Channel struct {
C chan Event
closed chan struct{}
once sync.Once
}
// NewChannel returns a channel. If buffer is zero, the channel is
@ -37,11 +40,9 @@ func (ch *Channel) Write(event Event) error {
// Close the channel sink.
func (ch *Channel) Close() error {
select {
case <-ch.closed:
return ErrSinkClosed
default:
ch.once.Do(func() {
close(ch.closed)
return nil
}
})
return nil
}

Просмотреть файл

@ -46,7 +46,7 @@ loop:
}
sink.Close()
_, ok := <-sink.Done() // test will timeout if this hands
_, ok := <-sink.Done() // test will timeout if this hangs
if ok {
t.Fatalf("done should be a closed channel")
}
@ -54,5 +54,4 @@ loop:
if received != nevents {
t.Fatalf("events did not make it through sink: %v != %v", received, nevents)
}
}

Просмотреть файл

@ -96,8 +96,8 @@ func checkClose(t *testing.T, sink Sink) {
}
// second close should not crash but should return an error.
if err := sink.Close(); err == nil {
t.Fatalf("no error on double close")
if err := sink.Close(); err != nil {
t.Fatalf("unexpected error on double close: %v", err)
}
// Write after closed should be an error

Просмотреть файл

@ -44,7 +44,7 @@ func (f *Filter) Write(event Event) error {
func (f *Filter) Close() error {
// TODO(stevvooe): Not all sinks should have Close.
if f.closed {
return ErrSinkClosed
return nil
}
f.closed = true

Просмотреть файл

@ -52,7 +52,7 @@ func (eq *Queue) Close() error {
defer eq.mu.Unlock()
if eq.closed {
return ErrSinkClosed
return nil
}
// set closed flag

Просмотреть файл

@ -18,6 +18,7 @@ type RetryingSink struct {
sink Sink
strategy RetryStrategy
closed chan struct{}
once sync.Once
}
// NewRetryingSink returns a sink that will retry writes to a sink, backing
@ -81,13 +82,11 @@ retry:
// Close closes the sink and the underlying sink.
func (rs *RetryingSink) Close() error {
select {
case <-rs.closed:
return ErrSinkClosed
default:
rs.once.Do(func() {
close(rs.closed)
return rs.sink.Close()
}
})
return nil
}
// RetryStrategy defines a strategy for retrying event sink writes.