throttler: If maxRate is below threadCount, increase it to threadCount.

- Added magic constant ZeroRateNoProgess to better reflect in the code that maxRate == 0 means that the Throttler won't let anything through.
This commit is contained in:
Michael Berlin 2016-05-03 00:19:58 -07:00
Родитель d68d5ec3dc
Коммит a739b89f65
3 изменённых файлов: 53 добавлений и 13 удалений

Просмотреть файл

@ -57,7 +57,7 @@ func (t *threadThrottler) throttle(now time.Time) time.Duration {
}
maxRate := t.maxRateSecond
if maxRate == 0 {
if maxRate == ZeroRateNoProgess {
// Throughput is effectively paused. Do not let anything through until
// the max rate changes.
return t.currentSecond.Add(1 * time.Second).Sub(now)

Просмотреть файл

@ -17,19 +17,27 @@ import (
"math"
"sync"
"time"
log "github.com/golang/glog"
)
// NotThrottled will be returned by Throttle() if the application is currently
// not throttled.
const NotThrottled time.Duration = 0
const (
// NotThrottled will be returned by Throttle() if the application is currently
// not throttled.
NotThrottled time.Duration = 0
// MaxRateModuleDisabled can be set in NewThrottler() to disable throttling
// by a fixed rate.
const MaxRateModuleDisabled = -1
// ZeroRateNoProgess can be used to set maxRate to 0. In this case, the
// throttler won't let any requests through until the rate is increased again.
ZeroRateNoProgess = 0
// ReplicationLagModuleDisabled can be set in NewThrottler() to disable
// throttling based on the MySQL replication lag.
const ReplicationLagModuleDisabled = -1
// MaxRateModuleDisabled can be set in NewThrottler() to disable throttling
// by a fixed rate.
MaxRateModuleDisabled = -1
// ReplicationLagModuleDisabled can be set in NewThrottler() to disable
// throttling based on the MySQL replication lag.
ReplicationLagModuleDisabled = -1
)
// Throttler provides a client-side, thread-aware throttler.
// See the package doc for more information.
@ -76,7 +84,10 @@ type Throttler struct {
// NewThrottler creates a new Throttler instance.
// Use the constants MaxRateModuleDisabled or ReplicationLagModuleDisabled
// if you want to disable parts of its functionality.
// maxRate will be distributed across all threadCount threads.
// maxRate will be distributed across all threadCount threads and must be >=
// threadCount. If it's lower, it will be automatically set to threadCount.
// maxRate can also be set to 0 which will effectively pause the user and
// constantly block until the rate has been increased again.
// unit refers to the type of entity you want to throttle e.g. "queries" or
// "transactions".
// name describes the Throttler instance and will be used by the webinterface.
@ -204,6 +215,10 @@ func (t *Throttler) updateMaxRate() {
return
}
if maxRate != ZeroRateNoProgess && maxRate < int64(threadsRunning) {
log.Warningf("Set maxRate is less than the number of threads (%v). To prevent threads from starving, maxRate was increased from: %v to: %v.", threadsRunning, maxRate, threadsRunning)
maxRate = int64(threadsRunning)
}
maxRatePerThread := maxRate / int64(threadsRunning)
// Distribute the remainder of the division across all threads.
remainder := maxRate % int64(threadsRunning)

Просмотреть файл

@ -220,7 +220,7 @@ func TestThrottle_RateRemainderIsDistributedAcrossThreads(t *testing.T) {
for threadID := 0; threadID < 2; threadID++ {
wantBackoff := 500 * time.Millisecond
if gotBackoff := throttler.Throttle(threadID); gotBackoff != wantBackoff {
t.Fatalf("throttler should have throttled us. got = %v, want = %v", gotBackoff, wantBackoff)
t.Fatalf("throttler should have throttled thread %d. got = %v, want = %v", threadID, gotBackoff, wantBackoff)
}
}
}
@ -242,7 +242,7 @@ func TestThreadFinished(t *testing.T) {
wantBackoff := 1000 * time.Millisecond
for threadID := 0; threadID < 2; threadID++ {
if gotBackoff := throttler.Throttle(threadID); gotBackoff != wantBackoff {
t.Fatalf("throttler should have throttled us. got = %v, want = %v", gotBackoff, wantBackoff)
t.Fatalf("throttler should have throttled thread %d. got = %v, want = %v", threadID, gotBackoff, wantBackoff)
}
}
@ -335,6 +335,31 @@ func TestThrottle_MaxRateDisabled(t *testing.T) {
}
}
// TestThrottle_MaxRateLowerThanThreadCount tests the behavior that maxRate
// must not be lower than threadCount. If this is the case, maxRate will be
// set to threadCount.
func TestThrottle_MaxRateLowerThanThreadCount(t *testing.T) {
fc := &fakeClock{}
// 2 Thread, 1 QPS.
throttler := newThrottlerWithClock("test", "queries", 2, 1, ReplicationLagModuleDisabled, fc.now)
defer throttler.Close()
// 2 QPS instead of configured 1 QPS allowed since there are 2 threads which
// must not starve.
fc.setNow(0 * time.Millisecond)
for threadID := 0; threadID < 1; threadID++ {
if gotBackoff := throttler.Throttle(threadID); gotBackoff != NotThrottled {
t.Fatalf("throttler should not have throttled thread %d: backoff = %v", threadID, gotBackoff)
}
}
wantBackoff := 1000 * time.Millisecond
for threadID := 0; threadID < 1; threadID++ {
if gotBackoff := throttler.Throttle(threadID); gotBackoff != wantBackoff {
t.Fatalf("throttler should have throttled thread %d: got = %v, want = %v", threadID, gotBackoff, wantBackoff)
}
}
}
func TestUpdateMaxRate_AllThreadsFinished(t *testing.T) {
fc := &fakeClock{}
throttler := newThrottlerWithClock("test", "queries", 2, 1e9, ReplicationLagModuleDisabled, fc.now)