From a2a0d900caa96305b3a7bbc6e2316a5ae195e725 Mon Sep 17 00:00:00 2001 From: Michael Berlin Date: Fri, 29 Apr 2016 00:46:07 -0700 Subject: [PATCH] worker: Resolve destination master using the discovery module. - Removed previous Resolver interface and respective implementations. - Removed respective stat vars and flags (--resolve_ttl). - Added unit test for case where vtworker fails over to a different replica. - Added unit test for case when healthcheck retries because currently no master is available. - Extended FakePoolConnection to support these tests: - can define callback (AfterFunc) when expected query was received - infinite mode where the last request may be received over and over again - discovery: Introduce DefaultTopoReadConcurrency to avoid duplication. - discovery: Added EndPointStats.Alias() to avoid duplication. - discovery: Added EndPointStats.String() to have pretty printed arrays. Adapted end-to-end tests: - binlog.py, merge_sharding.py, resharding.py: Enabled healthcheck for master tablets. - worker.py: Updated stat vars check --- go/vt/discovery/healthcheck.go | 20 +++ go/vt/worker/clone_utils.go | 206 ++++++++++++---------- go/vt/worker/fake_pool_connection_test.go | 56 ++++++ go/vt/worker/split_clone.go | 126 +++++++------ go/vt/worker/split_clone_test.go | 165 +++++++++++++++-- go/vt/worker/topo_utils.go | 6 +- go/vt/worker/vertical_split_clone.go | 131 ++++++++------ go/vt/worker/vertical_split_clone_test.go | 17 +- go/vt/worker/worker.go | 37 ++-- go/vt/wrangler/keyspace.go | 8 +- test/binlog.py | 5 +- test/merge_sharding.py | 6 +- test/resharding.py | 11 +- test/utils.py | 5 - test/worker.py | 35 ++-- 15 files changed, 555 insertions(+), 279 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 4a2aaa7294..f72f61c595 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -50,6 +50,11 @@ var ( hcErrorCounters *stats.MultiCounters ) +const ( + // DefaultTopoReadConcurrency can be used as default value for the topoReadConcurrency parameter of a TopologyWatcher. + DefaultTopoReadConcurrency int = 5 +) + func init() { hcErrorCounters = stats.NewMultiCounters("HealthcheckErrors", []string{"keyspace", "shardname", "tablettype"}) } @@ -72,6 +77,20 @@ type EndPointStats struct { LastError error } +// Alias returns the alias of the tablet. +// The return value can be used e.g. to generate the input for the topo API. +func (e *EndPointStats) Alias() *topodatapb.TabletAlias { + return &topodatapb.TabletAlias{ + Cell: e.Cell, + Uid: e.EndPoint.Uid, + } +} + +// String is defined because we want to print a []*EndPointStats array nicely. +func (e *EndPointStats) String() string { + return fmt.Sprint(*e) +} + // HealthCheck defines the interface of health checking module. type HealthCheck interface { // SetListener sets the listener for healthcheck updates. It should not block. @@ -83,6 +102,7 @@ type HealthCheck interface { // GetEndPointStatsFromKeyspaceShard returns all EndPointStats for the given keyspace/shard. GetEndPointStatsFromKeyspaceShard(keyspace, shard string) []*EndPointStats // GetEndPointStatsFromTarget returns all EndPointStats for the given target. + // You can exclude unhealthy entries using the helper in utils.go. GetEndPointStatsFromTarget(keyspace, shard string, tabletType topodatapb.TabletType) []*EndPointStats // GetConnection returns the TabletConn of the given endpoint. GetConnection(endPoint *topodatapb.EndPoint) tabletconn.TabletConn diff --git a/go/vt/worker/clone_utils.go b/go/vt/worker/clone_utils.go index a590f5fdb6..cd7a54edcf 100644 --- a/go/vt/worker/clone_utils.go +++ b/go/vt/worker/clone_utils.go @@ -16,6 +16,7 @@ import ( "golang.org/x/net/context" "github.com/youtube/vitess/go/sqltypes" + "github.com/youtube/vitess/go/vt/discovery" "github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/topo/topoproto" "github.com/youtube/vitess/go/vt/wrangler" @@ -29,32 +30,6 @@ import ( // This file contains utility functions for clone workers. // -// Does a topo lookup for a single shard, and returns the tablet record of the master tablet. -func resolveDestinationShardMaster(ctx context.Context, keyspace, shard string, wr *wrangler.Wrangler) (*topo.TabletInfo, error) { - var ti *topo.TabletInfo - shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - si, err := wr.TopoServer().GetShard(shortCtx, keyspace, shard) - cancel() - if err != nil { - return ti, fmt.Errorf("unable to resolve destination shard %v/%v", keyspace, shard) - } - - if !si.HasMaster() { - return ti, fmt.Errorf("no master in destination shard %v/%v", keyspace, shard) - } - - wr.Logger().Infof("Found target master alias %v in shard %v/%v", topoproto.TabletAliasString(si.MasterAlias), keyspace, shard) - - shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) - ti, err = wr.TopoServer().GetTablet(shortCtx, si.MasterAlias) - cancel() - if err != nil { - return ti, fmt.Errorf("unable to get master tablet from alias %v in shard %v/%v: %v", - topoproto.TabletAliasString(si.MasterAlias), keyspace, shard, err) - } - return ti, nil -} - // Does a topo lookup for a single shard, and returns: // 1. Slice of all tablet aliases for the shard. // 2. Map of tablet alias : tablet record for all tablets. @@ -84,77 +59,111 @@ var errExtract = regexp.MustCompile(`\(errno (\d+)\)`) // If will keep retrying the ExecuteFetch (for a finite but longer duration) if it fails due to a timeout or a // retriable application error. // -// executeFetchWithRetries will also re-resolve the topology after errors, to be resistant to a reparent. -// It takes in a tablet record that it will initially attempt to write to, and will return the final tablet -// record that it used. -func executeFetchWithRetries(ctx context.Context, wr *wrangler.Wrangler, ti *topo.TabletInfo, r Resolver, shard string, command string) (*topo.TabletInfo, error) { +// executeFetchWithRetries will always get the current MASTER tablet from the +// healthcheck instance. If no MASTER is available, it will keep retrying. +func executeFetchWithRetries(ctx context.Context, wr *wrangler.Wrangler, healthCheck discovery.HealthCheck, keyspace, shard, command string) error { retryDuration := 2 * time.Hour - // We should keep retrying up until the retryCtx runs out + // We should keep retrying up until the retryCtx runs out. retryCtx, retryCancel := context.WithTimeout(ctx, retryDuration) defer retryCancel() // Is this current attempt a retry of a previous attempt? isRetry := false for { - tryCtx, cancel := context.WithTimeout(retryCtx, 2*time.Minute) - _, err := wr.TabletManagerClient().ExecuteFetchAsApp(tryCtx, ti, command, 0) - cancel() - if err == nil { - // success! - return ti, nil + var master *discovery.EndPointStats + var err error + + // Get the current master from the HealthCheck. + masters := discovery.GetCurrentMaster( + healthCheck.GetEndPointStatsFromTarget(keyspace, shard, topodatapb.TabletType_MASTER)) + if len(masters) == 0 { + wr.Logger().Warningf("ExecuteFetch failed for keyspace/shard %v/%v because no MASTER is available; will retry until there is MASTER again", keyspace, shard) + statsRetryCount.Add(1) + statsRetryCounters.Add(retryCategoryNoMasterAvailable, 1) + goto retry } - // If the ExecuteFetch call failed because of an application error, we will try to figure out why. - // We need to extract the MySQL error number, and will attempt to retry if we think the error is recoverable. - match := errExtract.FindStringSubmatch(err.Error()) - var errNo string - if len(match) == 2 { - errNo = match[1] - } - switch { - case wr.TabletManagerClient().IsTimeoutError(err): - wr.Logger().Warningf("ExecuteFetch failed on %v; will retry because it was a timeout error: %v", ti, err) - statsRetryCounters.Add("TimeoutError", 1) - case errNo == "1290": - wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL read-only error: %v", ti, err) - statsRetryCounters.Add("ReadOnly", 1) - case errNo == "2002" || errNo == "2006": - wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL connection error: %v", ti, err) - statsRetryCounters.Add("ConnectionError", 1) - case errNo == "1062": - if !isRetry { - return ti, fmt.Errorf("ExecuteFetch failed on %v on the first attempt; not retrying as this is not a recoverable error: %v", ti, err) + master = masters[0] + + // Run the command (in a block since goto above does not allow to introduce + // new variables until the label is reached.) + { + tryCtx, cancel := context.WithTimeout(retryCtx, 2*time.Minute) + _, err = wr.TabletManagerClient().ExecuteFetchAsApp(tryCtx, endPointToTabletInfo(master), command, 0) + cancel() + + if err == nil { + // success! + return nil + } + + succeeded, finalErr := checkError(wr, err, isRetry, master, keyspace, shard) + if succeeded { + // We can ignore the error and don't have to retry. + return nil + } + if finalErr != nil { + // Non-retryable error. + return finalErr } - wr.Logger().Infof("ExecuteFetch failed on %v with a duplicate entry error; marking this as a success, because of the likelihood that this query has already succeeded before being retried: %v", ti, err) - return ti, nil - default: - // Unknown error - return ti, err } - t := time.NewTimer(*executeFetchRetryTime) - // don't leak memory if the timer isn't triggered - defer t.Stop() + + retry: + masterAlias := "no-master-was-available" + if master != nil { + masterAlias = topoproto.TabletAliasString(master.Alias()) + } + tabletString := fmt.Sprintf("%v (%v/%v)", masterAlias, keyspace, shard) select { case <-retryCtx.Done(): if retryCtx.Err() == context.DeadlineExceeded { - return ti, fmt.Errorf("failed to connect to destination tablet %v after retrying for %v", ti, retryDuration) - } - return ti, fmt.Errorf("interrupted while trying to run %v on tablet %v", command, ti) - case <-t.C: - // Re-resolve and retry 30s after the failure - err = r.ResolveDestinationMasters(ctx) - if err != nil { - return ti, fmt.Errorf("unable to re-resolve masters for ExecuteFetch, due to: %v", err) - } - ti, err = r.GetDestinationMaster(shard) - if err != nil { - // At this point, we probably don't have a valid tablet record to return - return nil, fmt.Errorf("unable to run ExecuteFetch due to: %v", err) + return fmt.Errorf("failed to connect to destination tablet %v after retrying for %v", tabletString, retryDuration) } + return fmt.Errorf("interrupted while trying to run %v on tablet %v", command, tabletString) + case <-time.After(*executeFetchRetryTime): + // Retry 30s after the failure using the current master seen by the HealthCheck. } isRetry = true } } +// checkError returns true if the error can be ignored and the command +// succeeded, false if the error is retryable and a non-nil error if the +// command must not be retried. +func checkError(wr *wrangler.Wrangler, err error, isRetry bool, master *discovery.EndPointStats, keyspace, shard string) (bool, error) { + tabletString := fmt.Sprintf("%v (%v/%v)", topoproto.TabletAliasString(master.Alias()), keyspace, shard) + // If the ExecuteFetch call failed because of an application error, we will try to figure out why. + // We need to extract the MySQL error number, and will attempt to retry if we think the error is recoverable. + match := errExtract.FindStringSubmatch(err.Error()) + var errNo string + if len(match) == 2 { + errNo = match[1] + } + switch { + case wr.TabletManagerClient().IsTimeoutError(err): + wr.Logger().Warningf("ExecuteFetch failed on %v; will retry because it was a timeout error: %v", tabletString, err) + statsRetryCount.Add(1) + statsRetryCounters.Add(retryCategoryTimeoutError, 1) + case errNo == "1290": + wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL read-only error: %v", tabletString, err) + statsRetryCount.Add(1) + statsRetryCounters.Add(retryCategoryReadOnly, 1) + case errNo == "2002" || errNo == "2006": + wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL connection error: %v", tabletString, err) + statsRetryCount.Add(1) + statsRetryCounters.Add(retryCategoryConnectionError, 1) + case errNo == "1062": + if !isRetry { + return false, fmt.Errorf("ExecuteFetch failed on %v on the first attempt; not retrying as this is not a recoverable error: %v", tabletString, err) + } + wr.Logger().Infof("ExecuteFetch failed on %v with a duplicate entry error; marking this as a success, because of the likelihood that this query has already succeeded before being retried: %v", tabletString, err) + return true, nil + default: + // Unknown error. + return false, err + } + return false, nil +} + // fillStringTemplate returns the string template filled func fillStringTemplate(tmpl string, vars interface{}) (string, error) { myTemplate := template.Must(template.New("").Parse(tmpl)) @@ -166,20 +175,14 @@ func fillStringTemplate(tmpl string, vars interface{}) (string, error) { } // runSQLCommands will send the sql commands to the remote tablet. -func runSQLCommands(ctx context.Context, wr *wrangler.Wrangler, r Resolver, shard string, commands []string) error { - ti, err := r.GetDestinationMaster(shard) - if err != nil { - return fmt.Errorf("runSQLCommands failed: %v", err) - } - +func runSQLCommands(ctx context.Context, wr *wrangler.Wrangler, healthCheck discovery.HealthCheck, keyspace, shard, dbName string, commands []string) error { for _, command := range commands { - command, err := fillStringTemplate(command, map[string]string{"DatabaseName": ti.DbName()}) + command, err := fillStringTemplate(command, map[string]string{"DatabaseName": dbName}) if err != nil { return fmt.Errorf("fillStringTemplate failed: %v", err) } - ti, err = executeFetchWithRetries(ctx, wr, ti, r, shard, command) - if err != nil { + if err := executeFetchWithRetries(ctx, wr, healthCheck, keyspace, shard, command); err != nil { return err } } @@ -359,11 +362,7 @@ func makeValueString(fields []*querypb.Field, rows [][]sqltypes.Value) string { // executeFetchLoop loops over the provided insertChannel // and sends the commands to the provided tablet. -func executeFetchLoop(ctx context.Context, wr *wrangler.Wrangler, r Resolver, shard string, insertChannel chan string) error { - ti, err := r.GetDestinationMaster(shard) - if err != nil { - return fmt.Errorf("executeFetchLoop failed: %v", err) - } +func executeFetchLoop(ctx context.Context, wr *wrangler.Wrangler, healthCheck discovery.HealthCheck, keyspace, shard, dbName string, insertChannel chan string) error { for { select { case cmd, ok := <-insertChannel: @@ -371,9 +370,8 @@ func executeFetchLoop(ctx context.Context, wr *wrangler.Wrangler, r Resolver, sh // no more to read, we're done return nil } - cmd = "INSERT INTO `" + ti.DbName() + "`." + cmd - ti, err = executeFetchWithRetries(ctx, wr, ti, r, shard, cmd) - if err != nil { + cmd = "INSERT INTO `" + dbName + "`." + cmd + if err := executeFetchWithRetries(ctx, wr, healthCheck, keyspace, shard, cmd); err != nil { return fmt.Errorf("ExecuteFetch failed: %v", err) } case <-ctx.Done(): @@ -384,3 +382,21 @@ func executeFetchLoop(ctx context.Context, wr *wrangler.Wrangler, r Resolver, sh } } } + +// endPointToTabletInfo converts an EndPointStats object from the discovery +// package into a TabletInfo object. The latter one is required by several +// TabletManagerClient API calls. +// Note that this is a best-effort conversion and won't result into the same +// result as a call to topo.GetTablet(). +// Note: We assume that "eps" is immutable and we can reference its data. +func endPointToTabletInfo(eps *discovery.EndPointStats) *topo.TabletInfo { + return topo.NewTabletInfo(&topodatapb.Tablet{ + Alias: eps.Alias(), + Hostname: eps.EndPoint.Host, + PortMap: eps.EndPoint.PortMap, + HealthMap: eps.EndPoint.HealthMap, + Keyspace: eps.Target.Keyspace, + Shard: eps.Target.Shard, + Type: eps.Target.TabletType, + }, -1 /* version */) +} diff --git a/go/vt/worker/fake_pool_connection_test.go b/go/vt/worker/fake_pool_connection_test.go index 21095986ef..ad526a0aa3 100644 --- a/go/vt/worker/fake_pool_connection_test.go +++ b/go/vt/worker/fake_pool_connection_test.go @@ -29,6 +29,9 @@ type FakePoolConnection struct { mu sync.Mutex expectedExecuteFetch []ExpectedExecuteFetch expectedExecuteFetchIndex int + // Infinite is true when executed queries beyond our expectation list should + // respond with the last entry from the list. + infinite bool } // ExpectedExecuteFetch defines for an expected query the to be faked output. @@ -38,6 +41,9 @@ type ExpectedExecuteFetch struct { WantFields bool QueryResult *sqltypes.Result Error error + // AfterFunc is a callback which is executed while the query is executed i.e., + // before the fake responds to the client. + AfterFunc func() } // NewFakePoolConnectionQuery creates a new fake database. @@ -50,6 +56,13 @@ func (f *FakePoolConnection) addExpectedExecuteFetch(entry ExpectedExecuteFetch) f.addExpectedExecuteFetchAtIndex(appendEntry, entry) } +func (f *FakePoolConnection) enableInfinite() { + f.mu.Lock() + defer f.mu.Unlock() + + f.infinite = true +} + // addExpectedExecuteFetchAtIndex inserts a new entry at index. // index values start at 0. func (f *FakePoolConnection) addExpectedExecuteFetchAtIndex(index int, entry ExpectedExecuteFetch) { @@ -87,6 +100,35 @@ func (f *FakePoolConnection) addExpectedQueryAtIndex(index int, query string, er }) } +// getEntry returns the expected entry at "index". If index is out of bounds, +// the return value will be nil. +func (f *FakePoolConnection) getEntry(index int) *ExpectedExecuteFetch { + f.mu.Lock() + defer f.mu.Unlock() + + if index < 0 || index >= len(f.expectedExecuteFetch) { + return nil + } + + return &f.expectedExecuteFetch[index] +} + +func (f *FakePoolConnection) deleteAllEntriesAfterIndex(index int) { + f.mu.Lock() + defer f.mu.Unlock() + + if index < 0 || index >= len(f.expectedExecuteFetch) { + return + } + + if index+1 < f.expectedExecuteFetchIndex { + // Don't delete entries which were already answered. + return + } + + f.expectedExecuteFetch = f.expectedExecuteFetch[:index+1] +} + // verifyAllExecutedOrFail checks that all expected queries where actually // received and executed. If not, it will let the test fail. func (f *FakePoolConnection) verifyAllExecutedOrFail() { @@ -110,6 +152,11 @@ func (f *FakePoolConnection) ExecuteFetch(query string, maxrows int, wantfields defer f.mu.Unlock() index := f.expectedExecuteFetchIndex + if f.infinite && index == len(f.expectedExecuteFetch) { + // Although we already executed all queries, we'll continue to answer the + // last one in the infinite mode. + index-- + } if index >= len(f.expectedExecuteFetch) { f.t.Errorf("%v: got unexpected out of bound fetch: %v >= %v", f.name, index, len(f.expectedExecuteFetch)) return nil, errors.New("unexpected out of bound fetch") @@ -117,6 +164,15 @@ func (f *FakePoolConnection) ExecuteFetch(query string, maxrows int, wantfields entry := f.expectedExecuteFetch[index] f.expectedExecuteFetchIndex++ + // If the infinite mode is on, reverse the increment and keep the index at + // len(f.expectedExecuteFetch). + if f.infinite && f.expectedExecuteFetchIndex > len(f.expectedExecuteFetch) { + f.expectedExecuteFetchIndex-- + } + + if entry.AfterFunc != nil { + defer entry.AfterFunc() + } expected := entry.Query if strings.HasSuffix(expected, "*") { diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go index e8dd20362d..3cfee11821 100644 --- a/go/vt/worker/split_clone.go +++ b/go/vt/worker/split_clone.go @@ -17,6 +17,7 @@ import ( "github.com/youtube/vitess/go/event" "github.com/youtube/vitess/go/sync2" "github.com/youtube/vitess/go/vt/binlog/binlogplayer" + "github.com/youtube/vitess/go/vt/discovery" "github.com/youtube/vitess/go/vt/mysqlctl/tmutils" "github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/topo/topoproto" @@ -55,6 +56,18 @@ type SplitCloneWorker struct { // populated during WorkerStateFindTargets, read-only after that sourceAliases []*topodatapb.TabletAlias sourceTablets []*topo.TabletInfo + // healthCheck tracks the health of all MASTER and REPLICA tablets. + // It must be closed at the end of the command. + healthCheck discovery.HealthCheck + // destinationShardWatchers contains a TopologyWatcher for each destination + // shard. It updates the list of endpoints in the healthcheck if replicas are + // added/removed. + // Each watcher must be stopped at the end of the command. + destinationShardWatchers []*discovery.TopologyWatcher + // destinationDbNames stores for each destination keyspace/shard the MySQL + // database name. + // Example Map Entry: test_keyspace/-80 => vt_test_keyspace + destinationDbNames map[string]string // populated during WorkerStateCopy // tableStatusList holds the status for each table. @@ -65,13 +78,6 @@ type SplitCloneWorker struct { reloadTablets []map[topodatapb.TabletAlias]*topo.TabletInfo ev *events.SplitClone - - // Mutex to protect fields that might change when (re)resolving topology. - // TODO(aaijazi): we might want to have a Mutex per shard. Having a single mutex - // could become a bottleneck, as it needs to be locked for every ExecuteFetch. - resolveMu sync.Mutex - destinationShardsToTablets map[string]*topo.TabletInfo - resolveTime time.Time } // NewSplitCloneWorker returns a new SplitCloneWorker object. @@ -95,6 +101,8 @@ func NewSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, ex minHealthyRdonlyEndPoints: minHealthyRdonlyEndPoints, cleaner: &wrangler.Cleaner{}, + destinationDbNames: make(map[string]string), + ev: &events.SplitClone{ Cell: cell, Keyspace: keyspace, @@ -169,9 +177,13 @@ func (scw *SplitCloneWorker) StatusAsText() string { // Run implements the Worker interface func (scw *SplitCloneWorker) Run(ctx context.Context) error { resetVars() + + // Run the command. err := scw.run(ctx) + // Cleanup. scw.setState(WorkerStateCleanUp) + // Reverse any changes e.g. setting the tablet type of a source RDONLY tablet. cerr := scw.cleaner.CleanUp(scw.wr) if cerr != nil { if err != nil { @@ -180,6 +192,17 @@ func (scw *SplitCloneWorker) Run(ctx context.Context) error { err = cerr } } + + // Stop healthcheck. + for _, watcher := range scw.destinationShardWatchers { + watcher.Stop() + } + if scw.healthCheck != nil { + if err := scw.healthCheck.Close(); err != nil { + scw.wr.Logger().Errorf("HealthCheck.Close() failed: %v", err) + } + } + if err != nil { scw.setErrorState(err) return err @@ -322,49 +345,45 @@ func (scw *SplitCloneWorker) findTargets(ctx context.Context) error { action.TabletType = topodatapb.TabletType_SPARE } - return scw.ResolveDestinationMasters(ctx) -} - -// ResolveDestinationMasters implements the Resolver interface. -// It will attempt to resolve all shards and update scw.destinationShardsToTablets; -// if it is unable to do so, it will not modify scw.destinationShardsToTablets at all. -func (scw *SplitCloneWorker) ResolveDestinationMasters(ctx context.Context) error { - statsDestinationAttemptedResolves.Add(1) - destinationShardsToTablets := make(map[string]*topo.TabletInfo) - - // Allow at most one resolution request at a time; if there are concurrent requests, only - // one of them will actually hit the topo server. - scw.resolveMu.Lock() - defer scw.resolveMu.Unlock() - - // If the last resolution was fresh enough, return it. - if time.Now().Sub(scw.resolveTime) < *resolveTTL { - return nil - } - + // Initialize healthcheck and add destination shards to it. + scw.healthCheck = discovery.NewHealthCheck(*remoteActionsTimeout, *healthcheckRetryDelay, *healthCheckTimeout, "" /* statsSuffix */) for _, si := range scw.destinationShards { - ti, err := resolveDestinationShardMaster(ctx, si.Keyspace(), si.ShardName(), scw.wr) - if err != nil { - return err - } - destinationShardsToTablets[si.ShardName()] = ti + watcher := discovery.NewShardReplicationWatcher(scw.wr.TopoServer(), scw.healthCheck, + scw.cell, si.Keyspace(), si.ShardName(), + *healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency) + scw.destinationShardWatchers = append(scw.destinationShardWatchers, watcher) } - scw.destinationShardsToTablets = destinationShardsToTablets - // save the time of the last successful resolution - scw.resolveTime = time.Now() - statsDestinationActualResolves.Add(1) - return nil -} -// GetDestinationMaster implements the Resolver interface -func (scw *SplitCloneWorker) GetDestinationMaster(shardName string) (*topo.TabletInfo, error) { - scw.resolveMu.Lock() - ti, ok := scw.destinationShardsToTablets[shardName] - scw.resolveMu.Unlock() - if !ok { - return nil, fmt.Errorf("no tablet found for destination shard %v", shardName) + // Make sure we find a master for each destination shard and log it. + scw.wr.Logger().Infof("Finding a MASTER tablet for each destination shard...") + for _, si := range scw.destinationShards { + waitCtx, waitCancel := context.WithTimeout(ctx, 10*time.Second) + defer waitCancel() + if err := discovery.WaitForEndPoints(waitCtx, scw.healthCheck, + scw.cell, si.Keyspace(), si.ShardName(), []topodatapb.TabletType{topodatapb.TabletType_MASTER}); err != nil { + return fmt.Errorf("cannot find MASTER tablet for destination shard for %v/%v: %v", si.Keyspace(), si.ShardName(), err) + } + masters := discovery.GetCurrentMaster( + scw.healthCheck.GetEndPointStatsFromTarget(si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER)) + if len(masters) == 0 { + return fmt.Errorf("cannot find MASTER tablet for destination shard for %v/%v in HealthCheck: empty EndPointStats list", si.Keyspace(), si.ShardName()) + } + master := masters[0] + + // Get the MySQL database name of the tablet. + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + ti, err := scw.wr.TopoServer().GetTablet(shortCtx, master.Alias()) + cancel() + if err != nil { + return fmt.Errorf("cannot get the TabletInfo for destination master (%v) to find out its db name: %v", topoproto.TabletAliasString(master.Alias()), err) + } + keyspaceAndShard := topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName()) + scw.destinationDbNames[keyspaceAndShard] = ti.DbName() + + scw.wr.Logger().Infof("Using tablet %v as destination master for %v/%v", topoproto.TabletAliasString(master.Alias()), si.Keyspace(), si.ShardName()) } - return ti, nil + scw.wr.Logger().Infof("NOTE: The used master of a destination shard might change over the course of the copy e.g. due to a reparent. The HealthCheck module will track and log master changes and any error message will always refer the actually used master address.") + return nil } // Find all tablets on all destination shards. This should be done immediately before reloading @@ -435,17 +454,19 @@ func (scw *SplitCloneWorker) copy(ctx context.Context) error { // destinationWriterCount go routines reading from it. insertChannels[shardIndex] = make(chan string, scw.destinationWriterCount*2) - go func(shardName string, insertChannel chan string) { + go func(keyspace, shard string, insertChannel chan string) { for j := 0; j < scw.destinationWriterCount; j++ { destinationWaitGroup.Add(1) go func() { defer destinationWaitGroup.Done() - if err := executeFetchLoop(ctx, scw.wr, scw, shardName, insertChannel); err != nil { + + keyspaceAndShard := topoproto.KeyspaceShardString(keyspace, shard) + if err := executeFetchLoop(ctx, scw.wr, scw.healthCheck, keyspace, shard, scw.destinationDbNames[keyspaceAndShard], insertChannel); err != nil { processError("executeFetchLoop failed: %v", err) } }() } - }(si.ShardName(), insertChannels[shardIndex]) + }(si.Keyspace(), si.ShardName(), insertChannels[shardIndex]) } // read the vschema if needed @@ -564,13 +585,14 @@ func (scw *SplitCloneWorker) copy(ctx context.Context) error { for _, si := range scw.destinationShards { destinationWaitGroup.Add(1) - go func(shardName string) { + go func(keyspace, shard string) { defer destinationWaitGroup.Done() scw.wr.Logger().Infof("Making and populating blp_checkpoint table") - if err := runSQLCommands(ctx, scw.wr, scw, shardName, queries); err != nil { + keyspaceAndShard := topoproto.KeyspaceShardString(keyspace, shard) + if err := runSQLCommands(ctx, scw.wr, scw.healthCheck, keyspace, shard, scw.destinationDbNames[keyspaceAndShard], queries); err != nil { processError("blp_checkpoint queries failed: %v", err) } - }(si.ShardName()) + }(si.Keyspace(), si.ShardName()) } destinationWaitGroup.Wait() if firstError != nil { diff --git a/go/vt/worker/split_clone_test.go b/go/vt/worker/split_clone_test.go index ba02a584ee..28e4a6c3b4 100644 --- a/go/vt/worker/split_clone_test.go +++ b/go/vt/worker/split_clone_test.go @@ -15,6 +15,7 @@ import ( "github.com/youtube/vitess/go/sqltypes" "github.com/youtube/vitess/go/vt/mysqlctl/replication" "github.com/youtube/vitess/go/vt/mysqlctl/tmutils" + "github.com/youtube/vitess/go/vt/tabletmanager/tmclient" "github.com/youtube/vitess/go/vt/tabletserver/grpcqueryservice" "github.com/youtube/vitess/go/vt/tabletserver/queryservice/fakes" "github.com/youtube/vitess/go/vt/topo" @@ -51,6 +52,11 @@ type splitCloneTestCase struct { rightMasterFakeDb *FakePoolConnection rightMasterQs *fakes.StreamHealthQueryService + // leftReplica is used by the reparent test. + leftReplica *testlib.FakeTablet + leftReplicaFakeDb *FakePoolConnection + leftReplicaQs *fakes.StreamHealthQueryService + // defaultWorkerArgs are the full default arguments to run SplitClone. defaultWorkerArgs []string } @@ -114,13 +120,17 @@ func (tc *splitCloneTestCase) setUp(v3 bool) { topodatapb.TabletType_MASTER, db, testlib.TabletKeyspaceShard(tc.t, "ks", "-40")) leftRdonly := testlib.NewFakeTablet(tc.t, tc.wi.wr, "cell1", 11, topodatapb.TabletType_RDONLY, db, testlib.TabletKeyspaceShard(tc.t, "ks", "-40")) + // leftReplica is used by the reparent test. + leftReplica := testlib.NewFakeTablet(tc.t, tc.wi.wr, "cell1", 12, + topodatapb.TabletType_REPLICA, db, testlib.TabletKeyspaceShard(tc.t, "ks", "-40")) + tc.leftReplica = leftReplica rightMaster := testlib.NewFakeTablet(tc.t, tc.wi.wr, "cell1", 20, topodatapb.TabletType_MASTER, db, testlib.TabletKeyspaceShard(tc.t, "ks", "40-80")) rightRdonly := testlib.NewFakeTablet(tc.t, tc.wi.wr, "cell1", 21, topodatapb.TabletType_RDONLY, db, testlib.TabletKeyspaceShard(tc.t, "ks", "40-80")) - tc.tablets = []*testlib.FakeTablet{sourceMaster, sourceRdonly1, sourceRdonly2, leftMaster, leftRdonly, rightMaster, rightRdonly} + tc.tablets = []*testlib.FakeTablet{sourceMaster, sourceRdonly1, sourceRdonly2, leftMaster, leftRdonly, tc.leftReplica, rightMaster, rightRdonly} for _, ft := range tc.tablets { ft.StartActionLoop(tc.t, tc.wi.wr) @@ -176,6 +186,7 @@ func (tc *splitCloneTestCase) setUp(v3 bool) { // containing half of the rows, i.e. 2 + 2 + 1 rows). So 3 * 10 // = 30 insert statements on each destination. tc.leftMasterFakeDb = NewFakePoolConnectionQuery(tc.t, "leftMaster") + tc.leftReplicaFakeDb = NewFakePoolConnectionQuery(tc.t, "leftReplica") tc.rightMasterFakeDb = NewFakePoolConnectionQuery(tc.t, "rightMaster") for i := 1; i <= 30; i++ { @@ -187,8 +198,20 @@ func (tc *splitCloneTestCase) setUp(v3 bool) { expectBlpCheckpointCreationQueries(tc.rightMasterFakeDb) leftMaster.FakeMysqlDaemon.DbAppConnectionFactory = tc.leftMasterFakeDb.getFactory() + leftReplica.FakeMysqlDaemon.DbAppConnectionFactory = tc.leftReplicaFakeDb.getFactory() rightMaster.FakeMysqlDaemon.DbAppConnectionFactory = tc.rightMasterFakeDb.getFactory() + // Fake stream health reponses because vtworker needs them to find the master. + tc.leftMasterQs = fakes.NewStreamHealthQueryService(leftMaster.Target()) + tc.leftMasterQs.AddDefaultHealthResponse() + tc.leftReplicaQs = fakes.NewStreamHealthQueryService(leftReplica.Target()) + tc.leftReplicaQs.AddDefaultHealthResponse() + tc.rightMasterQs = fakes.NewStreamHealthQueryService(rightMaster.Target()) + tc.rightMasterQs.AddDefaultHealthResponse() + grpcqueryservice.RegisterForTest(leftMaster.RPCServer, tc.leftMasterQs) + grpcqueryservice.RegisterForTest(leftReplica.RPCServer, tc.leftReplicaQs) + grpcqueryservice.RegisterForTest(rightMaster.RPCServer, tc.rightMasterQs) + tc.defaultWorkerArgs = []string{ "SplitClone", "-source_reader_count", "10", @@ -203,6 +226,7 @@ func (tc *splitCloneTestCase) tearDown() { ft.StopActionLoop(tc.t) } tc.leftMasterFakeDb.verifyAllExecutedOrFail() + tc.leftReplicaFakeDb.verifyAllExecutedOrFail() tc.rightMasterFakeDb.verifyAllExecutedOrFail() } @@ -300,17 +324,138 @@ func TestSplitCloneV2_RetryDueToReadonly(t *testing.T) { t.Fatal(err) } - if statsDestinationAttemptedResolves.String() != "3" { - t.Errorf("Wrong statsDestinationAttemptedResolves: wanted %v, got %v", "3", statsDestinationAttemptedResolves.String()) - } - if statsDestinationActualResolves.String() != "1" { - t.Errorf("Wrong statsDestinationActualResolves: wanted %v, got %v", "1", statsDestinationActualResolves.String()) - } - if statsRetryCounters.String() != "{\"ReadOnly\": 2}" { - t.Errorf("Wrong statsRetryCounters: wanted %v, got %v", "{\"ReadOnly\": 2}", statsRetryCounters.String()) - } + wantRetryCount := int64(2) + if got := statsRetryCount.Get(); got != wantRetryCount { + t.Errorf("Wrong statsRetryCounter: got %v, wanted %v", got, wantRetryCount) + } + wantRetryReadOnlyCount := int64(2) + if got := statsRetryCounters.Counts()[retryCategoryReadOnly]; got != wantRetryReadOnlyCount { + t.Errorf("Wrong statsRetryCounters: got %v, wanted %v", got, wantRetryReadOnlyCount) + } } +// TestSplitCloneV2_RetryDueToReparent tests that vtworker correctly failovers +// during a reparent. +// NOTE: worker.py is an end-to-end test which tests this as well. +func TestSplitCloneV2_RetryDueToReparent(t *testing.T) { + tc := &splitCloneTestCase{t: t} + tc.setUp(false /* v3 */) + defer tc.tearDown() + + // Provoke a reparent just before the copy finishes. + // leftReplica will take over for the last, 30th, insert and the BLP checkpoint. + tc.leftReplicaFakeDb.addExpectedQuery("INSERT INTO `vt_ks`.table1(id, msg, keyspace_id) VALUES (*", nil) + expectBlpCheckpointCreationQueries(tc.leftReplicaFakeDb) + + // Do not let leftMaster succeed the 30th write. + tc.leftMasterFakeDb.deleteAllEntriesAfterIndex(28) + tc.leftMasterFakeDb.addExpectedQuery("INSERT INTO `vt_ks`.table1(id, msg, keyspace_id) VALUES (*", errReadOnly) + tc.leftMasterFakeDb.enableInfinite() + // When vtworker encounters the readonly error on leftMaster, do the reparent. + tc.leftMasterFakeDb.getEntry(29).AfterFunc = func() { + // Reparent from leftMaster to leftReplica. + // NOTE: This step is actually not necessary due to our fakes which bypass + // a lot of logic. Let's keep it for correctness though. + ti, err := tc.ts.GetTablet(context.Background(), tc.leftReplica.Tablet.Alias) + if err != nil { + t.Fatalf("GetTablet failed: %v", err) + } + tmc := tmclient.NewTabletManagerClient() + if err := tmc.TabletExternallyReparented(context.Background(), ti, "wait id 1"); err != nil { + t.Fatalf("TabletExternallyReparented(replica) failed: %v", err) + } + + // Update targets in fake query service and send out a new health response. + tc.leftMasterQs.UpdateType(topodatapb.TabletType_REPLICA) + tc.leftMasterQs.AddDefaultHealthResponse() + tc.leftReplicaQs.UpdateType(topodatapb.TabletType_MASTER) + tc.leftReplicaQs.AddDefaultHealthResponse() + + // After this, vtworker will retry. The following situations can occur: + // 1. HealthCheck picked up leftReplica as new MASTER + // => retry will succeed. + // 2. HealthCheck picked up no changes (leftMaster remains MASTER) + // => retry will hit leftMaster which keeps responding with readonly err. + // 3. HealthCheck picked up leftMaster as REPLICA, but leftReplica is still + // a REPLICA. + // => vtworker has no MASTER to go to and will keep retrying. + } + + // Only wait 1 ms between retries, so that the test passes faster. + *executeFetchRetryTime = 1 * time.Millisecond + + // Run the vtworker command. + if err := runCommand(t, tc.wi, tc.wi.wr, tc.defaultWorkerArgs); err != nil { + t.Fatal(err) + } + + wantRetryCountMin := int64(1) + if got := statsRetryCount.Get(); got < wantRetryCountMin { + t.Errorf("Wrong statsRetryCounter: got %v, wanted >= %v", got, wantRetryCountMin) + } +} + +// TestSplitCloneV2_NoMasterAvailable tests that vtworker correctly retries +// even in a period where no MASTER tablet is available according to the +// HealthCheck instance. +func TestSplitCloneV2_NoMasterAvailable(t *testing.T) { + tc := &splitCloneTestCase{t: t} + tc.setUp(false /* v3 */) + defer tc.tearDown() + + // leftReplica will take over for the last, 30th, insert and the BLP checkpoint. + tc.leftReplicaFakeDb.addExpectedQuery("INSERT INTO `vt_ks`.table1(id, msg, keyspace_id) VALUES (*", nil) + expectBlpCheckpointCreationQueries(tc.leftReplicaFakeDb) + + // During the 29th write, let the MASTER disappear. + tc.leftMasterFakeDb.getEntry(28).AfterFunc = func() { + tc.leftMasterQs.UpdateType(topodatapb.TabletType_REPLICA) + tc.leftMasterQs.AddDefaultHealthResponse() + } + + // If the HealthCheck didn't pick up the change yet, the 30th write would + // succeed. To prevent this from happening, replace it with an error. + tc.leftMasterFakeDb.deleteAllEntriesAfterIndex(28) + tc.leftMasterFakeDb.addExpectedQuery("INSERT INTO `vt_ks`.table1(id, msg, keyspace_id) VALUES (*", errReadOnly) + tc.leftMasterFakeDb.enableInfinite() + // vtworker may not retry on leftMaster again if HealthCheck picks up the + // change very fast. In that case, the error was never encountered. + // Delete it or verifyAllExecutedOrFail() will fail because it was not + // processed. + defer tc.leftMasterFakeDb.deleteAllEntriesAfterIndex(28) + + // Wait for a retry due to NoMasterAvailable to happen, expect the 30th write + // on leftReplica and change leftReplica from REPLICA to MASTER. + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + for { + if statsRetryCounters.Counts()[retryCategoryNoMasterAvailable] >= 1 { + break + } + + select { + case <-ctx.Done(): + t.Fatalf("timed out waiting for vtworker to retry due to NoMasterAvailable: %v", ctx.Err()) + case <-time.After(10 * time.Millisecond): + // Poll constantly. + } + } + + // Make leftReplica the new MASTER. + tc.leftReplicaQs.UpdateType(topodatapb.TabletType_MASTER) + tc.leftReplicaQs.AddDefaultHealthResponse() + }() + + // Only wait 1 ms between retries, so that the test passes faster. + *executeFetchRetryTime = 1 * time.Millisecond + + // Run the vtworker command. + if err := runCommand(t, tc.wi, tc.wi.wr, tc.defaultWorkerArgs); err != nil { + t.Fatal(err) + } +} func TestSplitCloneV3(t *testing.T) { tc := &splitCloneTestCase{t: t} diff --git a/go/vt/worker/topo_utils.go b/go/vt/worker/topo_utils.go index 7b1b2f69d7..9d5b6a5f7a 100644 --- a/go/vt/worker/topo_utils.go +++ b/go/vt/worker/topo_utils.go @@ -26,10 +26,6 @@ var ( // Therefore, the default for this variable must be higher // than vttablet's -health_check_interval. waitForHealthyEndPointsTimeout = flag.Duration("wait_for_healthy_rdonly_endpoints_timeout", 60*time.Second, "maximum time to wait if less than --min_healthy_rdonly_endpoints are available") - - healthCheckTopologyRefresh = flag.Duration("worker_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology") - healthcheckRetryDelay = flag.Duration("worker_healthcheck_retry_delay", 5*time.Second, "delay before retrying a failed healthcheck") - healthCheckTimeout = flag.Duration("worker_healthcheck_timeout", time.Minute, "the health check timeout period") ) // FindHealthyRdonlyEndPoint returns a random healthy endpoint. @@ -43,7 +39,7 @@ func FindHealthyRdonlyEndPoint(ctx context.Context, wr *wrangler.Wrangler, cell, // create a discovery healthcheck, wait for it to have one rdonly // endpoints at this point healthCheck := discovery.NewHealthCheck(*remoteActionsTimeout, *healthcheckRetryDelay, *healthCheckTimeout, "" /* statsSuffix */) - watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), healthCheck, cell, keyspace, shard, *healthCheckTopologyRefresh, 5 /*topoReadConcurrency*/) + watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), healthCheck, cell, keyspace, shard, *healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency) defer watcher.Stop() defer healthCheck.Close() diff --git a/go/vt/worker/vertical_split_clone.go b/go/vt/worker/vertical_split_clone.go index 17f5e69200..575d9ed4e4 100644 --- a/go/vt/worker/vertical_split_clone.go +++ b/go/vt/worker/vertical_split_clone.go @@ -19,6 +19,7 @@ import ( "github.com/youtube/vitess/go/sqltypes" "github.com/youtube/vitess/go/sync2" "github.com/youtube/vitess/go/vt/binlog/binlogplayer" + "github.com/youtube/vitess/go/vt/discovery" "github.com/youtube/vitess/go/vt/mysqlctl/tmutils" "github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/topo/topoproto" @@ -53,6 +54,18 @@ type VerticalSplitCloneWorker struct { // populated during WorkerStateFindTargets, read-only after that sourceAlias *topodatapb.TabletAlias sourceTablet *topo.TabletInfo + // healthCheck tracks the health of all MASTER and REPLICA tablets. + // It must be closed at the end of the command. + healthCheck discovery.HealthCheck + // destinationShardWatchers contains a TopologyWatcher for each destination + // shard. It updates the list of endpoints in the healthcheck if replicas are + // added/removed. + // Each watcher must be stopped at the end of the command. + destinationShardWatchers []*discovery.TopologyWatcher + // destinationDbNames stores for each destination keyspace/shard the MySQL + // database name. + // Example Map Entry: test_keyspace/-80 => vt_test_keyspace + destinationDbNames map[string]string // populated during WorkerStateCopy // tableStatusList holds the status for each table. @@ -63,11 +76,6 @@ type VerticalSplitCloneWorker struct { reloadTablets map[topodatapb.TabletAlias]*topo.TabletInfo ev *events.VerticalSplitClone - - // Mutex to protect fields that might change when (re)resolving topology. - resolveMu sync.Mutex - destinationShardsToTablets map[string]*topo.TabletInfo - resolveTime time.Time } // NewVerticalSplitCloneWorker returns a new VerticalSplitCloneWorker object. @@ -94,6 +102,8 @@ func NewVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, destinationKeyspac minHealthyRdonlyEndPoints: minHealthyRdonlyEndPoints, cleaner: &wrangler.Cleaner{}, + destinationDbNames: make(map[string]string), + ev: &events.VerticalSplitClone{ Cell: cell, Keyspace: destinationKeyspace, @@ -160,9 +170,13 @@ func (vscw *VerticalSplitCloneWorker) StatusAsText() string { // Run implements the Worker interface func (vscw *VerticalSplitCloneWorker) Run(ctx context.Context) error { resetVars() + + // Run the command. err := vscw.run(ctx) + // Cleanup. vscw.setState(WorkerStateCleanUp) + // Reverse any changes e.g. setting the tablet type of a source RDONLY tablet. cerr := vscw.cleaner.CleanUp(vscw.wr) if cerr != nil { if err != nil { @@ -171,6 +185,17 @@ func (vscw *VerticalSplitCloneWorker) Run(ctx context.Context) error { err = cerr } } + + // Stop healthcheck. + for _, watcher := range vscw.destinationShardWatchers { + watcher.Stop() + } + if vscw.healthCheck != nil { + if err := vscw.healthCheck.Close(); err != nil { + vscw.wr.Logger().Errorf("HealthCheck.Close() failed: %v", err) + } + } + if err != nil { vscw.setErrorState(err) return err @@ -290,44 +315,41 @@ func (vscw *VerticalSplitCloneWorker) findTargets(ctx context.Context) error { } action.TabletType = topodatapb.TabletType_SPARE - return vscw.ResolveDestinationMasters(ctx) -} + // Initialize healthcheck and add destination shards to it. + vscw.healthCheck = discovery.NewHealthCheck(*remoteActionsTimeout, *healthcheckRetryDelay, *healthCheckTimeout, "" /* statsSuffix */) + watcher := discovery.NewShardReplicationWatcher(vscw.wr.TopoServer(), vscw.healthCheck, + vscw.cell, vscw.destinationKeyspace, vscw.destinationShard, + *healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency) + vscw.destinationShardWatchers = append(vscw.destinationShardWatchers, watcher) -// ResolveDestinationMasters implements the Resolver interface. -// It will attempt to resolve all shards and update vscw.destinationShardsToTablets; -// if it is unable to do so, it will not modify vscw.destinationShardsToTablets at all. -func (vscw *VerticalSplitCloneWorker) ResolveDestinationMasters(ctx context.Context) error { - statsDestinationAttemptedResolves.Add(1) - // Allow at most one resolution request at a time; if there are concurrent requests, only - // one of them will actualy hit the topo server. - vscw.resolveMu.Lock() - defer vscw.resolveMu.Unlock() - - // If the last resolution was fresh enough, return it. - if time.Now().Sub(vscw.resolveTime) < *resolveTTL { - return nil + // Make sure we find a master for each destination shard and log it. + vscw.wr.Logger().Infof("Finding a MASTER tablet for each destination shard...") + waitCtx, waitCancel := context.WithTimeout(ctx, 10*time.Second) + defer waitCancel() + if err := discovery.WaitForEndPoints(waitCtx, vscw.healthCheck, + vscw.cell, vscw.destinationKeyspace, vscw.destinationShard, []topodatapb.TabletType{topodatapb.TabletType_MASTER}); err != nil { + return fmt.Errorf("cannot find MASTER tablet for destination shard for %v/%v: %v", vscw.destinationKeyspace, vscw.destinationShard, err) } + masters := discovery.GetCurrentMaster( + vscw.healthCheck.GetEndPointStatsFromTarget(vscw.destinationKeyspace, vscw.destinationShard, topodatapb.TabletType_MASTER)) + if len(masters) == 0 { + return fmt.Errorf("cannot find MASTER tablet for destination shard for %v/%v in HealthCheck: empty EndPointStats list", vscw.destinationKeyspace, vscw.destinationShard) + } + master := masters[0] - ti, err := resolveDestinationShardMaster(ctx, vscw.destinationKeyspace, vscw.destinationShard, vscw.wr) + // Get the MySQL database name of the tablet. + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) + ti, err := vscw.wr.TopoServer().GetTablet(shortCtx, master.Alias()) + cancel() if err != nil { - return err + return fmt.Errorf("cannot get the TabletInfo for destination master (%v) to find out its db name: %v", topoproto.TabletAliasString(master.Alias()), err) } - vscw.destinationShardsToTablets = map[string]*topo.TabletInfo{vscw.destinationShard: ti} - // save the time of the last successful resolution - vscw.resolveTime = time.Now() - statsDestinationActualResolves.Add(1) - return nil -} + keyspaceAndShard := topoproto.KeyspaceShardString(vscw.destinationKeyspace, vscw.destinationShard) + vscw.destinationDbNames[keyspaceAndShard] = ti.DbName() -// GetDestinationMaster implements the Resolver interface -func (vscw *VerticalSplitCloneWorker) GetDestinationMaster(shardName string) (*topo.TabletInfo, error) { - vscw.resolveMu.Lock() - defer vscw.resolveMu.Unlock() - ti, ok := vscw.destinationShardsToTablets[shardName] - if !ok { - return nil, fmt.Errorf("no tablet found for destination shard %v", shardName) - } - return ti, nil + vscw.wr.Logger().Infof("Using tablet %v as destination master for %v/%v", topoproto.TabletAliasString(master.Alias()), vscw.destinationKeyspace, vscw.destinationShard) + vscw.wr.Logger().Infof("NOTE: The used master of a destination shard might change over the course of the copy e.g. due to a reparent. The HealthCheck module will track and log master changes and any error message will always refer the actually used master address.") + return nil } // Find all tablets on the destination shard. This should be done immediately before reloading @@ -389,18 +411,17 @@ func (vscw *VerticalSplitCloneWorker) copy(ctx context.Context) error { // destinationWriterCount go routines reading from it. insertChannel := make(chan string, vscw.destinationWriterCount*2) - go func(shardName string, insertChannel chan string) { - for j := 0; j < vscw.destinationWriterCount; j++ { - destinationWaitGroup.Add(1) - go func() { - defer destinationWaitGroup.Done() + for j := 0; j < vscw.destinationWriterCount; j++ { + destinationWaitGroup.Add(1) + go func() { + defer destinationWaitGroup.Done() - if err := executeFetchLoop(ctx, vscw.wr, vscw, shardName, insertChannel); err != nil { - processError("executeFetchLoop failed: %v", err) - } - }() - } - }(vscw.destinationShard, insertChannel) + keyspaceAndShard := topoproto.KeyspaceShardString(vscw.destinationKeyspace, vscw.destinationShard) + if err := executeFetchLoop(ctx, vscw.wr, vscw.healthCheck, vscw.destinationKeyspace, vscw.destinationShard, vscw.destinationDbNames[keyspaceAndShard], insertChannel); err != nil { + processError("executeFetchLoop failed: %v", err) + } + }() + } // Now for each table, read data chunks and send them to insertChannel sourceWaitGroup := sync.WaitGroup{} @@ -470,15 +491,11 @@ func (vscw *VerticalSplitCloneWorker) copy(ctx context.Context) error { flags = binlogplayer.BlpFlagDontStart } queries = append(queries, binlogplayer.PopulateBlpCheckpoint(0, status.Position, time.Now().Unix(), flags)) - destinationWaitGroup.Add(1) - go func(shardName string) { - defer destinationWaitGroup.Done() - vscw.wr.Logger().Infof("Making and populating blp_checkpoint table") - if err := runSQLCommands(ctx, vscw.wr, vscw, shardName, queries); err != nil { - processError("blp_checkpoint queries failed: %v", err) - } - }(vscw.destinationShard) - destinationWaitGroup.Wait() + vscw.wr.Logger().Infof("Making and populating blp_checkpoint table") + keyspaceAndShard := topoproto.KeyspaceShardString(vscw.destinationKeyspace, vscw.destinationShard) + if err := runSQLCommands(ctx, vscw.wr, vscw.healthCheck, vscw.destinationKeyspace, vscw.destinationShard, vscw.destinationDbNames[keyspaceAndShard], queries); err != nil { + processError("blp_checkpoint queries failed: %v", err) + } if firstError != nil { return firstError } diff --git a/go/vt/worker/vertical_split_clone_test.go b/go/vt/worker/vertical_split_clone_test.go index 91ababa129..f0f425b0cb 100644 --- a/go/vt/worker/vertical_split_clone_test.go +++ b/go/vt/worker/vertical_split_clone_test.go @@ -200,6 +200,10 @@ func TestVerticalSplitClone(t *testing.T) { defer destMasterFakeDb.verifyAllExecutedOrFail() destMaster.FakeMysqlDaemon.DbAppConnectionFactory = destMasterFakeDb.getFactory() + // Fake stream health reponses because vtworker needs them to find the master. + qs := fakes.NewStreamHealthQueryService(destMaster.Target()) + qs.AddDefaultHealthResponse() + grpcqueryservice.RegisterForTest(destMaster.RPCServer, qs) // Only wait 1 ms between retries, so that the test passes faster *executeFetchRetryTime = (1 * time.Millisecond) @@ -217,13 +221,12 @@ func TestVerticalSplitClone(t *testing.T) { t.Fatal(err) } - if statsDestinationAttemptedResolves.String() != "2" { - t.Errorf("Wrong statsDestinationAttemptedResolves: wanted %v, got %v", "2", statsDestinationAttemptedResolves.String()) + wantRetryCount := int64(1) + if got := statsRetryCount.Get(); got != wantRetryCount { + t.Errorf("Wrong statsRetryCounter: got %v, wanted %v", got, wantRetryCount) } - if statsDestinationActualResolves.String() != "1" { - t.Errorf("Wrong statsDestinationActualResolves: wanted %v, got %v", "1", statsDestinationActualResolves.String()) - } - if statsRetryCounters.String() != "{\"ReadOnly\": 1}" { - t.Errorf("Wrong statsRetryCounters: wanted %v, got %v", "{\"ReadOnly\": 1}", statsRetryCounters.String()) + wantRetryReadOnlyCount := int64(1) + if got := statsRetryCounters.Counts()[retryCategoryReadOnly]; got != wantRetryReadOnlyCount { + t.Errorf("Wrong statsRetryCounters: got %v, wanted %v", got, wantRetryReadOnlyCount) } } diff --git a/go/vt/worker/worker.go b/go/vt/worker/worker.go index 67a240c98a..4c69daf739 100644 --- a/go/vt/worker/worker.go +++ b/go/vt/worker/worker.go @@ -16,7 +16,6 @@ import ( "golang.org/x/net/context" "github.com/youtube/vitess/go/stats" - "github.com/youtube/vitess/go/vt/topo" ) // Worker is the base interface for all long running workers. @@ -36,38 +35,34 @@ type Worker interface { Run(context.Context) error } -// Resolver is an interface that should be implemented by any workers that need to -// resolve the topology. -type Resolver interface { - // ResolveDestinationMasters forces the worker to (re)resolve the topology and update - // the destination masters that it knows about. - ResolveDestinationMasters(ctx context.Context) error - - // GetDestinationMaster returns the most recently resolved destination master for a particular shard. - GetDestinationMaster(shardName string) (*topo.TabletInfo, error) -} - var ( - resolveTTL = flag.Duration("resolve_ttl", 15*time.Second, "Amount of time that a topo resolution can be cached for") executeFetchRetryTime = flag.Duration("executefetch_retry_time", 30*time.Second, "Amount of time we should wait before retrying ExecuteFetch calls") remoteActionsTimeout = flag.Duration("remote_actions_timeout", time.Minute, "Amount of time to wait for remote actions (like replication stop, ...)") useV3ReshardingMode = flag.Bool("use_v3_resharding_mode", false, "True iff the workers should use V3-style resharding, which doesn't require a preset sharding key column.") + healthCheckTopologyRefresh = flag.Duration("worker_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology") + healthcheckRetryDelay = flag.Duration("worker_healthcheck_retry_delay", 5*time.Second, "delay before retrying a failed healthcheck") + healthCheckTimeout = flag.Duration("worker_healthcheck_timeout", time.Minute, "the health check timeout period") + statsState = stats.NewString("WorkerState") - // the number of times that the worker attempst to reresolve the masters - statsDestinationAttemptedResolves = stats.NewInt("WorkerDestinationAttemptedResolves") - // the number of times that the worker actually hits the topo server, i.e., they don't - // use a cached topology - statsDestinationActualResolves = stats.NewInt("WorkerDestinationActualResolves") - statsRetryCounters = stats.NewCounters("WorkerRetryCount") + // statsRetryCount is the total number of times a query to vttablet had to be retried. + statsRetryCount = stats.NewInt("WorkerRetryCount") + // statsRetryCount groups the number of retries by category e.g. "TimeoutError" or "Readonly". + statsRetryCounters = stats.NewCounters("WorkerRetryCounters") +) + +const ( + retryCategoryReadOnly = "ReadOnly" + retryCategoryTimeoutError = "TimeoutError" + retryCategoryConnectionError = "ConnectionError" + retryCategoryNoMasterAvailable = "NoMasterAvailable" ) // resetVars resets the debug variables that are meant to provide information on a // per-run basis. This should be called at the beginning of each worker run. func resetVars() { statsState.Set("") - statsDestinationAttemptedResolves.Set(0) - statsDestinationActualResolves.Set(0) + statsRetryCount.Set(0) statsRetryCounters.Reset() } diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index af7e85da9c..2b14063003 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -485,7 +485,7 @@ func (wr *Wrangler) waitForDrainInCell(ctx context.Context, cell, keyspace, shar retryDelay, healthCheckTopologyRefresh, healthcheckRetryDelay, healthCheckTimeout time.Duration) error { hc := discovery.NewHealthCheck(healthCheckTimeout /* connectTimeout */, healthcheckRetryDelay, healthCheckTimeout, cell) defer hc.Close() - watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), hc, cell, keyspace, shard, healthCheckTopologyRefresh, 5 /* topoReadConcurrency */) + watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), hc, cell, keyspace, shard, healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency) defer watcher.Stop() if err := discovery.WaitForEndPoints(ctx, hc, cell, keyspace, shard, []topodatapb.TabletType{servedType}); err != nil { @@ -554,11 +554,7 @@ func formatEndpointStats(eps *discovery.EndPointStats) string { if webPort, ok := eps.EndPoint.PortMap["vt"]; ok { webURL = fmt.Sprintf("http://%v:%d/", eps.EndPoint.Host, webPort) } - alias := &topodatapb.TabletAlias{ - Cell: eps.Cell, - Uid: eps.EndPoint.Uid, - } - return fmt.Sprintf("%v: %v stats: %v", topoproto.TabletAliasString(alias), webURL, eps.Stats) + return fmt.Sprintf("%v: %v stats: %v", topoproto.TabletAliasString(eps.Alias()), webURL, eps.Stats) } // MigrateServedFrom is used during vertical splits to migrate a diff --git a/test/binlog.py b/test/binlog.py index 4a5df67098..e35e9946ce 100755 --- a/test/binlog.py +++ b/test/binlog.py @@ -86,7 +86,10 @@ def setUpModule(): # Create destination shard. dst_master.init_tablet('master', 'test_keyspace', '-') dst_replica.init_tablet('replica', 'test_keyspace', '-') - dst_master.start_vttablet(wait_for_state='NOT_SERVING') + # Start masters with enabled healthcheck (necessary for resolving the + # destination master). + dst_master.start_vttablet(wait_for_state='NOT_SERVING', + target_tablet_type='replica') dst_replica.start_vttablet(wait_for_state='NOT_SERVING') utils.run_vtctl(['InitShardMaster', 'test_keyspace/-', diff --git a/test/merge_sharding.py b/test/merge_sharding.py index e11dc36472..b2d232f118 100755 --- a/test/merge_sharding.py +++ b/test/merge_sharding.py @@ -301,8 +301,12 @@ index by_msg (msg) # start vttablet on the split shards (no db created, # so they're all not serving) - for t in [shard_dest_master, shard_dest_replica, shard_dest_rdonly]: + for t in [shard_dest_replica, shard_dest_rdonly]: t.start_vttablet(wait_for_state=None) + # Start masters with enabled healthcheck (necessary for resolving the + # destination master). + shard_dest_master.start_vttablet(wait_for_state=None, + target_tablet_type='replica') for t in [shard_dest_master, shard_dest_replica, shard_dest_rdonly]: t.wait_for_vttablet_state('NOT_SERVING') diff --git a/test/resharding.py b/test/resharding.py index cc5791f84e..b0d32b85d9 100755 --- a/test/resharding.py +++ b/test/resharding.py @@ -459,9 +459,13 @@ primary key (name) # start vttablet on the split shards (no db created, # so they're all not serving) + # Start masters with enabled healthcheck (necessary for resolving the + # destination master). + shard_2_master.start_vttablet(wait_for_state=None, + target_tablet_type='replica') shard_3_master.start_vttablet(wait_for_state=None, target_tablet_type='replica') - for t in [shard_2_master, shard_2_replica1, shard_2_replica2, + for t in [shard_2_replica1, shard_2_replica2, shard_3_replica, shard_3_rdonly1]: t.start_vttablet(wait_for_state=None) for t in [shard_2_master, shard_2_replica1, shard_2_replica2, @@ -483,6 +487,11 @@ primary key (name) keyspace_id_type=keyspace_id_type, sharding_column_name='custom_sharding_key') + # TODO(mberlin): Use a different approach for the same effect because this + # one doesn't work when the healthcheck is enabled on the + # tablet. In that case the healthcheck will race with the + # test and convert the SPARE tablet back to REPLICA the next + # time it runs. # disable shard_1_slave2, so we're sure filtered replication will go # from shard_1_slave1 utils.run_vtctl(['ChangeSlaveType', shard_1_slave2.tablet_alias, 'spare']) diff --git a/test/utils.py b/test/utils.py index c281f3abac..33a6140fd4 100644 --- a/test/utils.py +++ b/test/utils.py @@ -798,11 +798,6 @@ def _get_vtworker_cmd(clargs, auto_log=False): args = environment.binary_args('vtworker') + [ '-log_dir', environment.vtlogroot, '-port', str(port), - # use a long resolve TTL because of potential race conditions with doing - # an EmergencyReparent and resolving the master (as EmergencyReparent - # will delete the old master before updating the shard record with the - # new master) - '-resolve_ttl', '10s', '-executefetch_retry_time', '1s', '-tablet_manager_protocol', protocols_flavor().tablet_manager_protocol(), diff --git a/test/worker.py b/test/worker.py index 206bf725da..da477471cc 100755 --- a/test/worker.py +++ b/test/worker.py @@ -448,16 +448,14 @@ class TestBaseSplitCloneResiliency(TestBaseSplitClone): worker_rpc_port) if mysql_down: - # If MySQL is down, we wait until resolving at least twice (to verify that - # we do reresolve and retry due to MySQL being down). - worker_vars = utils.poll_for_vars( + # If MySQL is down, we wait until vtworker retried at least once to make + # sure it reached the point where a write failed due to MySQL being down. + # There should be two retries at least, one for each destination shard. + utils.poll_for_vars( 'vtworker', worker_port, - 'WorkerDestinationActualResolves >= 2', - condition_fn=lambda v: v.get('WorkerDestinationActualResolves') >= 2) - self.assertNotEqual( - worker_vars['WorkerRetryCount'], {}, - "expected vtworker to retry, but it didn't") - logging.debug('Worker has resolved at least twice, starting reparent now') + 'WorkerRetryCount >= 2', + condition_fn=lambda v: v.get('WorkerRetryCount') >= 2) + logging.debug('Worker has retried at least twice, starting reparent now') # Bring back masters. Since we test with semi-sync now, we need at least # one replica for the new master. This test is already quite expensive, @@ -480,19 +478,20 @@ class TestBaseSplitCloneResiliency(TestBaseSplitClone): # NOTE: There is a race condition around this: # It's possible that the SplitClone vtworker command finishes before the # PlannedReparentShard vtctl command, which we start below, succeeds. - # Then the test would fail because vtworker did not have to resolve the - # master tablet again (due to the missing reparent). + # Then the test would fail because vtworker did not have to retry. # # To workaround this, the test takes a parameter to increase the number of # rows that the worker has to copy (with the idea being to slow the worker # down). # You should choose a value for num_insert_rows, such that this test # passes for your environment (trial-and-error...) + # Make sure that vtworker got past the point where it picked a master + # for each destination shard ("finding targets" state). utils.poll_for_vars( 'vtworker', worker_port, - 'WorkerDestinationActualResolves >= 1', - condition_fn=lambda v: v.get('WorkerDestinationActualResolves') >= 1) - logging.debug('Worker has resolved at least once, starting reparent now') + 'WorkerState == copying the data', + condition_fn=lambda v: v.get('WorkerState') == 'copying the data') + logging.debug('Worker is in copy state, starting reparent now') utils.run_vtctl( ['PlannedReparentShard', 'test_keyspace/-80', @@ -503,10 +502,10 @@ class TestBaseSplitCloneResiliency(TestBaseSplitClone): utils.wait_procs([workerclient_proc]) - # Verify that we were forced to reresolve and retry. + # Verify that we were forced to re-resolve and retry. worker_vars = utils.get_vars(worker_port) - self.assertGreater(worker_vars['WorkerDestinationActualResolves'], 1) - self.assertGreater(worker_vars['WorkerDestinationAttemptedResolves'], 1) + # There should be two retries at least, one for each destination shard. + self.assertGreater(worker_vars['WorkerRetryCount'], 1) self.assertNotEqual(worker_vars['WorkerRetryCount'], {}, "expected vtworker to retry, but it didn't") utils.kill_sub_process(worker_proc, soft=True) @@ -585,7 +584,7 @@ class TestVtworkerWebinterface(unittest.TestCase): status = urllib2.urlopen(worker_base_url + '/status').read() self.assertIn( "Ping command was called with message: 'pong'", status, - 'Command did not log output to /status') + 'Command did not log output to /status: %s' % status) # Reset the job. urllib2.urlopen(worker_base_url + '/reset').read()