throttler: Fix the problem that the updated/reset throttler configuration was never applied by the adapative throttler. (#1974)

This commit is contained in:
Michael Berlin 2016-08-23 13:21:24 -07:00 коммит произвёл GitHub
Родитель 5c56799797
Коммит 8c26383c3c
2 изменённых файлов: 41 добавлений и 0 удалений

Просмотреть файл

@ -201,6 +201,7 @@ func (m *MaxReplicationLagModule) updateConfiguration(configuration *throttlerda
return err
}
m.mutableConfig = newConfig
m.applyMutableConfig = true
return nil
}
@ -209,6 +210,7 @@ func (m *MaxReplicationLagModule) resetConfiguration() {
defer m.mutableConfigMu.Unlock()
m.mutableConfig = NewMaxReplicationLagModuleConfig(m.initialMaxReplicationLagSec)
m.applyMutableConfig = true
}
// RecordReplicationLag records the current replication lag for processing.

Просмотреть файл

@ -464,3 +464,42 @@ func tabletStats(uid, lag uint32) discovery.TabletStats {
LastError: nil,
}
}
func TestApplyLatestConfig(t *testing.T) {
config := NewMaxReplicationLagModuleConfig(5)
tf, err := newTestFixture(config)
if err != nil {
t.Fatal(err)
}
// We start at config.InitialRate.
if err := tf.checkState(stateIncreaseRate, 100, sinceZero(0*time.Second)); err != nil {
t.Fatal(err)
}
// Change the default MaxIncrease from 100% to 200% and test that it's
// correctly propagated.
config.MaxIncrease = 2
tf.m.updateConfiguration(&config.Configuration, true /* copyZeroValues */)
// r2 @ 70s, 0s lag
tf.ratesHistory.add(sinceZero(69*time.Second), 100)
tf.process(lagRecord(sinceZero(70*time.Second), r2, 0))
// Rate was increased to 300 based on an actual rate of 100 within [0s, 69s].
// That's a 200% increase.
if err := tf.checkState(stateIncreaseRate, 300, sinceZero(70*time.Second)); err != nil {
t.Fatal(err)
}
// Now reset the config to its default values.
tf.m.resetConfiguration()
// Let's assume that the current rate of 300 was actually fine.
// After the reset, we'll increase it only by 100% to 600.
// r2 @ 90s, 0s lag
tf.ratesHistory.add(sinceZero(80*time.Second), 300)
tf.ratesHistory.add(sinceZero(89*time.Second), 300)
tf.process(lagRecord(sinceZero(90*time.Second), r2, 0))
if err := tf.checkState(stateIncreaseRate, 600, sinceZero(90*time.Second)); err != nil {
t.Fatal(err)
}
}