retry: implement exponential backoff for event retry

To complement circuit breaker, we now have an exponential backoff retry
strategy. The common increasing bound strategy is implemented.

A small bug was exposed in the implementation that blocked the retry
forever if the strategy did not probe. Now, we simply wait for the retry
and proceed. This may change behavior in the breaker case, but is more
correct.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2016-03-30 18:12:32 -07:00
Родитель 9e2c9c6318
Коммит 39718a2649
2 изменённых файлов: 144 добавлений и 9 удалений

Просмотреть файл

@ -1,7 +1,9 @@
package events
import (
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/Sirupsen/logrus"
@ -46,7 +48,11 @@ retry:
if backoff := rs.strategy.Proceed(event); backoff > 0 {
select {
case <-time.After(backoff):
goto retry
// TODO(stevvooe): This branch holds up the next try. Before, we
// would simply break to the "retry" label and then possibly wait
// again. However, this requires all retry strategies to have a
// large probability of probing the sync for success, rather than
// just backing off and sending the request.
case <-rs.closed:
return ErrSinkClosed
}
@ -103,9 +109,6 @@ type RetryStrategy interface {
Success(event Event)
}
// TODO(stevvooe): We are using circuit breaker here. May want to provide
// bounded exponential backoff, as well.
// Breaker implements a circuit breaker retry strategy.
//
// The current implementation never drops events.
@ -158,3 +161,89 @@ func (b *Breaker) Failure(event Event, err error) bool {
b.last = time.Now().UTC()
return false // never drop events.
}
var (
// DefaultExponentialBackoffConfig provides a default configuration for
// exponential backoff.
DefaultExponentialBackoffConfig = ExponentialBackoffConfig{
Base: time.Second,
Factor: time.Second,
Max: 20 * time.Second,
}
)
// ExponentialBackoffConfig configures backoff parameters.
//
// Note that these parameters operate on the upper bound for choosing a random
// value. For example, at Base=1s, a random value in [0,1s) will be chosen for
// the backoff value.
type ExponentialBackoffConfig struct {
// Base is the minimum bound for backing off after failure.
Base time.Duration
// Factor sets the amount of time by which the backoff grows with each
// failure.
Factor time.Duration
// Max is the absolute maxiumum bound for a single backoff.
Max time.Duration
}
// ExponentialBackoff implements random backoff with exponentially increasing
// bounds as the number consecutive failures increase.
type ExponentialBackoff struct {
config ExponentialBackoffConfig
failures uint64 // consecutive failure counter.
}
// NewExponentialBackoff returns an exponential backoff strategy with the
// desired config. If config is nil, the default is returned.
func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff {
return &ExponentialBackoff{
config: config,
}
}
// Proceed returns the next randomly bound exponential backoff time.
func (b *ExponentialBackoff) Proceed(event Event) time.Duration {
return b.backoff(atomic.LoadUint64(&b.failures))
}
// Success resets the failures counter.
func (b *ExponentialBackoff) Success(event Event) {
atomic.StoreUint64(&b.failures, 0)
}
// Failure increments the failure counter.
func (b *ExponentialBackoff) Failure(event Event, err error) bool {
atomic.AddUint64(&b.failures, 1)
return false
}
// backoff calculates the amount of time to wait based on the number of
// consecutive failures.
func (b *ExponentialBackoff) backoff(failures uint64) time.Duration {
if failures <= 0 {
// proceed normally when there are no failures.
return 0
}
factor := b.config.Factor
if factor <= 0 {
factor = DefaultExponentialBackoffConfig.Factor
}
backoff := b.config.Base + factor*time.Duration(1<<(failures-1))
max := b.config.Max
if max <= 0 {
max = DefaultExponentialBackoffConfig.Max
}
if backoff > max || backoff < 0 {
backoff = max
}
// Choose a uniformly distributed value from [0, backoff).
return time.Duration(rand.Int63n(int64(backoff)))
}

Просмотреть файл

@ -7,7 +7,19 @@ import (
"time"
)
func TestRetryingSink(t *testing.T) {
func TestRetryingSinkBreaker(t *testing.T) {
testRetryingSink(t, NewBreaker(3, 10*time.Millisecond))
}
func TestRetryingSinkExponentialBackoff(t *testing.T) {
testRetryingSink(t, NewExponentialBackoff(ExponentialBackoffConfig{
Base: time.Millisecond,
Factor: time.Millisecond,
Max: time.Millisecond * 5,
}))
}
func testRetryingSink(t *testing.T, strategy RetryStrategy) {
const nevents = 100
ts := newTestSink(t, nevents)
@ -18,8 +30,7 @@ func TestRetryingSink(t *testing.T) {
Sink: ts,
}
breaker := NewBreaker(3, 10*time.Millisecond)
s := NewRetryingSink(flaky, breaker)
s := NewRetryingSink(flaky, strategy)
var wg sync.WaitGroup
for i := 1; i <= nevents; i++ {
@ -28,7 +39,7 @@ func TestRetryingSink(t *testing.T) {
// Above 50, set the failure rate lower
if i > 50 {
flaky.mu.Lock()
flaky.rate = 0.90
flaky.rate = 0.9
flaky.mu.Unlock()
}
@ -46,5 +57,40 @@ func TestRetryingSink(t *testing.T) {
ts.mu.Lock()
defer ts.mu.Unlock()
}
func TestExponentialBackoff(t *testing.T) {
strategy := NewExponentialBackoff(DefaultExponentialBackoffConfig)
backoff := strategy.Proceed(nil)
if backoff != 0 {
t.Errorf("untouched backoff should be zero-wait: %v != 0", backoff)
}
expected := strategy.config.Base + strategy.config.Factor
for i := 1; i <= 10; i++ {
if strategy.Failure(nil, nil) {
t.Errorf("no facilities for dropping events in ExponentialBackoff")
}
for j := 0; j < 1000; j++ {
// sample this several thousand times.
backoff := strategy.Proceed(nil)
if backoff > expected {
t.Fatalf("expected must be bounded by %v after %v failures: %v", expected, i, backoff)
}
}
expected = strategy.config.Base + strategy.config.Factor*time.Duration(1<<uint64(i))
if expected > strategy.config.Max {
expected = strategy.config.Max
}
}
strategy.Success(nil) // recovery!
backoff = strategy.Proceed(nil)
if backoff != 0 {
t.Errorf("should have recovered: %v != 0", backoff)
}
}