Merge branch 'master' into resharding

This commit is contained in:
Alain Jobart 2015-07-17 11:06:41 -07:00
Родитель c04a3f0a27 3de9bdb49e
Коммит b2a09f8f4b
12 изменённых файлов: 39 добавлений и 39 удалений

Просмотреть файл

@ -200,11 +200,11 @@ func NewActionAgent(
}
// after the restore is done, start health check
agent.initHeathCheck()
agent.initHealthCheck()
}()
} else {
// synchronously start health check if needed
agent.initHeathCheck()
agent.initHealthCheck()
}
return agent, nil

Просмотреть файл

@ -96,7 +96,7 @@ func (agent *ActionAgent) IsRunningHealthCheck() bool {
return *targetTabletType != ""
}
func (agent *ActionAgent) initHeathCheck() {
func (agent *ActionAgent) initHealthCheck() {
if !agent.IsRunningHealthCheck() {
log.Infof("No target_tablet_type specified, disabling any health check")
return

Просмотреть файл

@ -47,8 +47,8 @@ COMMAND ARGUMENT DEFINITIONS
for backup purposes
-- batch: A slaved copy of data for OLAP load patterns (typically for
MapReduce jobs)
-- checker: A tablet that is running a checker process. The tablet is likely
lagging in replication.
-- worker: A tablet that is in use by a vtworker process. The tablet is likely
lagging in replication.
-- experimental: A slaved copy of data that is ready but not serving query
traffic. The value indicates a special characteristic of
the tablet that indicates the tablet should not be

Просмотреть файл

@ -264,7 +264,7 @@ func (scw *SplitCloneWorker) init(ctx context.Context) error {
// findTargets phase:
// - find one rdonly in the source shard
// - mark it as 'checker' pointing back to us
// - mark it as 'worker' pointing back to us
// - get the aliases of all the targets
func (scw *SplitCloneWorker) findTargets(ctx context.Context) error {
scw.setState(WorkerStateFindTargets)
@ -275,7 +275,7 @@ func (scw *SplitCloneWorker) findTargets(ctx context.Context) error {
for i, si := range scw.sourceShards {
scw.sourceAliases[i], err = FindWorkerTablet(ctx, scw.wr, scw.cleaner, scw.cell, si.Keyspace(), si.ShardName())
if err != nil {
return fmt.Errorf("cannot find checker for %v/%v/%v: %v", scw.cell, si.Keyspace(), si.ShardName(), err)
return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", scw.cell, si.Keyspace(), si.ShardName(), err)
}
scw.wr.Logger().Infof("Using tablet %v as source for %v/%v", scw.sourceAliases[i], si.Keyspace(), si.ShardName())
}

Просмотреть файл

@ -174,7 +174,7 @@ func (sdw *SplitDiffWorker) init(ctx context.Context) error {
// findTargets phase:
// - find one rdonly per source shard
// - find one rdonly in destination shard
// - mark them all as 'checker' pointing back to us
// - mark them all as 'worker' pointing back to us
func (sdw *SplitDiffWorker) findTargets(ctx context.Context) error {
sdw.SetState(WorkerStateFindTargets)
@ -182,7 +182,7 @@ func (sdw *SplitDiffWorker) findTargets(ctx context.Context) error {
var err error
sdw.destinationAlias, err = FindWorkerTablet(ctx, sdw.wr, sdw.cleaner, sdw.cell, sdw.keyspace, sdw.shard)
if err != nil {
return fmt.Errorf("cannot find checker for %v/%v/%v: %v", sdw.cell, sdw.keyspace, sdw.shard, err)
return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", sdw.cell, sdw.keyspace, sdw.shard, err)
}
// find an appropriate endpoint in the source shards
@ -190,7 +190,7 @@ func (sdw *SplitDiffWorker) findTargets(ctx context.Context) error {
for i, ss := range sdw.shardInfo.SourceShards {
sdw.sourceAliases[i], err = FindWorkerTablet(ctx, sdw.wr, sdw.cleaner, sdw.cell, sdw.keyspace, ss.Shard)
if err != nil {
return fmt.Errorf("cannot find checker for %v/%v/%v: %v", sdw.cell, sdw.keyspace, ss.Shard, err)
return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", sdw.cell, sdw.keyspace, ss.Shard, err)
}
}
@ -201,19 +201,19 @@ func (sdw *SplitDiffWorker) findTargets(ctx context.Context) error {
// 1 - ask the master of the destination shard to pause filtered replication,
// and return the source binlog positions
// (add a cleanup task to restart filtered replication on master)
// 2 - stop all the source 'checker' at a binlog position higher than the
// 2 - stop all the source tablets at a binlog position higher than the
// destination master. Get that new list of positions.
// (add a cleanup task to restart binlog replication on them, and change
// the existing ChangeSlaveType cleanup action to 'spare' type)
// 3 - ask the master of the destination shard to resume filtered replication
// up to the new list of positions, and return its binlog position.
// 4 - wait until the destination checker is equal or passed that master binlog
// position, and stop its replication.
// 4 - wait until the destination tablet is equal or passed that master
// binlog position, and stop its replication.
// (add a cleanup task to restart binlog replication on it, and change
// the existing ChangeSlaveType cleanup action to 'spare' type)
// 5 - restart filtered replication on destination master.
// (remove the cleanup task that does the same)
// At this point, all checker instances are stopped at the same point.
// At this point, all source and destination tablets are stopped at the same point.
func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
sdw.SetState(WorkerStateSyncReplication)
@ -233,7 +233,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
}
wrangler.RecordStartBlpAction(sdw.cleaner, masterInfo)
// 2 - stop all the source 'checker' at a binlog position
// 2 - stop all the source tablets at a binlog position
// higher than the destination master
stopPositionList := blproto.BlpPositionList{
Entries: make([]blproto.BlpPosition, len(sdw.shardInfo.SourceShards)),
@ -282,9 +282,9 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
return fmt.Errorf("RunBlpUntil for %v until %v failed: %v", sdw.shardInfo.MasterAlias, stopPositionList, err)
}
// 4 - wait until the destination checker is equal or passed
// 4 - wait until the destination tablet is equal or passed
// that master binlog position, and stop its replication.
sdw.wr.Logger().Infof("Waiting for destination checker %v to catch up to %v", sdw.destinationAlias, masterPos)
sdw.wr.Logger().Infof("Waiting for destination tablet %v to catch up to %v", sdw.destinationAlias, masterPos)
destinationTablet, err := sdw.wr.TopoServer().GetTablet(ctx, sdw.destinationAlias)
if err != nil {
return err
@ -318,7 +318,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error {
}
// diff phase: will log messages regarding the diff.
// - get the schema on all checkers
// - get the schema on all tablets
// - if some table schema mismatches, record them (use existing schema diff tools).
// - for each table in destination, run a diff pipeline.

Просмотреть файл

@ -140,7 +140,7 @@ func (worker *SQLDiffWorker) run(ctx context.Context) error {
// findTargets phase:
// - find one rdonly in superset
// - find one rdonly in subset
// - mark them all as 'checker' pointing back to us
// - mark them all as 'worker' pointing back to us
func (worker *SQLDiffWorker) findTargets(ctx context.Context) error {
worker.SetState(WorkerStateFindTargets)
@ -225,7 +225,7 @@ func (worker *SQLDiffWorker) synchronizeReplication(ctx context.Context) error {
}
// diff phase: will create a list of messages regarding the diff.
// - get the schema on all checkers
// - get the schema on all tablets
// - if some table schema mismatches, record them (use existing schema diff tools).
// - for each table in destination, run a diff pipeline.

Просмотреть файл

@ -17,7 +17,7 @@ import (
)
var (
minHealthyEndPoints = flag.Int("min_healthy_rdonly_endpoints", 2, "minimum number of healthy rdonly endpoints required for checker")
minHealthyEndPoints = flag.Int("min_healthy_rdonly_endpoints", 2, "minimum number of healthy rdonly endpoints before taking out one")
// WaitForHealthyEndPointsTimeout intent is to wait for the
// healthcheck to automatically return rdonly instances which
@ -110,7 +110,7 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang
// type change in the cleaner.
defer wrangler.RecordTabletTagAction(cleaner, tabletAlias, "worker", "")
wr.Logger().Infof("Changing tablet %v to 'checker'", tabletAlias)
wr.Logger().Infof("Changing tablet %v to '%v'", tabletAlias, topo.TYPE_WORKER)
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
err = wr.ChangeType(shortCtx, tabletAlias, topo.TYPE_WORKER, false /*force*/)
cancel()

Просмотреть файл

@ -236,7 +236,7 @@ func (vscw *VerticalSplitCloneWorker) init(ctx context.Context) error {
// findTargets phase:
// - find one rdonly in the source shard
// - mark it as 'checker' pointing back to us
// - mark it as 'worker' pointing back to us
// - get the aliases of all the targets
func (vscw *VerticalSplitCloneWorker) findTargets(ctx context.Context) error {
vscw.setState(WorkerStateFindTargets)
@ -245,7 +245,7 @@ func (vscw *VerticalSplitCloneWorker) findTargets(ctx context.Context) error {
var err error
vscw.sourceAlias, err = FindWorkerTablet(ctx, vscw.wr, vscw.cleaner, vscw.cell, vscw.sourceKeyspace, "0")
if err != nil {
return fmt.Errorf("cannot find checker for %v/%v/0: %v", vscw.cell, vscw.sourceKeyspace, err)
return fmt.Errorf("FindWorkerTablet() failed for %v/%v/0: %v", vscw.cell, vscw.sourceKeyspace, err)
}
vscw.wr.Logger().Infof("Using tablet %v as the source", vscw.sourceAlias)

Просмотреть файл

@ -183,7 +183,7 @@ func (vsdw *VerticalSplitDiffWorker) init(ctx context.Context) error {
// findTargets phase:
// - find one rdonly per source shard
// - find one rdonly in destination shard
// - mark them all as 'checker' pointing back to us
// - mark them all as 'worker' pointing back to us
func (vsdw *VerticalSplitDiffWorker) findTargets(ctx context.Context) error {
vsdw.SetState(WorkerStateFindTargets)
@ -191,13 +191,13 @@ func (vsdw *VerticalSplitDiffWorker) findTargets(ctx context.Context) error {
var err error
vsdw.destinationAlias, err = FindWorkerTablet(ctx, vsdw.wr, vsdw.cleaner, vsdw.cell, vsdw.keyspace, vsdw.shard)
if err != nil {
return fmt.Errorf("cannot find checker for %v/%v/%v: %v", vsdw.cell, vsdw.keyspace, vsdw.shard, err)
return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", vsdw.cell, vsdw.keyspace, vsdw.shard, err)
}
// find an appropriate endpoint in the source shard
vsdw.sourceAlias, err = FindWorkerTablet(ctx, vsdw.wr, vsdw.cleaner, vsdw.cell, vsdw.shardInfo.SourceShards[0].Keyspace, vsdw.shardInfo.SourceShards[0].Shard)
if err != nil {
return fmt.Errorf("cannot find checker for %v/%v/%v: %v", vsdw.cell, vsdw.shardInfo.SourceShards[0].Keyspace, vsdw.shardInfo.SourceShards[0].Shard, err)
return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", vsdw.cell, vsdw.shardInfo.SourceShards[0].Keyspace, vsdw.shardInfo.SourceShards[0].Shard, err)
}
return nil
@ -207,19 +207,19 @@ func (vsdw *VerticalSplitDiffWorker) findTargets(ctx context.Context) error {
// 1 - ask the master of the destination shard to pause filtered replication,
// and return the source binlog positions
// (add a cleanup task to restart filtered replication on master)
// 2 - stop the source 'checker' at a binlog position higher than the
// 2 - stop the source tablet at a binlog position higher than the
// destination master. Get that new position.
// (add a cleanup task to restart binlog replication on it, and change
// the existing ChangeSlaveType cleanup action to 'spare' type)
// 3 - ask the master of the destination shard to resume filtered replication
// up to the new list of positions, and return its binlog position.
// 4 - wait until the destination checker is equal or passed that master binlog
// position, and stop its replication.
// 4 - wait until the destination tablet is equal or passed that master
// binlog position, and stop its replication.
// (add a cleanup task to restart binlog replication on it, and change
// the existing ChangeSlaveType cleanup action to 'spare' type)
// 5 - restart filtered replication on destination master.
// (remove the cleanup task that does the same)
// At this point, all checker instances are stopped at the same point.
// At this point, all source and destination tablets are stopped at the same point.
func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context) error {
vsdw.SetState(WorkerStateSyncReplication)
@ -239,7 +239,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context)
}
wrangler.RecordStartBlpAction(vsdw.cleaner, masterInfo)
// 2 - stop the source 'checker' at a binlog position
// 2 - stop the source tablet at a binlog position
// higher than the destination master
stopPositionList := blproto.BlpPositionList{
Entries: make([]blproto.BlpPosition, 1),
@ -285,9 +285,9 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context)
return fmt.Errorf("RunBlpUntil on %v until %v failed: %v", vsdw.shardInfo.MasterAlias, stopPositionList, err)
}
// 4 - wait until the destination checker is equal or passed
// 4 - wait until the destination tablet is equal or passed
// that master binlog position, and stop its replication.
vsdw.wr.Logger().Infof("Waiting for destination checker %v to catch up to %v", vsdw.destinationAlias, masterPos)
vsdw.wr.Logger().Infof("Waiting for destination tablet %v to catch up to %v", vsdw.destinationAlias, masterPos)
destinationTablet, err := vsdw.wr.TopoServer().GetTablet(ctx, vsdw.destinationAlias)
if err != nil {
return err
@ -321,7 +321,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication(ctx context.Context)
}
// diff phase: will create a list of messages regarding the diff.
// - get the schema on all checkers
// - get the schema on all tablets
// - if some table schema mismatches, record them (use existing schema diff tools).
// - for each table in destination, run a diff pipeline.

Просмотреть файл

@ -385,7 +385,7 @@ index by_msg (msg)
logging.debug("Checking no data was sent the wrong way")
self._check_lots_not_present(1000)
# use the vtworker checker to compare the data
# use vtworker to compare the data
logging.debug("Running vtworker SplitDiff for -80")
utils.run_vtworker(['-cell', 'test_nj', 'SplitDiff', 'test_keyspace/-80'],
auto_log=True)

Просмотреть файл

@ -567,7 +567,7 @@ primary key (name)
self._check_binlog_player_vars(shard_2_master, seconds_behind_master_max=30)
self._check_binlog_player_vars(shard_3_master, seconds_behind_master_max=30)
# use the vtworker checker to compare the data
# use vtworker to compare the data
logging.debug("Running vtworker SplitDiff")
utils.run_vtworker(['-cell', 'test_nj', 'SplitDiff', '--exclude_tables',
'unrelated', 'test_keyspace/c0-'],
@ -705,7 +705,7 @@ primary key (name)
logging.debug("Checking 80 percent of data was sent fairly quickly")
self._check_lots_timeout(3000, 80, 10, base=2000)
# use the vtworker checker to compare the data again
# use vtworker to compare the data again
logging.debug("Running vtworker SplitDiff")
utils.run_vtworker(['-cell', 'test_nj', 'SplitDiff', '--exclude_tables',
'unrelated', 'test_keyspace/c0-'],

Просмотреть файл

@ -365,7 +365,7 @@ index by_msg (msg)
self._check_values_timeout(destination_master, 'vt_destination_keyspace',
'moving2', moving2_first_add1, 100)
# use the vtworker checker to compare the data
# use vtworker to compare the data
logging.debug("Running vtworker VerticalSplitDiff")
utils.run_vtworker(['-cell', 'test_nj', 'VerticalSplitDiff',
'destination_keyspace/0'], auto_log=True)