Vrepl Metrics: improvements, more tests

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
This commit is contained in:
Rohit Nayak 2020-07-30 16:46:52 +02:00
Родитель d4f632ed45
Коммит 5c7ef2cdcd
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: BA0A4E9168156524
10 изменённых файлов: 93 добавлений и 65 удалений

Просмотреть файл

@ -60,6 +60,13 @@ func NewTimings(name, help, label string, categories ...string) *Timings {
return t
}
// Reset will clear histograms: used during testing
func (t *Timings) Reset() {
t.mu.RLock()
t.histograms = make(map[string]*Histogram)
t.mu.RUnlock()
}
// Add will add a new value to the named histogram.
func (t *Timings) Add(name string, elapsed time.Duration) {
if t.labelCombined {

Просмотреть файл

@ -84,12 +84,10 @@ type Stats struct {
State sync2.AtomicString
CopyTimings *stats.Timings
QueryCount *stats.CountersWithSingleLabel
CopyRowCount *stats.Counter
CopyLoopCount *stats.Counter
CatchupTimings *stats.Timings
FastForwardTimings *stats.Timings
PhaseTimings *stats.Timings
QueryCount *stats.CountersWithSingleLabel
CopyRowCount *stats.Counter
CopyLoopCount *stats.Counter
}
// SetLastPosition sets the last replication position.
@ -125,12 +123,10 @@ func NewStats() *Stats {
bps.Rates = stats.NewRates("", bps.Timings, 15*60/5, 5*time.Second)
bps.History = history.New(3)
bps.SecondsBehindMaster.Set(math.MaxInt64)
bps.CopyTimings = stats.NewTimings("", "", "")
bps.PhaseTimings = stats.NewTimings("", "", "Phase")
bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase", "")
bps.CopyRowCount = stats.NewCounter("", "")
bps.CopyLoopCount = stats.NewCounter("", "")
bps.CatchupTimings = stats.NewTimings("", "", "")
bps.FastForwardTimings = stats.NewTimings("", "", "")
return bps
}

Просмотреть файл

@ -635,6 +635,11 @@ func (tw *TimingsWrapper) Counts() map[string]int64 {
return tw.timings.Counts()
}
// Reset will clear histograms: used during testing
func (tw *TimingsWrapper) Reset() {
tw.timings.Reset()
}
//-----------------------------------------------------------------
// MultiTimingsWrapper provides a namespaced version of stats.MultiTimings.
@ -668,6 +673,11 @@ func (tw *MultiTimingsWrapper) Counts() map[string]int64 {
return tw.timings.Counts()
}
// Reset will clear histograms: used during testing
func (tw *MultiTimingsWrapper) Reset() {
tw.timings.Reset()
}
//-----------------------------------------------------------------
// handleFunc stores the http Handler for an Exporter. This function can

Просмотреть файл

@ -120,28 +120,32 @@ func (st *vrStats) register() {
}))
stats.NewGaugesFuncWithMultiLabels(
"VReplicationQueryTimings",
"vreplication query timings per stream",
[]string{"source_keyspace", "source_shard", "workflow", "counts"},
"VReplicationPhaseTimings",
"vreplication per phase timings per stream",
[]string{"source_keyspace", "source_shard", "workflow", "counts", "phase"},
func() map[string]int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := make(map[string]int64, len(st.controllers))
for _, ct := range st.controllers {
result[ct.source.Keyspace+"."+ct.source.Shard+"."+ct.workflow+"."+fmt.Sprintf("%v", ct.id)] = ct.blpStats.CopyTimings.Time()
for phase, t := range ct.blpStats.PhaseTimings.Counts() {
result[ct.source.Keyspace+"."+ct.source.Shard+"."+ct.workflow+"."+fmt.Sprintf("%v", ct.id)+"."+phase] = t
}
}
return result
})
stats.NewCounterFunc(
"VReplicationQueryTimingsTotal",
"vreplication query timings aggregated across all streams",
"VReplicationPhaseTimingsTotal",
"vreplication per phase timings aggregated across all phases and streams",
func() int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := int64(0)
for _, ct := range st.controllers {
result += ct.blpStats.CopyTimings.Time()
for _, t := range ct.blpStats.PhaseTimings.Counts() {
result += t
}
}
return result
})
@ -275,11 +279,9 @@ func (st *vrStats) status() *EngineStatus {
SourceTablet: ct.sourceTablet.Get(),
Messages: ct.blpStats.MessageHistory(),
QueryCounts: ct.blpStats.QueryCount.Counts(),
CopyTimings: ct.blpStats.CopyTimings.Time(),
PhaseTimings: ct.blpStats.PhaseTimings.Counts(),
CopyRowCount: ct.blpStats.CopyRowCount.Get(),
CopyLoopCount: ct.blpStats.CopyLoopCount.Get(),
CatchupTimings: ct.blpStats.CatchupTimings.Time(),
FastForwardTimings: ct.blpStats.FastForwardTimings.Time(),
}
i++
}
@ -307,11 +309,9 @@ type ControllerStatus struct {
SourceTablet string
Messages []string
QueryCounts map[string]int64
CopyTimings int64
PhaseTimings map[string]int64
CopyRowCount int64
CopyLoopCount int64
CatchupTimings int64
FastForwardTimings int64
}
var vreplicationTemplate = `

Просмотреть файл

@ -24,8 +24,6 @@ import (
"time"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/proto/binlogdata"
@ -139,18 +137,18 @@ func TestVReplicationStats(t *testing.T) {
testStats.controllers[1].sourceTablet.Set("src1")
sleepTime := 1 * time.Millisecond
addTiming := func(timing *stats.Timings) {
defer timing.Record("fastforward", time.Now())
record := func(phase string) {
defer blpStats.PhaseTimings.Record(phase, time.Now())
time.Sleep(sleepTime)
}
want := int64(1.2 * float64(sleepTime)) //allow 10% overhead for recording timing
addTiming(blpStats.FastForwardTimings)
require.Greater(t, want, testStats.status().Controllers[0].FastForwardTimings)
addTiming(blpStats.CatchupTimings)
require.Greater(t, want, testStats.status().Controllers[0].CatchupTimings)
addTiming(blpStats.CopyTimings)
require.Greater(t, want, testStats.status().Controllers[0].CopyTimings)
record("fastforward")
require.Greater(t, want, testStats.status().Controllers[0].PhaseTimings["fastforward"])
record("catchup")
require.Greater(t, want, testStats.status().Controllers[0].PhaseTimings["catchup"])
record("copy")
require.Greater(t, want, testStats.status().Controllers[0].PhaseTimings["copy"])
blpStats.QueryCount.Add("replicate", 11)
blpStats.QueryCount.Add("fastforward", 23)

Просмотреть файл

@ -140,7 +140,7 @@ func (vc *vcopier) catchup(ctx context.Context, copyState map[string]*sqltypes.R
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
vc.vr.stats.CatchupTimings.Record("catchup", time.Now())
vc.vr.stats.PhaseTimings.Record("catchup", time.Now())
}()
settings, err := binlogplayer.ReadVRSettings(vc.vr.dbClient, vc.vr.id)
@ -192,7 +192,7 @@ func (vc *vcopier) catchup(ctx context.Context, copyState map[string]*sqltypes.R
func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState map[string]*sqltypes.Result) error {
defer vc.vr.dbClient.Rollback()
defer func() {
vc.vr.stats.CopyTimings.Record("copy", time.Now())
vc.vr.stats.PhaseTimings.Record("copy", time.Now())
vc.vr.stats.CopyLoopCount.Add(1)
}()
@ -314,7 +314,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
func (vc *vcopier) fastForward(ctx context.Context, copyState map[string]*sqltypes.Result, gtid string) error {
defer func() {
vc.vr.stats.FastForwardTimings.Record("fastforward", time.Now())
vc.vr.stats.PhaseTimings.Record("fastforward", time.Now())
}()
pos, err := mysql.DecodePosition(gtid)
if err != nil {

Просмотреть файл

@ -65,7 +65,7 @@ func (uvs *uvstreamer) catchup(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
uvs.vse.vstreamerCopyTimings.Record("catchup", time.Now())
uvs.vse.vstreamerPhaseTimings.Record("catchup", time.Now())
}()
errch := make(chan error, 1)
@ -116,7 +116,6 @@ func (uvs *uvstreamer) sendFieldEvent(ctx context.Context, gtid string, fieldEve
}}
log.V(2).Infof("Sending field event %v, gtid is %s", fieldEvent, gtid)
uvs.send(evs)
uvs.vse.vstreamerEventsStreamed.Add(int64(len(evs)))
if err := uvs.setPosition(gtid, true); err != nil {
log.Infof("setPosition returned error %v", err)
@ -157,7 +156,6 @@ func (uvs *uvstreamer) sendEventsForRows(ctx context.Context, tableName string,
evs = append(evs, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_COMMIT,
})
uvs.vse.vstreamerEventsStreamed.Add(int64(len(evs)))
if err := uvs.send(evs); err != nil {
log.Infof("send returned error %v", err)
@ -196,7 +194,7 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
uvs.vse.vstreamerCopyTimings.Record("copy", time.Now())
uvs.vse.vstreamerPhaseTimings.Record("copy", time.Now())
}()
var newLastPK *sqltypes.Result
@ -287,7 +285,7 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
// processes events between when a table was caught up and when a snapshot is taken for streaming a batch of rows
func (uvs *uvstreamer) fastForward(stopPos string) error {
defer func() {
uvs.vse.vstreamerFastForwardTimings.Record("fastforward", time.Now())
uvs.vse.vstreamerPhaseTimings.Record("fastforward", time.Now())
}()
log.Infof("starting fastForward from %s upto pos %s", mysql.EncodePosition(uvs.pos), stopPos)
uvs.stopPos, _ = mysql.DecodePosition(stopPos)

Просмотреть файл

@ -73,16 +73,14 @@ type Engine struct {
vschemaUpdates *stats.Counter
// vstreamer metrics
vstreamerCopyTimings *servenv.TimingsWrapper
vstreamerCatchupTimings *servenv.TimingsWrapper
vstreamerFastForwardTimings *servenv.TimingsWrapper
vstreamerEventsStreamed *stats.Counter
vstreamerPacketSize *stats.GaugeFunc
vstreamerNumPackets *stats.Counter
resultStreamerNumRows *stats.Counter
resultStreamerNumPackets *stats.Counter
rowStreamerNumRows *stats.Counter
rowStreamerNumPackets *stats.Counter
vstreamerPhaseTimings *servenv.TimingsWrapper
vstreamerEventsStreamed *stats.Counter
vstreamerPacketSize *stats.GaugeFunc
vstreamerNumPackets *stats.Counter
resultStreamerNumRows *stats.Counter
resultStreamerNumPackets *stats.Counter
rowStreamerNumRows *stats.Counter
rowStreamerNumPackets *stats.Counter
}
// NewEngine creates a new Engine.
@ -104,16 +102,14 @@ func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, cell str
vschemaErrors: env.Exporter().NewCounter("VSchemaErrors", "Count of VSchema errors"),
vschemaUpdates: env.Exporter().NewCounter("VSchemaUpdates", "Count of VSchema updates. Does not include errors"),
vstreamerCopyTimings: env.Exporter().NewTimings("VStreamerCopyTiming", "Time taken for bulk copy during vstream copy", "Copy Timings"),
vstreamerCatchupTimings: env.Exporter().NewTimings("VStreamerCatchupTiming", "Time taken for catchup during vstream copy", "Catchup Timings"),
vstreamerFastForwardTimings: env.Exporter().NewTimings("VStreamerFastForwardTiming", "Time taken for fastforward during vstream copy", "FastForward Timings"),
vstreamerEventsStreamed: env.Exporter().NewCounter("VStreamerEventsStreamed", "Count of events streamed in VStream API"),
vstreamerPacketSize: env.Exporter().NewGaugeFunc("VStreamPacketSize", "Max packet size for sending vstreamer events", getPacketSize),
vstreamerNumPackets: env.Exporter().NewCounter("VStreamerNumPackets", "Number of packets in vstreamer"),
resultStreamerNumPackets: env.Exporter().NewCounter("ResultStreamerNumPackets", "Number of packets in result streamer"),
resultStreamerNumRows: env.Exporter().NewCounter("ResultStreamerNumRows", "Number of rows sent in result streamer"),
rowStreamerNumPackets: env.Exporter().NewCounter("RowStreamerNumPackets", "Number of packets in row streamer"),
rowStreamerNumRows: env.Exporter().NewCounter("RowStreamerNumRows", "Number of rows sent in row streamer"),
vstreamerPhaseTimings: env.Exporter().NewTimings("VStreamerCopyTiming", "Time taken for bulk copy during vstream copy", "Copy Timings"),
vstreamerEventsStreamed: env.Exporter().NewCounter("VStreamerEventsStreamed", "Count of events streamed in VStream API"),
vstreamerPacketSize: env.Exporter().NewGaugeFunc("VStreamPacketSize", "Max packet size for sending vstreamer events", getPacketSize),
vstreamerNumPackets: env.Exporter().NewCounter("VStreamerNumPackets", "Number of packets in vstreamer"),
resultStreamerNumPackets: env.Exporter().NewCounter("ResultStreamerNumPackets", "Number of packets in result streamer"),
resultStreamerNumRows: env.Exporter().NewCounter("ResultStreamerNumRows", "Number of rows sent in result streamer"),
rowStreamerNumPackets: env.Exporter().NewCounter("RowStreamerNumPackets", "Number of packets in row streamer"),
rowStreamerNumRows: env.Exporter().NewCounter("RowStreamerNumRows", "Number of rows sent in row streamer"),
}
env.Exporter().HandleFunc("/debug/tablet_vschema", vse.ServeHTTP)
return vse

Просмотреть файл

@ -94,11 +94,15 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se
MaxReplicationLag: 1 * time.Nanosecond,
CatchupRetryTime: 1 * time.Second,
}
send2 := func(evs []*binlogdatapb.VEvent) error {
vse.vstreamerEventsStreamed.Add(int64(len(evs)))
return send(evs)
}
uvs := &uvstreamer{
ctx: ctx,
cancel: cancel,
vse: vse,
send: send,
send: send2,
cp: cp,
se: se,
startPos: startPos,
@ -270,7 +274,6 @@ func (uvs *uvstreamer) send2(evs []*binlogdatapb.VEvent) error {
if len(uvs.plans) > 0 {
evs2 = uvs.filterEvents(evs)
}
uvs.vse.vstreamerEventsStreamed.Add(int64(len(evs2)))
err := uvs.send(evs2)
if err != nil && err != io.EOF {
return err
@ -288,13 +291,13 @@ func (uvs *uvstreamer) send2(evs []*binlogdatapb.VEvent) error {
func (uvs *uvstreamer) sendEventsForCurrentPos() error {
log.Infof("sendEventsForCurrentPos")
vevents := []*binlogdatapb.VEvent{{
evs := []*binlogdatapb.VEvent{{
Type: binlogdatapb.VEventType_GTID,
Gtid: mysql.EncodePosition(uvs.pos),
}, {
Type: binlogdatapb.VEventType_OTHER,
}}
if err := uvs.send(vevents); err != nil {
if err := uvs.send(evs); err != nil {
return wrapError(err, uvs.pos)
}
return nil
@ -412,6 +415,7 @@ func (uvs *uvstreamer) sendTestEvent(msg string) {
Type: binlogdatapb.VEventType_OTHER,
Gtid: msg,
}
if err := uvs.send([]*binlogdatapb.VEvent{ev}); err != nil {
return
}

Просмотреть файл

@ -264,7 +264,7 @@ commit;"
cancel()
}
}
resetMetrics(t)
startVStreamCopy(ctx, t, filter, tablePKs)
select {
@ -283,6 +283,7 @@ commit;"
log.Infof("Successfully received %d events", numExpectedEvents)
}
validateReceivedEvents(t)
validateMetrics(t)
}
func validateReceivedEvents(t *testing.T) {
@ -297,6 +298,24 @@ func validateReceivedEvents(t *testing.T) {
}
}
func resetMetrics(t *testing.T) {
engine.vstreamerEventsStreamed.Reset()
engine.resultStreamerNumRows.Reset()
engine.rowStreamerNumRows.Reset()
engine.vstreamerPhaseTimings.Reset()
engine.vstreamerPhaseTimings.Reset()
engine.vstreamerPhaseTimings.Reset()
}
func validateMetrics(t *testing.T) {
require.Equal(t, engine.vstreamerEventsStreamed.Get(), int64(len(allEvents)))
require.Equal(t, engine.resultStreamerNumRows.Get(), int64(0))
require.Equal(t, engine.rowStreamerNumRows.Get(), int64(31))
require.Equal(t, engine.vstreamerPhaseTimings.Counts()["VStreamerTest.copy"], int64(3))
require.Equal(t, engine.vstreamerPhaseTimings.Counts()["VStreamerTest.catchup"], int64(2))
require.Equal(t, engine.vstreamerPhaseTimings.Counts()["VStreamerTest.fastforward"], int64(2))
}
func insertMultipleRows(t *testing.T, table string, idx int, numRows int) {
query1 := fmt.Sprintf(bulkInsertQuery, table, idx, idx)
s := ""