зеркало из https://github.com/github/vitess-gh.git
Add variables to track additional worker state, useful for monitoring and testing
This commit is contained in:
Родитель
073f69451b
Коммит
b879c0535a
|
@ -186,8 +186,10 @@ func executeFetchWithRetries(ctx context.Context, wr *wrangler.Wrangler, ti *top
|
|||
switch {
|
||||
case wr.TabletManagerClient().IsTimeoutError(err):
|
||||
wr.Logger().Warningf("ExecuteFetch failed on %v; will retry because it was a timeout error: %v", ti, err)
|
||||
statsRetryCounters.Add("TimeoutError", 1)
|
||||
case errNo == "1290":
|
||||
wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL read-only error: %v", ti, err)
|
||||
statsRetryCounters.Add("ReadOnly", 1)
|
||||
case errNo == "1062":
|
||||
if !isRetry {
|
||||
return ti, fmt.Errorf("ExecuteFetch failed on %v on the first attempt; not retrying as this is not a recoverable error: %v", ti, err)
|
||||
|
|
|
@ -124,6 +124,7 @@ func NewSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, ex
|
|||
func (scw *SplitCloneWorker) setState(state string) {
|
||||
scw.mu.Lock()
|
||||
scw.state = state
|
||||
statsState.Set(state)
|
||||
scw.mu.Unlock()
|
||||
|
||||
event.DispatchUpdate(scw.ev, state)
|
||||
|
@ -132,6 +133,7 @@ func (scw *SplitCloneWorker) setState(state string) {
|
|||
func (scw *SplitCloneWorker) recordError(err error) {
|
||||
scw.mu.Lock()
|
||||
scw.state = stateSCError
|
||||
statsState.Set(stateSCError)
|
||||
scw.err = err
|
||||
scw.mu.Unlock()
|
||||
|
||||
|
@ -394,6 +396,7 @@ func (scw *SplitCloneWorker) ResolveDestinationMasters() error {
|
|||
scw.destinationShardsToTablets = destinationShardsToTablets
|
||||
// save the time of the last successful resolution
|
||||
scw.resolveTime = time.Now()
|
||||
statsDestinationResolves.Add(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -86,12 +86,14 @@ func NewSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, exc
|
|||
func (sdw *SplitDiffWorker) setState(state string) {
|
||||
sdw.mu.Lock()
|
||||
sdw.state = state
|
||||
statsState.Set(state)
|
||||
sdw.mu.Unlock()
|
||||
}
|
||||
|
||||
func (sdw *SplitDiffWorker) recordError(err error) {
|
||||
sdw.mu.Lock()
|
||||
sdw.state = stateSDError
|
||||
statsState.Set(stateSDError)
|
||||
sdw.err = err
|
||||
sdw.mu.Unlock()
|
||||
}
|
||||
|
|
|
@ -89,6 +89,7 @@ func NewSQLDiffWorker(wr *wrangler.Wrangler, cell string, superset, subset Sourc
|
|||
func (worker *SQLDiffWorker) setState(state sqlDiffWorkerState) {
|
||||
worker.mu.Lock()
|
||||
worker.state = state
|
||||
statsState.Set(string(state))
|
||||
worker.mu.Unlock()
|
||||
}
|
||||
|
||||
|
@ -97,6 +98,7 @@ func (worker *SQLDiffWorker) recordError(err error) {
|
|||
defer worker.mu.Unlock()
|
||||
|
||||
worker.state = sqlDiffError
|
||||
statsState.Set(string(sqlDiffError))
|
||||
worker.err = err
|
||||
}
|
||||
|
||||
|
|
|
@ -119,6 +119,7 @@ func NewVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, destinationKeyspac
|
|||
func (vscw *VerticalSplitCloneWorker) setState(state string) {
|
||||
vscw.mu.Lock()
|
||||
vscw.state = state
|
||||
statsState.Set(state)
|
||||
vscw.mu.Unlock()
|
||||
|
||||
event.DispatchUpdate(vscw.ev, state)
|
||||
|
@ -127,6 +128,7 @@ func (vscw *VerticalSplitCloneWorker) setState(state string) {
|
|||
func (vscw *VerticalSplitCloneWorker) recordError(err error) {
|
||||
vscw.mu.Lock()
|
||||
vscw.state = stateVSCError
|
||||
statsState.Set(stateVSCError)
|
||||
vscw.err = err
|
||||
vscw.mu.Unlock()
|
||||
|
||||
|
@ -353,6 +355,7 @@ func (vscw *VerticalSplitCloneWorker) ResolveDestinationMasters() error {
|
|||
vscw.destinationShardsToTablets = map[string]*topo.TabletInfo{vscw.destinationShard: ti}
|
||||
// save the time of the last successful resolution
|
||||
vscw.resolveTime = time.Now()
|
||||
statsDestinationResolves.Add(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -86,12 +86,14 @@ func NewVerticalSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard str
|
|||
func (vsdw *VerticalSplitDiffWorker) setState(state string) {
|
||||
vsdw.mu.Lock()
|
||||
vsdw.state = state
|
||||
statsState.Set(state)
|
||||
vsdw.mu.Unlock()
|
||||
}
|
||||
|
||||
func (vsdw *VerticalSplitDiffWorker) recordError(err error) {
|
||||
vsdw.mu.Lock()
|
||||
vsdw.state = stateVSDError
|
||||
statsState.Set(stateVSDError)
|
||||
vsdw.err = err
|
||||
vsdw.mu.Unlock()
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"html/template"
|
||||
"time"
|
||||
|
||||
"github.com/youtube/vitess/go/stats"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
)
|
||||
|
||||
|
@ -51,3 +52,10 @@ type Resolver interface {
|
|||
// Resolvers should attempt to keep the previous topo resolution cached for at
|
||||
// least this long.
|
||||
const resolveTTL = 15 * time.Second
|
||||
|
||||
var (
|
||||
// the stats exported by this package
|
||||
statsState = stats.NewString("WorkerState")
|
||||
statsDestinationResolves = stats.NewInt("WorkerDestinationResolves")
|
||||
statsRetryCounters = stats.NewCounters("WorkerRetryCount")
|
||||
)
|
||||
|
|
Загрузка…
Ссылка в новой задаче