зеркало из https://github.com/docker/go-events.git
events: initial commit of the events package
The docker/distribution/notifications package has been groomed for general use. It now provides a tool for dispatching and routing of events. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Родитель
a713aa633d
Коммит
bb7a0c6805
|
@ -0,0 +1,110 @@
|
|||
# Docker Events Package
|
||||
|
||||
[![GoDoc](https://godoc.org/github.com/docker/go-events?status.svg)](https://godoc.org/github.com/docker/go-events)
|
||||
|
||||
The Docker `events` package implements a composable event distribition package.
|
||||
It provides helpers for commonly used patterns for use in Go code.
|
||||
|
||||
Originally created to implement the [notifications in Docker Registry
|
||||
2](https://github.com/docker/distribution/blob/master/docs/notifications.md),
|
||||
we've found the pattern to be useful in other applications. This package is
|
||||
most of the same code with slightly updated interfaces.
|
||||
|
||||
## Usage
|
||||
|
||||
The `events` package centers around a `Sink` type. Events are written with
|
||||
calls to `Sink.Write(event Event)`. Sinks can be wired up in various
|
||||
configurations to achieve interesting behavior.
|
||||
|
||||
The canonical example is that employed by the
|
||||
[docker/distribution/notifications](https://godoc.org/github.com/docker/distribution/notifications)
|
||||
package. Let's say we have a type `httpSink` where we'd like to queue
|
||||
notifications. As a rule, it should send a single http request and return an
|
||||
error if it fails:
|
||||
|
||||
```go
|
||||
func (h *httpSink) Write(event Event) error {
|
||||
p, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
body := bytes.NewReader(p)
|
||||
resp, err := h.client.Post(h.url, "application/json", body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.Status != 200 {
|
||||
return errors.New("unexpected status")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// implement (*httpSink).Close()
|
||||
```
|
||||
|
||||
With just that, we can start using components from this package. One can call
|
||||
`(*httpSink).Write` to send events as the body of a post request to a
|
||||
configured URL.
|
||||
|
||||
### Retries
|
||||
|
||||
HTTP can be unreliable. The first feature we'd like is to have some retry:
|
||||
|
||||
```go
|
||||
hs := newHTTPSink(/*...*/)
|
||||
retry := NewRetryingSink(hs, NewBreaker(5, time.Second))
|
||||
```
|
||||
|
||||
We now have a sink that will retry events against the `httpSink` until they
|
||||
succeed. The retry will backoff for one second after 5 consecutive failures
|
||||
using the breaker strategy.
|
||||
|
||||
### Queues
|
||||
|
||||
This isn't quite enough. We we want a sink that doesn't block while we are
|
||||
waiting for events to be sent. Let's add a `Queue`:
|
||||
|
||||
```go
|
||||
queue := NewQueue(retry)
|
||||
```
|
||||
|
||||
Now, we have an unbounded queue that will work through all events sent with
|
||||
`(*Queue).Write`. Events can be added asynchronously to the queue without
|
||||
blocking the current execution path. This is ideal for use in an http request.
|
||||
|
||||
### Broadcast
|
||||
|
||||
It usually turns out that you want to send to more than one listener. We can
|
||||
use `Broadcaster` to support this:
|
||||
|
||||
```go
|
||||
var broadcast = NewBroadcaster() // make it available somewhere in your application.
|
||||
broadcast.Add(queue) // add your queue!
|
||||
broadcast.Add(queue2) // and another!
|
||||
```
|
||||
|
||||
With the above, we can now call `broadcast.Write` in our http handlers and have
|
||||
all the events distributed to each queue. Because the events are queued, not
|
||||
listener blocks another.
|
||||
|
||||
### Extending
|
||||
|
||||
For the most part, the above is sufficient for a lot of applications. However,
|
||||
extending the above functionality can be done implementing your own `Sink`. The
|
||||
behavior and semantics of the sink can be completely dependent on the
|
||||
application requirements. The interface is provided below for reference:
|
||||
|
||||
```go
|
||||
type Sink {
|
||||
Write(Event) error
|
||||
Close() error
|
||||
}
|
||||
```
|
||||
|
||||
Application behavior can be controlled by how `Write` behaves. The examples
|
||||
above are designed to queue the message and return as quickly as possible.
|
||||
Other implementations may block until the event is committed to durable
|
||||
storage.
|
|
@ -0,0 +1,158 @@
|
|||
package events
|
||||
|
||||
import "github.com/Sirupsen/logrus"
|
||||
|
||||
// Broadcaster sends events to multiple, reliable Sinks. The goal of this
|
||||
// component is to dispatch events to configured endpoints. Reliability can be
|
||||
// provided by wrapping incoming sinks.
|
||||
type Broadcaster struct {
|
||||
sinks []Sink
|
||||
events chan Event
|
||||
adds chan configureRequest
|
||||
removes chan configureRequest
|
||||
closed chan chan struct{}
|
||||
}
|
||||
|
||||
// NewBroadcaster appends one or more sinks to the list of sinks. The
|
||||
// broadcaster behavior will be affected by the properties of the sink.
|
||||
// Generally, the sink should accept all messages and deal with reliability on
|
||||
// its own. Use of EventQueue and RetryingSink should be used here.
|
||||
func NewBroadcaster(sinks ...Sink) *Broadcaster {
|
||||
b := Broadcaster{
|
||||
sinks: sinks,
|
||||
events: make(chan Event),
|
||||
adds: make(chan configureRequest),
|
||||
removes: make(chan configureRequest),
|
||||
closed: make(chan chan struct{}),
|
||||
}
|
||||
|
||||
// Start the broadcaster
|
||||
go b.run()
|
||||
|
||||
return &b
|
||||
}
|
||||
|
||||
// Write accepts an event to be dispatched to all sinks. This method will never
|
||||
// fail and should never block (hopefully!). The caller cedes the memory to the
|
||||
// broadcaster and should not modify it after calling write.
|
||||
func (b *Broadcaster) Write(event Event) error {
|
||||
select {
|
||||
case b.events <- event:
|
||||
case <-b.closed:
|
||||
return ErrSinkClosed
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add the sink to the broadcaster.
|
||||
//
|
||||
// The provided sink must be comparable with equality. Typically, this just
|
||||
// works with a regular pointer type.
|
||||
func (b *Broadcaster) Add(sink Sink) error {
|
||||
return b.configure(b.adds, sink)
|
||||
}
|
||||
|
||||
// Remove the provided sink.
|
||||
func (b *Broadcaster) Remove(sink Sink) error {
|
||||
return b.configure(b.removes, sink)
|
||||
}
|
||||
|
||||
type configureRequest struct {
|
||||
sink Sink
|
||||
response chan error
|
||||
}
|
||||
|
||||
func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error {
|
||||
response := make(chan error, 1)
|
||||
|
||||
for {
|
||||
select {
|
||||
case ch <- configureRequest{
|
||||
sink: sink,
|
||||
response: response}:
|
||||
ch = nil
|
||||
case err := <-response:
|
||||
return err
|
||||
case <-b.closed:
|
||||
return ErrSinkClosed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close the broadcaster, ensuring that all messages are flushed to the
|
||||
// underlying sink before returning.
|
||||
func (b *Broadcaster) Close() error {
|
||||
select {
|
||||
case <-b.closed:
|
||||
// already closed
|
||||
return ErrSinkClosed
|
||||
default:
|
||||
// do a little chan handoff dance to synchronize closing
|
||||
closed := make(chan struct{})
|
||||
b.closed <- closed
|
||||
close(b.closed)
|
||||
<-closed
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// run is the main broadcast loop, started when the broadcaster is created.
|
||||
// Under normal conditions, it waits for events on the event channel. After
|
||||
// Close is called, this goroutine will exit.
|
||||
func (b *Broadcaster) run() {
|
||||
remove := func(target Sink) {
|
||||
for i, sink := range b.sinks {
|
||||
if sink == target {
|
||||
b.sinks = append(b.sinks[:i], b.sinks[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case event := <-b.events:
|
||||
for _, sink := range b.sinks {
|
||||
if err := sink.Write(event); err != nil {
|
||||
if err == ErrSinkClosed {
|
||||
// remove closed sinks
|
||||
remove(sink)
|
||||
continue
|
||||
}
|
||||
logrus.WithField("event", event).WithField("events.sink", sink).WithError(err).
|
||||
Errorf("broadcaster: dropping event")
|
||||
}
|
||||
}
|
||||
case request := <-b.adds:
|
||||
// while we have to iterate for add/remove, common iteration for
|
||||
// send is faster against slice.
|
||||
|
||||
var found bool
|
||||
for _, sink := range b.sinks {
|
||||
if request.sink == sink {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
b.sinks = append(b.sinks, request.sink)
|
||||
}
|
||||
// b.sinks[request.sink] = struct{}{}
|
||||
request.response <- nil
|
||||
case request := <-b.removes:
|
||||
remove(request.sink)
|
||||
request.response <- nil
|
||||
case closing := <-b.closed:
|
||||
// close all the underlying sinks
|
||||
for _, sink := range b.sinks {
|
||||
if err := sink.Close(); err != nil && err != ErrSinkClosed {
|
||||
logrus.WithField("events.sink", sink).WithError(err).
|
||||
Errorf("broadcaster: closing sink failed")
|
||||
}
|
||||
}
|
||||
closing <- struct{}{}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,97 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBroadcaster(t *testing.T) {
|
||||
const nEvents = 1000
|
||||
var sinks []Sink
|
||||
b := NewBroadcaster()
|
||||
for i := 0; i < 10; i++ {
|
||||
sinks = append(sinks, newTestSink(t, nEvents))
|
||||
b.Add(sinks[i])
|
||||
b.Add(sinks[i]) // noop
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 1; i <= nEvents; i++ {
|
||||
wg.Add(1)
|
||||
go func(event Event) {
|
||||
if err := b.Write(event); err != nil {
|
||||
t.Fatalf("error writing event %v: %v", event, err)
|
||||
}
|
||||
wg.Done()
|
||||
}("event")
|
||||
}
|
||||
|
||||
wg.Wait() // Wait until writes complete
|
||||
|
||||
for i := range sinks {
|
||||
b.Remove(sinks[i])
|
||||
}
|
||||
|
||||
// sending one more should trigger test failure if they weren't removed.
|
||||
if err := b.Write("onemore"); err != nil {
|
||||
t.Fatalf("unexpected error sending one more: %v", err)
|
||||
}
|
||||
|
||||
// add them back to test closing.
|
||||
for i := range sinks {
|
||||
b.Add(sinks[i])
|
||||
}
|
||||
|
||||
checkClose(t, b)
|
||||
|
||||
// Iterate through the sinks and check that they all have the expected length.
|
||||
for _, sink := range sinks {
|
||||
ts := sink.(*testSink)
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
|
||||
if len(ts.events) != nEvents {
|
||||
t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
|
||||
}
|
||||
|
||||
if !ts.closed {
|
||||
t.Fatalf("sink should have been closed")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast10(b *testing.B) {
|
||||
benchmarkBroadcast(b, 10)
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast100(b *testing.B) {
|
||||
benchmarkBroadcast(b, 100)
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast1000(b *testing.B) {
|
||||
benchmarkBroadcast(b, 1000)
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast10000(b *testing.B) {
|
||||
benchmarkBroadcast(b, 10000)
|
||||
}
|
||||
|
||||
func benchmarkBroadcast(b *testing.B, nsinks int) {
|
||||
// counter := metrics.NewCounter()
|
||||
// metrics.DefaultRegistry.Register(fmt.Sprintf("nsinks: %v", nsinks), counter)
|
||||
// go metrics.Log(metrics.DefaultRegistry, 500*time.Millisecond, log.New(os.Stderr, "metrics: ", log.LstdFlags))
|
||||
|
||||
b.StopTimer()
|
||||
var sinks []Sink
|
||||
for i := 0; i < nsinks; i++ {
|
||||
// counter.Inc(1)
|
||||
sinks = append(sinks, newTestSink(b, b.N))
|
||||
// sinks = append(sinks, NewQueue(&testSink{t: b, expected: b.N}))
|
||||
}
|
||||
b.StartTimer()
|
||||
|
||||
// meter := metered{}
|
||||
// NewQueue(meter.Egress(dst))
|
||||
|
||||
benchmarkSink(b, NewBroadcaster(sinks...))
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
package events
|
||||
|
||||
// Channel provides a sink that can be listened on. The writer and channel
|
||||
// listener must operate in separate goroutines.
|
||||
//
|
||||
// Consumers should listen on Channel.C until Closed is closed.
|
||||
type Channel struct {
|
||||
C chan Event
|
||||
|
||||
closed chan struct{}
|
||||
}
|
||||
|
||||
// NewChannel returns a channel. If buffer is non-zero, the channel is
|
||||
// unbuffered.
|
||||
func NewChannel(buffer int) *Channel {
|
||||
return &Channel{
|
||||
C: make(chan Event, buffer),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Done returns a channel that will always proceed once the sink is closed.
|
||||
func (ch *Channel) Done() chan struct{} {
|
||||
return ch.closed
|
||||
}
|
||||
|
||||
// Write the event to the channel. Must be called in a separate goroutine from
|
||||
// the listener.
|
||||
func (ch *Channel) Write(event Event) error {
|
||||
select {
|
||||
case ch.C <- event:
|
||||
return nil
|
||||
case <-ch.closed:
|
||||
return ErrSinkClosed
|
||||
}
|
||||
}
|
||||
|
||||
// Close the channel sink.
|
||||
func (ch *Channel) Close() error {
|
||||
select {
|
||||
case <-ch.closed:
|
||||
return ErrSinkClosed
|
||||
default:
|
||||
close(ch.closed)
|
||||
return nil
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestChannel(t *testing.T) {
|
||||
const nevents = 100
|
||||
|
||||
sink := NewChannel(0)
|
||||
|
||||
go func() {
|
||||
var wg sync.WaitGroup
|
||||
for i := 1; i <= nevents; i++ {
|
||||
event := "event-" + fmt.Sprint(i)
|
||||
wg.Add(1)
|
||||
go func(event Event) {
|
||||
defer wg.Done()
|
||||
if err := sink.Write(event); err != nil {
|
||||
t.Fatalf("error writing event: %v", err)
|
||||
}
|
||||
}(event)
|
||||
}
|
||||
wg.Wait()
|
||||
sink.Close()
|
||||
|
||||
// now send another bunch of events and ensure we stay closed
|
||||
for i := 1; i <= nevents; i++ {
|
||||
if err := sink.Write(i); err != ErrSinkClosed {
|
||||
t.Fatalf("unexpected error: %v != %v", err, ErrSinkClosed)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
var received int
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-sink.C:
|
||||
received++
|
||||
case <-sink.Done():
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
sink.Close()
|
||||
_, ok := <-sink.Done() // test will timeout if this hands
|
||||
if ok {
|
||||
t.Fatalf("done should be a closed channel")
|
||||
}
|
||||
|
||||
if received != nevents {
|
||||
t.Fatalf("events did not make it through sink: %v != %v", received, nevents)
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,117 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type tOrB interface {
|
||||
Fatalf(format string, args ...interface{})
|
||||
Logf(format string, args ...interface{})
|
||||
}
|
||||
|
||||
type testSink struct {
|
||||
t tOrB
|
||||
|
||||
events []Event
|
||||
expected int
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
func newTestSink(t tOrB, expected int) *testSink {
|
||||
return &testSink{
|
||||
t: t,
|
||||
events: make([]Event, 0, expected), // pre-allocate so we aren't benching alloc
|
||||
expected: expected,
|
||||
}
|
||||
}
|
||||
|
||||
func (ts *testSink) Write(event Event) error {
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
|
||||
if ts.closed {
|
||||
return ErrSinkClosed
|
||||
}
|
||||
|
||||
ts.events = append(ts.events, event)
|
||||
|
||||
if len(ts.events) > ts.expected {
|
||||
ts.t.Fatalf("len(ts.events) == %v, expected %v", len(ts.events), ts.expected)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ts *testSink) Close() error {
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
if ts.closed {
|
||||
return ErrSinkClosed
|
||||
}
|
||||
|
||||
ts.closed = true
|
||||
|
||||
if len(ts.events) != ts.expected {
|
||||
ts.t.Fatalf("len(ts.events) == %v, expected %v", len(ts.events), ts.expected)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type delayedSink struct {
|
||||
Sink
|
||||
delay time.Duration
|
||||
}
|
||||
|
||||
func (ds *delayedSink) Write(event Event) error {
|
||||
time.Sleep(ds.delay)
|
||||
return ds.Sink.Write(event)
|
||||
}
|
||||
|
||||
type flakySink struct {
|
||||
Sink
|
||||
rate float64
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (fs *flakySink) Write(event Event) error {
|
||||
fs.mu.Lock()
|
||||
defer fs.mu.Unlock()
|
||||
|
||||
if rand.Float64() < fs.rate {
|
||||
return fmt.Errorf("error writing event: %v", event)
|
||||
}
|
||||
|
||||
return fs.Sink.Write(event)
|
||||
}
|
||||
|
||||
func checkClose(t *testing.T, sink Sink) {
|
||||
if err := sink.Close(); err != nil {
|
||||
t.Fatalf("unexpected error closing: %v", err)
|
||||
}
|
||||
|
||||
// second close should not crash but should return an error.
|
||||
if err := sink.Close(); err == nil {
|
||||
t.Fatalf("no error on double close")
|
||||
}
|
||||
|
||||
// Write after closed should be an error
|
||||
if err := sink.Write("fail"); err == nil {
|
||||
t.Fatalf("write after closed did not have an error")
|
||||
} else if err != ErrSinkClosed {
|
||||
t.Fatalf("error should be ErrSinkClosed")
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkSink(b *testing.B, sink Sink) {
|
||||
defer sink.Close()
|
||||
var event = "myevent"
|
||||
for i := 0; i < b.N; i++ {
|
||||
sink.Write(event)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
package events
|
||||
|
||||
import "fmt"
|
||||
|
||||
var (
|
||||
// ErrSinkClosed is returned if a write is issued to a sink that has been
|
||||
// closed. If encountered, the error should be considered terminal and
|
||||
// retries will not be successful.
|
||||
ErrSinkClosed = fmt.Errorf("events: sink closed")
|
||||
)
|
|
@ -0,0 +1,15 @@
|
|||
package events
|
||||
|
||||
// Event marks items that can be sent as events.
|
||||
type Event interface{}
|
||||
|
||||
// Sink accepts and sends events.
|
||||
type Sink interface {
|
||||
// Write an event to the Sink. If no error is returned, the caller will
|
||||
// assume that all events have been committed to the sink. If an error is
|
||||
// received, the caller may retry sending the event.
|
||||
Write(event Event) error
|
||||
|
||||
// Close the sink, possibly waiting for pending events to flush.
|
||||
Close() error
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
package events
|
||||
|
||||
// Matcher matches events.
|
||||
type Matcher interface {
|
||||
Match(event Event) bool
|
||||
}
|
||||
|
||||
// MatcherFunc implements matcher with just a function.
|
||||
type MatcherFunc func(event Event) bool
|
||||
|
||||
// Match calls the wrapped function.
|
||||
func (fn MatcherFunc) Match(event Event) bool {
|
||||
return fn(event)
|
||||
}
|
||||
|
||||
// Filter provides an event sink that sends only events that are accepted by a
|
||||
// Matcher. No methods on filter are goroutine safe.
|
||||
type Filter struct {
|
||||
dst Sink
|
||||
matcher Matcher
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewFilter returns a new filter that will send to events to dst that return
|
||||
// true for Matcher.
|
||||
func NewFilter(dst Sink, matcher Matcher) Sink {
|
||||
return &Filter{dst: dst, matcher: matcher}
|
||||
}
|
||||
|
||||
// Write an event to the filter.
|
||||
func (f *Filter) Write(event Event) error {
|
||||
if f.closed {
|
||||
return ErrSinkClosed
|
||||
}
|
||||
|
||||
if f.matcher.Match(event) {
|
||||
return f.dst.Write(event)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close the filter and allow no more events to pass through.
|
||||
func (f *Filter) Close() error {
|
||||
// TODO(stevvooe): Not all sinks should have Close.
|
||||
if f.closed {
|
||||
return ErrSinkClosed
|
||||
}
|
||||
|
||||
f.closed = true
|
||||
return f.dst.Close()
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
package events
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestFilter(t *testing.T) {
|
||||
const nevents = 100
|
||||
ts := newTestSink(t, nevents/2)
|
||||
filter := NewFilter(ts, MatcherFunc(func(event Event) bool {
|
||||
i, ok := event.(int)
|
||||
return ok && i%2 == 0
|
||||
}))
|
||||
|
||||
for i := 0; i < nevents; i++ {
|
||||
if err := filter.Write(i); err != nil {
|
||||
t.Fatalf("unexpected error writing event: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
checkClose(t, filter)
|
||||
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"sync"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Queue accepts all messages into a queue for asynchronous consumption
|
||||
// by a sink. It is unbounded and thread safe but the sink must be reliable or
|
||||
// events will be dropped.
|
||||
type Queue struct {
|
||||
dst Sink
|
||||
events *list.List
|
||||
cond *sync.Cond
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewQueue returns a queue to the provided Sink dst.
|
||||
func NewQueue(dst Sink) *Queue {
|
||||
eq := Queue{
|
||||
dst: dst,
|
||||
events: list.New(),
|
||||
}
|
||||
|
||||
eq.cond = sync.NewCond(&eq.mu)
|
||||
go eq.run()
|
||||
return &eq
|
||||
}
|
||||
|
||||
// Write accepts the events into the queue, only failing if the queue has
|
||||
// beend closed.
|
||||
func (eq *Queue) Write(event Event) error {
|
||||
eq.mu.Lock()
|
||||
defer eq.mu.Unlock()
|
||||
|
||||
if eq.closed {
|
||||
return ErrSinkClosed
|
||||
}
|
||||
|
||||
eq.events.PushBack(event)
|
||||
eq.cond.Signal() // signal waiters
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close shutsdown the event queue, flushing
|
||||
func (eq *Queue) Close() error {
|
||||
eq.mu.Lock()
|
||||
defer eq.mu.Unlock()
|
||||
|
||||
if eq.closed {
|
||||
return ErrSinkClosed
|
||||
}
|
||||
|
||||
// set closed flag
|
||||
eq.closed = true
|
||||
eq.cond.Signal() // signal flushes queue
|
||||
eq.cond.Wait() // wait for signal from last flush
|
||||
return eq.dst.Close()
|
||||
}
|
||||
|
||||
// run is the main goroutine to flush events to the target sink.
|
||||
func (eq *Queue) run() {
|
||||
for {
|
||||
event := eq.next()
|
||||
|
||||
if event == nil {
|
||||
return // nil block means event queue is closed.
|
||||
}
|
||||
|
||||
if err := eq.dst.Write(event); err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"event": event,
|
||||
"sink": eq.dst,
|
||||
}).WithError(err).Warnf("eventqueue: dropped event")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// next encompasses the critical section of the run loop. When the queue is
|
||||
// empty, it will block on the condition. If new data arrives, it will wake
|
||||
// and return a block. When closed, a nil slice will be returned.
|
||||
func (eq *Queue) next() Event {
|
||||
eq.mu.Lock()
|
||||
defer eq.mu.Unlock()
|
||||
|
||||
for eq.events.Len() < 1 {
|
||||
if eq.closed {
|
||||
eq.cond.Broadcast()
|
||||
return nil
|
||||
}
|
||||
|
||||
eq.cond.Wait()
|
||||
}
|
||||
|
||||
front := eq.events.Front()
|
||||
block := front.Value.(Event)
|
||||
eq.events.Remove(front)
|
||||
|
||||
return block
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestQueue(t *testing.T) {
|
||||
const nevents = 1000
|
||||
|
||||
ts := newTestSink(t, nevents)
|
||||
eq := NewQueue(
|
||||
// delayed sync simulates destination slower than channel comms
|
||||
&delayedSink{
|
||||
Sink: ts,
|
||||
delay: time.Millisecond * 1,
|
||||
})
|
||||
time.Sleep(10 * time.Millisecond) // let's queue settle to wait conidition.
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 1; i <= nevents; i++ {
|
||||
wg.Add(1)
|
||||
go func(event Event) {
|
||||
if err := eq.Write(event); err != nil {
|
||||
t.Fatalf("error writing event: %v", err)
|
||||
}
|
||||
wg.Done()
|
||||
}("event-" + fmt.Sprint(i))
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
checkClose(t, eq)
|
||||
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
|
||||
if len(ts.events) != nevents {
|
||||
t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000)
|
||||
}
|
||||
|
||||
if !ts.closed {
|
||||
t.Fatalf("sink should have been closed")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,168 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
// RetryingSink retries the write until success or an ErrSinkClosed is
|
||||
// returned. Underlying sink must have p > 0 of succeeding or the sink will
|
||||
// block. Retry is configured with a RetryStrategy. Concurrent calls to a
|
||||
// retrying sink are serialized through the sink, meaning that if one is
|
||||
// in-flight, another will not proceed.
|
||||
type RetryingSink struct {
|
||||
sink Sink
|
||||
strategy RetryStrategy
|
||||
closed chan struct{}
|
||||
}
|
||||
|
||||
// NewRetryingSink returns a sink that will retry writes to a sink, backing
|
||||
// off on failure. Parameters threshold and backoff adjust the behavior of the
|
||||
// circuit breaker.
|
||||
func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink {
|
||||
rs := &RetryingSink{
|
||||
sink: sink,
|
||||
strategy: strategy,
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
|
||||
return rs
|
||||
}
|
||||
|
||||
// Write attempts to flush the events to the downstream sink until it succeeds
|
||||
// or the sink is closed.
|
||||
func (rs *RetryingSink) Write(event Event) error {
|
||||
logger := logrus.WithField("event", event)
|
||||
var timer *time.Timer
|
||||
|
||||
retry:
|
||||
select {
|
||||
case <-rs.closed:
|
||||
return ErrSinkClosed
|
||||
default:
|
||||
}
|
||||
|
||||
if backoff := rs.strategy.Proceed(event); backoff > 0 {
|
||||
if timer == nil {
|
||||
timer = time.NewTimer(backoff)
|
||||
defer timer.Stop()
|
||||
} else {
|
||||
timer.Reset(backoff)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
goto retry
|
||||
case <-rs.closed:
|
||||
return ErrSinkClosed
|
||||
}
|
||||
}
|
||||
|
||||
if err := rs.sink.Write(event); err != nil {
|
||||
if err == ErrSinkClosed {
|
||||
// terminal!
|
||||
return err
|
||||
}
|
||||
|
||||
logger := logger.WithError(err) // shadow!!
|
||||
|
||||
if rs.strategy.Failure(event, err) {
|
||||
logger.Errorf("retryingsink: dropped event")
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Errorf("retryingsink: error writing event, retrying")
|
||||
goto retry
|
||||
}
|
||||
|
||||
rs.strategy.Success(event)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the sink and the underlying sink.
|
||||
func (rs *RetryingSink) Close() error {
|
||||
select {
|
||||
case <-rs.closed:
|
||||
return ErrSinkClosed
|
||||
default:
|
||||
close(rs.closed)
|
||||
return rs.sink.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// RetryStrategy defines a strategy for retrying event sink writes.
|
||||
//
|
||||
// All methods should be goroutine safe.
|
||||
type RetryStrategy interface {
|
||||
// Proceed is called before every event send. If proceed returns a
|
||||
// positive, non-zero integer, the retryer will back off by the provided
|
||||
// duration.
|
||||
//
|
||||
// An event is provided, by may be ignored.
|
||||
Proceed(event Event) time.Duration
|
||||
|
||||
// Failure reports a failure to the strategy. If this method returns true,
|
||||
// the event should be dropped.
|
||||
Failure(event Event, err error) bool
|
||||
|
||||
// Success should be called when an event is sent successfully.
|
||||
Success(event Event)
|
||||
}
|
||||
|
||||
// TODO(stevvooe): We are using circuit breaker here. May want to provide
|
||||
// bounded exponential backoff, as well.
|
||||
|
||||
// Breaker implements a circuit breaker retry strategy.
|
||||
//
|
||||
// The current implementation never drops events.
|
||||
type Breaker struct {
|
||||
threshold int
|
||||
recent int
|
||||
last time.Time
|
||||
backoff time.Duration // time after which we retry after failure.
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
var _ RetryStrategy = &Breaker{}
|
||||
|
||||
// NewBreaker returns a breaker that will backoff after the threshold has been
|
||||
// tripped. A Breaker is thread safe and may be shared by many goroutines.
|
||||
func NewBreaker(threshold int, backoff time.Duration) *Breaker {
|
||||
return &Breaker{
|
||||
threshold: threshold,
|
||||
backoff: backoff,
|
||||
}
|
||||
}
|
||||
|
||||
// Proceed checks the failures against the threshold.
|
||||
func (b *Breaker) Proceed(event Event) time.Duration {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
if b.recent < b.threshold {
|
||||
return 0
|
||||
}
|
||||
|
||||
return b.last.Add(b.backoff).Sub(time.Now())
|
||||
}
|
||||
|
||||
// Success resets the breaker.
|
||||
func (b *Breaker) Success(event Event) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
b.recent = 0
|
||||
b.last = time.Time{}
|
||||
}
|
||||
|
||||
// Failure records the failure and latest failure time.
|
||||
func (b *Breaker) Failure(event Event, err error) bool {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
b.recent++
|
||||
b.last = time.Now().UTC()
|
||||
return false // never drop events.
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestRetryingSink(t *testing.T) {
|
||||
const nevents = 100
|
||||
ts := newTestSink(t, nevents)
|
||||
|
||||
// Make a sync that fails most of the time, ensuring that all the events
|
||||
// make it through.
|
||||
flaky := &flakySink{
|
||||
rate: 1.0, // start out always failing.
|
||||
Sink: ts,
|
||||
}
|
||||
|
||||
breaker := NewBreaker(3, 10*time.Millisecond)
|
||||
s := NewRetryingSink(flaky, breaker)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 1; i <= nevents; i++ {
|
||||
event := "myevent-" + fmt.Sprint(i)
|
||||
|
||||
// Above 50, set the failure rate lower
|
||||
if i > 50 {
|
||||
flaky.mu.Lock()
|
||||
flaky.rate = 0.90
|
||||
flaky.mu.Unlock()
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(event Event) {
|
||||
defer wg.Done()
|
||||
if err := s.Write(event); err != nil {
|
||||
t.Fatalf("error writing event: %v", err)
|
||||
}
|
||||
}(event)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
checkClose(t, s)
|
||||
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
|
||||
}
|
Загрузка…
Ссылка в новой задаче