Use a background context per vtworker, and have all contexts derive from there.

This commit is contained in:
Ammar Aijazi 2015-01-12 20:08:44 -08:00
Родитель f45be49837
Коммит 069aaec816
8 изменённых файлов: 133 добавлений и 75 удалений

Просмотреть файл

@ -109,6 +109,16 @@ ci_skip_integration_test_files = \
resharding.py \
update_stream.py
# Run the following tests after making worker changes
worker_integration_test_files = \
binlog.py \
resharding.py \
resharding_bytes.py \
vertical_split.py \
vertical_split_vtgate.py \
initial_sharding.py \
initial_sharding_bytes.py
.ONESHELL:
SHELL = /bin/bash
@ -139,6 +149,10 @@ large_integration_test:
ci_skip_integration_test:
$(call run_integration_tests, $(ci_skip_integration_test_files))
worker_test:
go test ./go/vt/worker/
$(call run_integration_tests, $(worker_integration_test_files))
integration_test: small_integration_test medium_integration_test large_integration_test ci_skip_integration_test
site_integration_test:

Просмотреть файл

@ -103,7 +103,6 @@ func formatTableStatuses(tableStatuses []*tableStatus, startTime time.Time) ([]s
// If will keep retrying the ExecuteFetch (for a finite but longer duration) if it fails due to a timeout or a
// retriable application error.
func executeFetchWithRetries(ctx context.Context, wr *wrangler.Wrangler, ti *topo.TabletInfo, command string, disableBinLogs bool) error {
fmt.Printf("Starting executeFetchWithRetries! \n")
retryDuration := 2 * time.Hour
executeFetchErrs := make(chan error)
@ -129,13 +128,11 @@ func executeFetchWithRetries(ctx context.Context, wr *wrangler.Wrangler, ti *top
switch {
// success!
case err == nil:
fmt.Printf("Successfully ran executeFetchWithRetries with cmd: %v! \n", command)
return nil
// retriable failure, either due to a timeout or an application-level retriable failure
case wr.TabletManagerClient().IsTimeoutError(err), strings.Contains(err.Error(), "retry: "):
go func() {
wr.Logger().Infof("Retrying failed ExecuteFetch on %v; failed with: %v", ti, err)
fmt.Printf("Tried to retry! \n")
// TODO(aaijazi): wait 30 second and re-resolve
executeFetch()
}()
@ -162,14 +159,14 @@ func fillStringTemplate(tmpl string, vars interface{}) (string, error) {
}
// runSqlCommands will send the sql commands to the remote tablet.
func runSqlCommands(wr *wrangler.Wrangler, ti *topo.TabletInfo, commands []string, abort chan struct{}, disableBinLogs bool) error {
func runSqlCommands(ctx context.Context, wr *wrangler.Wrangler, ti *topo.TabletInfo, commands []string, abort chan struct{}, disableBinLogs bool) error {
for _, command := range commands {
command, err := fillStringTemplate(command, map[string]string{"DatabaseName": ti.DbName()})
if err != nil {
return fmt.Errorf("fillStringTemplate failed: %v", err)
}
err = executeFetchWithRetries(context.TODO(), wr, ti, command, disableBinLogs)
err = executeFetchWithRetries(ctx, wr, ti, command, disableBinLogs)
if err != nil {
return err
}
@ -193,7 +190,7 @@ func runSqlCommands(wr *wrangler.Wrangler, ti *topo.TabletInfo, commands []strin
// "", "value1", "value2", ""
// A non-split tablet will just return:
// "", ""
func findChunks(wr *wrangler.Wrangler, ti *topo.TabletInfo, td *myproto.TableDefinition, minTableSizeForSplit uint64, sourceReaderCount int) ([]string, error) {
func findChunks(ctx context.Context, wr *wrangler.Wrangler, ti *topo.TabletInfo, td *myproto.TableDefinition, minTableSizeForSplit uint64, sourceReaderCount int) ([]string, error) {
result := []string{"", ""}
// eliminate a few cases we don't split tables for
@ -208,7 +205,7 @@ func findChunks(wr *wrangler.Wrangler, ti *topo.TabletInfo, td *myproto.TableDef
// get the min and max of the leading column of the primary key
query := fmt.Sprintf("SELECT MIN(%v), MAX(%v) FROM %v.%v", td.PrimaryKeyColumns[0], td.PrimaryKeyColumns[0], ti.DbName(), td.Name)
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
qr, err := wr.TabletManagerClient().ExecuteFetch(ctx, ti, query, 1, true, false)
cancel()
if err != nil {
@ -364,7 +361,7 @@ func makeValueString(fields []mproto.Field, rows [][]sqltypes.Value) string {
// executeFetchLoop loops over the provided insertChannel
// and sends the commands to the provided tablet.
func executeFetchLoop(wr *wrangler.Wrangler, ti *topo.TabletInfo, insertChannel chan string, abort chan struct{}, disableBinLogs bool) error {
func executeFetchLoop(ctx context.Context, wr *wrangler.Wrangler, ti *topo.TabletInfo, insertChannel chan string, abort chan struct{}, disableBinLogs bool) error {
for {
select {
case cmd, ok := <-insertChannel:
@ -373,7 +370,7 @@ func executeFetchLoop(wr *wrangler.Wrangler, ti *topo.TabletInfo, insertChannel
return nil
}
cmd = "INSERT INTO `" + ti.DbName() + "`." + cmd
err := executeFetchWithRetries(context.TODO(), wr, ti, cmd, disableBinLogs)
err := executeFetchWithRetries(ctx, wr, ti, cmd, disableBinLogs)
if err != nil {
return fmt.Errorf("ExecuteFetch failed: %v", err)
}

Просмотреть файл

@ -51,6 +51,8 @@ type SplitCloneWorker struct {
minTableSizeForSplit uint64
destinationWriterCount int
cleaner *wrangler.Cleaner
ctx context.Context
ctxCancel context.CancelFunc
// all subsequent fields are protected by the mutex
mu sync.Mutex
@ -87,6 +89,7 @@ func NewSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, ex
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
return &SplitCloneWorker{
wr: wr,
cell: cell,
@ -99,6 +102,8 @@ func NewSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, ex
minTableSizeForSplit: minTableSizeForSplit,
destinationWriterCount: destinationWriterCount,
cleaner: &wrangler.Cleaner{},
ctx: ctx,
ctxCancel: cancel,
state: stateSCNotSarted,
ev: &events.SplitClone{
@ -325,7 +330,7 @@ func (scw *SplitCloneWorker) findTargets() error {
return fmt.Errorf("cannot read tablet %v: %v", alias, err)
}
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
ctx, cancel := context.WithTimeout(scw.ctx, 60*time.Second)
err := scw.wr.TabletManagerClient().StopSlave(ctx, scw.sourceTablets[i])
cancel()
if err != nil {
@ -356,14 +361,18 @@ func (scw *SplitCloneWorker) findMasterTargets() error {
scw.reloadTablets = make([]map[topo.TabletAlias]*topo.TabletInfo, len(scw.destinationShards))
for shardIndex, si := range scw.destinationShards {
scw.reloadAliases[shardIndex], err = topo.FindAllTabletAliasesInShard(context.TODO(), scw.wr.TopoServer(), si.Keyspace(), si.ShardName())
ctx, cancel := context.WithTimeout(scw.ctx, 60*time.Second)
scw.reloadAliases[shardIndex], err = topo.FindAllTabletAliasesInShard(ctx, scw.wr.TopoServer(), si.Keyspace(), si.ShardName())
cancel()
if err != nil {
return fmt.Errorf("cannot find all reload target tablets in %v/%v: %v", si.Keyspace(), si.ShardName(), err)
}
scw.wr.Logger().Infof("Found %v reload target aliases in shard %v/%v", len(scw.reloadAliases[shardIndex]), si.Keyspace(), si.ShardName())
// get the TabletInfo for all targets
scw.reloadTablets[shardIndex], err = topo.GetTabletMap(context.TODO(), scw.wr.TopoServer(), scw.reloadAliases[shardIndex])
ctx, cancel = context.WithTimeout(scw.ctx, 60*time.Second)
scw.reloadTablets[shardIndex], err = topo.GetTabletMap(ctx, scw.wr.TopoServer(), scw.reloadAliases[shardIndex])
cancel()
if err != nil {
return fmt.Errorf("cannot read all reload target tablets in %v/%v: %v", si.Keyspace(), si.ShardName(), err)
}
@ -401,7 +410,7 @@ func (scw *SplitCloneWorker) copy() error {
// on all source shards. Furthermore, we estimate the number of rows
// in each source shard for each table to be about the same
// (rowCount is used to estimate an ETA)
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
ctx, cancel := context.WithTimeout(scw.ctx, 60*time.Second)
sourceSchemaDefinition, err := scw.wr.GetSchema(ctx, scw.sourceAliases[0], nil, scw.excludeTables, true)
cancel()
if err != nil {
@ -486,7 +495,7 @@ func (scw *SplitCloneWorker) copy() error {
destinationWaitGroup.Add(1)
go func() {
defer destinationWaitGroup.Done()
if err := executeFetchLoop(scw.wr, ti, insertChannel, abort, disableBinLogs); err != nil {
if err := executeFetchLoop(scw.ctx, scw.wr, ti, insertChannel, abort, disableBinLogs); err != nil {
processError("executeFetchLoop failed: %v", err)
}
}()
@ -507,7 +516,7 @@ func (scw *SplitCloneWorker) copy() error {
rowSplitter := NewRowSplitter(scw.destinationShards, scw.keyspaceInfo.ShardingColumnType, columnIndexes[tableIndex])
chunks, err := findChunks(scw.wr, scw.sourceTablets[shardIndex], td, scw.minTableSizeForSplit, scw.sourceReaderCount)
chunks, err := findChunks(scw.ctx, scw.wr, scw.sourceTablets[shardIndex], td, scw.minTableSizeForSplit, scw.sourceReaderCount)
if err != nil {
return err
}
@ -564,7 +573,7 @@ func (scw *SplitCloneWorker) copy() error {
// get the current position from the sources
for shardIndex := range scw.sourceShards {
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
ctx, cancel := context.WithTimeout(scw.ctx, 60*time.Second)
status, err := scw.wr.TabletManagerClient().SlaveStatus(ctx, scw.sourceTablets[shardIndex])
cancel()
if err != nil {
@ -580,7 +589,7 @@ func (scw *SplitCloneWorker) copy() error {
go func(ti *topo.TabletInfo) {
defer destinationWaitGroup.Done()
scw.wr.Logger().Infof("Making and populating blp_checkpoint table on tablet %v", ti.Alias)
if err := runSqlCommands(scw.wr, ti, queries, abort, disableBinLogs); err != nil {
if err := runSqlCommands(scw.ctx, scw.wr, ti, queries, abort, disableBinLogs); err != nil {
processError("blp_checkpoint queries failed on tablet %v: %v", ti.Alias, err)
}
}(scw.destinationTablets[shardIndex][tabletAlias])
@ -601,7 +610,7 @@ func (scw *SplitCloneWorker) copy() error {
} else {
for _, si := range scw.destinationShards {
scw.wr.Logger().Infof("Setting SourceShard on shard %v/%v", si.Keyspace(), si.ShardName())
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
ctx, cancel := context.WithTimeout(scw.ctx, 60*time.Second)
err := scw.wr.SetSourceShards(ctx, si.Keyspace(), si.ShardName(), scw.sourceAliases, nil)
cancel()
if err != nil {
@ -619,7 +628,7 @@ func (scw *SplitCloneWorker) copy() error {
go func(ti *topo.TabletInfo) {
defer destinationWaitGroup.Done()
scw.wr.Logger().Infof("Reloading schema on tablet %v", ti.Alias)
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
ctx, cancel := context.WithTimeout(scw.ctx, 60*time.Second)
err := scw.wr.TabletManagerClient().ReloadSchema(ctx, ti)
cancel()
if err != nil {

Просмотреть файл

@ -37,11 +37,13 @@ const (
// SplitDiffWorker executes a diff between a destination shard and its
// source shards in a shard split case.
type SplitDiffWorker struct {
wr *wrangler.Wrangler
cell string
keyspace string
shard string
cleaner *wrangler.Cleaner
wr *wrangler.Wrangler
cell string
keyspace string
shard string
cleaner *wrangler.Cleaner
ctx context.Context
ctxCancel context.CancelFunc
// all subsequent fields are protected by the mutex
mu sync.Mutex
@ -65,12 +67,15 @@ type SplitDiffWorker struct {
// NewSplitDiffWorker returns a new SplitDiffWorker object.
func NewSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string) Worker {
ctx, cancel := context.WithCancel(context.Background())
return &SplitDiffWorker{
wr: wr,
cell: cell,
keyspace: keyspace,
shard: shard,
cleaner: &wrangler.Cleaner{},
wr: wr,
cell: cell,
keyspace: keyspace,
shard: shard,
cleaner: &wrangler.Cleaner{},
ctx: ctx,
ctxCancel: cancel,
state: stateSDNotSarted,
}
@ -271,7 +276,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication() error {
// 1 - stop the master binlog replication, get its current position
sdw.wr.Logger().Infof("Stopping master binlog replication on %v", sdw.shardInfo.MasterAlias)
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
ctx, cancel := context.WithTimeout(sdw.ctx, 60*time.Second)
blpPositionList, err := sdw.wr.TabletManagerClient().StopBlp(ctx, masterInfo)
cancel()
if err != nil {
@ -299,7 +304,9 @@ func (sdw *SplitDiffWorker) synchronizeReplication() error {
// stop replication
sdw.wr.Logger().Infof("Stopping slave[%v] %v at a minimum of %v", i, sdw.sourceAliases[i], blpPos.Position)
stoppedAt, err := sdw.wr.TabletManagerClient().StopSlaveMinimum(context.TODO(), sourceTablet, blpPos.Position, 30*time.Second)
ctx, cancel := context.WithTimeout(sdw.ctx, 60*time.Second)
stoppedAt, err := sdw.wr.TabletManagerClient().StopSlaveMinimum(ctx, sourceTablet, blpPos.Position, 30*time.Second)
cancel()
if err != nil {
return fmt.Errorf("cannot stop slave %v at right binlog position %v: %v", sdw.sourceAliases[i], blpPos.Position, err)
}
@ -319,7 +326,9 @@ func (sdw *SplitDiffWorker) synchronizeReplication() error {
// 3 - ask the master of the destination shard to resume filtered
// replication up to the new list of positions
sdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", sdw.shardInfo.MasterAlias, stopPositionList)
masterPos, err := sdw.wr.TabletManagerClient().RunBlpUntil(context.TODO(), masterInfo, &stopPositionList, 30*time.Second)
ctx, cancel = context.WithTimeout(sdw.ctx, 60*time.Second)
masterPos, err := sdw.wr.TabletManagerClient().RunBlpUntil(ctx, masterInfo, &stopPositionList, 30*time.Second)
cancel()
if err != nil {
return fmt.Errorf("RunBlpUntil for %v until %v failed: %v", sdw.shardInfo.MasterAlias, stopPositionList, err)
}
@ -331,7 +340,9 @@ func (sdw *SplitDiffWorker) synchronizeReplication() error {
if err != nil {
return err
}
_, err = sdw.wr.TabletManagerClient().StopSlaveMinimum(context.TODO(), destinationTablet, masterPos, 30*time.Second)
ctx, cancel = context.WithTimeout(sdw.ctx, 60*time.Second)
_, err = sdw.wr.TabletManagerClient().StopSlaveMinimum(ctx, destinationTablet, masterPos, 30*time.Second)
cancel()
if err != nil {
return fmt.Errorf("StopSlaveMinimum for %v at %v failed: %v", sdw.destinationAlias, masterPos, err)
}
@ -344,7 +355,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication() error {
// 5 - restart filtered replication on destination master
sdw.wr.Logger().Infof("Restarting filtered replication on master %v", sdw.shardInfo.MasterAlias)
ctx, cancel = context.WithTimeout(context.TODO(), 60*time.Second)
ctx, cancel = context.WithTimeout(sdw.ctx, 60*time.Second)
err = sdw.wr.TabletManagerClient().StartBlp(ctx, masterInfo)
if err := sdw.cleaner.RemoveActionByName(wrangler.StartBlpActionName, sdw.shardInfo.MasterAlias.String()); err != nil {
sdw.wr.Logger().Warningf("Cannot find cleaning action %v/%v: %v", wrangler.StartBlpActionName, sdw.shardInfo.MasterAlias.String(), err)
@ -372,7 +383,7 @@ func (sdw *SplitDiffWorker) diff() error {
wg.Add(1)
go func() {
var err error
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
ctx, cancel := context.WithTimeout(sdw.ctx, 60*time.Second)
sdw.destinationSchemaDefinition, err = sdw.wr.GetSchema(ctx, sdw.destinationAlias, nil, nil, false)
cancel()
rec.RecordError(err)
@ -383,7 +394,7 @@ func (sdw *SplitDiffWorker) diff() error {
wg.Add(1)
go func(i int, sourceAlias topo.TabletAlias) {
var err error
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
ctx, cancel := context.WithTimeout(sdw.ctx, 60*time.Second)
sdw.sourceSchemaDefinitions[i], err = sdw.wr.GetSchema(ctx, sourceAlias, nil, nil, false)
cancel()
rec.RecordError(err)

Просмотреть файл

@ -50,10 +50,12 @@ type SourceSpec struct {
// database: any row in the subset spec needs to have a conuterpart in
// the superset spec.
type SQLDiffWorker struct {
wr *wrangler.Wrangler
cell string
shard string
cleaner *wrangler.Cleaner
wr *wrangler.Wrangler
cell string
shard string
cleaner *wrangler.Cleaner
ctx context.Context
ctxCancel context.CancelFunc
// alias in the following 2 fields is during
// SQLDifferFindTargets, read-only after that.
@ -70,13 +72,17 @@ type SQLDiffWorker struct {
// NewSQLDiffWorker returns a new SQLDiffWorker object.
func NewSQLDiffWorker(wr *wrangler.Wrangler, cell string, superset, subset SourceSpec) Worker {
ctx, cancel := context.WithCancel(context.Background())
return &SQLDiffWorker{
wr: wr,
cell: cell,
superset: superset,
subset: subset,
cleaner: new(wrangler.Cleaner),
state: sqlDiffNotSarted,
wr: wr,
cell: cell,
superset: superset,
subset: subset,
cleaner: new(wrangler.Cleaner),
ctx: ctx,
ctxCancel: cancel,
state: sqlDiffNotSarted,
}
}
@ -228,7 +234,7 @@ func (worker *SQLDiffWorker) synchronizeReplication() error {
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
ctx, cancel := context.WithTimeout(worker.ctx, 60*time.Second)
err = worker.wr.TabletManagerClient().StopSlave(ctx, subsetTablet)
cancel()
if err != nil {
@ -259,7 +265,7 @@ func (worker *SQLDiffWorker) synchronizeReplication() error {
if err != nil {
return err
}
ctx, cancel = context.WithTimeout(context.TODO(), 30*time.Second)
ctx, cancel = context.WithTimeout(worker.ctx, 60*time.Second)
err = worker.wr.TabletManagerClient().StopSlave(ctx, supersetTablet)
cancel()
if err != nil {

Просмотреть файл

@ -50,6 +50,8 @@ type VerticalSplitCloneWorker struct {
minTableSizeForSplit uint64
destinationWriterCount int
cleaner *wrangler.Cleaner
ctx context.Context
ctxCancel context.CancelFunc
// all subsequent fields are protected by the mutex
mu sync.Mutex
@ -84,6 +86,7 @@ func NewVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, destinationKeyspac
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
return &VerticalSplitCloneWorker{
wr: wr,
cell: cell,
@ -96,6 +99,8 @@ func NewVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, destinationKeyspac
minTableSizeForSplit: minTableSizeForSplit,
destinationWriterCount: destinationWriterCount,
cleaner: &wrangler.Cleaner{},
ctx: ctx,
ctxCancel: cancel,
state: stateVSCNotSarted,
ev: &events.VerticalSplitClone{
@ -292,7 +297,7 @@ func (vscw *VerticalSplitCloneWorker) findTargets() error {
}
// stop replication on it
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
ctx, cancel := context.WithTimeout(vscw.ctx, 60*time.Second)
err = vscw.wr.TabletManagerClient().StopSlave(ctx, vscw.sourceTablet)
cancel()
if err != nil {
@ -314,14 +319,18 @@ func (vscw *VerticalSplitCloneWorker) findTargets() error {
func (vscw *VerticalSplitCloneWorker) findMasterTargets() error {
var err error
// find all the targets in the destination keyspace / shard
vscw.reloadAliases, err = topo.FindAllTabletAliasesInShard(context.TODO(), vscw.wr.TopoServer(), vscw.destinationKeyspace, vscw.destinationShard)
ctx, cancel := context.WithTimeout(vscw.ctx, 60*time.Second)
vscw.reloadAliases, err = topo.FindAllTabletAliasesInShard(ctx, vscw.wr.TopoServer(), vscw.destinationKeyspace, vscw.destinationShard)
cancel()
if err != nil {
return fmt.Errorf("cannot find all reload target tablets in %v/%v: %v", vscw.destinationKeyspace, vscw.destinationShard, err)
}
vscw.wr.Logger().Infof("Found %v reload target aliases", len(vscw.reloadAliases))
// get the TabletInfo for all targets
vscw.reloadTablets, err = topo.GetTabletMap(context.TODO(), vscw.wr.TopoServer(), vscw.reloadAliases)
ctx, cancel = context.WithTimeout(vscw.ctx, 60*time.Second)
vscw.reloadTablets, err = topo.GetTabletMap(ctx, vscw.wr.TopoServer(), vscw.reloadAliases)
cancel()
if err != nil {
return fmt.Errorf("cannot read all reload target tablets in %v/%v: %v", vscw.destinationKeyspace, vscw.destinationShard, err)
}
@ -354,7 +363,7 @@ func (vscw *VerticalSplitCloneWorker) copy() error {
vscw.setState(stateVSCCopy)
// get source schema
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
ctx, cancel := context.WithTimeout(vscw.ctx, 60*time.Second)
sourceSchemaDefinition, err := vscw.wr.GetSchema(ctx, vscw.sourceAlias, vscw.tables, nil, true)
cancel()
if err != nil {
@ -423,7 +432,7 @@ func (vscw *VerticalSplitCloneWorker) copy() error {
go func() {
defer destinationWaitGroup.Done()
if err := executeFetchLoop(vscw.wr, ti, insertChannel, abort, disableBinLogs); err != nil {
if err := executeFetchLoop(vscw.ctx, vscw.wr, ti, insertChannel, abort, disableBinLogs); err != nil {
processError("executeFetchLoop failed: %v", err)
}
}()
@ -440,7 +449,7 @@ func (vscw *VerticalSplitCloneWorker) copy() error {
continue
}
chunks, err := findChunks(vscw.wr, vscw.sourceTablet, td, vscw.minTableSizeForSplit, vscw.sourceReaderCount)
chunks, err := findChunks(vscw.ctx, vscw.wr, vscw.sourceTablet, td, vscw.minTableSizeForSplit, vscw.sourceReaderCount)
if err != nil {
return err
}
@ -486,7 +495,7 @@ func (vscw *VerticalSplitCloneWorker) copy() error {
// then create and populate the blp_checkpoint table
if vscw.strategy.PopulateBlpCheckpoint {
// get the current position from the source
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
ctx, cancel := context.WithTimeout(vscw.ctx, 60*time.Second)
status, err := vscw.wr.TabletManagerClient().SlaveStatus(ctx, vscw.sourceTablet)
cancel()
if err != nil {
@ -505,7 +514,7 @@ func (vscw *VerticalSplitCloneWorker) copy() error {
go func(ti *topo.TabletInfo) {
defer destinationWaitGroup.Done()
vscw.wr.Logger().Infof("Making and populating blp_checkpoint table on tablet %v", ti.Alias)
if err := runSqlCommands(vscw.wr, ti, queries, abort, disableBinLogs); err != nil {
if err := runSqlCommands(vscw.ctx, vscw.wr, ti, queries, abort, disableBinLogs); err != nil {
processError("blp_checkpoint queries failed on tablet %v: %v", ti.Alias, err)
}
}(vscw.destinationTablets[tabletAlias])
@ -521,7 +530,7 @@ func (vscw *VerticalSplitCloneWorker) copy() error {
vscw.wr.Logger().Infof("Skipping setting SourceShard on destination shard.")
} else {
vscw.wr.Logger().Infof("Setting SourceShard on shard %v/%v", vscw.destinationKeyspace, vscw.destinationShard)
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
ctx, cancel := context.WithTimeout(vscw.ctx, 60*time.Second)
err := vscw.wr.SetSourceShards(ctx, vscw.destinationKeyspace, vscw.destinationShard, []topo.TabletAlias{vscw.sourceAlias}, vscw.tables)
cancel()
if err != nil {
@ -537,7 +546,7 @@ func (vscw *VerticalSplitCloneWorker) copy() error {
go func(ti *topo.TabletInfo) {
defer destinationWaitGroup.Done()
vscw.wr.Logger().Infof("Reloading schema on tablet %v", ti.Alias)
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
ctx, cancel := context.WithTimeout(vscw.ctx, 30*time.Second)
err := vscw.wr.TabletManagerClient().ReloadSchema(ctx, ti)
cancel()
if err != nil {

Просмотреть файл

@ -37,11 +37,13 @@ const (
// VerticalSplitDiffWorker executes a diff between a destination shard and its
// source shards in a shard split case.
type VerticalSplitDiffWorker struct {
wr *wrangler.Wrangler
cell string
keyspace string
shard string
cleaner *wrangler.Cleaner
wr *wrangler.Wrangler
cell string
keyspace string
shard string
cleaner *wrangler.Cleaner
ctx context.Context
ctxCancel context.CancelFunc
// all subsequent fields are protected by the mutex
mu sync.Mutex
@ -65,12 +67,15 @@ type VerticalSplitDiffWorker struct {
// NewVerticalSplitDiffWorker returns a new VerticalSplitDiffWorker object.
func NewVerticalSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string) Worker {
ctx, cancel := context.WithCancel(context.Background())
return &VerticalSplitDiffWorker{
wr: wr,
cell: cell,
keyspace: keyspace,
shard: shard,
cleaner: &wrangler.Cleaner{},
wr: wr,
cell: cell,
keyspace: keyspace,
shard: shard,
cleaner: &wrangler.Cleaner{},
ctx: ctx,
ctxCancel: cancel,
state: stateVSDNotSarted,
}
@ -277,7 +282,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication() error {
// 1 - stop the master binlog replication, get its current position
vsdw.wr.Logger().Infof("Stopping master binlog replication on %v", vsdw.shardInfo.MasterAlias)
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
ctx, cancel := context.WithTimeout(vsdw.ctx, 60*time.Second)
blpPositionList, err := vsdw.wr.TabletManagerClient().StopBlp(ctx, masterInfo)
cancel()
if err != nil {
@ -303,7 +308,9 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication() error {
if err != nil {
return err
}
stoppedAt, err := vsdw.wr.TabletManagerClient().StopSlaveMinimum(context.TODO(), sourceTablet, pos.Position, 30*time.Second)
ctx, cancel = context.WithTimeout(vsdw.ctx, 60*time.Second)
stoppedAt, err := vsdw.wr.TabletManagerClient().StopSlaveMinimum(ctx, sourceTablet, pos.Position, 30*time.Second)
cancel()
if err != nil {
return fmt.Errorf("cannot stop slave %v at right binlog position %v: %v", vsdw.sourceAlias, pos.Position, err)
}
@ -322,7 +329,9 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication() error {
// 3 - ask the master of the destination shard to resume filtered
// replication up to the new list of positions
vsdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", vsdw.shardInfo.MasterAlias, stopPositionList)
masterPos, err := vsdw.wr.TabletManagerClient().RunBlpUntil(context.TODO(), masterInfo, &stopPositionList, 30*time.Second)
ctx, cancel = context.WithTimeout(vsdw.ctx, 60*time.Second)
masterPos, err := vsdw.wr.TabletManagerClient().RunBlpUntil(ctx, masterInfo, &stopPositionList, 30*time.Second)
cancel()
if err != nil {
return fmt.Errorf("RunBlpUntil on %v until %v failed: %v", vsdw.shardInfo.MasterAlias, stopPositionList, err)
}
@ -334,7 +343,9 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication() error {
if err != nil {
return err
}
_, err = vsdw.wr.TabletManagerClient().StopSlaveMinimum(context.TODO(), destinationTablet, masterPos, 30*time.Second)
ctx, cancel = context.WithTimeout(vsdw.ctx, 60*time.Second)
_, err = vsdw.wr.TabletManagerClient().StopSlaveMinimum(ctx, destinationTablet, masterPos, 30*time.Second)
cancel()
if err != nil {
return fmt.Errorf("StopSlaveMinimum on %v at %v failed: %v", vsdw.destinationAlias, masterPos, err)
}
@ -347,7 +358,7 @@ func (vsdw *VerticalSplitDiffWorker) synchronizeReplication() error {
// 5 - restart filtered replication on destination master
vsdw.wr.Logger().Infof("Restarting filtered replication on master %v", vsdw.shardInfo.MasterAlias)
ctx, cancel = context.WithTimeout(context.TODO(), 30*time.Second)
ctx, cancel = context.WithTimeout(vsdw.ctx, 60*time.Second)
err = vsdw.wr.TabletManagerClient().StartBlp(ctx, masterInfo)
if err := vsdw.cleaner.RemoveActionByName(wrangler.StartBlpActionName, vsdw.shardInfo.MasterAlias.String()); err != nil {
vsdw.wr.Logger().Warningf("Cannot find cleaning action %v/%v: %v", wrangler.StartBlpActionName, vsdw.shardInfo.MasterAlias.String(), err)
@ -374,7 +385,7 @@ func (vsdw *VerticalSplitDiffWorker) diff() error {
wg.Add(1)
go func() {
var err error
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
ctx, cancel := context.WithTimeout(vsdw.ctx, 60*time.Second)
vsdw.destinationSchemaDefinition, err = vsdw.wr.GetSchema(ctx, vsdw.destinationAlias, nil, nil, false)
cancel()
rec.RecordError(err)
@ -384,7 +395,7 @@ func (vsdw *VerticalSplitDiffWorker) diff() error {
wg.Add(1)
go func() {
var err error
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
ctx, cancel := context.WithTimeout(vsdw.ctx, 60*time.Second)
vsdw.sourceSchemaDefinition, err = vsdw.wr.GetSchema(ctx, vsdw.sourceAlias, nil, nil, false)
cancel()
rec.RecordError(err)

Просмотреть файл

@ -54,6 +54,7 @@ type cleanUpHelper struct {
// If an action on a target fails, it will not run the next action on
// the same target.
// We return the aggregate errors for all cleanups.
// CleanUp uses its own context, with a timeout of 5 minutes, so that clean up action will run even if the original context times out.
// TODO(alainjobart) Actions should run concurrently on a per target
// basis. They are then serialized on each target.
func (cleaner *Cleaner) CleanUp(wr *Wrangler) error {