Throttler: stats in /debug/vars (#10443)

* Tablet throttler: serve metrics on /throttler/metrics

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* release notes

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* GaugeFloat64

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* counter map

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* GaugeFloat64

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* remove /throttler/metrics endpoint

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* move away from rcrowley/go-metrics and into vitess's stats

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* update release notes

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* internal app name is 'vitess'

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* internal app name is 'vitess'

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* WingleWordCamel

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* SingleWordCamel tests

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* CamelCase for /debug/var metric names. Do not include app-specific metrics

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* use testify

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* copyright

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* some code comments

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
This commit is contained in:
Shlomi Noach 2022-06-22 09:38:43 +03:00 коммит произвёл GitHub
Родитель 53d2cd48d4
Коммит bddc71e8bf
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 379 добавлений и 21 удалений

Просмотреть файл

@ -165,6 +165,30 @@ API endpoint `/throttler/throttle-app` now accepts a `ratio` query argument, a f
- `1` means "always throttle"
- any numbr in between is allowd. For example, `0.3` means "throttle in 0.3 probability", ie on a per request and based on a dice roll, there's a `30%` change a request is denied. Overall we can expect about `30%` of requests to be denied. Example: `/throttler/throttle-app?app=vreplication&ratio=0.25`
API endpoint `/debug/vars` now exposes throttler metrics, such as number of hits and errors per app per check type. Example:
```shell
$ curl -s 'http://127.0.0.1:15100/debug/vars' | jq . | grep throttler
"throttler.aggregated.mysql.self": 133.19334,
"throttler.aggregated.mysql.shard": 132.997847,
"throttler.check.any.error": 1086,
"throttler.check.any.mysql.self.error": 542,
"throttler.check.any.mysql.self.total": 570,
"throttler.check.any.mysql.shard.error": 544,
"throttler.check.any.mysql.shard.total": 570,
"throttler.check.any.total": 1140,
"throttler.check.mysql.self.seconds_since_healthy": 132,
"throttler.check.mysql.shard.seconds_since_healthy": 132,
"throttler.check.vitess.error": 1086,
"throttler.check.vitess.mysql.self.error": 542,
"throttler.check.vitess.mysql.self.total": 570,
"throttler.check.vitess.mysql.shard.error": 544,
"throttler.check.vitess.mysql.shard.total": 570,
"throttler.check.vitess.total": 1140,
"throttler.probes.latency": 292982,
"throttler.probes.total": 1138
```
See new SQL syntax for controlling/viewing throttling, down below.
### New Syntax

Просмотреть файл

@ -167,3 +167,48 @@ func NewGaugeFunc(name string, help string, f func() int64) *GaugeFunc {
}
return i
}
// GaugeFloat64 tracks a cumulative count of a metric.
// For a one-dimensional or multi-dimensional counter, please use
// CountersWithSingleLabel or CountersWithMultiLabels instead.
type GaugeFloat64 struct {
i sync2.AtomicFloat64
help string
}
// NewCounter returns a new GaugeFloat64.
func NewGaugeFloat64(name string, help string) *GaugeFloat64 {
v := &GaugeFloat64{help: help}
if name != "" {
publish(name, v)
}
return v
}
// Set overwrites the current value.
// This should be used with caution for GaugeFloat64 values
// only when we are certain that the underlying value we are setting
// is increment only
func (v *GaugeFloat64) Set(value float64) {
v.i.Set(value)
}
// Reset resets the counter value to 0.
func (v *GaugeFloat64) Reset() {
v.i.Set(float64(0))
}
// Get returns the value.
func (v *GaugeFloat64) Get() float64 {
return v.i.Get()
}
// String implements the expvar.Var interface.
func (v *GaugeFloat64) String() string {
return strconv.FormatFloat(v.i.Get(), 'f', -1, 64)
}
// Help returns the help string.
func (v *GaugeFloat64) Help() string {
return v.help
}

95
go/stats/counter_map.go Normal file
Просмотреть файл

@ -0,0 +1,95 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"sync"
)
var (
countersMap = map[string]interface{}{}
countersMu sync.RWMutex
)
// GetOrNewCounter returns a Counter with given name; the functiona either creates the counter
// if it does not exist, or returns a pre-existing one. The function is thread safe.
func GetOrNewCounter(name string, help string) *Counter {
// first, attempt read lock only
countersMu.RLock()
c, ok := countersMap[name]
countersMu.RUnlock()
if ok {
return c.(*Counter)
}
// escalate into write lock
countersMu.Lock()
defer countersMu.Unlock()
// double check because we have released the lock in the interim
if c, ok := countersMap[name]; ok {
return c.(*Counter)
}
n := NewCounter(name, help)
countersMap[name] = n
return n
}
// GetOrNewGauge returns a Gauge with given name; the functiona either creates the gauge
// if it does not exist, or returns a pre-existing one. The function is thread safe.
func GetOrNewGauge(name string, help string) *Gauge {
// first, attempt read lock only
countersMu.RLock()
c, ok := countersMap[name]
countersMu.RUnlock()
if ok {
return c.(*Gauge)
}
// escalate into write lock
countersMu.Lock()
defer countersMu.Unlock()
// double check because we have released the lock in the interim
if c, ok := countersMap[name]; ok {
return c.(*Gauge)
}
n := NewGauge(name, help)
countersMap[name] = n
return n
}
// GetOrNewGaugeFloat64 returns a Gauge (float64) with given name; the functiona either creates the gauge
// if it does not exist, or returns a pre-existing one. The function is thread safe.
func GetOrNewGaugeFloat64(name string, help string) *GaugeFloat64 {
// first, attempt read lock only
countersMu.RLock()
c, ok := countersMap[name]
countersMu.RUnlock()
if ok {
return c.(*GaugeFloat64)
}
// escalate into write lock
countersMu.Lock()
defer countersMu.Unlock()
// double check because we have released the lock in the interim
if c, ok := countersMap[name]; ok {
return c.(*GaugeFloat64)
}
n := NewGaugeFloat64(name, help)
countersMap[name] = n
return n
}

Просмотреть файл

@ -0,0 +1,75 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGetOrNewCounter(t *testing.T) {
c1 := GetOrNewCounter("size_c", "help message")
require.NotNil(t, c1)
assert.Equal(t, int64(0), c1.Get())
c1.Add(2)
assert.Equal(t, int64(2), c1.Get())
c2 := GetOrNewCounter("size_c", "help message")
assert.Equal(t, int64(2), c1.Get())
assert.Equal(t, int64(2), c2.Get())
assert.Equal(t, c1, c2)
c1.Add(3)
assert.Equal(t, int64(5), c1.Get())
assert.Equal(t, int64(5), c2.Get())
}
func TestGetOrNewGauge(t *testing.T) {
c1 := GetOrNewGauge("size_g", "help message")
require.NotNil(t, c1)
assert.Equal(t, int64(0), c1.Get())
c1.Add(2)
assert.Equal(t, int64(2), c1.Get())
c2 := GetOrNewGauge("size_g", "help message")
assert.Equal(t, int64(2), c1.Get())
assert.Equal(t, int64(2), c2.Get())
assert.Equal(t, c1, c2)
c1.Add(3)
assert.Equal(t, int64(5), c1.Get())
assert.Equal(t, int64(5), c2.Get())
}
func TestGetOrNewGaugeFloat64(t *testing.T) {
c1 := GetOrNewGaugeFloat64("size_gf64", "help message")
require.NotNil(t, c1)
assert.Equal(t, float64(0), c1.Get())
c1.Set(3.14)
assert.Equal(t, float64(3.14), c1.Get())
c2 := GetOrNewGaugeFloat64("size_gf64", "help message")
assert.Equal(t, float64(3.14), c1.Get())
assert.Equal(t, float64(3.14), c2.Get())
assert.Equal(t, c1, c2)
c1.Set(2.718)
assert.Equal(t, float64(2.718), c1.Get())
assert.Equal(t, float64(2.718), c2.Get())
}

Просмотреть файл

@ -19,6 +19,8 @@ package stats
import (
"expvar"
"testing"
"github.com/stretchr/testify/assert"
)
func TestCounter(t *testing.T) {
@ -71,3 +73,21 @@ func TestGaugeFunc(t *testing.T) {
t.Errorf("want 1, got %v", v.String())
}
}
func TestGaugeFloat64(t *testing.T) {
var gotname string
var gotv *GaugeFloat64
clear()
Register(func(name string, v expvar.Var) {
gotname = name
gotv = v.(*GaugeFloat64)
})
v := NewGaugeFloat64("f", "help")
assert.Equal(t, "f", gotname)
assert.Equal(t, v, gotv)
v.Set(3.14)
assert.Equal(t, 3.14, v.Get())
assert.Equal(t, "3.14", v.String())
v.Reset()
assert.Equal(t, float64(0), v.Get())
}

Просмотреть файл

@ -208,6 +208,8 @@ func (dc *dataCollector) addExpVar(kv expvar.KeyValue) {
dc.addInt(k, v.F(), nil)
case *stats.Gauge:
dc.addInt(k, v.Get(), nil)
case *stats.GaugeFloat64:
dc.addFloat(k, v.Get(), nil)
case *stats.GaugeFunc:
dc.addInt(k, v.F(), nil)
case *stats.CounterDuration:

Просмотреть файл

@ -53,6 +53,8 @@ func (be PromBackend) publishPrometheusMetric(name string, v expvar.Var) {
newMetricFuncCollector(st, be.buildPromName(name), prometheus.CounterValue, func() float64 { return float64(st.F()) })
case *stats.Gauge:
newMetricFuncCollector(st, be.buildPromName(name), prometheus.GaugeValue, func() float64 { return float64(st.Get()) })
case *stats.GaugeFloat64:
newMetricFuncCollector(st, be.buildPromName(name), prometheus.GaugeValue, func() float64 { return st.Get() })
case *stats.GaugeFunc:
newMetricFuncCollector(st, be.buildPromName(name), prometheus.GaugeValue, func() float64 { return float64(st.F()) })
case stats.FloatFunc:

Просмотреть файл

@ -56,6 +56,15 @@ func TestPrometheusGauge(t *testing.T) {
checkHandlerForMetrics(t, name, 0)
}
func TestPrometheusGaugeFloat64(t *testing.T) {
name := "blah_gauge_f64"
c := stats.NewGaugeFloat64(name, "help")
c.Set(3.14)
checkHandlerForMetrics(t, name, 3)
c.Reset()
checkHandlerForMetrics(t, name, 0)
}
func TestPrometheusCounterFunc(t *testing.T) {
name := "blah_counterfunc"
stats.NewCounterFunc(name, "help", func() int64 {

Просмотреть файл

@ -108,6 +108,10 @@ func (sb StatsBackend) addExpVar(kv expvar.KeyValue) {
if err := sb.statsdClient.Gauge(k, float64(v.Get()), nil, sb.sampleRate); err != nil {
log.Errorf("Failed to add Gauge %v for key %v", v, k)
}
case *stats.GaugeFloat64:
if err := sb.statsdClient.Gauge(k, v.Get(), nil, sb.sampleRate); err != nil {
log.Errorf("Failed to add GaugeFloat64 %v for key %v", v, k)
}
case *stats.GaugeFunc:
if err := sb.statsdClient.Gauge(k, float64(v.F()), nil, sb.sampleRate); err != nil {
log.Errorf("Failed to add GaugeFunc %v for key %v", v, k)

Просмотреть файл

@ -93,6 +93,35 @@ func TestStatsdGauge(t *testing.T) {
}
}
func TestStatsdGaugeFloat64(t *testing.T) {
sb, server := getBackend(t)
defer server.Close()
name := "gauge_name_f64"
s := stats.NewGaugeFloat64(name, "help")
s.Set(3.14)
found := false
expvar.Do(func(kv expvar.KeyValue) {
if kv.Key == name {
found = true
sb.addExpVar(kv)
if err := sb.statsdClient.Flush(); err != nil {
t.Errorf("Error flushing: %s", err)
}
bytes := make([]byte, 4096)
n, err := server.Read(bytes)
if err != nil {
t.Fatal(err)
}
result := string(bytes[:n])
expected := "test.gauge_name_f64:3.140000|g"
assert.Equal(t, result, expected)
}
})
if !found {
t.Errorf("Stat %s not found...", name)
}
}
func TestStatsdGaugeFunc(t *testing.T) {
sb, server := getBackend(t)
defer server.Close()

Просмотреть файл

@ -64,3 +64,12 @@ func SplitUnescape(s string, sep string) ([]string, error) {
}
return unescapedElems, nil
}
// SingleWordCamel takes a single word and returns is in Camel case; basically
// just capitalizing the first letter and making sure the rest are lower case.
func SingleWordCamel(w string) string {
if w == "" {
return w
}
return strings.ToUpper(w[0:1]) + strings.ToLower(w[1:])
}

Просмотреть файл

@ -66,3 +66,45 @@ func TestSplitUnescape(t *testing.T) {
assert.Equal(t, expected, elems)
}
}
func TestSingleWordCamel(t *testing.T) {
tt := []struct {
word string
expect string
}{
{
word: "",
expect: "",
},
{
word: "_",
expect: "_",
},
{
word: "a",
expect: "A",
},
{
word: "A",
expect: "A",
},
{
word: "_A",
expect: "_a",
},
{
word: "mysql",
expect: "Mysql",
},
{
word: "mySQL",
expect: "Mysql",
},
}
for _, tc := range tt {
t.Run(tc.word, func(t *testing.T) {
camel := SingleWordCamel(tc.word)
assert.Equal(t, tc.expect, camel)
})
}
}

Просмотреть файл

@ -307,6 +307,13 @@ func (e *Exporter) NewGauge(name string, help string) *stats.Gauge {
return lvar
}
// NewGaugeFloat64
// exporter assumes all counters/gauges are int64 based; I haven't found a good solution for exporting
// a float64 gauge yet. (Shlomi)
func (e *Exporter) NewGaugeFloat64(name string, help string) *stats.GaugeFloat64 {
return nil
}
// NewCounterFunc creates a name-spaced equivalent for stats.NewCounterFunc.
func (e *Exporter) NewCounterFunc(name string, help string, f func() int64) *stats.CounterFunc {
if e.name == "" || name == "" {

Просмотреть файл

@ -14,15 +14,15 @@ import (
"sync/atomic"
"time"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
metrics "github.com/rcrowley/go-metrics"
)
const (
// DefaultAppName is the app name used by vitess when app doesn't indicate its name
DefaultAppName = "default"
frenoAppName = "freno"
vitessAppName = "vitess"
selfCheckInterval = 250 * time.Millisecond
)
@ -88,7 +88,7 @@ func (check *ThrottlerCheck) checkAppMetricResult(ctx context.Context, appName s
statusCode = http.StatusTooManyRequests // 429
err = base.ErrThresholdExceeded
if !flags.LowPriority && !flags.ReadCheck && appName != frenoAppName {
if !flags.LowPriority && !flags.ReadCheck && appName != vitessAppName {
// low priority requests will henceforth be denied
go check.throttler.nonLowPriorityAppRequestsThrottled.SetDefault(metricName, true)
}
@ -118,18 +118,12 @@ func (check *ThrottlerCheck) Check(ctx context.Context, appName string, storeTyp
atomic.StoreInt64(&check.throttler.lastCheckTimeNano, time.Now().UnixNano())
go func(statusCode int) {
metrics.GetOrRegisterCounter("check.any.total", nil).Inc(1)
metrics.GetOrRegisterCounter(fmt.Sprintf("check.%s.total", appName), nil).Inc(1)
metrics.GetOrRegisterCounter(fmt.Sprintf("check.any.%s.%s.total", storeType, storeName), nil).Inc(1)
metrics.GetOrRegisterCounter(fmt.Sprintf("check.%s.%s.%s.total", appName, storeType, storeName), nil).Inc(1)
stats.GetOrNewCounter("ThrottlerCheckAnyTotal", "total number of checks").Add(1)
stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sTotal", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1)
if statusCode != http.StatusOK {
metrics.GetOrRegisterCounter("check.any.error", nil).Inc(1)
metrics.GetOrRegisterCounter(fmt.Sprintf("check.%s.error", appName), nil).Inc(1)
metrics.GetOrRegisterCounter(fmt.Sprintf("check.any.%s.%s.error", storeType, storeName), nil).Inc(1)
metrics.GetOrRegisterCounter(fmt.Sprintf("check.%s.%s.%s.error", appName, storeType, storeName), nil).Inc(1)
stats.GetOrNewCounter("ThrottlerCheckAnyError", "total number of failed checks").Add(1)
stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sError", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1)
}
check.throttler.markRecentApp(appName, remoteAddr)
@ -155,13 +149,13 @@ func (check *ThrottlerCheck) localCheck(ctx context.Context, metricName string)
if err != nil {
return NoSuchMetricCheckResult
}
checkResult = check.Check(ctx, frenoAppName, storeType, storeName, "local", StandardCheckFlags)
checkResult = check.Check(ctx, vitessAppName, storeType, storeName, "local", StandardCheckFlags)
if checkResult.StatusCode == http.StatusOK {
check.throttler.markMetricHealthy(metricName)
}
if timeSinceHealthy, found := check.throttler.timeSinceMetricHealthy(metricName); found {
metrics.GetOrRegisterGauge(fmt.Sprintf("check.%s.%s.seconds_since_healthy", storeType, storeName), nil).Update(int64(timeSinceHealthy.Seconds()))
stats.GetOrNewGauge(fmt.Sprintf("ThrottlerCheck%s%sSecondsSinceHealthy", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), fmt.Sprintf("seconds since last healthy cehck for %s.%s", storeType, storeName)).Set(int64(timeSinceHealthy.Seconds()))
}
return checkResult
@ -173,7 +167,7 @@ func (check *ThrottlerCheck) reportAggregated(metricName string, metricResult ba
return
}
if value, err := metricResult.Get(); err == nil {
metrics.GetOrRegisterGaugeFloat64(fmt.Sprintf("aggregated.%s.%s", storeType, storeName), nil).Update(value)
stats.GetOrNewGaugeFloat64(fmt.Sprintf("ThrottlerAggregated%s%s", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), fmt.Sprintf("aggregated value for %s.%s", storeType, storeName)).Set(value)
}
}

Просмотреть файл

@ -12,7 +12,8 @@ import (
"time"
"github.com/patrickmn/go-cache"
metrics "github.com/rcrowley/go-metrics"
"vitess.io/vitess/go/stats"
)
// MetricsQueryType indicates the type of metrics query on MySQL backend. See following.
@ -108,10 +109,10 @@ func ReadThrottleMetric(probe *Probe, clusterName string, overrideGetMetricFunc
defer func(metric *MySQLThrottleMetric, started time.Time) {
go func() {
metrics.GetOrRegisterTimer("probes.latency", nil).Update(time.Since(started))
metrics.GetOrRegisterCounter("probes.total", nil).Inc(1)
stats.GetOrNewGauge("ThrottlerProbesLatency", "probes latency").Set(time.Since(started).Nanoseconds())
stats.GetOrNewCounter("ThrottlerProbesTotal", "total probes").Add(1)
if metric.Err != nil {
metrics.GetOrRegisterCounter("probes.error", nil).Inc(1)
stats.GetOrNewCounter("ThrottlerProbesError", "total probes errors").Add(1)
}
}()
}(mySQLThrottleMetric, started)