Merge pull request #1972 from michael-berlin/age_bad_values

throttler: HTTP handler /throttlerz which shows the most recent recalculation results.
This commit is contained in:
Michael Berlin 2016-08-22 12:27:07 -07:00 коммит произвёл GitHub
Родитель a78e55df33 61dafabe19
Коммит e3f31172d6
24 изменённых файлов: 1066 добавлений и 192 удалений

Просмотреть файл

@ -2,16 +2,21 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tabletserver
// Package logz provides an infrastructure to expose a list of entries as
// a sortable table on a webpage.
//
// It is used by many internal vttablet pages e.g. /queryz, /querylogz, /schemaz
// /streamqueryz or /txlogz.
//
// See tabletserver/querylogz.go for an example how to use it.
package logz
import (
"bytes"
"net/http"
"strconv"
"time"
)
func startHTMLTable(w http.ResponseWriter) {
func StartHTMLTable(w http.ResponseWriter) {
w.Write([]byte(`
<!DOCTYPE html>
<html>
@ -69,7 +74,7 @@ func startHTMLTable(w http.ResponseWriter) {
`))
}
func endHTMLTable(w http.ResponseWriter) {
func EndHTMLTable(w http.ResponseWriter) {
defer w.Write([]byte(`
</table>
<script src="http://ajax.googleapis.com/ajax/libs/jquery/2.1.0/jquery.min.js"></script>
@ -117,9 +122,9 @@ $(function() {
</html>`))
}
// wrappable inserts zero-width whitespaces to make
// Wrappable inserts zero-width whitespaces to make
// the string wrappable.
func wrappable(in string) string {
func Wrappable(in string) string {
buf := bytes.NewBuffer(nil)
for _, ch := range in {
buf.WriteRune(ch)
@ -130,28 +135,3 @@ func wrappable(in string) string {
}
return buf.String()
}
func adjustValue(val int, lower int, upper int) int {
if val < lower {
return lower
} else if val > upper {
return upper
}
return val
}
func parseTimeoutLimitParams(req *http.Request) (time.Duration, int) {
timeout := 10
limit := 300
if ts, ok := req.URL.Query()["timeout"]; ok {
if t, err := strconv.Atoi(ts[0]); err == nil {
timeout = adjustValue(t, 0, 60)
}
}
if l, ok := req.URL.Query()["limit"]; ok {
if lim, err := strconv.Atoi(l[0]); err == nil {
limit = adjustValue(lim, 1, 200000)
}
}
return time.Duration(timeout) * time.Second, limit
}

Просмотреть файл

@ -8,12 +8,14 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"text/template"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/acl"
"github.com/youtube/vitess/go/vt/logz"
)
var (
@ -42,11 +44,11 @@ var (
`)
querylogzFuncMap = template.FuncMap{
"stampMicro": func(t time.Time) string { return t.Format(time.StampMicro) },
"cssWrappable": wrappable,
"cssWrappable": logz.Wrappable,
"unquote": func(s string) string { return strings.Trim(s, "\"") },
}
querylogzTmpl = template.Must(template.New("example").Funcs(querylogzFuncMap).Parse(`
<tr class=".ColorLevel">
<tr class="{{.ColorLevel}}">
<td>{{.Method}}</td>
<td>{{.ContextHTML}}</td>
<td>{{.EffectiveCaller}}</td>
@ -84,8 +86,8 @@ func querylogzHandler(ch chan interface{}, w http.ResponseWriter, r *http.Reques
return
}
timeout, limit := parseTimeoutLimitParams(r)
startHTMLTable(w)
defer endHTMLTable(w)
logz.StartHTMLTable(w)
defer logz.EndHTMLTable(w)
w.Write(querylogzHeader)
tmr := time.NewTimer(timeout)
@ -127,3 +129,28 @@ func querylogzHandler(ch chan interface{}, w http.ResponseWriter, r *http.Reques
}
}
}
func parseTimeoutLimitParams(req *http.Request) (time.Duration, int) {
timeout := 10
limit := 300
if ts, ok := req.URL.Query()["timeout"]; ok {
if t, err := strconv.Atoi(ts[0]); err == nil {
timeout = adjustValue(t, 0, 60)
}
}
if l, ok := req.URL.Query()["limit"]; ok {
if lim, err := strconv.Atoi(l[0]); err == nil {
limit = adjustValue(lim, 1, 200000)
}
}
return time.Duration(timeout) * time.Second, limit
}
func adjustValue(val int, lower int, upper int) int {
if val < lower {
return lower
} else if val > upper {
return upper
}
return val
}

Просмотреть файл

@ -49,6 +49,7 @@ func TestQuerylogzHandler(t *testing.T) {
// fast query
fastQueryPattern := []string{
`<tr class="low">`,
`<td>Execute</td>`,
`<td></td>`,
`<td>effective-caller</td>`,
@ -78,6 +79,7 @@ func TestQuerylogzHandler(t *testing.T) {
// medium query
mediumQueryPattern := []string{
`<tr class="medium">`,
`<td>Execute</td>`,
`<td></td>`,
`<td>effective-caller</td>`,
@ -107,6 +109,7 @@ func TestQuerylogzHandler(t *testing.T) {
// slow query
slowQueryPattern := []string{
`<tr class="high">`,
`<td>Execute</td>`,
`<td></td>`,
`<td>effective-caller</td>`,

Просмотреть файл

@ -13,6 +13,7 @@ import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/acl"
"github.com/youtube/vitess/go/vt/logz"
"github.com/youtube/vitess/go/vt/tabletserver/planbuilder"
)
@ -110,25 +111,17 @@ type queryzSorter struct {
less func(row1, row2 *queryzRow) bool
}
func (sorter *queryzSorter) Len() int {
return len(sorter.rows)
}
func (sorter *queryzSorter) Swap(i, j int) {
sorter.rows[i], sorter.rows[j] = sorter.rows[j], sorter.rows[i]
}
func (sorter *queryzSorter) Less(i, j int) bool {
return sorter.less(sorter.rows[i], sorter.rows[j])
}
func (s *queryzSorter) Len() int { return len(s.rows) }
func (s *queryzSorter) Swap(i, j int) { s.rows[i], s.rows[j] = s.rows[j], s.rows[i] }
func (s *queryzSorter) Less(i, j int) bool { return s.less(s.rows[i], s.rows[j]) }
func queryzHandler(si *SchemaInfo, w http.ResponseWriter, r *http.Request) {
if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil {
acl.SendError(w, err)
return
}
startHTMLTable(w)
defer endHTMLTable(w)
logz.StartHTMLTable(w)
defer logz.EndHTMLTable(w)
w.Write(queryzHeader)
keys := si.queries.Keys()
@ -144,7 +137,7 @@ func queryzHandler(si *SchemaInfo, w http.ResponseWriter, r *http.Request) {
continue
}
Value := &queryzRow{
Query: wrappable(v),
Query: logz.Wrappable(v),
Table: plan.TableName,
Plan: plan.PlanID,
Reason: plan.Reason,

Просмотреть файл

@ -11,6 +11,7 @@ import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/acl"
"github.com/youtube/vitess/go/vt/logz"
"github.com/youtube/vitess/go/vt/schema"
)
@ -65,8 +66,8 @@ func schemazHandler(tables []*schema.Table, w http.ResponseWriter, r *http.Reque
acl.SendError(w, err)
return
}
startHTMLTable(w)
defer endHTMLTable(w)
logz.StartHTMLTable(w)
defer logz.EndHTMLTable(w)
w.Write(schemazHeader)
sorter := schemazSorter{

Просмотреть файл

@ -13,6 +13,7 @@ import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/acl"
"github.com/youtube/vitess/go/vt/logz"
)
var (
@ -60,8 +61,8 @@ func streamQueryzHandler(queryList *QueryList, w http.ResponseWriter, r *http.Re
w.Write(js)
return
}
startHTMLTable(w)
defer endHTMLTable(w)
logz.StartHTMLTable(w)
defer logz.EndHTMLTable(w)
w.Write(streamqueryzHeader)
for i := range rows {
if err := streamqueryzTmpl.Execute(w, rows[i]); err != nil {

Просмотреть файл

@ -14,6 +14,7 @@ import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/acl"
"github.com/youtube/vitess/go/vt/callerid"
"github.com/youtube/vitess/go/vt/logz"
querypb "github.com/youtube/vitess/go/vt/proto/query"
vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc"
@ -74,8 +75,8 @@ func txlogzHandler(w http.ResponseWriter, req *http.Request) {
timeout, limit := parseTimeoutLimitParams(req)
ch := TxLogger.Subscribe("txlogz")
defer TxLogger.Unsubscribe(ch)
startHTMLTable(w)
defer endHTMLTable(w)
logz.StartHTMLTable(w)
defer logz.EndHTMLTable(w)
w.Write(txlogzHeader)
tmr := time.NewTimer(timeout)

Просмотреть файл

@ -7,12 +7,14 @@ package main
import (
"flag"
"math/rand"
"net/http"
"sync"
"testing"
"time"
"github.com/youtube/vitess/go/vt/discovery"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/tabletserver/grpcqueryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice/fakes"
@ -49,7 +51,7 @@ import (
var (
rate = flag.Int64("rate", 1000, "maximum rate of the throttled demo server at the start")
duration = flag.Duration("duration", 600*time.Second, "total duration the demo runs")
lagUpdateInterval = flag.Duration("lag_update_interval", 1*time.Second, "interval at which the current replication lag will be broadcasted to the throttler")
lagUpdateInterval = flag.Duration("lag_update_interval", 5*time.Second, "interval at which the current replication lag will be broadcasted to the throttler")
replicaDegrationInterval = flag.Duration("replica_degration_interval", 0*time.Second, "simulate a throughput degration of the replica every X interval (i.e. the replica applies transactions at a slower rate for -reparent_duration and the replication lag might go up)")
replicaDegrationDuration = flag.Duration("replica_degration_duration", 10*time.Second, "duration a simulated degration should take")
)
@ -271,6 +273,11 @@ func (c *client) StatsUpdate(ts *discovery.TabletStats) {
func main() {
flag.Parse()
go servenv.RunDefault()
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/throttlerz", http.StatusTemporaryRedirect)
})
log.Infof("start rate set to: %v", *rate)
replica := newReplica(*lagUpdateInterval, *replicaDegrationInterval, *replicaDegrationDuration)
master := &master{replica: replica}
@ -281,3 +288,7 @@ func main() {
client.stop()
replica.stop()
}
func init() {
servenv.RegisterDefaultFlags()
}

Просмотреть файл

@ -99,13 +99,10 @@ func (m *managerImpl) SetMaxRate(rate int64) []string {
m.mu.Lock()
defer m.mu.Unlock()
var names []string
for name, t := range m.throttlers {
for _, t := range m.throttlers {
t.SetMaxRate(rate)
names = append(names, name)
}
sort.Strings(names)
return names
return m.throttlerNamesLocked()
}
// GetConfiguration implements the "Manager" interface.
@ -150,15 +147,12 @@ func (m *managerImpl) UpdateConfiguration(throttlerName string, configuration *t
return []string{throttlerName}, nil
}
var names []string
for name, t := range m.throttlers {
if err := t.UpdateConfiguration(configuration, copyZeroValues); err != nil {
return nil, fmt.Errorf("failed to update throttler: %v err: %v", name, err)
}
names = append(names, name)
}
sort.Strings(names)
return names, nil
return m.throttlerNamesLocked(), nil
}
// ResetConfiguration implements the "Manager" interface.
@ -175,11 +169,39 @@ func (m *managerImpl) ResetConfiguration(throttlerName string) ([]string, error)
return []string{throttlerName}, nil
}
var names []string
for name, t := range m.throttlers {
for _, t := range m.throttlers {
t.ResetConfiguration()
names = append(names, name)
}
return m.throttlerNamesLocked(), nil
}
// Throttlers returns the sorted list of active throttlers.
func (m *managerImpl) Throttlers() []string {
m.mu.Lock()
defer m.mu.Unlock()
return m.throttlerNamesLocked()
}
func (m *managerImpl) throttlerNamesLocked() []string {
var names []string
for k := range m.throttlers {
names = append(names, k)
}
sort.Strings(names)
return names, nil
return names
}
// Log returns the most recent changes of the MaxReplicationLag module.
// There will be one result for each processed replication lag record.
func (m *managerImpl) Log(throttlerName string) ([]result, error) {
m.mu.Lock()
defer m.mu.Unlock()
t, ok := m.throttlers[throttlerName]
if !ok {
return nil, fmt.Errorf("throttler: %v does not exist", throttlerName)
}
return t.Log(), nil
}

Просмотреть файл

@ -161,13 +161,13 @@ func TestManager_UpdateConfiguration_Error(t *testing.T) {
// Check that errors from Verify() are correctly propagated.
invalidConfig := &throttlerdata.Configuration{
// max < target lag is not allowed.
MaxReplicationLagSec: defaultTargetLag - 1,
// max < 2 is not allowed.
MaxReplicationLagSec: 1,
}
if _, err := f.m.UpdateConfiguration("t2", invalidConfig, false /* copyZeroValues */); err == nil {
t.Fatal("expected error but got nil")
} else {
want := "target replication lag must not be higher than the configured max replication lag"
want := "max_replication_lag_sec must be >= 2"
if !strings.Contains(err.Error(), want) {
t.Fatalf("received wrong error. got = %v, want contains = %v", err, want)
}
@ -211,23 +211,21 @@ func TestManager_UpdateConfiguration_ZeroValues(t *testing.T) {
defer f.tearDown()
// Test the explicit copy of zero values.
zeroValueConfig := &throttlerdata.Configuration{
// TargetReplicationLagSec will be zero too because we omitted it here.
IgnoreNSlowestReplicas: 0,
}
names, err := f.m.UpdateConfiguration("t2", zeroValueConfig, true /* copyZeroValues */)
zeroValueConfig := defaultMaxReplicationLagModuleConfig.Configuration
zeroValueConfig.IgnoreNSlowestReplicas = 0
names, err := f.m.UpdateConfiguration("t2", &zeroValueConfig, true /* copyZeroValues */)
if err != nil {
t.Fatal(err)
}
if err := checkConfig(f.m, []string{"t2"}, names, 0, 0); err != nil {
if err := checkConfig(f.m, []string{"t2"}, names, defaultTargetLag, 0); err != nil {
t.Fatal(err)
}
// Repeat test for all throttlers.
allNames, err := f.m.UpdateConfiguration("" /* all */, zeroValueConfig, true /* copyZeroValues */)
allNames, err := f.m.UpdateConfiguration("" /* all */, &zeroValueConfig, true /* copyZeroValues */)
if err != nil {
t.Fatal(err)
}
if err := checkConfig(f.m, []string{"t1", "t2"}, allNames, 0, 0); err != nil {
if err := checkConfig(f.m, []string{"t1", "t2"}, allNames, defaultTargetLag, 0); err != nil {
t.Fatal(err)
}
}

Просмотреть файл

@ -18,12 +18,12 @@ import (
"github.com/golang/protobuf/proto"
)
type state int
type state string
const (
increaseRate state = iota
decreaseAndGuessRate
emergency
stateIncreaseRate state = "I"
stateDecreaseAndGuessRate = "D"
stateEmergency = "E"
)
type replicationLagChange int
@ -93,6 +93,9 @@ type MaxReplicationLagModule struct {
// listener. ProcessRecords() will process them.
lagRecords chan replicationLagRecord
wg sync.WaitGroup
// results caches the results of the latest processed replication lag records.
results *resultRing
}
// NewMaxReplicationLagModule will create a new module instance and set the
@ -113,14 +116,16 @@ func NewMaxReplicationLagModule(config MaxReplicationLagModuleConfig, actualRate
applyMutableConfig: true,
// Always start off with a non-zero rate because zero means all requests
// get throttled.
rate: sync2.NewAtomicInt64(rate),
memory: newMemory(memoryGranularity),
nowFunc: nowFunc,
lagRecords: make(chan replicationLagRecord, 10),
rate: sync2.NewAtomicInt64(rate),
currentState: stateIncreaseRate,
memory: newMemory(memoryGranularity),
nowFunc: nowFunc,
lagRecords: make(chan replicationLagRecord, 10),
// Prevent an immediately increase of the initial rate.
nextAllowedIncrease: nowFunc().Add(config.MaxDurationBetweenIncreases()),
actualRatesHistory: actualRatesHistory,
lagCache: newReplicationLagCache(1000),
results: newResultRing(1000),
}
// Enforce a config update.
@ -251,25 +256,45 @@ func (m *MaxReplicationLagModule) recalculateRate(lagRecordNow replicationLagRec
}
lagNow := lagRecordNow.lag()
r := result{
Now: now,
RateChange: unchangedRate,
lastRateChange: m.lastRateChange,
OldState: m.currentState,
NewState: m.currentState,
OldRate: m.rate.Get(),
NewRate: m.rate.Get(),
LagRecordNow: lagRecordNow,
}
if lagNow <= m.config.TargetReplicationLagSec {
// Lag in range: [0, target]
m.increaseRate(now, lagRecordNow)
r.TestedState = stateIncreaseRate
m.increaseRate(&r, now, lagRecordNow)
} else if lagNow <= m.config.MaxReplicationLagSec {
// Lag in range: (target, max]
m.decreaseAndGuessRate(now, lagRecordNow)
r.TestedState = stateDecreaseAndGuessRate
m.decreaseAndGuessRate(&r, now, lagRecordNow)
} else {
// Lag in range: (max, infinite]
m.emergency(now, lagRecordNow)
r.TestedState = stateEmergency
m.emergency(&r, now, lagRecordNow)
}
r.HighestGood = m.memory.highestGood()
r.LowestBad = m.memory.lowestBad()
log.Infof("%v", r)
m.results.add(r)
}
func (m *MaxReplicationLagModule) increaseRate(now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) increaseRate(r *result, now time.Time, lagRecordNow replicationLagRecord) {
// Any increase has to wait for a previous decrease first.
if !m.nextAllowedDecrease.IsZero() && now.Before(m.nextAllowedDecrease) {
r.Reason = fmt.Sprintf("did not increase the rate because we're waiting %.1f more seconds for a previous decrease", m.nextAllowedDecrease.Sub(now).Seconds())
return
}
// Increase rate again only if the last increase was in effect long enough.
if !m.nextAllowedIncrease.IsZero() && now.Before(m.nextAllowedIncrease) {
r.Reason = fmt.Sprintf("did not increase the rate because we're waiting %.1f more seconds for a previous increase", m.nextAllowedIncrease.Sub(now).Seconds())
return
}
@ -281,15 +306,16 @@ func (m *MaxReplicationLagModule) increaseRate(now time.Time, lagRecordNow repli
// a) it's still tracked
// b) its LastError is not set
// c) it has not become a slow, ignored replica
r := m.lagCache.latest(m.replicaUnderIncreaseTest)
if !r.isZero() && r.LastError == nil && !m.lagCache.isIgnored(m.replicaUnderIncreaseTest) {
lr := m.lagCache.latest(m.replicaUnderIncreaseTest)
if !lr.isZero() && lr.LastError == nil && !m.lagCache.isIgnored(m.replicaUnderIncreaseTest) {
r.Reason = fmt.Sprintf("did not increase the rate because we're waiting for the next lag record from replica: %v", m.replicaUnderIncreaseTest)
return
}
}
oldRate := m.rate.Get()
m.markCurrentRateAsBadOrGood(now, increaseRate, unknown)
m.markCurrentRateAsBadOrGood(r, now, stateIncreaseRate, unknown)
m.resetCurrentState(now)
// Calculate new rate based on the previous (preferrably actual) rate.
@ -327,7 +353,7 @@ func (m *MaxReplicationLagModule) increaseRate(now time.Time, lagRecordNow repli
if lowestBad != 0 {
if rate > lowestBad {
// New rate will be the middle value of [previous rate, lowest bad rate].
rate -= (lowestBad - previousRate) / 2
rate = previousRate + (lowestBad-previousRate)/2
increaseReason += fmt.Sprintf(" (but limited to the middle value in the range [previous rate, lowest bad rate]: [%.0f, %.0f]", previousRate, lowestBad)
}
}
@ -335,8 +361,8 @@ func (m *MaxReplicationLagModule) increaseRate(now time.Time, lagRecordNow repli
increase := (rate - previousRate) / previousRate
m.updateNextAllowedIncrease(now, increase, lagRecordNow.Key)
reason := fmt.Sprintf("periodic increase of the %v from %d to %d (by %.1f%%) based on %v to find out the maximum - next allowed increase in %.0f seconds",
previousRateSource, oldRate, int64(rate), increase*100, increaseReason, m.nextAllowedIncrease.Sub(now).Seconds())
m.updateRate(increaseRate, int64(rate), reason, now, lagRecordNow)
previousRateSource, int64(previousRate), int64(rate), increase*100, increaseReason, m.nextAllowedIncrease.Sub(now).Seconds())
m.updateRate(r, stateIncreaseRate, int64(rate), reason, now, lagRecordNow)
}
func (m *MaxReplicationLagModule) updateNextAllowedIncrease(now time.Time, increase float64, key string) {
@ -367,9 +393,10 @@ func (m *MaxReplicationLagModule) updateNextAllowedIncrease(now time.Time, incre
m.replicaUnderIncreaseTest = key
}
func (m *MaxReplicationLagModule) decreaseAndGuessRate(now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, lagRecordNow replicationLagRecord) {
// Decrease the rate only if the last decrease was in effect long enough.
if !m.nextAllowedDecrease.IsZero() && now.Before(m.nextAllowedDecrease) {
r.Reason = fmt.Sprintf("did not decrease the rate because we're waiting %.1f more seconds for a previous decrease", m.nextAllowedDecrease.Sub(now).Seconds())
return
}
@ -380,12 +407,15 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(now time.Time, lagRecordN
// We don't know the replication lag of this replica since the last rate
// change. Without it we won't be able to guess the slave rate.
// Therefore, we'll stay in the current state and wait for more records.
r.Reason = "no previous lag record for this replica since the last rate change"
return
}
// Store the record in the result.
r.LagRecordBefore = lagRecordBefore
if lagRecordBefore.time == lagRecordNow.time {
// First record is the same as the last record. Not possible to calculate a
// diff. Wait for the next health stats update.
r.Reason = "no previous lag record available"
return
}
@ -401,11 +431,12 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(now time.Time, lagRecordN
} else if lagNow > lagBefore {
replicationLagChange = greater
}
m.markCurrentRateAsBadOrGood(now, decreaseAndGuessRate, replicationLagChange)
m.markCurrentRateAsBadOrGood(r, now, stateDecreaseAndGuessRate, replicationLagChange)
m.resetCurrentState(now)
if replicationLagChange == equal {
// The replication lag did not change. Keep going at the current rate.
r.Reason = fmt.Sprintf("did not decrease the rate because the lag did not change (assuming a 1s error margin)")
return
}
@ -417,6 +448,7 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(now time.Time, lagRecordN
if math.IsNaN(avgMasterRate) {
// NaN (0.0/0.0) occurs when no observations were in the timespan.
// Wait for more rate observations.
r.Reason = fmt.Sprintf("did not decrease the rate because the throttler has not recorded its historic rates in the range [%v , %v]", from.Format("15:04:05"), to.Format("15:04:05"))
return
}
@ -429,19 +461,26 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(now time.Time, lagRecordN
}
// Guess the slave capacity based on the replication lag change.
rate := m.guessSlaveRate(avgMasterRate, lagBefore, lagNow, lagDifference, d)
rate, reason := m.guessSlaveRate(r, avgMasterRate, lagBefore, lagNow, lagDifference, d)
m.nextAllowedDecrease = now.Add(m.config.MinDurationBetweenChanges() + 2*time.Second)
reason := "reaction to replication lag change"
m.updateRate(decreaseAndGuessRate, rate, reason, now, lagRecordNow)
m.updateRate(r, stateDecreaseAndGuessRate, rate, reason, now, lagRecordNow)
}
func (m *MaxReplicationLagModule) guessSlaveRate(avgMasterRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) int64 {
// guessSlaveRate guesses the actual slave rate based on the new bac
// Note that "lagDifference" can be positive (lag increased) or negative (lag
// decreased).
func (m *MaxReplicationLagModule) guessSlaveRate(r *result, avgMasterRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) {
// avgSlaveRate is the average rate (per second) at which the slave
// applied transactions from the replication stream. We infer the value
// from the relative change in the replication lag.
avgSlaveRate := avgMasterRate * (d - lagDifference).Seconds() / d.Seconds()
log.Infof("d : %v lag diff: %v master: %v slave: %v", d, lagDifference, avgMasterRate, avgSlaveRate)
if avgSlaveRate <= 0 {
log.Warningf("guessed slave rate was <= 0 (%v). master rate: %v d: %.1f lag difference: %.1f", avgSlaveRate, avgMasterRate, d.Seconds(), lagDifference.Seconds())
avgSlaveRate = 1
}
r.MasterRate = int64(avgMasterRate)
r.GuessedSlaveRate = int64(avgSlaveRate)
oldRequestsBehind := 0.0
// If the old lag was > 0s, the slave needs to catch up on that as well.
@ -455,46 +494,63 @@ func (m *MaxReplicationLagModule) guessSlaveRate(avgMasterRate float64, lagBefor
newRequestsBehind = (avgMasterRate - avgSlaveRate) * d.Seconds()
}
requestsBehind := oldRequestsBehind + newRequestsBehind
log.Infof("old reqs: %v new reqs: %v total: %v", oldRequestsBehind, newRequestsBehind, requestsBehind)
r.GuessedSlaveBacklogOld = int(oldRequestsBehind)
r.GuessedSlaveBacklogNew = int(newRequestsBehind)
newRate := avgSlaveRate
// Reduce the new rate such that it has time to catch up the requests it's
// behind within the next interval.
futureRequests := avgSlaveRate * m.config.MinDurationBetweenChanges().Seconds()
if futureRequests > 0 {
avgSlaveRate *= (futureRequests - requestsBehind) / futureRequests
if avgSlaveRate < 1 {
// Backlog is too high. Reduce rate to 1 request/second.
avgSlaveRate = 1.0
}
log.Infof("slave after future reqs adj: %v", avgSlaveRate)
futureRequests := newRate * m.config.MinDurationBetweenChanges().Seconds()
newRate *= (futureRequests - requestsBehind) / futureRequests
var reason string
if newRate < 1 {
// Backlog is too high. Reduce rate to 1 request/second.
// TODO(mberlin): Make this a constant.
newRate = 1
reason = fmt.Sprintf("based on the guessed slave rate of: %v the slave won't be able to process the guessed backlog of %d requests within the next %.f seconds", avgSlaveRate, int64(requestsBehind), m.config.MinDurationBetweenChanges().Seconds())
} else {
reason = fmt.Sprintf("new rate is %d lower than the guessed slave rate to account for a guessed backlog of %d requests over %.f seconds", int64(avgSlaveRate-newRate), int64(requestsBehind), m.config.MinDurationBetweenChanges().Seconds())
}
return int64(avgSlaveRate)
return int64(newRate), reason
}
func (m *MaxReplicationLagModule) emergency(now time.Time, lagRecordNow replicationLagRecord) {
m.markCurrentRateAsBadOrGood(now, emergency, unknown)
func (m *MaxReplicationLagModule) emergency(r *result, now time.Time, lagRecordNow replicationLagRecord) {
m.markCurrentRateAsBadOrGood(r, now, stateEmergency, unknown)
m.resetCurrentState(now)
oldRate := m.rate.Get()
rate := int64(float64(oldRate) * m.config.EmergencyDecrease)
if rate == 0 {
// Never fully stop throttling.
rate = 1
}
reason := fmt.Sprintf("replication lag went beyond max: %d > %d reducing previous rate by %.f%% to: %v", lagRecordNow.lag(), m.config.MaxReplicationLagSec, m.config.EmergencyDecrease*100, rate)
m.updateRate(emergency, rate, reason, now, lagRecordNow)
reason := fmt.Sprintf("replication lag went beyond max: %d > %d reducing previous rate of %d by %.f%% to: %v", lagRecordNow.lag(), m.config.MaxReplicationLagSec, oldRate, m.config.EmergencyDecrease*100, rate)
m.updateRate(r, stateEmergency, rate, reason, now, lagRecordNow)
}
func (m *MaxReplicationLagModule) updateRate(newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord) {
oldRate := m.rate.Get()
m.currentState = newState
m.lastRateChange = now
m.lastRateChangeReason = reason
// Update result with the new state.
r.NewState = newState
r.NewRate = rate
r.Reason = reason
if rate > oldRate {
r.RateChange = increasedRate
} else if rate < oldRate {
r.RateChange = decreasedRate
}
if rate == oldRate {
return
}
log.Infof("updated rate from: %v to: %v reason: %v", oldRate, rate, m.lastRateChangeReason)
m.rate.Set(int64(rate))
// Notify the throttler that we updated our max rate.
m.rateUpdateChan <- struct{}{}
@ -502,9 +558,11 @@ func (m *MaxReplicationLagModule) updateRate(newState state, rate int64, reason
// markCurrentRateAsBadOrGood determines the actual rate between the last rate
// change and "now" and determines if that rate was bad or good.
func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(now time.Time, newState state, replicationLagChange replicationLagChange) {
func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time.Time, newState state, replicationLagChange replicationLagChange) {
if m.lastRateChange.IsZero() {
// Module was just started. We don't have any data points yet.
r.GoodOrBad = ignoredRate
r.MemorySkipReason = "rate was never changed before (initial start)"
return
}
@ -513,28 +571,30 @@ func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(now time.Time, newS
// set by a different module and not us.)
rate := m.actualRatesHistory.average(m.lastRateChange, now)
if math.IsNaN(rate) {
// NaN (0.0/0.0) occurs when no observations were in the timespan.
// Wait for more observations.
// NaN (0.0/0.0) occurs when no records were in the timespan.
// Wait for more records.
r.GoodOrBad = ignoredRate
r.MemorySkipReason = "cannot determine actual rate: no records in [lastRateChange, now]"
return
}
rateIsGood := false
switch m.currentState {
case increaseRate:
case stateIncreaseRate:
switch newState {
case increaseRate:
case stateIncreaseRate:
rateIsGood = true
case decreaseAndGuessRate:
case stateDecreaseAndGuessRate:
rateIsGood = false
case emergency:
case stateEmergency:
rateIsGood = false
}
case decreaseAndGuessRate:
case stateDecreaseAndGuessRate:
switch newState {
case increaseRate:
case stateIncreaseRate:
rateIsGood = true
case decreaseAndGuessRate:
case stateDecreaseAndGuessRate:
switch replicationLagChange {
case unknown:
return
@ -546,20 +606,28 @@ func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(now time.Time, newS
case greater:
rateIsGood = false
}
case emergency:
case stateEmergency:
rateIsGood = false
}
case emergency:
case stateEmergency:
// Rate changes initiated during an "emergency" phase provide no meaningful data point.
r.MemorySkipReason = "not marking a rate as good or bad while in the emergency state"
return
}
r.CurrentRate = int64(rate)
if rateIsGood {
log.Infof("marking rate %.f as good state: %v", rate, m.currentState)
m.memory.markGood(int64(rate))
if err := m.memory.markGood(int64(rate)); err == nil {
r.GoodOrBad = goodRate
} else {
r.MemorySkipReason = err.Error()
}
} else {
log.Infof("marking rate %.f as bad state: %v", rate, m.currentState)
m.memory.markBad(int64(rate))
if err := m.memory.markBad(int64(rate)); err == nil {
r.GoodOrBad = badRate
} else {
r.MemorySkipReason = err.Error()
}
}
}
@ -567,9 +635,13 @@ func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(now time.Time, newS
// reset.
func (m *MaxReplicationLagModule) resetCurrentState(now time.Time) {
switch m.currentState {
case increaseRate:
case stateIncreaseRate:
m.nextAllowedIncrease = now
case decreaseAndGuessRate:
case stateDecreaseAndGuessRate:
m.nextAllowedDecrease = now
}
}
func (m *MaxReplicationLagModule) log() []result {
return m.results.latestValues()
}

Просмотреть файл

@ -45,10 +45,34 @@ func NewMaxReplicationLagModuleConfig(maxReplicationLag int64) MaxReplicationLag
// Verify returns an error if the config is invalid.
func (c MaxReplicationLagModuleConfig) Verify() error {
if c.TargetReplicationLagSec < 1 {
return fmt.Errorf("target_replication_lag_sec must be >= 1")
}
if c.MaxReplicationLagSec < 2 {
return fmt.Errorf("max_replication_lag_sec must be >= 2")
}
if c.TargetReplicationLagSec > c.MaxReplicationLagSec {
return fmt.Errorf("target replication lag must not be higher than the configured max replication lag: invalid: %v > %v",
return fmt.Errorf("target_replication_lag_sec must not be higher than max_replication_lag_sec: invalid: %v > %v",
c.TargetReplicationLagSec, c.MaxReplicationLagSec)
}
if c.InitialRate < 1 {
return fmt.Errorf("initial_rate must be >= 1")
}
if c.MaxIncrease <= 0 {
return fmt.Errorf("max_increase must be > 0")
}
if c.EmergencyDecrease <= 0 {
return fmt.Errorf("emergency_decrease must be > 0")
}
if c.MinDurationBetweenChangesSec < 1 {
return fmt.Errorf("min_duration_between_changes_sec must be >= 1")
}
if c.MaxDurationBetweenIncreasesSec < 1 {
return fmt.Errorf("max_duration_between_increases_sec must be >= 1")
}
if c.IgnoreNSlowestReplicas < 0 {
return fmt.Errorf("ignore_n_slowest_replicas must be >= 0")
}
return nil
}

Просмотреть файл

@ -83,7 +83,7 @@ func TestMaxReplicationLagModule_RateNotZeroWhenDisabled(t *testing.T) {
}
// Initial rate must not be zero. It's ReplicationLagModuleDisabled instead.
if err := tf.checkState(increaseRate, ReplicationLagModuleDisabled, sinceZero(0*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, ReplicationLagModuleDisabled, sinceZero(0*time.Second)); err != nil {
t.Fatal(err)
}
}
@ -99,7 +99,7 @@ func TestMaxReplicationLagModule_InitialStateAndWait(t *testing.T) {
}
// Initial rate must be config.InitialRate.
if err := tf.checkState(increaseRate, config.InitialRate, sinceZero(0*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, config.InitialRate, sinceZero(0*time.Second)); err != nil {
t.Fatal(err)
}
// After startup, the next increment won't happen until
@ -118,7 +118,7 @@ func TestMaxReplicationLagModule_Increase(t *testing.T) {
}
// We start at config.InitialRate.
if err := tf.checkState(increaseRate, 100, sinceZero(0*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 100, sinceZero(0*time.Second)); err != nil {
t.Fatal(err)
}
// After the initial wait period of 62s (config.MaxDurationBetweenChangesSec),
@ -128,7 +128,7 @@ func TestMaxReplicationLagModule_Increase(t *testing.T) {
tf.ratesHistory.add(sinceZero(69*time.Second), 100)
tf.process(lagRecord(sinceZero(70*time.Second), r2, 0))
// Rate was increased to 200 based on actual rate of 100 within [0s, 69s].
if err := tf.checkState(increaseRate, 200, sinceZero(70*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil {
t.Fatal(err)
}
// We have to wait at least config.MinDurationBetweenChangesSec (10s) before
@ -141,7 +141,7 @@ func TestMaxReplicationLagModule_Increase(t *testing.T) {
tf.ratesHistory.add(sinceZero(74*time.Second), 200)
tf.process(lagRecord(sinceZero(75*time.Second), r2, 0))
// Lag record was ignored because it's within the wait period.
if err := tf.checkState(increaseRate, 200, sinceZero(70*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil {
t.Fatal(err)
}
@ -150,7 +150,7 @@ func TestMaxReplicationLagModule_Increase(t *testing.T) {
tf.process(lagRecord(sinceZero(80*time.Second), r1, 0))
// The r1 lag update was ignored because an increment "under test" is always
// locked in with the replica which triggered the increase (r2 this time).
if err := tf.checkState(increaseRate, 200, sinceZero(70*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil {
t.Fatal(err)
}
@ -158,7 +158,7 @@ func TestMaxReplicationLagModule_Increase(t *testing.T) {
tf.ratesHistory.add(sinceZero(80*time.Second), 200)
tf.ratesHistory.add(sinceZero(89*time.Second), 200)
tf.process(lagRecord(sinceZero(90*time.Second), r2, 0))
if err := tf.checkState(increaseRate, 400, sinceZero(90*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 400, sinceZero(90*time.Second)); err != nil {
t.Fatal(err)
}
}
@ -177,7 +177,7 @@ func TestMaxReplicationLagModule_Increase_LastErrorOrNotUp(t *testing.T) {
tf.ratesHistory.add(sinceZero(69*time.Second), 100)
tf.process(lagRecord(sinceZero(70*time.Second), r2, 0))
// Rate was increased to 200 based on actual rate of 100 within [0s, 69s].
if err := tf.checkState(increaseRate, 200, sinceZero(70*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil {
t.Fatal(err)
}
@ -192,7 +192,7 @@ func TestMaxReplicationLagModule_Increase_LastErrorOrNotUp(t *testing.T) {
tf.process(lagRecord(sinceZero(80*time.Second), r1, 0))
// The r1 lag update triggered an increase and did not wait for r2
// because r2 has LastError set.
if err := tf.checkState(increaseRate, 400, sinceZero(80*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 400, sinceZero(80*time.Second)); err != nil {
t.Fatal(err)
}
@ -210,7 +210,7 @@ func TestMaxReplicationLagModule_Increase_LastErrorOrNotUp(t *testing.T) {
tf.process(lagRecord(sinceZero(90*time.Second), r2, 0))
// The r1 lag update triggered an increase and did not wait for r2
// because r2 has !Up set.
if err := tf.checkState(increaseRate, 800, sinceZero(90*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 800, sinceZero(90*time.Second)); err != nil {
t.Fatal(err)
}
}
@ -227,7 +227,7 @@ func TestMaxReplicationLagModule_Decrease(t *testing.T) {
tf.ratesHistory.add(sinceZero(69*time.Second), 100)
tf.process(lagRecord(sinceZero(70*time.Second), r2, 0))
// Rate was increased to 200 based on actual rate of 100 within [0s, 69s].
if err := tf.checkState(increaseRate, 200, sinceZero(70*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil {
t.Fatal(err)
}
@ -244,7 +244,7 @@ func TestMaxReplicationLagModule_Decrease(t *testing.T) {
// Since this backlog is spread across MinDurationBetweenChangesSec (10s),
// the guessed rate gets further reduced by 60 QPS (600 queries / 10s).
// Hence, the rate is set to 110 QPS (170 - 60).
if err := tf.checkState(decreaseAndGuessRate, 110, sinceZero(90*time.Second)); err != nil {
if err := tf.checkState(stateDecreaseAndGuessRate, 110, sinceZero(90*time.Second)); err != nil {
t.Fatal(err)
}
}
@ -263,7 +263,7 @@ func TestMaxReplicationLagModule_Decrease_NoReplicaHistory(t *testing.T) {
tf.ratesHistory.add(sinceZero(69*time.Second), 100)
tf.process(lagRecord(sinceZero(70*time.Second), r2, 0))
// Rate was increased to 200 based on actual rate of 100 within [0s, 69s].
if err := tf.checkState(increaseRate, 200, sinceZero(70*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil {
t.Fatal(err)
}
@ -272,7 +272,7 @@ func TestMaxReplicationLagModule_Decrease_NoReplicaHistory(t *testing.T) {
tf.ratesHistory.add(sinceZero(79*time.Second), 200)
tf.process(lagRecord(sinceZero(80*time.Second), r1, uint32(tf.m.config.TargetReplicationLagSec+1)))
// Rate was not decreased because r1 has no lag record @ 70s or higher.
if err := tf.checkState(increaseRate, 200, sinceZero(70*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil {
t.Fatal(err)
}
@ -290,7 +290,7 @@ func TestMaxReplicationLagModule_Decrease_NoReplicaHistory(t *testing.T) {
// Since this backlog is spread across MinDurationBetweenChangesSec (10s),
// the guessed rate gets further reduced by 120 QPS (1200 queries / 10s).
// Hence, the rate is set to 20 QPS.
if err := tf.checkState(decreaseAndGuessRate, 20, sinceZero(90*time.Second)); err != nil {
if err := tf.checkState(stateDecreaseAndGuessRate, 20, sinceZero(90*time.Second)); err != nil {
t.Fatal(err)
}
}
@ -305,7 +305,7 @@ func TestMaxReplicationLagModule_IgnoreNSlowestReplicas(t *testing.T) {
// r1 @ 0s, 0s lag
tf.process(lagRecord(sinceZero(0*time.Second), r1, 0))
if err := tf.checkState(increaseRate, 100, sinceZero(0*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 100, sinceZero(0*time.Second)); err != nil {
t.Fatal(err)
}
@ -313,7 +313,7 @@ func TestMaxReplicationLagModule_IgnoreNSlowestReplicas(t *testing.T) {
tf.ratesHistory.add(sinceZero(9*time.Second), 100)
tf.process(lagRecord(sinceZero(10*time.Second), r2, 10))
// Although r2's lag is high, it's ignored because it's the 1 slowest replica.
if err := tf.checkState(increaseRate, 100, sinceZero(0*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 100, sinceZero(0*time.Second)); err != nil {
t.Fatal(err)
}
@ -322,7 +322,7 @@ func TestMaxReplicationLagModule_IgnoreNSlowestReplicas(t *testing.T) {
tf.process(lagRecord(sinceZero(20*time.Second), r1, 20))
// r1 would become the new 1 slowest replica. However, we do not ignore it
// because then we would ignore all known replicas in a row.
if err := tf.checkState(emergency, 50, sinceZero(20*time.Second)); err != nil {
if err := tf.checkState(stateEmergency, 50, sinceZero(20*time.Second)); err != nil {
t.Fatal(err)
}
}
@ -340,7 +340,7 @@ func TestMaxReplicationLagModule_IgnoreNSlowestReplicas_NotEnoughReplicas(t *tes
tf.process(lagRecord(sinceZero(10*time.Second), r2, 10))
// r2 is the 1 slowest replica. However, it's not ignored because then we
// would ignore all replicas. Therefore, we react to its lag increase.
if err := tf.checkState(emergency, 50, sinceZero(10*time.Second)); err != nil {
if err := tf.checkState(stateEmergency, 50, sinceZero(10*time.Second)); err != nil {
t.Fatal(err)
}
}
@ -363,7 +363,7 @@ func TestMaxReplicationLagModule_IgnoreNSlowestReplicas_IsIgnoredDuringIncrease(
tf.ratesHistory.add(sinceZero(69*time.Second), 100)
tf.process(lagRecord(sinceZero(70*time.Second), r2, 0))
// Rate was increased to 200 based on actual rate of 100 within [0s, 69s].
if err := tf.checkState(increaseRate, 200, sinceZero(70*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil {
t.Fatal(err)
}
@ -372,7 +372,7 @@ func TestMaxReplicationLagModule_IgnoreNSlowestReplicas_IsIgnoredDuringIncrease(
tf.ratesHistory.add(sinceZero(79*time.Second), 200)
tf.process(lagRecord(sinceZero(80*time.Second), r1, 0))
// Lag record was ignored because it's within the wait period.
if err := tf.checkState(increaseRate, 200, sinceZero(70*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil {
t.Fatal(err)
}
@ -381,7 +381,7 @@ func TestMaxReplicationLagModule_IgnoreNSlowestReplicas_IsIgnoredDuringIncrease(
tf.ratesHistory.add(sinceZero(89*time.Second), 200)
tf.m.lagCache.add(lagRecord(sinceZero(90*time.Second), r2, 10))
// We ignore the 1 slowest replica and do not decrease despite r2's high lag.
if err := tf.checkState(increaseRate, 200, sinceZero(70*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil {
t.Fatal(err)
}
@ -390,7 +390,7 @@ func TestMaxReplicationLagModule_IgnoreNSlowestReplicas_IsIgnoredDuringIncrease(
tf.process(lagRecord(sinceZero(100*time.Second), r1, 0))
// Meanwhile, r1 is doing fine and will trigger the next increase because
// we're no longer waiting for the ignored r2.
if err := tf.checkState(increaseRate, 400, sinceZero(100*time.Second)); err != nil {
if err := tf.checkState(stateIncreaseRate, 400, sinceZero(100*time.Second)); err != nil {
t.Fatal(err)
}
}

Просмотреть файл

@ -5,9 +5,8 @@
package throttler
import (
"fmt"
"sort"
log "github.com/golang/glog"
)
// memory tracks "good" and "bad" throttler rates where good rates are below
@ -48,30 +47,30 @@ func searchInt64s(a []int64, x int64) int {
return sort.Search(len(a), func(i int) bool { return a[i] >= x })
}
func (m *memory) markGood(rate int64) {
func (m *memory) markGood(rate int64) error {
rate = m.roundDown(rate)
if lowestBad := m.lowestBad(); lowestBad != 0 && rate > lowestBad {
log.Warningf("markGood(): ignoring higher good rate of %v because we assume that the known maximum capacity (currently at %v) can only degrade.", rate, lowestBad)
return
return fmt.Errorf("ignoring higher good rate of %v because we assume that the known maximum capacity (currently at %v) can only degrade", rate, lowestBad)
}
// Skip rates which already exist.
i := searchInt64s(m.good, rate)
if i < len(m.good) && m.good[i] == rate {
return
return nil
}
m.good = append(m.good, rate)
sort.Sort(int64Slice(m.good))
return nil
}
func (m *memory) markBad(rate int64) {
func (m *memory) markBad(rate int64) error {
rate = m.roundDown(rate)
// Ignore higher bad rates than the current one.
if m.bad != 0 && rate >= m.bad {
return
return nil
}
// Ignore bad rates which are too drastic. This prevents that temporary
@ -82,8 +81,7 @@ func (m *memory) markBad(rate int64) {
decrease := float64(highestGood) - float64(rate)
degradation := decrease / float64(highestGood)
if degradation > 0.1 {
log.Warningf("markBad(): ignoring lower bad rate of %v because such a high degradation (%.1f%%) is unlikely (current highest good: %v)", rate, degradation*100, highestGood)
return
return fmt.Errorf("ignoring lower bad rate of %v because such a high degradation (%.1f%%) is unlikely (current highest good: %v)", rate, degradation*100, highestGood)
}
}
@ -100,6 +98,7 @@ func (m *memory) markBad(rate int64) {
m.good = m.good[:goodLength]
m.bad = rate
return nil
}
func (m *memory) highestGood() int64 {

Просмотреть файл

@ -47,7 +47,9 @@ func TestMemory(t *testing.T) {
}
// a good 601 will be ignored because the first bad is at 300.
m.markGood(601)
if err := m.markGood(601); err == nil {
t.Fatal("good rates cannot go beyond the lowest bad rate: should have returned an error")
}
if got := m.lowestBad(); got != want300 {
t.Fatalf("good rates cannot go beyond the lowest bad rate: got = %v, want = %v", got, want300)
}
@ -78,7 +80,9 @@ func TestMemory_markDownIgnoresDrasticBadValues(t *testing.T) {
t.Fatalf("bad rate was not correctly inserted: got = %v, want = %v", got, bad)
}
m.markBad(500)
if err := m.markBad(500); err == nil {
t.Fatal("bad rate should have been ignored and an error should have been returned")
}
if got := m.highestGood(); got != good {
t.Fatalf("bad rate should have been ignored: got = %v, want = %v", got, good)
}

160
go/vt/throttler/result.go Normal file
Просмотреть файл

@ -0,0 +1,160 @@
package throttler
import (
"bytes"
"fmt"
"sync"
"text/template"
"time"
"github.com/youtube/vitess/go/vt/topo/topoproto"
)
type rateChange string
const (
increasedRate rateChange = "increased"
decreasedRate = "decreased"
unchangedRate = "not changed"
)
type goodOrBadRate string
const (
goodRate = "good"
badRate = "bad"
ignoredRate = "ignored"
)
var resultStringTemplate = template.Must(template.New("result.String()").Parse(
`rate was: {{.RateChange}} from: {{.OldRate}} to: {{.NewRate}}
alias: {{.Alias}} lag: {{.LagRecordNow.Stats.SecondsBehindMaster}}s
last change: {{.TimeSinceLastRateChange}} rate: {{.CurrentRate}} good/bad? {{.GoodOrBad}} skipped b/c: {{.MemorySkipReason}} good/bad: {{.HighestGood}}/{{.LowestBad}}
state (old/tested/new): {{.OldState}}/{{.TestedState}}/{{.NewState}}
lag before: {{.LagBefore}} ({{.AgeOfBeforeLag}} ago) rates (master/slave): {{.MasterRate}}/{{.GuessedSlaveRate}} backlog (old/new): {{.GuessedSlaveBacklogOld}}/{{.GuessedSlaveBacklogNew}}
reason: {{.Reason}}`))
// result is generated by the MaxReplicationLag module for each processed
// "replicationLagRecord".
// It captures the details and the decision of the processing.
type result struct {
Now time.Time
RateChange rateChange
lastRateChange time.Time
OldState state
TestedState state
NewState state
OldRate int64
NewRate int64
Reason string
CurrentRate int64
GoodOrBad goodOrBadRate
MemorySkipReason string
HighestGood int64
LowestBad int64
LagRecordNow replicationLagRecord
LagRecordBefore replicationLagRecord
MasterRate int64
GuessedSlaveRate int64
GuessedSlaveBacklogOld int
GuessedSlaveBacklogNew int
}
func (r result) String() string {
var b bytes.Buffer
if err := resultStringTemplate.Execute(&b, r); err != nil {
panic(fmt.Sprintf("failed to Execute() template: %v", err))
}
return b.String()
}
func (r result) Alias() string {
return topoproto.TabletAliasString(r.LagRecordNow.Tablet.Alias)
}
func (r result) TimeSinceLastRateChange() string {
if r.lastRateChange.IsZero() {
return "n/a"
}
return fmt.Sprintf("%.1fs", r.Now.Sub(r.lastRateChange).Seconds())
}
func (r result) LagBefore() string {
if r.LagRecordBefore.isZero() {
return "n/a"
}
return fmt.Sprintf("%ds", r.LagRecordBefore.Stats.SecondsBehindMaster)
}
func (r result) AgeOfBeforeLag() string {
if r.LagRecordBefore.isZero() {
return "n/a"
}
return fmt.Sprintf("%.1fs", r.LagRecordNow.time.Sub(r.LagRecordBefore.time).Seconds())
}
// resultRing implements a ring buffer for "result" instances.
type resultRing struct {
// mu guards the fields below.
mu sync.Mutex
// position holds the index of the *next* result in the ring.
position int
// wrapped becomes true when the ring buffer "wrapped" at least once and we
// started reusing entries.
wrapped bool
// values is the underlying ring buffer.
values []result
}
// newResultRing creates a new resultRing.
func newResultRing(capacity int) *resultRing {
return &resultRing{
values: make([]result, capacity),
}
}
// add inserts a new result into the ring buffer.
func (rr *resultRing) add(r result) {
rr.mu.Lock()
defer rr.mu.Unlock()
rr.values[rr.position] = r
rr.position++
if rr.position == len(rr.values) {
rr.position = 0
rr.wrapped = true
}
}
// latestValues returns all values of the buffer. Entries are sorted in reverse
// chronological order i.e. newer items come first.
func (rr *resultRing) latestValues() []result {
rr.mu.Lock()
defer rr.mu.Unlock()
start := rr.position - 1
if start == -1 {
// Current position is at the end.
start = len(rr.values) - 1
}
count := len(rr.values)
if !rr.wrapped {
count = rr.position
}
results := make([]result, count)
for i := 0; i < count; i++ {
pos := start - i
if pos < 0 {
// We started in the middle of the array and need to wrap around at the
// beginning of it.
pos += count
}
results[i] = rr.values[pos%count]
}
return results
}

Просмотреть файл

@ -0,0 +1,145 @@
package throttler
import (
"reflect"
"testing"
"time"
)
var (
resultIncreased = result{
Now: sinceZero(1234 * time.Millisecond),
RateChange: increasedRate,
lastRateChange: sinceZero(1 * time.Millisecond),
OldState: stateIncreaseRate,
TestedState: stateIncreaseRate,
NewState: stateIncreaseRate,
OldRate: 100,
NewRate: 100,
Reason: "increased the rate",
CurrentRate: 99,
GoodOrBad: goodRate,
MemorySkipReason: "",
HighestGood: 95,
LowestBad: 0,
LagRecordNow: lagRecord(sinceZero(1234*time.Millisecond), 101, 1),
LagRecordBefore: replicationLagRecord{},
MasterRate: 99,
GuessedSlaveRate: 0,
GuessedSlaveBacklogOld: 0,
GuessedSlaveBacklogNew: 0,
}
resultDecreased = result{
Now: sinceZero(5000 * time.Millisecond),
RateChange: decreasedRate,
lastRateChange: sinceZero(1234 * time.Millisecond),
OldState: stateIncreaseRate,
TestedState: stateDecreaseAndGuessRate,
NewState: stateDecreaseAndGuessRate,
OldRate: 200,
NewRate: 100,
Reason: "decreased the rate",
CurrentRate: 200,
GoodOrBad: badRate,
MemorySkipReason: "",
HighestGood: 95,
LowestBad: 200,
LagRecordNow: lagRecord(sinceZero(5000*time.Millisecond), 101, 2),
LagRecordBefore: lagRecord(sinceZero(1234*time.Millisecond), 101, 1),
MasterRate: 200,
GuessedSlaveRate: 150,
GuessedSlaveBacklogOld: 10,
GuessedSlaveBacklogNew: 20,
}
resultEmergency = result{
Now: sinceZero(10123 * time.Millisecond),
RateChange: decreasedRate,
lastRateChange: sinceZero(5000 * time.Millisecond),
OldState: stateDecreaseAndGuessRate,
TestedState: stateEmergency,
NewState: stateEmergency,
OldRate: 100,
NewRate: 50,
Reason: "emergency state decreased the rate",
CurrentRate: 100,
GoodOrBad: badRate,
MemorySkipReason: "",
HighestGood: 95,
LowestBad: 100,
LagRecordNow: lagRecord(sinceZero(10123*time.Millisecond), 101, 23),
LagRecordBefore: lagRecord(sinceZero(5000*time.Millisecond), 101, 2),
MasterRate: 0,
GuessedSlaveRate: 0,
GuessedSlaveBacklogOld: 0,
GuessedSlaveBacklogNew: 0,
}
)
func TestResultString(t *testing.T) {
testcases := []struct {
r result
want string
}{
{
resultIncreased,
`rate was: increased from: 100 to: 100
alias: cell1-0000000101 lag: 1s
last change: 1.2s rate: 99 good/bad? good skipped b/c: good/bad: 95/0
state (old/tested/new): I/I/I
lag before: n/a (n/a ago) rates (master/slave): 99/0 backlog (old/new): 0/0
reason: increased the rate`,
},
{
resultDecreased,
`rate was: decreased from: 200 to: 100
alias: cell1-0000000101 lag: 2s
last change: 3.8s rate: 200 good/bad? bad skipped b/c: good/bad: 95/200
state (old/tested/new): I/D/D
lag before: 1s (3.8s ago) rates (master/slave): 200/150 backlog (old/new): 10/20
reason: decreased the rate`,
},
{
resultEmergency,
`rate was: decreased from: 100 to: 50
alias: cell1-0000000101 lag: 23s
last change: 5.1s rate: 100 good/bad? bad skipped b/c: good/bad: 95/100
state (old/tested/new): D/E/E
lag before: 2s (5.1s ago) rates (master/slave): 0/0 backlog (old/new): 0/0
reason: emergency state decreased the rate`,
},
}
for _, tc := range testcases {
got := tc.r.String()
if got != tc.want {
t.Fatalf("record.String() = %v, want = %v for full record: %#v", got, tc.want, tc.r)
}
}
}
func TestResultRing(t *testing.T) {
// Test data.
r1 := result{Reason: "r1"}
r2 := result{Reason: "r2"}
r3 := result{Reason: "r3"}
rr := newResultRing(2)
// Use the ring partially.
rr.add(r1)
if got, want := rr.latestValues(), []result{r1}; !reflect.DeepEqual(got, want) {
t.Fatalf("items not correctly added to resultRing. got = %v, want = %v", got, want)
}
// Use it fully.
rr.add(r2)
if got, want := rr.latestValues(), []result{r2, r1}; !reflect.DeepEqual(got, want) {
t.Fatalf("items not correctly added to resultRing. got = %v, want = %v", got, want)
}
// Let it wrap.
rr.add(r3)
if got, want := rr.latestValues(), []result{r3, r2}; !reflect.DeepEqual(got, want) {
t.Fatalf("resultRing did not wrap correctly. got = %v, want = %v", got, want)
}
}

Просмотреть файл

@ -302,3 +302,8 @@ func (t *Throttler) UpdateConfiguration(configuration *throttlerdata.Configurati
func (t *Throttler) ResetConfiguration() {
t.maxReplicationLagModule.resetConfiguration()
}
// Log returns the most recent changes of the MaxReplicationLag module.
func (t *Throttler) Log() []result {
return t.maxReplicationLagModule.log()
}

Просмотреть файл

@ -117,14 +117,14 @@ func (tf *testFixture) configuration(t *testing.T, client throttlerclient.Client
// Test UpdateConfiguration.
config := &throttlerdata.Configuration{
TargetReplicationLagSec: 0,
MaxReplicationLagSec: 1,
InitialRate: 2,
MaxIncrease: 0.3,
EmergencyDecrease: 0.4,
MinDurationBetweenChangesSec: 5,
MaxDurationBetweenIncreasesSec: 6,
IgnoreNSlowestReplicas: 7,
TargetReplicationLagSec: 1,
MaxReplicationLagSec: 2,
InitialRate: 3,
MaxIncrease: 0.4,
EmergencyDecrease: 0.5,
MinDurationBetweenChangesSec: 6,
MaxDurationBetweenIncreasesSec: 7,
IgnoreNSlowestReplicas: 8,
}
names, err := client.UpdateConfiguration(context.Background(), "t2", config /* false */, true /* copyZeroValues */)
if err != nil {

Просмотреть файл

@ -0,0 +1,158 @@
package throttler
import (
"fmt"
"html/template"
"io"
"net/http"
"strings"
"time"
"github.com/youtube/vitess/go/vt/logz"
)
const logHeaderHTML = `
<style>
table.gridtable th {
/* Override the nowrap default to avoid that the table overflows. */
white-space: normal;
}
</style>
<thead>
<tr>
<th>Now
<th>Rate Change
<th>Old Rate
<th>New Rate
<th>Tablet
<th>Lag
<th>Last Change
<th>Actual Rate
<th>Good/&#8203;Bad?
<th>If Skipped
<th>Highest Good
<th>Lowest Bad
<th>Old State
<th>Tested State
<th>New State
<th>Lag Before
<th>Recorded Ago
<th>Master Rate
<th>Slave Rate
<th>Old Backlog
<th>New Backlog
<th>Reason
<!-- Do not omit closing thead tag or the browser won't automaticall start a
tbody tag and this will break the table sorting. -->
</thead>
`
const logEntryHTML = `
<tr class="{{.ColorLevel}}">
<td>{{.Now.Format "15:04:05"}}
<td>{{.RateChange}}
<td>{{.OldRate}}
<td>{{.NewRate}}
<td>{{.Alias}}
<td>{{.LagRecordNow.Stats.SecondsBehindMaster}}s
<td>{{.TimeSinceLastRateChange}}
<td>{{.CurrentRate}}
<td>{{.GoodOrBad}}
<td>{{.MemorySkipReason}}
<td>{{.HighestGood}}
<td>{{.LowestBad}}
<td>{{.OldState}}
<td>{{.TestedState}}
<td>{{.NewState}}
<td>{{.LagBefore}}
<td>{{.AgeOfBeforeLag}}
<td>{{.MasterRate}}
<td>{{.GuessedSlaveRate}}
<td>{{.GuessedSlaveBacklogOld}}
<td>{{.GuessedSlaveBacklogNew}}
<td>{{.Reason}}
`
const logFooterHTML = `
{{.Count}} lag records spanning the last {{.TimeSpan}} minutes are displayed.
`
var (
logEntryTemplate = template.Must(template.New("logEntry").Parse(logEntryHTML))
logFooterTemplate = template.Must(template.New("logFooter").Parse(logFooterHTML))
)
func init() {
http.HandleFunc("/throttlerlogz/", func(w http.ResponseWriter, r *http.Request) {
throttlerlogzHandler(w, r, GlobalManager)
})
}
func throttlerlogzHandler(w http.ResponseWriter, r *http.Request, m *managerImpl) {
// Longest supported URL: /throttlerlogz/<name>
parts := strings.SplitN(r.URL.Path, "/", 3)
if len(parts) != 3 {
errMsg := fmt.Sprintf("invalid /throttlerlogz path: %q expected paths: /throttlerlogz/ or /throttlerlogz/<throttler name>", r.URL.Path)
http.Error(w, errMsg, http.StatusInternalServerError)
return
}
name := parts[2]
if name == "" {
// If no name is given, redirect to the list of throttlers at /throttlerz.
http.Redirect(w, r, "/throttlerz", http.StatusTemporaryRedirect)
return
}
showThrottlerLog(w, m, name)
}
func showThrottlerLog(w http.ResponseWriter, m *managerImpl, name string) {
results, err := m.Log(name)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
logz.StartHTMLTable(w)
if _, err := io.WriteString(w, logHeaderHTML); err != nil {
panic(fmt.Sprintf("failed to execute logHeader template: %v", err))
}
for _, r := range results {
// Color based on the new state.
var colorLevel string
switch r.NewState {
case stateIncreaseRate:
colorLevel = "low"
case stateDecreaseAndGuessRate:
colorLevel = "medium"
case stateEmergency:
colorLevel = "high"
}
data := struct {
result
ColorLevel string
}{r, colorLevel}
if err := logEntryTemplate.Execute(w, data); err != nil {
panic(fmt.Sprintf("failed to execute logEntry template: %v", err))
}
}
logz.EndHTMLTable(w)
// Print footer.
count := len(results)
var d time.Duration
if count > 0 {
d = results[0].Now.Sub(results[count-1].Now)
}
if err := logFooterTemplate.Execute(w, map[string]interface{}{
"Count": count,
"TimeSpan": fmt.Sprintf("%.1f", d.Minutes()),
}); err != nil {
panic(fmt.Sprintf("failed to execute logFooter template: %v", err))
}
}

Просмотреть файл

@ -0,0 +1,140 @@
package throttler
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
)
func TestThrottlerlogzHandler_MissingSlash(t *testing.T) {
request, _ := http.NewRequest("GET", "/throttlerlogz", nil)
response := httptest.NewRecorder()
m := newManager()
throttlerlogzHandler(response, request, m)
if got, want := response.Body.String(), "invalid /throttlerlogz path"; !strings.Contains(got, want) {
t.Fatalf("/throttlerlogz without the slash does not work (the Go HTTP server does automatically redirect in practice though). got = %v, want = %v", got, want)
}
}
func TestThrottlerlogzHandler_NonExistantThrottler(t *testing.T) {
request, _ := http.NewRequest("GET", "/throttlerlogz/t1", nil)
response := httptest.NewRecorder()
throttlerlogzHandler(response, request, newManager())
if got, want := response.Body.String(), `throttler: t1 does not exist`; !strings.Contains(got, want) {
t.Fatalf("/throttlerlogz page for non-existant t1 should not succeed. got = %v, want = %v", got, want)
}
}
func TestThrottlerlogzHandler(t *testing.T) {
f := &managerTestFixture{}
if err := f.setUp(); err != nil {
t.Fatal(err)
}
defer f.tearDown()
testcases := []struct {
desc string
r result
want string
}{
{
"increased rate",
resultIncreased,
` <tr class="low">
<td>00:00:01
<td>increased
<td>100
<td>100
<td>cell1-0000000101
<td>1s
<td>1.2s
<td>99
<td>good
<td>
<td>95
<td>0
<td>I
<td>I
<td>I
<td>n/a
<td>n/a
<td>99
<td>0
<td>0
<td>0
<td>increased the rate`,
},
{
"decreased rate",
resultDecreased,
` <tr class="medium">
<td>00:00:05
<td>decreased
<td>200
<td>100
<td>cell1-0000000101
<td>2s
<td>3.8s
<td>200
<td>bad
<td>
<td>95
<td>200
<td>I
<td>D
<td>D
<td>1s
<td>3.8s
<td>200
<td>150
<td>10
<td>20
<td>decreased the rate`,
},
{
"emergency state decreased the rate",
resultEmergency,
` <tr class="high">
<td>00:00:10
<td>decreased
<td>100
<td>50
<td>cell1-0000000101
<td>23s
<td>5.1s
<td>100
<td>bad
<td>
<td>95
<td>100
<td>D
<td>E
<td>E
<td>2s
<td>5.1s
<td>0
<td>0
<td>0
<td>0
<td>emergency state decreased the rate`,
},
}
for _, tc := range testcases {
request, _ := http.NewRequest("GET", "/throttlerlogz/t1", nil)
response := httptest.NewRecorder()
f.t1.maxReplicationLagModule.results.add(tc.r)
throttlerlogzHandler(response, request, f.m)
got := response.Body.String()
if !strings.Contains(got, tc.want) {
t.Fatalf("testcase '%v': result not shown in log. got = %v, want = %v", tc.desc, got, tc.want)
}
}
}

Просмотреть файл

@ -0,0 +1,67 @@
package throttler
import (
"fmt"
"html/template"
"net/http"
"strings"
)
const listHTML = `<!DOCTYPE html>
<title>{{len .Throttlers}} Active Throttler(s)</title>
<ul>
{{range .Throttlers}}
<li>
<a href="/throttlerz/{{.}}">{{.}}</a>
</li>
{{end}}
</ul>
`
const detailsHTML = `<!DOCTYPE html>
<title>Details for Throttler '{{.}}'</title>
<a href="/throttlerlogz/{{.}}">adapative throttling log</a>
TODO(mberlin): Add graphs here.
`
var (
listTemplate = template.Must(template.New("list").Parse(listHTML))
detailsTemplate = template.Must(template.New("details").Parse(detailsHTML))
)
func init() {
http.HandleFunc("/throttlerz/", func(w http.ResponseWriter, r *http.Request) {
throttlerzHandler(w, r, GlobalManager)
})
}
func throttlerzHandler(w http.ResponseWriter, r *http.Request, m *managerImpl) {
// Longest supported URL: /throttlerz/<name>
parts := strings.SplitN(r.URL.Path, "/", 3)
if len(parts) != 3 {
errMsg := fmt.Sprintf("invalid /throttlerz path: %q expected paths: /throttlerz or /throttlerz/<throttler name>", r.URL.Path)
http.Error(w, errMsg, http.StatusInternalServerError)
return
}
name := parts[2]
if name == "" {
listThrottlers(w, m)
return
}
showThrottlerDetails(w, name)
}
func listThrottlers(w http.ResponseWriter, m *managerImpl) {
throttlers := m.Throttlers()
listTemplate.Execute(w, map[string]interface{}{
"Throttlers": throttlers,
})
}
func showThrottlerDetails(w http.ResponseWriter, name string) {
detailsTemplate.Execute(w, name)
}

Просмотреть файл

@ -0,0 +1,57 @@
package throttler
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
)
func TestThrottlerzHandler_MissingSlash(t *testing.T) {
request, _ := http.NewRequest("GET", "/throttlerz", nil)
response := httptest.NewRecorder()
m := newManager()
throttlerzHandler(response, request, m)
if got, want := response.Body.String(), "invalid /throttlerz path"; !strings.Contains(got, want) {
t.Fatalf("/throttlerz without the slash does not work (the Go HTTP server does automatically redirect in practice though). got = %v, want = %v", got, want)
}
}
func TestThrottlerzHandler_List(t *testing.T) {
f := &managerTestFixture{}
if err := f.setUp(); err != nil {
t.Fatal(err)
}
defer f.tearDown()
request, _ := http.NewRequest("GET", "/throttlerz/", nil)
response := httptest.NewRecorder()
throttlerzHandler(response, request, f.m)
if got, want := response.Body.String(), `<a href="/throttlerz/t1">t1</a>`; !strings.Contains(got, want) {
t.Fatalf("list does not include 't1'. got = %v, want = %v", got, want)
}
if got, want := response.Body.String(), `<a href="/throttlerz/t2">t2</a>`; !strings.Contains(got, want) {
t.Fatalf("list does not include 't1'. got = %v, want = %v", got, want)
}
}
func TestThrottlerzHandler_Details(t *testing.T) {
f := &managerTestFixture{}
if err := f.setUp(); err != nil {
t.Fatal(err)
}
defer f.tearDown()
request, _ := http.NewRequest("GET", "/throttlerz/t1", nil)
response := httptest.NewRecorder()
throttlerzHandler(response, request, f.m)
if got, want := response.Body.String(), `<title>Details for Throttler 't1'</title>`; !strings.Contains(got, want) {
t.Fatalf("details for 't1' not shown. got = %v, want = %v", got, want)
}
}

Просмотреть файл

@ -308,8 +308,14 @@ class BaseShardingTest(object):
stdout, _ = utils.run_vtctl(['UpdateThrottlerConfiguration',
'--server', throttler_server,
'--copy_zero_values',
'target_replication_lag_sec:0 '
'max_replication_lag_sec:12345'],
'target_replication_lag_sec:12345 '
'max_replication_lag_sec:65789 '
'initial_rate:3 '
'max_increase:0.4 '
'emergency_decrease:0.5 '
'min_duration_between_changes_sec:6 '
'max_duration_between_increases_sec:7 '
'ignore_n_slowest_replicas:0 '],
auto_log=True, trap_output=True)
self.assertIn('%d active throttler(s)' % len(names), stdout)
# Check the updated configuration.
@ -318,10 +324,10 @@ class BaseShardingTest(object):
auto_log=True, trap_output=True)
for name in names:
# The max should be set and have a non-zero value.
# Note that all other fields will be zero because we did not define them.
self.assertIn('| %s | max_replication_lag_sec:12345 ' % (name), stdout)
# We test only the the first field 'target_replication_lag_sec'.
self.assertIn('| %s | target_replication_lag_sec:12345 ' % (name), stdout)
# protobuf omits fields with a zero value in the text output.
self.assertNotIn('target_replication_lag_sec', stdout)
self.assertNotIn('ignore_n_slowest_replicas', stdout)
self.assertIn('%d active throttler(s)' % len(names), stdout)
# Reset clears our configuration values.
@ -334,8 +340,8 @@ class BaseShardingTest(object):
'--server', throttler_server],
auto_log=True, trap_output=True)
for name in names:
# Target lag value should no longer be zero and therefore included.
self.assertIn('| %s | target_replication_lag_sec:' % (name), stdout)
# Target lag value should no longer be 12345 and be back to the default.
self.assertNotIn('target_replication_lag_sec:12345', stdout)
self.assertIn('%d active throttler(s)' % len(names), stdout)
def verify_reconciliation_counters(self, worker_port, online_or_offline,