A bit more cleanup. Add a separate counter for read errors vs write errors

This commit is contained in:
Bryan Beaudreault 2017-04-04 18:23:56 -04:00 коммит произвёл Sugu Sougoumarane
Родитель d6e0f6603e
Коммит 6c8187518d
5 изменённых файлов: 54 добавлений и 35 удалений

Просмотреть файл

@ -24,12 +24,14 @@ var (
enableHeartbeat = flag.Bool("enable_heartbeat", false, "If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks.")
interval = flag.Duration("heartbeat_interval", 1*time.Second, "How frequently to read and write replication heartbeat.")
// HeartbeatWrites keeps a count of the number of heartbeats written over time.
writes = stats.NewInt("HeartbeatWrites")
// HeartbeatReads keeps a count of the number of heartbeats read over time.
reads = stats.NewInt("HeartbeatReads")
// HeartbeatErrors keeps a count of the number of errors encountered while reading or writing heartbeats.
errors = stats.NewInt("HeartbeatErrors")
// HeartbeatWriteCount keeps a count of the number of heartbeats written over time.
writeCount = stats.NewInt("HeartbeatWriteCount")
// HeartbeatWriteErrorCount keeps a count of errors encountered while writing heartbeats
writeErrorCount = stats.NewInt("HeartbeatWriteErrorCount")
// HeartbeatReadCount keeps a count of the number of heartbeats read over time.
readCount = stats.NewInt("HeartbeatReadCount")
// HeartbeatReadErrorCount keeps a count of errors encountered while reading heartbeats
readErrorCount = stats.NewInt("HeartbeatReadErrorCount")
// HeartbeatLagNs is incremented by the current lag at each heartbeat read interval. Plotting this
// over time allows calculating of a rolling average lag.
lagNs = stats.NewInt("HeartbeatLagNs")

Просмотреть файл

@ -18,6 +18,10 @@ import (
"github.com/youtube/vitess/go/vt/vttablet/tabletserver/tabletenv"
)
const (
sqlFetchMostRecentHeartbeat = "SELECT ts, master_uid FROM %s.heartbeat ORDER BY ts DESC LIMIT 1"
)
// Reader reads the heartbeat table at a configured interval in order
// to calculate replication lag. It is meant to be run on a slave, and paired
// with a Writer on a master. It's primarily created and launched from Reporter.
@ -25,11 +29,11 @@ import (
// table against the current time at read time. This value isreported in metrics and
// also to the healthchecks.
type Reader struct {
topoServer topo.Server
mysqld mysqlctl.MysqlDaemon
tablet *topodata.Tablet
now func() time.Time
errorLog *logutil.ThrottledLogger
topoServer topo.Server
mysqld mysqlctl.MysqlDaemon
tablet *topodata.Tablet
now func() time.Time
errorLog *logutil.ThrottledLogger
wg *sync.WaitGroup
cancel context.CancelFunc
@ -59,7 +63,7 @@ func (r *Reader) Open(dbc dbconfigs.DBConfigs) {
ctx, cancel := context.WithCancel(context.Background())
r.dbName = sqlparser.Backtick(dbc.SidecarDBName)
r.wg = wg
r.wg = wg
r.cancel = cancel
wg.Add(1)
@ -72,6 +76,16 @@ func (r *Reader) Close() {
r.wg.Wait()
}
// GetLatest returns the most recent lag measurement or error encountered
func (r *Reader) GetLatest() (time.Duration, error) {
r.mu.Lock()
defer r.mu.Unlock()
if r.lastKnownError != nil {
return 0, r.lastKnownError
}
return r.lastKnownLag, nil
}
// watchHeartbeat is meant to be called as a goroutine, and calls
// fetchMostRecentHeartbeat repeatedly until told to exit by Close.
func (r *Reader) watchHeartbeat(ctx context.Context) {
@ -126,7 +140,7 @@ func (r *Reader) readHeartbeat(ctx context.Context) error {
lag := r.now().Sub(time.Unix(0, ts))
lagNs.Add(lag.Nanoseconds())
reads.Add(1)
readCount.Add(1)
r.mu.Lock()
r.lastKnownLag = lag
@ -143,7 +157,7 @@ func (r *Reader) fetchMostRecentHeartbeat(ctx context.Context) (*sqltypes.Result
return nil, err
}
defer conn.Recycle()
return conn.ExecuteFetch(fmt.Sprintf("SELECT ts, master_uid FROM %s.heartbeat ORDER BY ts DESC LIMIT 1", r.dbName), 1, false)
return conn.ExecuteFetch(fmt.Sprintf(sqlFetchMostRecentHeartbeat, r.dbName), 1, false)
}
// parseHeartbeatResult turns a raw result into the timestamp and master uid values
@ -171,5 +185,5 @@ func (r *Reader) recordError(err error) {
r.lastKnownError = err
r.mu.Unlock()
r.errorLog.Errorf("%v", err)
errors.Add(1)
readErrorCount.Add(1)
}

Просмотреть файл

@ -31,21 +31,16 @@ func RegisterReporter(topoServer topo.Server, mysqld mysqlctl.MysqlDaemon, table
return reporter
}
// HTMLName is part of the health.Reader interface.
// HTMLName is part of the health.Reporter interface.
func (r *Reporter) HTMLName() template.HTML {
return template.HTML("MySQLHeartbeat")
}
// Report is part of the health.Reader interface. It returns the last reported value
// Report is part of the health.Reporter interface. It returns the last reported value
// written by the watchHeartbeat goroutine. If we're the master, it just returns 0.
func (r *Reporter) Report(isSlaveType, shouldQueryServiceBeRunning bool) (time.Duration, error) {
if !isSlaveType {
return 0, nil
}
r.mu.Lock()
defer r.mu.Unlock()
if r.lastKnownError != nil {
return 0, r.lastKnownError
}
return r.lastKnownLag, nil
return r.GetLatest()
}

Просмотреть файл

@ -10,7 +10,6 @@ import (
"github.com/youtube/vitess/go/vt/dbconnpool"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/proto/topodata"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vttablet/tabletserver/tabletenv"
log "github.com/golang/glog"
@ -30,6 +29,12 @@ const (
sqlUpdateHeartbeat = "UPDATE %v.heartbeat SET ts=%d WHERE master_uid=%d"
)
var (
// initTableRetryInterval is the default retry interval for initialize table attempts. It's non-constant
// to allow overriding in tests.
initTableRetryInterval = 10 * time.Second
)
type mySQLChecker interface {
CheckMySQL()
}
@ -37,7 +42,6 @@ type mySQLChecker interface {
// Writer runs on master tablets and writes heartbeats to the _vt.heartbeat
// table at a regular interval, defined by heartbeat_interval.
type Writer struct {
topoServer topo.Server
tabletAlias topodata.TabletAlias
now func() time.Time
errorLog *logutil.ThrottledLogger
@ -51,9 +55,8 @@ type Writer struct {
}
// NewWriter creates a new Writer.
func NewWriter(topoServer topo.Server, alias topodata.TabletAlias, checker mySQLChecker, config tabletenv.TabletConfig) *Writer {
func NewWriter(checker mySQLChecker, alias topodata.TabletAlias, config tabletenv.TabletConfig) *Writer {
return &Writer{
topoServer: topoServer,
tabletAlias: alias,
now: time.Now,
errorLog: logutil.NewThrottledLogger("HeartbeatWriter", 60*time.Second),
@ -106,7 +109,12 @@ func (w *Writer) run(ctx context.Context, initParams *sqldb.ConnParams) {
defer w.wg.Done()
defer tabletenv.LogError()
w.waitForTables(ctx, initParams)
// We would only exit here if we were in error and had been canceled.
// In this case, end the routine.
if err := w.waitForTables(ctx, initParams); err != nil {
w.recordError(err)
return
}
log.Info("Beginning heartbeat writes")
for {
@ -123,17 +131,17 @@ func (w *Writer) run(ctx context.Context, initParams *sqldb.ConnParams) {
// waitForTables continually attempts to create the heartbeat tables, until
// success or cancellation.
func (w *Writer) waitForTables(ctx context.Context, cp *sqldb.ConnParams) {
func (w *Writer) waitForTables(ctx context.Context, cp *sqldb.ConnParams) error {
log.Info("Initializing heartbeat table")
for {
err := w.initializeTables(cp)
if err == nil {
return
return nil
}
w.recordError(err)
if waitOrExit(ctx, 10*time.Second) {
return
if waitOrExit(ctx, initTableRetryInterval) {
return err
}
}
}
@ -164,7 +172,7 @@ func (w *Writer) writeHeartbeat(ctx context.Context) error {
if err != nil {
return fmt.Errorf("Failed to execute update query: %v", err)
}
writes.Add(1)
writeCount.Add(1)
return nil
}
@ -183,5 +191,5 @@ func (w *Writer) exec(ctx context.Context, query string) error {
func (w *Writer) recordError(err error) {
w.errorLog.Errorf("%v", err)
errors.Add(1)
writeErrorCount.Add(1)
}

Просмотреть файл

@ -192,7 +192,7 @@ func NewTabletServer(config tabletenv.TabletConfig, topoServer topo.Server, alia
tsv.te = NewTxEngine(tsv, config)
tsv.txThrottler = txthrottler.CreateTxThrottlerFromTabletConfig(topoServer)
tsv.messager = messager.NewEngine(tsv, tsv.se, config)
tsv.heartbeat = heartbeat.NewWriter(tsv.topoServer, alias, tsv, config)
tsv.heartbeat = heartbeat.NewWriter(tsv, alias, config)
tsv.watcher = NewReplicationWatcher(tsv.se, config)
tsv.updateStreamList = &binlog.StreamList{}
// FIXME(alainjobart) could we move this to the Register method below?