diff --git a/retry.go b/retry.go index 4973dc0..4ddb3ac 100644 --- a/retry.go +++ b/retry.go @@ -1,7 +1,9 @@ package events import ( + "math/rand" "sync" + "sync/atomic" "time" "github.com/Sirupsen/logrus" @@ -46,7 +48,11 @@ retry: if backoff := rs.strategy.Proceed(event); backoff > 0 { select { case <-time.After(backoff): - goto retry + // TODO(stevvooe): This branch holds up the next try. Before, we + // would simply break to the "retry" label and then possibly wait + // again. However, this requires all retry strategies to have a + // large probability of probing the sync for success, rather than + // just backing off and sending the request. case <-rs.closed: return ErrSinkClosed } @@ -103,9 +109,6 @@ type RetryStrategy interface { Success(event Event) } -// TODO(stevvooe): We are using circuit breaker here. May want to provide -// bounded exponential backoff, as well. - // Breaker implements a circuit breaker retry strategy. // // The current implementation never drops events. @@ -158,3 +161,89 @@ func (b *Breaker) Failure(event Event, err error) bool { b.last = time.Now().UTC() return false // never drop events. } + +var ( + // DefaultExponentialBackoffConfig provides a default configuration for + // exponential backoff. + DefaultExponentialBackoffConfig = ExponentialBackoffConfig{ + Base: time.Second, + Factor: time.Second, + Max: 20 * time.Second, + } +) + +// ExponentialBackoffConfig configures backoff parameters. +// +// Note that these parameters operate on the upper bound for choosing a random +// value. For example, at Base=1s, a random value in [0,1s) will be chosen for +// the backoff value. +type ExponentialBackoffConfig struct { + // Base is the minimum bound for backing off after failure. + Base time.Duration + + // Factor sets the amount of time by which the backoff grows with each + // failure. + Factor time.Duration + + // Max is the absolute maxiumum bound for a single backoff. + Max time.Duration +} + +// ExponentialBackoff implements random backoff with exponentially increasing +// bounds as the number consecutive failures increase. +type ExponentialBackoff struct { + config ExponentialBackoffConfig + failures uint64 // consecutive failure counter. +} + +// NewExponentialBackoff returns an exponential backoff strategy with the +// desired config. If config is nil, the default is returned. +func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff { + return &ExponentialBackoff{ + config: config, + } +} + +// Proceed returns the next randomly bound exponential backoff time. +func (b *ExponentialBackoff) Proceed(event Event) time.Duration { + return b.backoff(atomic.LoadUint64(&b.failures)) +} + +// Success resets the failures counter. +func (b *ExponentialBackoff) Success(event Event) { + atomic.StoreUint64(&b.failures, 0) +} + +// Failure increments the failure counter. +func (b *ExponentialBackoff) Failure(event Event, err error) bool { + atomic.AddUint64(&b.failures, 1) + return false +} + +// backoff calculates the amount of time to wait based on the number of +// consecutive failures. +func (b *ExponentialBackoff) backoff(failures uint64) time.Duration { + if failures <= 0 { + // proceed normally when there are no failures. + return 0 + } + + factor := b.config.Factor + if factor <= 0 { + factor = DefaultExponentialBackoffConfig.Factor + } + + backoff := b.config.Base + factor*time.Duration(1<<(failures-1)) + + max := b.config.Max + if max <= 0 { + max = DefaultExponentialBackoffConfig.Max + } + + if backoff > max || backoff < 0 { + backoff = max + } + + // Choose a uniformly distributed value from [0, backoff). + return time.Duration(rand.Int63n(int64(backoff))) +} diff --git a/retry_test.go b/retry_test.go index 280db5b..eba344e 100644 --- a/retry_test.go +++ b/retry_test.go @@ -7,7 +7,19 @@ import ( "time" ) -func TestRetryingSink(t *testing.T) { +func TestRetryingSinkBreaker(t *testing.T) { + testRetryingSink(t, NewBreaker(3, 10*time.Millisecond)) +} + +func TestRetryingSinkExponentialBackoff(t *testing.T) { + testRetryingSink(t, NewExponentialBackoff(ExponentialBackoffConfig{ + Base: time.Millisecond, + Factor: time.Millisecond, + Max: time.Millisecond * 5, + })) +} + +func testRetryingSink(t *testing.T, strategy RetryStrategy) { const nevents = 100 ts := newTestSink(t, nevents) @@ -18,8 +30,7 @@ func TestRetryingSink(t *testing.T) { Sink: ts, } - breaker := NewBreaker(3, 10*time.Millisecond) - s := NewRetryingSink(flaky, breaker) + s := NewRetryingSink(flaky, strategy) var wg sync.WaitGroup for i := 1; i <= nevents; i++ { @@ -28,7 +39,7 @@ func TestRetryingSink(t *testing.T) { // Above 50, set the failure rate lower if i > 50 { flaky.mu.Lock() - flaky.rate = 0.90 + flaky.rate = 0.9 flaky.mu.Unlock() } @@ -46,5 +57,40 @@ func TestRetryingSink(t *testing.T) { ts.mu.Lock() defer ts.mu.Unlock() - +} + +func TestExponentialBackoff(t *testing.T) { + strategy := NewExponentialBackoff(DefaultExponentialBackoffConfig) + backoff := strategy.Proceed(nil) + + if backoff != 0 { + t.Errorf("untouched backoff should be zero-wait: %v != 0", backoff) + } + + expected := strategy.config.Base + strategy.config.Factor + for i := 1; i <= 10; i++ { + if strategy.Failure(nil, nil) { + t.Errorf("no facilities for dropping events in ExponentialBackoff") + } + + for j := 0; j < 1000; j++ { + // sample this several thousand times. + backoff := strategy.Proceed(nil) + if backoff > expected { + t.Fatalf("expected must be bounded by %v after %v failures: %v", expected, i, backoff) + } + } + + expected = strategy.config.Base + strategy.config.Factor*time.Duration(1< strategy.config.Max { + expected = strategy.config.Max + } + } + + strategy.Success(nil) // recovery! + + backoff = strategy.Proceed(nil) + if backoff != 0 { + t.Errorf("should have recovered: %v != 0", backoff) + } }