Merge pull request #520 from youtube/aaijazi_add_vars_to_workers

Add variables to track additional worker state, useful for monitoring and testing
This commit is contained in:
Ammar Aijazi 2015-03-26 13:45:33 -07:00
Родитель d4f3982043 7451a0ec55
Коммит 45cb02b813
10 изменённых файлов: 68 добавлений и 0 удалений

Просмотреть файл

@ -49,6 +49,13 @@ func (c *Counters) Set(name string, value int64) {
c.counts[name] = value
}
// Reset resets all counter values
func (c *Counters) Reset() {
c.mu.Lock()
defer c.mu.Unlock()
c.counts = make(map[string]int64)
}
// Counts returns a copy of the Counters' map.
func (c *Counters) Counts() map[string]int64 {
c.mu.Lock()

Просмотреть файл

@ -186,8 +186,10 @@ func executeFetchWithRetries(ctx context.Context, wr *wrangler.Wrangler, ti *top
switch {
case wr.TabletManagerClient().IsTimeoutError(err):
wr.Logger().Warningf("ExecuteFetch failed on %v; will retry because it was a timeout error: %v", ti, err)
statsRetryCounters.Add("TimeoutError", 1)
case errNo == "1290":
wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL read-only error: %v", ti, err)
statsRetryCounters.Add("ReadOnly", 1)
case errNo == "1062":
if !isRetry {
return ti, fmt.Errorf("ExecuteFetch failed on %v on the first attempt; not retrying as this is not a recoverable error: %v", ti, err)

Просмотреть файл

@ -124,6 +124,7 @@ func NewSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, ex
func (scw *SplitCloneWorker) setState(state string) {
scw.mu.Lock()
scw.state = state
statsState.Set(state)
scw.mu.Unlock()
event.DispatchUpdate(scw.ev, state)
@ -132,6 +133,7 @@ func (scw *SplitCloneWorker) setState(state string) {
func (scw *SplitCloneWorker) recordError(err error) {
scw.mu.Lock()
scw.state = stateSCError
statsState.Set(stateSCError)
scw.err = err
scw.mu.Unlock()
@ -213,6 +215,7 @@ func (scw *SplitCloneWorker) checkInterrupted() bool {
// Run implements the Worker interface
func (scw *SplitCloneWorker) Run() {
resetVars()
err := scw.run()
scw.setState(stateSCCleanUp)
@ -372,6 +375,7 @@ func (scw *SplitCloneWorker) findTargets() error {
// It will attempt to resolve all shards and update scw.destinationShardsToTablets;
// if it is unable to do so, it will not modify scw.destinationShardsToTablets at all.
func (scw *SplitCloneWorker) ResolveDestinationMasters() error {
statsDestinationAttemptedResolves.Add(1)
destinationShardsToTablets := make(map[string]*topo.TabletInfo)
// Allow at most one resolution request at a time; if there are concurrent requests, only
@ -394,6 +398,7 @@ func (scw *SplitCloneWorker) ResolveDestinationMasters() error {
scw.destinationShardsToTablets = destinationShardsToTablets
// save the time of the last successful resolution
scw.resolveTime = time.Now()
statsDestinationActualResolves.Add(1)
return nil
}

Просмотреть файл

@ -335,4 +335,14 @@ func testSplitClone(t *testing.T, strategy string) {
if wrk.err != nil || wrk.state != stateSCDone {
t.Errorf("Worker run failed")
}
if statsDestinationAttemptedResolves.String() != "3" {
t.Errorf("Wrong statsDestinationAttemptedResolves: wanted %v, got %v", "3", statsDestinationAttemptedResolves.String())
}
if statsDestinationActualResolves.String() != "1" {
t.Errorf("Wrong statsDestinationActualResolves: wanted %v, got %v", "1", statsDestinationActualResolves.String())
}
if statsRetryCounters.String() != "{\"ReadOnly\": 2}" {
t.Errorf("Wrong statsRetryCounters: wanted %v, got %v", "{\"ReadOnly\": 2}", statsRetryCounters.String())
}
}

Просмотреть файл

@ -86,12 +86,14 @@ func NewSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, exc
func (sdw *SplitDiffWorker) setState(state string) {
sdw.mu.Lock()
sdw.state = state
statsState.Set(state)
sdw.mu.Unlock()
}
func (sdw *SplitDiffWorker) recordError(err error) {
sdw.mu.Lock()
sdw.state = stateSDError
statsState.Set(stateSDError)
sdw.err = err
sdw.mu.Unlock()
}
@ -151,6 +153,7 @@ func (sdw *SplitDiffWorker) checkInterrupted() bool {
// Run is mostly a wrapper to run the cleanup at the end.
func (sdw *SplitDiffWorker) Run() {
resetVars()
err := sdw.run()
sdw.setState(stateSDCleanUp)

Просмотреть файл

@ -89,6 +89,7 @@ func NewSQLDiffWorker(wr *wrangler.Wrangler, cell string, superset, subset Sourc
func (worker *SQLDiffWorker) setState(state sqlDiffWorkerState) {
worker.mu.Lock()
worker.state = state
statsState.Set(string(state))
worker.mu.Unlock()
}
@ -97,6 +98,7 @@ func (worker *SQLDiffWorker) recordError(err error) {
defer worker.mu.Unlock()
worker.state = sqlDiffError
statsState.Set(string(sqlDiffError))
worker.err = err
}
@ -157,6 +159,7 @@ func (worker *SQLDiffWorker) checkInterrupted() bool {
// Run is mostly a wrapper to run the cleanup at the end.
func (worker *SQLDiffWorker) Run() {
resetVars()
err := worker.run()
worker.setState(sqlDiffCleanUp)

Просмотреть файл

@ -119,6 +119,7 @@ func NewVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, destinationKeyspac
func (vscw *VerticalSplitCloneWorker) setState(state string) {
vscw.mu.Lock()
vscw.state = state
statsState.Set(state)
vscw.mu.Unlock()
event.DispatchUpdate(vscw.ev, state)
@ -127,6 +128,7 @@ func (vscw *VerticalSplitCloneWorker) setState(state string) {
func (vscw *VerticalSplitCloneWorker) recordError(err error) {
vscw.mu.Lock()
vscw.state = stateVSCError
statsState.Set(stateVSCError)
vscw.err = err
vscw.mu.Unlock()
@ -200,6 +202,7 @@ func (vscw *VerticalSplitCloneWorker) checkInterrupted() bool {
// Run implements the Worker interface
func (vscw *VerticalSplitCloneWorker) Run() {
resetVars()
err := vscw.run()
vscw.setState(stateVSCCleanUp)
@ -336,6 +339,7 @@ func (vscw *VerticalSplitCloneWorker) findTargets() error {
// It will attempt to resolve all shards and update vscw.destinationShardsToTablets;
// if it is unable to do so, it will not modify vscw.destinationShardsToTablets at all.
func (vscw *VerticalSplitCloneWorker) ResolveDestinationMasters() error {
statsDestinationAttemptedResolves.Add(1)
// Allow at most one resolution request at a time; if there are concurrent requests, only
// one of them will actualy hit the topo server.
vscw.resolveMu.Lock()
@ -353,6 +357,7 @@ func (vscw *VerticalSplitCloneWorker) ResolveDestinationMasters() error {
vscw.destinationShardsToTablets = map[string]*topo.TabletInfo{vscw.destinationShard: ti}
// save the time of the last successful resolution
vscw.resolveTime = time.Now()
statsDestinationActualResolves.Add(1)
return nil
}

Просмотреть файл

@ -320,4 +320,14 @@ func testVerticalSplitClone(t *testing.T, strategy string) {
if wrk.err != nil || wrk.state != stateSCDone {
t.Errorf("Worker run failed")
}
if statsDestinationAttemptedResolves.String() != "2" {
t.Errorf("Wrong statsDestinationAttemptedResolves: wanted %v, got %v", "2", statsDestinationAttemptedResolves.String())
}
if statsDestinationActualResolves.String() != "1" {
t.Errorf("Wrong statsDestinationActualResolves: wanted %v, got %v", "1", statsDestinationActualResolves.String())
}
if statsRetryCounters.String() != "{\"ReadOnly\": 1}" {
t.Errorf("Wrong statsRetryCounters: wanted %v, got %v", "{\"ReadOnly\": 1}", statsRetryCounters.String())
}
}

Просмотреть файл

@ -86,12 +86,14 @@ func NewVerticalSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard str
func (vsdw *VerticalSplitDiffWorker) setState(state string) {
vsdw.mu.Lock()
vsdw.state = state
statsState.Set(state)
vsdw.mu.Unlock()
}
func (vsdw *VerticalSplitDiffWorker) recordError(err error) {
vsdw.mu.Lock()
vsdw.state = stateVSDError
statsState.Set(stateVSDError)
vsdw.err = err
vsdw.mu.Unlock()
}
@ -151,6 +153,7 @@ func (vsdw *VerticalSplitDiffWorker) checkInterrupted() bool {
// Run is mostly a wrapper to run the cleanup at the end.
func (vsdw *VerticalSplitDiffWorker) Run() {
resetVars()
err := vsdw.run()
vsdw.setState(stateVSDCleanUp)

Просмотреть файл

@ -12,6 +12,7 @@ import (
"html/template"
"time"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/vt/topo"
)
@ -51,3 +52,22 @@ type Resolver interface {
// Resolvers should attempt to keep the previous topo resolution cached for at
// least this long.
const resolveTTL = 15 * time.Second
var (
statsState = stats.NewString("WorkerState")
// the number of times that the worker attempst to reresolve the masters
statsDestinationAttemptedResolves = stats.NewInt("WorkerDestinationAttemptedResolves")
// the number of times that the worker actually hits the topo server, i.e., they don't
// use a cached topology
statsDestinationActualResolves = stats.NewInt("WorkerDestinationActualResolves")
statsRetryCounters = stats.NewCounters("WorkerRetryCount")
)
// resetVars resets the debug variables that are meant to provide information on a
// per-run basis. This should be called at the beginning of each worker run.
func resetVars() {
statsState.Set("")
statsDestinationAttemptedResolves.Set(0)
statsDestinationActualResolves.Set(0)
statsRetryCounters.Reset()
}