worker: Resolve destination master using the discovery module.

- Removed previous Resolver interface and respective implementations.
- Removed respective stat vars and flags (--resolve_ttl).

- Added unit test for case where vtworker fails over to a different replica.
- Added unit test for case when healthcheck retries because currently no master is available.
- Extended FakePoolConnection to support these tests:
  - can define callback (AfterFunc) when expected query was received
  - infinite mode where the last request may be received over and over again

- discovery: Introduce DefaultTopoReadConcurrency to avoid duplication.
- discovery: Added EndPointStats.Alias() to avoid duplication.
- discovery: Added EndPointStats.String() to have pretty printed arrays.

Adapted end-to-end tests:
- binlog.py, merge_sharding.py, resharding.py: Enabled healthcheck for master tablets.
- worker.py: Updated stat vars check
This commit is contained in:
Michael Berlin 2016-04-29 00:46:07 -07:00
Родитель 0cb22ce7e8
Коммит a2a0d900ca
15 изменённых файлов: 555 добавлений и 279 удалений

Просмотреть файл

@ -50,6 +50,11 @@ var (
hcErrorCounters *stats.MultiCounters
)
const (
// DefaultTopoReadConcurrency can be used as default value for the topoReadConcurrency parameter of a TopologyWatcher.
DefaultTopoReadConcurrency int = 5
)
func init() {
hcErrorCounters = stats.NewMultiCounters("HealthcheckErrors", []string{"keyspace", "shardname", "tablettype"})
}
@ -72,6 +77,20 @@ type EndPointStats struct {
LastError error
}
// Alias returns the alias of the tablet.
// The return value can be used e.g. to generate the input for the topo API.
func (e *EndPointStats) Alias() *topodatapb.TabletAlias {
return &topodatapb.TabletAlias{
Cell: e.Cell,
Uid: e.EndPoint.Uid,
}
}
// String is defined because we want to print a []*EndPointStats array nicely.
func (e *EndPointStats) String() string {
return fmt.Sprint(*e)
}
// HealthCheck defines the interface of health checking module.
type HealthCheck interface {
// SetListener sets the listener for healthcheck updates. It should not block.
@ -83,6 +102,7 @@ type HealthCheck interface {
// GetEndPointStatsFromKeyspaceShard returns all EndPointStats for the given keyspace/shard.
GetEndPointStatsFromKeyspaceShard(keyspace, shard string) []*EndPointStats
// GetEndPointStatsFromTarget returns all EndPointStats for the given target.
// You can exclude unhealthy entries using the helper in utils.go.
GetEndPointStatsFromTarget(keyspace, shard string, tabletType topodatapb.TabletType) []*EndPointStats
// GetConnection returns the TabletConn of the given endpoint.
GetConnection(endPoint *topodatapb.EndPoint) tabletconn.TabletConn

Просмотреть файл

@ -16,6 +16,7 @@ import (
"golang.org/x/net/context"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/discovery"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/vt/wrangler"
@ -29,32 +30,6 @@ import (
// This file contains utility functions for clone workers.
//
// Does a topo lookup for a single shard, and returns the tablet record of the master tablet.
func resolveDestinationShardMaster(ctx context.Context, keyspace, shard string, wr *wrangler.Wrangler) (*topo.TabletInfo, error) {
var ti *topo.TabletInfo
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
si, err := wr.TopoServer().GetShard(shortCtx, keyspace, shard)
cancel()
if err != nil {
return ti, fmt.Errorf("unable to resolve destination shard %v/%v", keyspace, shard)
}
if !si.HasMaster() {
return ti, fmt.Errorf("no master in destination shard %v/%v", keyspace, shard)
}
wr.Logger().Infof("Found target master alias %v in shard %v/%v", topoproto.TabletAliasString(si.MasterAlias), keyspace, shard)
shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
ti, err = wr.TopoServer().GetTablet(shortCtx, si.MasterAlias)
cancel()
if err != nil {
return ti, fmt.Errorf("unable to get master tablet from alias %v in shard %v/%v: %v",
topoproto.TabletAliasString(si.MasterAlias), keyspace, shard, err)
}
return ti, nil
}
// Does a topo lookup for a single shard, and returns:
// 1. Slice of all tablet aliases for the shard.
// 2. Map of tablet alias : tablet record for all tablets.
@ -84,24 +59,78 @@ var errExtract = regexp.MustCompile(`\(errno (\d+)\)`)
// If will keep retrying the ExecuteFetch (for a finite but longer duration) if it fails due to a timeout or a
// retriable application error.
//
// executeFetchWithRetries will also re-resolve the topology after errors, to be resistant to a reparent.
// It takes in a tablet record that it will initially attempt to write to, and will return the final tablet
// record that it used.
func executeFetchWithRetries(ctx context.Context, wr *wrangler.Wrangler, ti *topo.TabletInfo, r Resolver, shard string, command string) (*topo.TabletInfo, error) {
// executeFetchWithRetries will always get the current MASTER tablet from the
// healthcheck instance. If no MASTER is available, it will keep retrying.
func executeFetchWithRetries(ctx context.Context, wr *wrangler.Wrangler, healthCheck discovery.HealthCheck, keyspace, shard, command string) error {
retryDuration := 2 * time.Hour
// We should keep retrying up until the retryCtx runs out
// We should keep retrying up until the retryCtx runs out.
retryCtx, retryCancel := context.WithTimeout(ctx, retryDuration)
defer retryCancel()
// Is this current attempt a retry of a previous attempt?
isRetry := false
for {
var master *discovery.EndPointStats
var err error
// Get the current master from the HealthCheck.
masters := discovery.GetCurrentMaster(
healthCheck.GetEndPointStatsFromTarget(keyspace, shard, topodatapb.TabletType_MASTER))
if len(masters) == 0 {
wr.Logger().Warningf("ExecuteFetch failed for keyspace/shard %v/%v because no MASTER is available; will retry until there is MASTER again", keyspace, shard)
statsRetryCount.Add(1)
statsRetryCounters.Add(retryCategoryNoMasterAvailable, 1)
goto retry
}
master = masters[0]
// Run the command (in a block since goto above does not allow to introduce
// new variables until the label is reached.)
{
tryCtx, cancel := context.WithTimeout(retryCtx, 2*time.Minute)
_, err := wr.TabletManagerClient().ExecuteFetchAsApp(tryCtx, ti, command, 0)
_, err = wr.TabletManagerClient().ExecuteFetchAsApp(tryCtx, endPointToTabletInfo(master), command, 0)
cancel()
if err == nil {
// success!
return ti, nil
return nil
}
succeeded, finalErr := checkError(wr, err, isRetry, master, keyspace, shard)
if succeeded {
// We can ignore the error and don't have to retry.
return nil
}
if finalErr != nil {
// Non-retryable error.
return finalErr
}
}
retry:
masterAlias := "no-master-was-available"
if master != nil {
masterAlias = topoproto.TabletAliasString(master.Alias())
}
tabletString := fmt.Sprintf("%v (%v/%v)", masterAlias, keyspace, shard)
select {
case <-retryCtx.Done():
if retryCtx.Err() == context.DeadlineExceeded {
return fmt.Errorf("failed to connect to destination tablet %v after retrying for %v", tabletString, retryDuration)
}
return fmt.Errorf("interrupted while trying to run %v on tablet %v", command, tabletString)
case <-time.After(*executeFetchRetryTime):
// Retry 30s after the failure using the current master seen by the HealthCheck.
}
isRetry = true
}
}
// checkError returns true if the error can be ignored and the command
// succeeded, false if the error is retryable and a non-nil error if the
// command must not be retried.
func checkError(wr *wrangler.Wrangler, err error, isRetry bool, master *discovery.EndPointStats, keyspace, shard string) (bool, error) {
tabletString := fmt.Sprintf("%v (%v/%v)", topoproto.TabletAliasString(master.Alias()), keyspace, shard)
// If the ExecuteFetch call failed because of an application error, we will try to figure out why.
// We need to extract the MySQL error number, and will attempt to retry if we think the error is recoverable.
match := errExtract.FindStringSubmatch(err.Error())
@ -111,48 +140,28 @@ func executeFetchWithRetries(ctx context.Context, wr *wrangler.Wrangler, ti *top
}
switch {
case wr.TabletManagerClient().IsTimeoutError(err):
wr.Logger().Warningf("ExecuteFetch failed on %v; will retry because it was a timeout error: %v", ti, err)
statsRetryCounters.Add("TimeoutError", 1)
wr.Logger().Warningf("ExecuteFetch failed on %v; will retry because it was a timeout error: %v", tabletString, err)
statsRetryCount.Add(1)
statsRetryCounters.Add(retryCategoryTimeoutError, 1)
case errNo == "1290":
wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL read-only error: %v", ti, err)
statsRetryCounters.Add("ReadOnly", 1)
wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL read-only error: %v", tabletString, err)
statsRetryCount.Add(1)
statsRetryCounters.Add(retryCategoryReadOnly, 1)
case errNo == "2002" || errNo == "2006":
wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL connection error: %v", ti, err)
statsRetryCounters.Add("ConnectionError", 1)
wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL connection error: %v", tabletString, err)
statsRetryCount.Add(1)
statsRetryCounters.Add(retryCategoryConnectionError, 1)
case errNo == "1062":
if !isRetry {
return ti, fmt.Errorf("ExecuteFetch failed on %v on the first attempt; not retrying as this is not a recoverable error: %v", ti, err)
return false, fmt.Errorf("ExecuteFetch failed on %v on the first attempt; not retrying as this is not a recoverable error: %v", tabletString, err)
}
wr.Logger().Infof("ExecuteFetch failed on %v with a duplicate entry error; marking this as a success, because of the likelihood that this query has already succeeded before being retried: %v", ti, err)
return ti, nil
wr.Logger().Infof("ExecuteFetch failed on %v with a duplicate entry error; marking this as a success, because of the likelihood that this query has already succeeded before being retried: %v", tabletString, err)
return true, nil
default:
// Unknown error
return ti, err
}
t := time.NewTimer(*executeFetchRetryTime)
// don't leak memory if the timer isn't triggered
defer t.Stop()
select {
case <-retryCtx.Done():
if retryCtx.Err() == context.DeadlineExceeded {
return ti, fmt.Errorf("failed to connect to destination tablet %v after retrying for %v", ti, retryDuration)
}
return ti, fmt.Errorf("interrupted while trying to run %v on tablet %v", command, ti)
case <-t.C:
// Re-resolve and retry 30s after the failure
err = r.ResolveDestinationMasters(ctx)
if err != nil {
return ti, fmt.Errorf("unable to re-resolve masters for ExecuteFetch, due to: %v", err)
}
ti, err = r.GetDestinationMaster(shard)
if err != nil {
// At this point, we probably don't have a valid tablet record to return
return nil, fmt.Errorf("unable to run ExecuteFetch due to: %v", err)
}
}
isRetry = true
// Unknown error.
return false, err
}
return false, nil
}
// fillStringTemplate returns the string template filled
@ -166,20 +175,14 @@ func fillStringTemplate(tmpl string, vars interface{}) (string, error) {
}
// runSQLCommands will send the sql commands to the remote tablet.
func runSQLCommands(ctx context.Context, wr *wrangler.Wrangler, r Resolver, shard string, commands []string) error {
ti, err := r.GetDestinationMaster(shard)
if err != nil {
return fmt.Errorf("runSQLCommands failed: %v", err)
}
func runSQLCommands(ctx context.Context, wr *wrangler.Wrangler, healthCheck discovery.HealthCheck, keyspace, shard, dbName string, commands []string) error {
for _, command := range commands {
command, err := fillStringTemplate(command, map[string]string{"DatabaseName": ti.DbName()})
command, err := fillStringTemplate(command, map[string]string{"DatabaseName": dbName})
if err != nil {
return fmt.Errorf("fillStringTemplate failed: %v", err)
}
ti, err = executeFetchWithRetries(ctx, wr, ti, r, shard, command)
if err != nil {
if err := executeFetchWithRetries(ctx, wr, healthCheck, keyspace, shard, command); err != nil {
return err
}
}
@ -359,11 +362,7 @@ func makeValueString(fields []*querypb.Field, rows [][]sqltypes.Value) string {
// executeFetchLoop loops over the provided insertChannel
// and sends the commands to the provided tablet.
func executeFetchLoop(ctx context.Context, wr *wrangler.Wrangler, r Resolver, shard string, insertChannel chan string) error {
ti, err := r.GetDestinationMaster(shard)
if err != nil {
return fmt.Errorf("executeFetchLoop failed: %v", err)
}
func executeFetchLoop(ctx context.Context, wr *wrangler.Wrangler, healthCheck discovery.HealthCheck, keyspace, shard, dbName string, insertChannel chan string) error {
for {
select {
case cmd, ok := <-insertChannel:
@ -371,9 +370,8 @@ func executeFetchLoop(ctx context.Context, wr *wrangler.Wrangler, r Resolver, sh
// no more to read, we're done
return nil
}
cmd = "INSERT INTO `" + ti.DbName() + "`." + cmd
ti, err = executeFetchWithRetries(ctx, wr, ti, r, shard, cmd)
if err != nil {
cmd = "INSERT INTO `" + dbName + "`." + cmd
if err := executeFetchWithRetries(ctx, wr, healthCheck, keyspace, shard, cmd); err != nil {
return fmt.Errorf("ExecuteFetch failed: %v", err)
}
case <-ctx.Done():
@ -384,3 +382,21 @@ func executeFetchLoop(ctx context.Context, wr *wrangler.Wrangler, r Resolver, sh
}
}
}
// endPointToTabletInfo converts an EndPointStats object from the discovery
// package into a TabletInfo object. The latter one is required by several
// TabletManagerClient API calls.
// Note that this is a best-effort conversion and won't result into the same
// result as a call to topo.GetTablet().
// Note: We assume that "eps" is immutable and we can reference its data.
func endPointToTabletInfo(eps *discovery.EndPointStats) *topo.TabletInfo {
return topo.NewTabletInfo(&topodatapb.Tablet{
Alias: eps.Alias(),
Hostname: eps.EndPoint.Host,
PortMap: eps.EndPoint.PortMap,
HealthMap: eps.EndPoint.HealthMap,
Keyspace: eps.Target.Keyspace,
Shard: eps.Target.Shard,
Type: eps.Target.TabletType,
}, -1 /* version */)
}

Просмотреть файл

@ -29,6 +29,9 @@ type FakePoolConnection struct {
mu sync.Mutex
expectedExecuteFetch []ExpectedExecuteFetch
expectedExecuteFetchIndex int
// Infinite is true when executed queries beyond our expectation list should
// respond with the last entry from the list.
infinite bool
}
// ExpectedExecuteFetch defines for an expected query the to be faked output.
@ -38,6 +41,9 @@ type ExpectedExecuteFetch struct {
WantFields bool
QueryResult *sqltypes.Result
Error error
// AfterFunc is a callback which is executed while the query is executed i.e.,
// before the fake responds to the client.
AfterFunc func()
}
// NewFakePoolConnectionQuery creates a new fake database.
@ -50,6 +56,13 @@ func (f *FakePoolConnection) addExpectedExecuteFetch(entry ExpectedExecuteFetch)
f.addExpectedExecuteFetchAtIndex(appendEntry, entry)
}
func (f *FakePoolConnection) enableInfinite() {
f.mu.Lock()
defer f.mu.Unlock()
f.infinite = true
}
// addExpectedExecuteFetchAtIndex inserts a new entry at index.
// index values start at 0.
func (f *FakePoolConnection) addExpectedExecuteFetchAtIndex(index int, entry ExpectedExecuteFetch) {
@ -87,6 +100,35 @@ func (f *FakePoolConnection) addExpectedQueryAtIndex(index int, query string, er
})
}
// getEntry returns the expected entry at "index". If index is out of bounds,
// the return value will be nil.
func (f *FakePoolConnection) getEntry(index int) *ExpectedExecuteFetch {
f.mu.Lock()
defer f.mu.Unlock()
if index < 0 || index >= len(f.expectedExecuteFetch) {
return nil
}
return &f.expectedExecuteFetch[index]
}
func (f *FakePoolConnection) deleteAllEntriesAfterIndex(index int) {
f.mu.Lock()
defer f.mu.Unlock()
if index < 0 || index >= len(f.expectedExecuteFetch) {
return
}
if index+1 < f.expectedExecuteFetchIndex {
// Don't delete entries which were already answered.
return
}
f.expectedExecuteFetch = f.expectedExecuteFetch[:index+1]
}
// verifyAllExecutedOrFail checks that all expected queries where actually
// received and executed. If not, it will let the test fail.
func (f *FakePoolConnection) verifyAllExecutedOrFail() {
@ -110,6 +152,11 @@ func (f *FakePoolConnection) ExecuteFetch(query string, maxrows int, wantfields
defer f.mu.Unlock()
index := f.expectedExecuteFetchIndex
if f.infinite && index == len(f.expectedExecuteFetch) {
// Although we already executed all queries, we'll continue to answer the
// last one in the infinite mode.
index--
}
if index >= len(f.expectedExecuteFetch) {
f.t.Errorf("%v: got unexpected out of bound fetch: %v >= %v", f.name, index, len(f.expectedExecuteFetch))
return nil, errors.New("unexpected out of bound fetch")
@ -117,6 +164,15 @@ func (f *FakePoolConnection) ExecuteFetch(query string, maxrows int, wantfields
entry := f.expectedExecuteFetch[index]
f.expectedExecuteFetchIndex++
// If the infinite mode is on, reverse the increment and keep the index at
// len(f.expectedExecuteFetch).
if f.infinite && f.expectedExecuteFetchIndex > len(f.expectedExecuteFetch) {
f.expectedExecuteFetchIndex--
}
if entry.AfterFunc != nil {
defer entry.AfterFunc()
}
expected := entry.Query
if strings.HasSuffix(expected, "*") {

Просмотреть файл

@ -17,6 +17,7 @@ import (
"github.com/youtube/vitess/go/event"
"github.com/youtube/vitess/go/sync2"
"github.com/youtube/vitess/go/vt/binlog/binlogplayer"
"github.com/youtube/vitess/go/vt/discovery"
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
@ -55,6 +56,18 @@ type SplitCloneWorker struct {
// populated during WorkerStateFindTargets, read-only after that
sourceAliases []*topodatapb.TabletAlias
sourceTablets []*topo.TabletInfo
// healthCheck tracks the health of all MASTER and REPLICA tablets.
// It must be closed at the end of the command.
healthCheck discovery.HealthCheck
// destinationShardWatchers contains a TopologyWatcher for each destination
// shard. It updates the list of endpoints in the healthcheck if replicas are
// added/removed.
// Each watcher must be stopped at the end of the command.
destinationShardWatchers []*discovery.TopologyWatcher
// destinationDbNames stores for each destination keyspace/shard the MySQL
// database name.
// Example Map Entry: test_keyspace/-80 => vt_test_keyspace
destinationDbNames map[string]string
// populated during WorkerStateCopy
// tableStatusList holds the status for each table.
@ -65,13 +78,6 @@ type SplitCloneWorker struct {
reloadTablets []map[topodatapb.TabletAlias]*topo.TabletInfo
ev *events.SplitClone
// Mutex to protect fields that might change when (re)resolving topology.
// TODO(aaijazi): we might want to have a Mutex per shard. Having a single mutex
// could become a bottleneck, as it needs to be locked for every ExecuteFetch.
resolveMu sync.Mutex
destinationShardsToTablets map[string]*topo.TabletInfo
resolveTime time.Time
}
// NewSplitCloneWorker returns a new SplitCloneWorker object.
@ -95,6 +101,8 @@ func NewSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, ex
minHealthyRdonlyEndPoints: minHealthyRdonlyEndPoints,
cleaner: &wrangler.Cleaner{},
destinationDbNames: make(map[string]string),
ev: &events.SplitClone{
Cell: cell,
Keyspace: keyspace,
@ -169,9 +177,13 @@ func (scw *SplitCloneWorker) StatusAsText() string {
// Run implements the Worker interface
func (scw *SplitCloneWorker) Run(ctx context.Context) error {
resetVars()
// Run the command.
err := scw.run(ctx)
// Cleanup.
scw.setState(WorkerStateCleanUp)
// Reverse any changes e.g. setting the tablet type of a source RDONLY tablet.
cerr := scw.cleaner.CleanUp(scw.wr)
if cerr != nil {
if err != nil {
@ -180,6 +192,17 @@ func (scw *SplitCloneWorker) Run(ctx context.Context) error {
err = cerr
}
}
// Stop healthcheck.
for _, watcher := range scw.destinationShardWatchers {
watcher.Stop()
}
if scw.healthCheck != nil {
if err := scw.healthCheck.Close(); err != nil {
scw.wr.Logger().Errorf("HealthCheck.Close() failed: %v", err)
}
}
if err != nil {
scw.setErrorState(err)
return err
@ -322,49 +345,45 @@ func (scw *SplitCloneWorker) findTargets(ctx context.Context) error {
action.TabletType = topodatapb.TabletType_SPARE
}
return scw.ResolveDestinationMasters(ctx)
}
// ResolveDestinationMasters implements the Resolver interface.
// It will attempt to resolve all shards and update scw.destinationShardsToTablets;
// if it is unable to do so, it will not modify scw.destinationShardsToTablets at all.
func (scw *SplitCloneWorker) ResolveDestinationMasters(ctx context.Context) error {
statsDestinationAttemptedResolves.Add(1)
destinationShardsToTablets := make(map[string]*topo.TabletInfo)
// Allow at most one resolution request at a time; if there are concurrent requests, only
// one of them will actually hit the topo server.
scw.resolveMu.Lock()
defer scw.resolveMu.Unlock()
// If the last resolution was fresh enough, return it.
if time.Now().Sub(scw.resolveTime) < *resolveTTL {
return nil
}
// Initialize healthcheck and add destination shards to it.
scw.healthCheck = discovery.NewHealthCheck(*remoteActionsTimeout, *healthcheckRetryDelay, *healthCheckTimeout, "" /* statsSuffix */)
for _, si := range scw.destinationShards {
ti, err := resolveDestinationShardMaster(ctx, si.Keyspace(), si.ShardName(), scw.wr)
if err != nil {
return err
}
destinationShardsToTablets[si.ShardName()] = ti
}
scw.destinationShardsToTablets = destinationShardsToTablets
// save the time of the last successful resolution
scw.resolveTime = time.Now()
statsDestinationActualResolves.Add(1)
return nil
watcher := discovery.NewShardReplicationWatcher(scw.wr.TopoServer(), scw.healthCheck,
scw.cell, si.Keyspace(), si.ShardName(),
*healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency)
scw.destinationShardWatchers = append(scw.destinationShardWatchers, watcher)
}
// GetDestinationMaster implements the Resolver interface
func (scw *SplitCloneWorker) GetDestinationMaster(shardName string) (*topo.TabletInfo, error) {
scw.resolveMu.Lock()
ti, ok := scw.destinationShardsToTablets[shardName]
scw.resolveMu.Unlock()
if !ok {
return nil, fmt.Errorf("no tablet found for destination shard %v", shardName)
// Make sure we find a master for each destination shard and log it.
scw.wr.Logger().Infof("Finding a MASTER tablet for each destination shard...")
for _, si := range scw.destinationShards {
waitCtx, waitCancel := context.WithTimeout(ctx, 10*time.Second)
defer waitCancel()
if err := discovery.WaitForEndPoints(waitCtx, scw.healthCheck,
scw.cell, si.Keyspace(), si.ShardName(), []topodatapb.TabletType{topodatapb.TabletType_MASTER}); err != nil {
return fmt.Errorf("cannot find MASTER tablet for destination shard for %v/%v: %v", si.Keyspace(), si.ShardName(), err)
}
return ti, nil
masters := discovery.GetCurrentMaster(
scw.healthCheck.GetEndPointStatsFromTarget(si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER))
if len(masters) == 0 {
return fmt.Errorf("cannot find MASTER tablet for destination shard for %v/%v in HealthCheck: empty EndPointStats list", si.Keyspace(), si.ShardName())
}
master := masters[0]
// Get the MySQL database name of the tablet.
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, master.Alias())
cancel()
if err != nil {
return fmt.Errorf("cannot get the TabletInfo for destination master (%v) to find out its db name: %v", topoproto.TabletAliasString(master.Alias()), err)
}
keyspaceAndShard := topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName())
scw.destinationDbNames[keyspaceAndShard] = ti.DbName()
scw.wr.Logger().Infof("Using tablet %v as destination master for %v/%v", topoproto.TabletAliasString(master.Alias()), si.Keyspace(), si.ShardName())
}
scw.wr.Logger().Infof("NOTE: The used master of a destination shard might change over the course of the copy e.g. due to a reparent. The HealthCheck module will track and log master changes and any error message will always refer the actually used master address.")
return nil
}
// Find all tablets on all destination shards. This should be done immediately before reloading
@ -435,17 +454,19 @@ func (scw *SplitCloneWorker) copy(ctx context.Context) error {
// destinationWriterCount go routines reading from it.
insertChannels[shardIndex] = make(chan string, scw.destinationWriterCount*2)
go func(shardName string, insertChannel chan string) {
go func(keyspace, shard string, insertChannel chan string) {
for j := 0; j < scw.destinationWriterCount; j++ {
destinationWaitGroup.Add(1)
go func() {
defer destinationWaitGroup.Done()
if err := executeFetchLoop(ctx, scw.wr, scw, shardName, insertChannel); err != nil {
keyspaceAndShard := topoproto.KeyspaceShardString(keyspace, shard)
if err := executeFetchLoop(ctx, scw.wr, scw.healthCheck, keyspace, shard, scw.destinationDbNames[keyspaceAndShard], insertChannel); err != nil {
processError("executeFetchLoop failed: %v", err)
}
}()
}
}(si.ShardName(), insertChannels[shardIndex])
}(si.Keyspace(), si.ShardName(), insertChannels[shardIndex])
}
// read the vschema if needed
@ -564,13 +585,14 @@ func (scw *SplitCloneWorker) copy(ctx context.Context) error {
for _, si := range scw.destinationShards {
destinationWaitGroup.Add(1)
go func(shardName string) {
go func(keyspace, shard string) {
defer destinationWaitGroup.Done()
scw.wr.Logger().Infof("Making and populating blp_checkpoint table")
if err := runSQLCommands(ctx, scw.wr, scw, shardName, queries); err != nil {
keyspaceAndShard := topoproto.KeyspaceShardString(keyspace, shard)
if err := runSQLCommands(ctx, scw.wr, scw.healthCheck, keyspace, shard, scw.destinationDbNames[keyspaceAndShard], queries); err != nil {
processError("blp_checkpoint queries failed: %v", err)
}
}(si.ShardName())
}(si.Keyspace(), si.ShardName())
}
destinationWaitGroup.Wait()
if firstError != nil {

Просмотреть файл

@ -15,6 +15,7 @@ import (
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/mysqlctl/replication"
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/tabletserver/grpcqueryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice/fakes"
"github.com/youtube/vitess/go/vt/topo"
@ -51,6 +52,11 @@ type splitCloneTestCase struct {
rightMasterFakeDb *FakePoolConnection
rightMasterQs *fakes.StreamHealthQueryService
// leftReplica is used by the reparent test.
leftReplica *testlib.FakeTablet
leftReplicaFakeDb *FakePoolConnection
leftReplicaQs *fakes.StreamHealthQueryService
// defaultWorkerArgs are the full default arguments to run SplitClone.
defaultWorkerArgs []string
}
@ -114,13 +120,17 @@ func (tc *splitCloneTestCase) setUp(v3 bool) {
topodatapb.TabletType_MASTER, db, testlib.TabletKeyspaceShard(tc.t, "ks", "-40"))
leftRdonly := testlib.NewFakeTablet(tc.t, tc.wi.wr, "cell1", 11,
topodatapb.TabletType_RDONLY, db, testlib.TabletKeyspaceShard(tc.t, "ks", "-40"))
// leftReplica is used by the reparent test.
leftReplica := testlib.NewFakeTablet(tc.t, tc.wi.wr, "cell1", 12,
topodatapb.TabletType_REPLICA, db, testlib.TabletKeyspaceShard(tc.t, "ks", "-40"))
tc.leftReplica = leftReplica
rightMaster := testlib.NewFakeTablet(tc.t, tc.wi.wr, "cell1", 20,
topodatapb.TabletType_MASTER, db, testlib.TabletKeyspaceShard(tc.t, "ks", "40-80"))
rightRdonly := testlib.NewFakeTablet(tc.t, tc.wi.wr, "cell1", 21,
topodatapb.TabletType_RDONLY, db, testlib.TabletKeyspaceShard(tc.t, "ks", "40-80"))
tc.tablets = []*testlib.FakeTablet{sourceMaster, sourceRdonly1, sourceRdonly2, leftMaster, leftRdonly, rightMaster, rightRdonly}
tc.tablets = []*testlib.FakeTablet{sourceMaster, sourceRdonly1, sourceRdonly2, leftMaster, leftRdonly, tc.leftReplica, rightMaster, rightRdonly}
for _, ft := range tc.tablets {
ft.StartActionLoop(tc.t, tc.wi.wr)
@ -176,6 +186,7 @@ func (tc *splitCloneTestCase) setUp(v3 bool) {
// containing half of the rows, i.e. 2 + 2 + 1 rows). So 3 * 10
// = 30 insert statements on each destination.
tc.leftMasterFakeDb = NewFakePoolConnectionQuery(tc.t, "leftMaster")
tc.leftReplicaFakeDb = NewFakePoolConnectionQuery(tc.t, "leftReplica")
tc.rightMasterFakeDb = NewFakePoolConnectionQuery(tc.t, "rightMaster")
for i := 1; i <= 30; i++ {
@ -187,8 +198,20 @@ func (tc *splitCloneTestCase) setUp(v3 bool) {
expectBlpCheckpointCreationQueries(tc.rightMasterFakeDb)
leftMaster.FakeMysqlDaemon.DbAppConnectionFactory = tc.leftMasterFakeDb.getFactory()
leftReplica.FakeMysqlDaemon.DbAppConnectionFactory = tc.leftReplicaFakeDb.getFactory()
rightMaster.FakeMysqlDaemon.DbAppConnectionFactory = tc.rightMasterFakeDb.getFactory()
// Fake stream health reponses because vtworker needs them to find the master.
tc.leftMasterQs = fakes.NewStreamHealthQueryService(leftMaster.Target())
tc.leftMasterQs.AddDefaultHealthResponse()
tc.leftReplicaQs = fakes.NewStreamHealthQueryService(leftReplica.Target())
tc.leftReplicaQs.AddDefaultHealthResponse()
tc.rightMasterQs = fakes.NewStreamHealthQueryService(rightMaster.Target())
tc.rightMasterQs.AddDefaultHealthResponse()
grpcqueryservice.RegisterForTest(leftMaster.RPCServer, tc.leftMasterQs)
grpcqueryservice.RegisterForTest(leftReplica.RPCServer, tc.leftReplicaQs)
grpcqueryservice.RegisterForTest(rightMaster.RPCServer, tc.rightMasterQs)
tc.defaultWorkerArgs = []string{
"SplitClone",
"-source_reader_count", "10",
@ -203,6 +226,7 @@ func (tc *splitCloneTestCase) tearDown() {
ft.StopActionLoop(tc.t)
}
tc.leftMasterFakeDb.verifyAllExecutedOrFail()
tc.leftReplicaFakeDb.verifyAllExecutedOrFail()
tc.rightMasterFakeDb.verifyAllExecutedOrFail()
}
@ -300,17 +324,138 @@ func TestSplitCloneV2_RetryDueToReadonly(t *testing.T) {
t.Fatal(err)
}
if statsDestinationAttemptedResolves.String() != "3" {
t.Errorf("Wrong statsDestinationAttemptedResolves: wanted %v, got %v", "3", statsDestinationAttemptedResolves.String())
wantRetryCount := int64(2)
if got := statsRetryCount.Get(); got != wantRetryCount {
t.Errorf("Wrong statsRetryCounter: got %v, wanted %v", got, wantRetryCount)
}
if statsDestinationActualResolves.String() != "1" {
t.Errorf("Wrong statsDestinationActualResolves: wanted %v, got %v", "1", statsDestinationActualResolves.String())
}
if statsRetryCounters.String() != "{\"ReadOnly\": 2}" {
t.Errorf("Wrong statsRetryCounters: wanted %v, got %v", "{\"ReadOnly\": 2}", statsRetryCounters.String())
wantRetryReadOnlyCount := int64(2)
if got := statsRetryCounters.Counts()[retryCategoryReadOnly]; got != wantRetryReadOnlyCount {
t.Errorf("Wrong statsRetryCounters: got %v, wanted %v", got, wantRetryReadOnlyCount)
}
}
// TestSplitCloneV2_RetryDueToReparent tests that vtworker correctly failovers
// during a reparent.
// NOTE: worker.py is an end-to-end test which tests this as well.
func TestSplitCloneV2_RetryDueToReparent(t *testing.T) {
tc := &splitCloneTestCase{t: t}
tc.setUp(false /* v3 */)
defer tc.tearDown()
// Provoke a reparent just before the copy finishes.
// leftReplica will take over for the last, 30th, insert and the BLP checkpoint.
tc.leftReplicaFakeDb.addExpectedQuery("INSERT INTO `vt_ks`.table1(id, msg, keyspace_id) VALUES (*", nil)
expectBlpCheckpointCreationQueries(tc.leftReplicaFakeDb)
// Do not let leftMaster succeed the 30th write.
tc.leftMasterFakeDb.deleteAllEntriesAfterIndex(28)
tc.leftMasterFakeDb.addExpectedQuery("INSERT INTO `vt_ks`.table1(id, msg, keyspace_id) VALUES (*", errReadOnly)
tc.leftMasterFakeDb.enableInfinite()
// When vtworker encounters the readonly error on leftMaster, do the reparent.
tc.leftMasterFakeDb.getEntry(29).AfterFunc = func() {
// Reparent from leftMaster to leftReplica.
// NOTE: This step is actually not necessary due to our fakes which bypass
// a lot of logic. Let's keep it for correctness though.
ti, err := tc.ts.GetTablet(context.Background(), tc.leftReplica.Tablet.Alias)
if err != nil {
t.Fatalf("GetTablet failed: %v", err)
}
tmc := tmclient.NewTabletManagerClient()
if err := tmc.TabletExternallyReparented(context.Background(), ti, "wait id 1"); err != nil {
t.Fatalf("TabletExternallyReparented(replica) failed: %v", err)
}
// Update targets in fake query service and send out a new health response.
tc.leftMasterQs.UpdateType(topodatapb.TabletType_REPLICA)
tc.leftMasterQs.AddDefaultHealthResponse()
tc.leftReplicaQs.UpdateType(topodatapb.TabletType_MASTER)
tc.leftReplicaQs.AddDefaultHealthResponse()
// After this, vtworker will retry. The following situations can occur:
// 1. HealthCheck picked up leftReplica as new MASTER
// => retry will succeed.
// 2. HealthCheck picked up no changes (leftMaster remains MASTER)
// => retry will hit leftMaster which keeps responding with readonly err.
// 3. HealthCheck picked up leftMaster as REPLICA, but leftReplica is still
// a REPLICA.
// => vtworker has no MASTER to go to and will keep retrying.
}
// Only wait 1 ms between retries, so that the test passes faster.
*executeFetchRetryTime = 1 * time.Millisecond
// Run the vtworker command.
if err := runCommand(t, tc.wi, tc.wi.wr, tc.defaultWorkerArgs); err != nil {
t.Fatal(err)
}
wantRetryCountMin := int64(1)
if got := statsRetryCount.Get(); got < wantRetryCountMin {
t.Errorf("Wrong statsRetryCounter: got %v, wanted >= %v", got, wantRetryCountMin)
}
}
// TestSplitCloneV2_NoMasterAvailable tests that vtworker correctly retries
// even in a period where no MASTER tablet is available according to the
// HealthCheck instance.
func TestSplitCloneV2_NoMasterAvailable(t *testing.T) {
tc := &splitCloneTestCase{t: t}
tc.setUp(false /* v3 */)
defer tc.tearDown()
// leftReplica will take over for the last, 30th, insert and the BLP checkpoint.
tc.leftReplicaFakeDb.addExpectedQuery("INSERT INTO `vt_ks`.table1(id, msg, keyspace_id) VALUES (*", nil)
expectBlpCheckpointCreationQueries(tc.leftReplicaFakeDb)
// During the 29th write, let the MASTER disappear.
tc.leftMasterFakeDb.getEntry(28).AfterFunc = func() {
tc.leftMasterQs.UpdateType(topodatapb.TabletType_REPLICA)
tc.leftMasterQs.AddDefaultHealthResponse()
}
// If the HealthCheck didn't pick up the change yet, the 30th write would
// succeed. To prevent this from happening, replace it with an error.
tc.leftMasterFakeDb.deleteAllEntriesAfterIndex(28)
tc.leftMasterFakeDb.addExpectedQuery("INSERT INTO `vt_ks`.table1(id, msg, keyspace_id) VALUES (*", errReadOnly)
tc.leftMasterFakeDb.enableInfinite()
// vtworker may not retry on leftMaster again if HealthCheck picks up the
// change very fast. In that case, the error was never encountered.
// Delete it or verifyAllExecutedOrFail() will fail because it was not
// processed.
defer tc.leftMasterFakeDb.deleteAllEntriesAfterIndex(28)
// Wait for a retry due to NoMasterAvailable to happen, expect the 30th write
// on leftReplica and change leftReplica from REPLICA to MASTER.
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
for {
if statsRetryCounters.Counts()[retryCategoryNoMasterAvailable] >= 1 {
break
}
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for vtworker to retry due to NoMasterAvailable: %v", ctx.Err())
case <-time.After(10 * time.Millisecond):
// Poll constantly.
}
}
// Make leftReplica the new MASTER.
tc.leftReplicaQs.UpdateType(topodatapb.TabletType_MASTER)
tc.leftReplicaQs.AddDefaultHealthResponse()
}()
// Only wait 1 ms between retries, so that the test passes faster.
*executeFetchRetryTime = 1 * time.Millisecond
// Run the vtworker command.
if err := runCommand(t, tc.wi, tc.wi.wr, tc.defaultWorkerArgs); err != nil {
t.Fatal(err)
}
}
func TestSplitCloneV3(t *testing.T) {
tc := &splitCloneTestCase{t: t}

Просмотреть файл

@ -26,10 +26,6 @@ var (
// Therefore, the default for this variable must be higher
// than vttablet's -health_check_interval.
waitForHealthyEndPointsTimeout = flag.Duration("wait_for_healthy_rdonly_endpoints_timeout", 60*time.Second, "maximum time to wait if less than --min_healthy_rdonly_endpoints are available")
healthCheckTopologyRefresh = flag.Duration("worker_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology")
healthcheckRetryDelay = flag.Duration("worker_healthcheck_retry_delay", 5*time.Second, "delay before retrying a failed healthcheck")
healthCheckTimeout = flag.Duration("worker_healthcheck_timeout", time.Minute, "the health check timeout period")
)
// FindHealthyRdonlyEndPoint returns a random healthy endpoint.
@ -43,7 +39,7 @@ func FindHealthyRdonlyEndPoint(ctx context.Context, wr *wrangler.Wrangler, cell,
// create a discovery healthcheck, wait for it to have one rdonly
// endpoints at this point
healthCheck := discovery.NewHealthCheck(*remoteActionsTimeout, *healthcheckRetryDelay, *healthCheckTimeout, "" /* statsSuffix */)
watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), healthCheck, cell, keyspace, shard, *healthCheckTopologyRefresh, 5 /*topoReadConcurrency*/)
watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), healthCheck, cell, keyspace, shard, *healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency)
defer watcher.Stop()
defer healthCheck.Close()

Просмотреть файл

@ -19,6 +19,7 @@ import (
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/sync2"
"github.com/youtube/vitess/go/vt/binlog/binlogplayer"
"github.com/youtube/vitess/go/vt/discovery"
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
@ -53,6 +54,18 @@ type VerticalSplitCloneWorker struct {
// populated during WorkerStateFindTargets, read-only after that
sourceAlias *topodatapb.TabletAlias
sourceTablet *topo.TabletInfo
// healthCheck tracks the health of all MASTER and REPLICA tablets.
// It must be closed at the end of the command.
healthCheck discovery.HealthCheck
// destinationShardWatchers contains a TopologyWatcher for each destination
// shard. It updates the list of endpoints in the healthcheck if replicas are
// added/removed.
// Each watcher must be stopped at the end of the command.
destinationShardWatchers []*discovery.TopologyWatcher
// destinationDbNames stores for each destination keyspace/shard the MySQL
// database name.
// Example Map Entry: test_keyspace/-80 => vt_test_keyspace
destinationDbNames map[string]string
// populated during WorkerStateCopy
// tableStatusList holds the status for each table.
@ -63,11 +76,6 @@ type VerticalSplitCloneWorker struct {
reloadTablets map[topodatapb.TabletAlias]*topo.TabletInfo
ev *events.VerticalSplitClone
// Mutex to protect fields that might change when (re)resolving topology.
resolveMu sync.Mutex
destinationShardsToTablets map[string]*topo.TabletInfo
resolveTime time.Time
}
// NewVerticalSplitCloneWorker returns a new VerticalSplitCloneWorker object.
@ -94,6 +102,8 @@ func NewVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, destinationKeyspac
minHealthyRdonlyEndPoints: minHealthyRdonlyEndPoints,
cleaner: &wrangler.Cleaner{},
destinationDbNames: make(map[string]string),
ev: &events.VerticalSplitClone{
Cell: cell,
Keyspace: destinationKeyspace,
@ -160,9 +170,13 @@ func (vscw *VerticalSplitCloneWorker) StatusAsText() string {
// Run implements the Worker interface
func (vscw *VerticalSplitCloneWorker) Run(ctx context.Context) error {
resetVars()
// Run the command.
err := vscw.run(ctx)
// Cleanup.
vscw.setState(WorkerStateCleanUp)
// Reverse any changes e.g. setting the tablet type of a source RDONLY tablet.
cerr := vscw.cleaner.CleanUp(vscw.wr)
if cerr != nil {
if err != nil {
@ -171,6 +185,17 @@ func (vscw *VerticalSplitCloneWorker) Run(ctx context.Context) error {
err = cerr
}
}
// Stop healthcheck.
for _, watcher := range vscw.destinationShardWatchers {
watcher.Stop()
}
if vscw.healthCheck != nil {
if err := vscw.healthCheck.Close(); err != nil {
vscw.wr.Logger().Errorf("HealthCheck.Close() failed: %v", err)
}
}
if err != nil {
vscw.setErrorState(err)
return err
@ -290,44 +315,41 @@ func (vscw *VerticalSplitCloneWorker) findTargets(ctx context.Context) error {
}
action.TabletType = topodatapb.TabletType_SPARE
return vscw.ResolveDestinationMasters(ctx)
// Initialize healthcheck and add destination shards to it.
vscw.healthCheck = discovery.NewHealthCheck(*remoteActionsTimeout, *healthcheckRetryDelay, *healthCheckTimeout, "" /* statsSuffix */)
watcher := discovery.NewShardReplicationWatcher(vscw.wr.TopoServer(), vscw.healthCheck,
vscw.cell, vscw.destinationKeyspace, vscw.destinationShard,
*healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency)
vscw.destinationShardWatchers = append(vscw.destinationShardWatchers, watcher)
// Make sure we find a master for each destination shard and log it.
vscw.wr.Logger().Infof("Finding a MASTER tablet for each destination shard...")
waitCtx, waitCancel := context.WithTimeout(ctx, 10*time.Second)
defer waitCancel()
if err := discovery.WaitForEndPoints(waitCtx, vscw.healthCheck,
vscw.cell, vscw.destinationKeyspace, vscw.destinationShard, []topodatapb.TabletType{topodatapb.TabletType_MASTER}); err != nil {
return fmt.Errorf("cannot find MASTER tablet for destination shard for %v/%v: %v", vscw.destinationKeyspace, vscw.destinationShard, err)
}
// ResolveDestinationMasters implements the Resolver interface.
// It will attempt to resolve all shards and update vscw.destinationShardsToTablets;
// if it is unable to do so, it will not modify vscw.destinationShardsToTablets at all.
func (vscw *VerticalSplitCloneWorker) ResolveDestinationMasters(ctx context.Context) error {
statsDestinationAttemptedResolves.Add(1)
// Allow at most one resolution request at a time; if there are concurrent requests, only
// one of them will actualy hit the topo server.
vscw.resolveMu.Lock()
defer vscw.resolveMu.Unlock()
// If the last resolution was fresh enough, return it.
if time.Now().Sub(vscw.resolveTime) < *resolveTTL {
return nil
masters := discovery.GetCurrentMaster(
vscw.healthCheck.GetEndPointStatsFromTarget(vscw.destinationKeyspace, vscw.destinationShard, topodatapb.TabletType_MASTER))
if len(masters) == 0 {
return fmt.Errorf("cannot find MASTER tablet for destination shard for %v/%v in HealthCheck: empty EndPointStats list", vscw.destinationKeyspace, vscw.destinationShard)
}
master := masters[0]
ti, err := resolveDestinationShardMaster(ctx, vscw.destinationKeyspace, vscw.destinationShard, vscw.wr)
// Get the MySQL database name of the tablet.
shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
ti, err := vscw.wr.TopoServer().GetTablet(shortCtx, master.Alias())
cancel()
if err != nil {
return err
}
vscw.destinationShardsToTablets = map[string]*topo.TabletInfo{vscw.destinationShard: ti}
// save the time of the last successful resolution
vscw.resolveTime = time.Now()
statsDestinationActualResolves.Add(1)
return nil
return fmt.Errorf("cannot get the TabletInfo for destination master (%v) to find out its db name: %v", topoproto.TabletAliasString(master.Alias()), err)
}
keyspaceAndShard := topoproto.KeyspaceShardString(vscw.destinationKeyspace, vscw.destinationShard)
vscw.destinationDbNames[keyspaceAndShard] = ti.DbName()
// GetDestinationMaster implements the Resolver interface
func (vscw *VerticalSplitCloneWorker) GetDestinationMaster(shardName string) (*topo.TabletInfo, error) {
vscw.resolveMu.Lock()
defer vscw.resolveMu.Unlock()
ti, ok := vscw.destinationShardsToTablets[shardName]
if !ok {
return nil, fmt.Errorf("no tablet found for destination shard %v", shardName)
}
return ti, nil
vscw.wr.Logger().Infof("Using tablet %v as destination master for %v/%v", topoproto.TabletAliasString(master.Alias()), vscw.destinationKeyspace, vscw.destinationShard)
vscw.wr.Logger().Infof("NOTE: The used master of a destination shard might change over the course of the copy e.g. due to a reparent. The HealthCheck module will track and log master changes and any error message will always refer the actually used master address.")
return nil
}
// Find all tablets on the destination shard. This should be done immediately before reloading
@ -389,18 +411,17 @@ func (vscw *VerticalSplitCloneWorker) copy(ctx context.Context) error {
// destinationWriterCount go routines reading from it.
insertChannel := make(chan string, vscw.destinationWriterCount*2)
go func(shardName string, insertChannel chan string) {
for j := 0; j < vscw.destinationWriterCount; j++ {
destinationWaitGroup.Add(1)
go func() {
defer destinationWaitGroup.Done()
if err := executeFetchLoop(ctx, vscw.wr, vscw, shardName, insertChannel); err != nil {
keyspaceAndShard := topoproto.KeyspaceShardString(vscw.destinationKeyspace, vscw.destinationShard)
if err := executeFetchLoop(ctx, vscw.wr, vscw.healthCheck, vscw.destinationKeyspace, vscw.destinationShard, vscw.destinationDbNames[keyspaceAndShard], insertChannel); err != nil {
processError("executeFetchLoop failed: %v", err)
}
}()
}
}(vscw.destinationShard, insertChannel)
// Now for each table, read data chunks and send them to insertChannel
sourceWaitGroup := sync.WaitGroup{}
@ -470,15 +491,11 @@ func (vscw *VerticalSplitCloneWorker) copy(ctx context.Context) error {
flags = binlogplayer.BlpFlagDontStart
}
queries = append(queries, binlogplayer.PopulateBlpCheckpoint(0, status.Position, time.Now().Unix(), flags))
destinationWaitGroup.Add(1)
go func(shardName string) {
defer destinationWaitGroup.Done()
vscw.wr.Logger().Infof("Making and populating blp_checkpoint table")
if err := runSQLCommands(ctx, vscw.wr, vscw, shardName, queries); err != nil {
keyspaceAndShard := topoproto.KeyspaceShardString(vscw.destinationKeyspace, vscw.destinationShard)
if err := runSQLCommands(ctx, vscw.wr, vscw.healthCheck, vscw.destinationKeyspace, vscw.destinationShard, vscw.destinationDbNames[keyspaceAndShard], queries); err != nil {
processError("blp_checkpoint queries failed: %v", err)
}
}(vscw.destinationShard)
destinationWaitGroup.Wait()
if firstError != nil {
return firstError
}

Просмотреть файл

@ -200,6 +200,10 @@ func TestVerticalSplitClone(t *testing.T) {
defer destMasterFakeDb.verifyAllExecutedOrFail()
destMaster.FakeMysqlDaemon.DbAppConnectionFactory = destMasterFakeDb.getFactory()
// Fake stream health reponses because vtworker needs them to find the master.
qs := fakes.NewStreamHealthQueryService(destMaster.Target())
qs.AddDefaultHealthResponse()
grpcqueryservice.RegisterForTest(destMaster.RPCServer, qs)
// Only wait 1 ms between retries, so that the test passes faster
*executeFetchRetryTime = (1 * time.Millisecond)
@ -217,13 +221,12 @@ func TestVerticalSplitClone(t *testing.T) {
t.Fatal(err)
}
if statsDestinationAttemptedResolves.String() != "2" {
t.Errorf("Wrong statsDestinationAttemptedResolves: wanted %v, got %v", "2", statsDestinationAttemptedResolves.String())
wantRetryCount := int64(1)
if got := statsRetryCount.Get(); got != wantRetryCount {
t.Errorf("Wrong statsRetryCounter: got %v, wanted %v", got, wantRetryCount)
}
if statsDestinationActualResolves.String() != "1" {
t.Errorf("Wrong statsDestinationActualResolves: wanted %v, got %v", "1", statsDestinationActualResolves.String())
}
if statsRetryCounters.String() != "{\"ReadOnly\": 1}" {
t.Errorf("Wrong statsRetryCounters: wanted %v, got %v", "{\"ReadOnly\": 1}", statsRetryCounters.String())
wantRetryReadOnlyCount := int64(1)
if got := statsRetryCounters.Counts()[retryCategoryReadOnly]; got != wantRetryReadOnlyCount {
t.Errorf("Wrong statsRetryCounters: got %v, wanted %v", got, wantRetryReadOnlyCount)
}
}

Просмотреть файл

@ -16,7 +16,6 @@ import (
"golang.org/x/net/context"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/vt/topo"
)
// Worker is the base interface for all long running workers.
@ -36,38 +35,34 @@ type Worker interface {
Run(context.Context) error
}
// Resolver is an interface that should be implemented by any workers that need to
// resolve the topology.
type Resolver interface {
// ResolveDestinationMasters forces the worker to (re)resolve the topology and update
// the destination masters that it knows about.
ResolveDestinationMasters(ctx context.Context) error
// GetDestinationMaster returns the most recently resolved destination master for a particular shard.
GetDestinationMaster(shardName string) (*topo.TabletInfo, error)
}
var (
resolveTTL = flag.Duration("resolve_ttl", 15*time.Second, "Amount of time that a topo resolution can be cached for")
executeFetchRetryTime = flag.Duration("executefetch_retry_time", 30*time.Second, "Amount of time we should wait before retrying ExecuteFetch calls")
remoteActionsTimeout = flag.Duration("remote_actions_timeout", time.Minute, "Amount of time to wait for remote actions (like replication stop, ...)")
useV3ReshardingMode = flag.Bool("use_v3_resharding_mode", false, "True iff the workers should use V3-style resharding, which doesn't require a preset sharding key column.")
healthCheckTopologyRefresh = flag.Duration("worker_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology")
healthcheckRetryDelay = flag.Duration("worker_healthcheck_retry_delay", 5*time.Second, "delay before retrying a failed healthcheck")
healthCheckTimeout = flag.Duration("worker_healthcheck_timeout", time.Minute, "the health check timeout period")
statsState = stats.NewString("WorkerState")
// the number of times that the worker attempst to reresolve the masters
statsDestinationAttemptedResolves = stats.NewInt("WorkerDestinationAttemptedResolves")
// the number of times that the worker actually hits the topo server, i.e., they don't
// use a cached topology
statsDestinationActualResolves = stats.NewInt("WorkerDestinationActualResolves")
statsRetryCounters = stats.NewCounters("WorkerRetryCount")
// statsRetryCount is the total number of times a query to vttablet had to be retried.
statsRetryCount = stats.NewInt("WorkerRetryCount")
// statsRetryCount groups the number of retries by category e.g. "TimeoutError" or "Readonly".
statsRetryCounters = stats.NewCounters("WorkerRetryCounters")
)
const (
retryCategoryReadOnly = "ReadOnly"
retryCategoryTimeoutError = "TimeoutError"
retryCategoryConnectionError = "ConnectionError"
retryCategoryNoMasterAvailable = "NoMasterAvailable"
)
// resetVars resets the debug variables that are meant to provide information on a
// per-run basis. This should be called at the beginning of each worker run.
func resetVars() {
statsState.Set("")
statsDestinationAttemptedResolves.Set(0)
statsDestinationActualResolves.Set(0)
statsRetryCount.Set(0)
statsRetryCounters.Reset()
}

Просмотреть файл

@ -485,7 +485,7 @@ func (wr *Wrangler) waitForDrainInCell(ctx context.Context, cell, keyspace, shar
retryDelay, healthCheckTopologyRefresh, healthcheckRetryDelay, healthCheckTimeout time.Duration) error {
hc := discovery.NewHealthCheck(healthCheckTimeout /* connectTimeout */, healthcheckRetryDelay, healthCheckTimeout, cell)
defer hc.Close()
watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), hc, cell, keyspace, shard, healthCheckTopologyRefresh, 5 /* topoReadConcurrency */)
watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), hc, cell, keyspace, shard, healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency)
defer watcher.Stop()
if err := discovery.WaitForEndPoints(ctx, hc, cell, keyspace, shard, []topodatapb.TabletType{servedType}); err != nil {
@ -554,11 +554,7 @@ func formatEndpointStats(eps *discovery.EndPointStats) string {
if webPort, ok := eps.EndPoint.PortMap["vt"]; ok {
webURL = fmt.Sprintf("http://%v:%d/", eps.EndPoint.Host, webPort)
}
alias := &topodatapb.TabletAlias{
Cell: eps.Cell,
Uid: eps.EndPoint.Uid,
}
return fmt.Sprintf("%v: %v stats: %v", topoproto.TabletAliasString(alias), webURL, eps.Stats)
return fmt.Sprintf("%v: %v stats: %v", topoproto.TabletAliasString(eps.Alias()), webURL, eps.Stats)
}
// MigrateServedFrom is used during vertical splits to migrate a

Просмотреть файл

@ -86,7 +86,10 @@ def setUpModule():
# Create destination shard.
dst_master.init_tablet('master', 'test_keyspace', '-')
dst_replica.init_tablet('replica', 'test_keyspace', '-')
dst_master.start_vttablet(wait_for_state='NOT_SERVING')
# Start masters with enabled healthcheck (necessary for resolving the
# destination master).
dst_master.start_vttablet(wait_for_state='NOT_SERVING',
target_tablet_type='replica')
dst_replica.start_vttablet(wait_for_state='NOT_SERVING')
utils.run_vtctl(['InitShardMaster', 'test_keyspace/-',

Просмотреть файл

@ -301,8 +301,12 @@ index by_msg (msg)
# start vttablet on the split shards (no db created,
# so they're all not serving)
for t in [shard_dest_master, shard_dest_replica, shard_dest_rdonly]:
for t in [shard_dest_replica, shard_dest_rdonly]:
t.start_vttablet(wait_for_state=None)
# Start masters with enabled healthcheck (necessary for resolving the
# destination master).
shard_dest_master.start_vttablet(wait_for_state=None,
target_tablet_type='replica')
for t in [shard_dest_master, shard_dest_replica, shard_dest_rdonly]:
t.wait_for_vttablet_state('NOT_SERVING')

Просмотреть файл

@ -459,9 +459,13 @@ primary key (name)
# start vttablet on the split shards (no db created,
# so they're all not serving)
# Start masters with enabled healthcheck (necessary for resolving the
# destination master).
shard_2_master.start_vttablet(wait_for_state=None,
target_tablet_type='replica')
shard_3_master.start_vttablet(wait_for_state=None,
target_tablet_type='replica')
for t in [shard_2_master, shard_2_replica1, shard_2_replica2,
for t in [shard_2_replica1, shard_2_replica2,
shard_3_replica, shard_3_rdonly1]:
t.start_vttablet(wait_for_state=None)
for t in [shard_2_master, shard_2_replica1, shard_2_replica2,
@ -483,6 +487,11 @@ primary key (name)
keyspace_id_type=keyspace_id_type,
sharding_column_name='custom_sharding_key')
# TODO(mberlin): Use a different approach for the same effect because this
# one doesn't work when the healthcheck is enabled on the
# tablet. In that case the healthcheck will race with the
# test and convert the SPARE tablet back to REPLICA the next
# time it runs.
# disable shard_1_slave2, so we're sure filtered replication will go
# from shard_1_slave1
utils.run_vtctl(['ChangeSlaveType', shard_1_slave2.tablet_alias, 'spare'])

Просмотреть файл

@ -798,11 +798,6 @@ def _get_vtworker_cmd(clargs, auto_log=False):
args = environment.binary_args('vtworker') + [
'-log_dir', environment.vtlogroot,
'-port', str(port),
# use a long resolve TTL because of potential race conditions with doing
# an EmergencyReparent and resolving the master (as EmergencyReparent
# will delete the old master before updating the shard record with the
# new master)
'-resolve_ttl', '10s',
'-executefetch_retry_time', '1s',
'-tablet_manager_protocol',
protocols_flavor().tablet_manager_protocol(),

Просмотреть файл

@ -448,16 +448,14 @@ class TestBaseSplitCloneResiliency(TestBaseSplitClone):
worker_rpc_port)
if mysql_down:
# If MySQL is down, we wait until resolving at least twice (to verify that
# we do reresolve and retry due to MySQL being down).
worker_vars = utils.poll_for_vars(
# If MySQL is down, we wait until vtworker retried at least once to make
# sure it reached the point where a write failed due to MySQL being down.
# There should be two retries at least, one for each destination shard.
utils.poll_for_vars(
'vtworker', worker_port,
'WorkerDestinationActualResolves >= 2',
condition_fn=lambda v: v.get('WorkerDestinationActualResolves') >= 2)
self.assertNotEqual(
worker_vars['WorkerRetryCount'], {},
"expected vtworker to retry, but it didn't")
logging.debug('Worker has resolved at least twice, starting reparent now')
'WorkerRetryCount >= 2',
condition_fn=lambda v: v.get('WorkerRetryCount') >= 2)
logging.debug('Worker has retried at least twice, starting reparent now')
# Bring back masters. Since we test with semi-sync now, we need at least
# one replica for the new master. This test is already quite expensive,
@ -480,19 +478,20 @@ class TestBaseSplitCloneResiliency(TestBaseSplitClone):
# NOTE: There is a race condition around this:
# It's possible that the SplitClone vtworker command finishes before the
# PlannedReparentShard vtctl command, which we start below, succeeds.
# Then the test would fail because vtworker did not have to resolve the
# master tablet again (due to the missing reparent).
# Then the test would fail because vtworker did not have to retry.
#
# To workaround this, the test takes a parameter to increase the number of
# rows that the worker has to copy (with the idea being to slow the worker
# down).
# You should choose a value for num_insert_rows, such that this test
# passes for your environment (trial-and-error...)
# Make sure that vtworker got past the point where it picked a master
# for each destination shard ("finding targets" state).
utils.poll_for_vars(
'vtworker', worker_port,
'WorkerDestinationActualResolves >= 1',
condition_fn=lambda v: v.get('WorkerDestinationActualResolves') >= 1)
logging.debug('Worker has resolved at least once, starting reparent now')
'WorkerState == copying the data',
condition_fn=lambda v: v.get('WorkerState') == 'copying the data')
logging.debug('Worker is in copy state, starting reparent now')
utils.run_vtctl(
['PlannedReparentShard', 'test_keyspace/-80',
@ -503,10 +502,10 @@ class TestBaseSplitCloneResiliency(TestBaseSplitClone):
utils.wait_procs([workerclient_proc])
# Verify that we were forced to reresolve and retry.
# Verify that we were forced to re-resolve and retry.
worker_vars = utils.get_vars(worker_port)
self.assertGreater(worker_vars['WorkerDestinationActualResolves'], 1)
self.assertGreater(worker_vars['WorkerDestinationAttemptedResolves'], 1)
# There should be two retries at least, one for each destination shard.
self.assertGreater(worker_vars['WorkerRetryCount'], 1)
self.assertNotEqual(worker_vars['WorkerRetryCount'], {},
"expected vtworker to retry, but it didn't")
utils.kill_sub_process(worker_proc, soft=True)
@ -585,7 +584,7 @@ class TestVtworkerWebinterface(unittest.TestCase):
status = urllib2.urlopen(worker_base_url + '/status').read()
self.assertIn(
"Ping command was called with message: 'pong'", status,
'Command did not log output to /status')
'Command did not log output to /status: %s' % status)
# Reset the job.
urllib2.urlopen(worker_base_url + '/reset').read()