Merge branch 'master' into copyschemashard_diff

This commit is contained in:
Michael Berlin 2016-01-06 15:30:09 +01:00
Родитель ee2205e9f3 c7d55a3840
Коммит 4b674a4fd4
18 изменённых файлов: 378 добавлений и 164 удалений

Просмотреть файл

@ -67,13 +67,13 @@ func (controller *PlainController) OnReadFail(ctx context.Context, err error) er
// OnValidationSuccess is called when schemamanager successfully validates all sql statements.
func (controller *PlainController) OnValidationSuccess(ctx context.Context) error {
log.Info("Successfully validate all sqls.")
log.Info("Successfully validated all SQL statements.")
return nil
}
// OnValidationFail is called when schemamanager fails to validate sql statements.
func (controller *PlainController) OnValidationFail(ctx context.Context, err error) error {
log.Errorf("Failed to validate sqls, error: %v\n", err)
log.Errorf("Failed to validate SQL statements, error: %v\n", err)
return err
}

Просмотреть файл

@ -76,7 +76,7 @@ type ShardResult struct {
Result *querypb.QueryResult
}
// Run schema changes on Vitess through VtGate
// Run applies schema changes on Vitess through VtGate.
func Run(ctx context.Context, controller Controller, executor Executor) error {
if err := controller.Open(ctx); err != nil {
log.Errorf("failed to open data sourcer: %v", err)

Просмотреть файл

@ -19,11 +19,12 @@ import (
// TabletExecutor applies schema changes to all tablets.
type TabletExecutor struct {
tmClient tmclient.TabletManagerClient
topoServer topo.Server
tabletInfos []*topo.TabletInfo
schemaDiffs []*tmutils.SchemaChangeResult
isClosed bool
tmClient tmclient.TabletManagerClient
topoServer topo.Server
tabletInfos []*topo.TabletInfo
schemaDiffs []*tmutils.SchemaChangeResult
isClosed bool
allowBigSchemaChange bool
}
// NewTabletExecutor creates a new TabletExecutor instance
@ -31,13 +32,26 @@ func NewTabletExecutor(
tmClient tmclient.TabletManagerClient,
topoServer topo.Server) *TabletExecutor {
return &TabletExecutor{
tmClient: tmClient,
topoServer: topoServer,
isClosed: true,
tmClient: tmClient,
topoServer: topoServer,
isClosed: true,
allowBigSchemaChange: false,
}
}
// Open opens a connection to the master for every shard
// AllowBigSchemaChange changes TabletExecutor such that big schema changes
// will no longer be rejected.
func (exec *TabletExecutor) AllowBigSchemaChange() {
exec.allowBigSchemaChange = true
}
// DisallowBigSchemaChange enables the check for big schema changes such that
// TabletExecutor will reject these.
func (exec *TabletExecutor) DisallowBigSchemaChange() {
exec.allowBigSchemaChange = false
}
// Open opens a connection to the master for every shard.
func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error {
if !exec.isClosed {
return nil
@ -73,7 +87,7 @@ func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error {
return nil
}
// Validate validates a list of sql statements
// Validate validates a list of sql statements.
func (exec *TabletExecutor) Validate(ctx context.Context, sqls []string) error {
if exec.isClosed {
return fmt.Errorf("executor is closed")
@ -90,14 +104,19 @@ func (exec *TabletExecutor) Validate(ctx context.Context, sqls []string) error {
}
parsedDDLs[i] = ddl
}
return exec.detectBigSchemaChanges(ctx, parsedDDLs)
bigSchemaChange, err := exec.detectBigSchemaChanges(ctx, parsedDDLs)
if bigSchemaChange && exec.allowBigSchemaChange {
log.Warning("Processing big schema change. This may cause visible MySQL downtime.")
return nil
}
return err
}
// a schema change that satisfies any following condition is considered
// to be a big schema change and will be rejected.
// 1. Alter more than 100,000 rows.
// 2. Change a table with more than 2,000,000 rows (Drops are fine).
func (exec *TabletExecutor) detectBigSchemaChanges(ctx context.Context, parsedDDLs []*sqlparser.DDL) error {
func (exec *TabletExecutor) detectBigSchemaChanges(ctx context.Context, parsedDDLs []*sqlparser.DDL) (bool, error) {
// exec.tabletInfos is guaranteed to have at least one element;
// Otherwise, Open should fail and executor should fail.
masterTabletInfo := exec.tabletInfos[0]
@ -105,7 +124,7 @@ func (exec *TabletExecutor) detectBigSchemaChanges(ctx context.Context, parsedDD
dbSchema, err := exec.tmClient.GetSchema(
ctx, masterTabletInfo, []string{}, []string{}, false)
if err != nil {
return fmt.Errorf("unable to get database schema, error: %v", err)
return false, fmt.Errorf("unable to get database schema, error: %v", err)
}
tableWithCount := make(map[string]uint64, len(dbSchema.TableDefinitions))
for _, tableSchema := range dbSchema.TableDefinitions {
@ -118,16 +137,16 @@ func (exec *TabletExecutor) detectBigSchemaChanges(ctx context.Context, parsedDD
tableName := string(ddl.Table)
if rowCount, ok := tableWithCount[tableName]; ok {
if rowCount > 100000 && ddl.Action == sqlparser.AlterStr {
return fmt.Errorf(
"big schema change, ddl: %v alters a table with more than 100 thousand rows", ddl)
return true, fmt.Errorf(
"big schema change detected. Disable check with -allow_long_unavailability. ddl: %v alters a table with more than 100 thousand rows", ddl)
}
if rowCount > 2000000 {
return fmt.Errorf(
"big schema change, ddl: %v changes a table with more than 2 million rows", ddl)
return true, fmt.Errorf(
"big schema change detected. Disable check with -allow_long_unavailability. ddl: %v changes a table with more than 2 million rows", ddl)
}
}
}
return nil
return false, nil
}
func (exec *TabletExecutor) preflightSchemaChanges(ctx context.Context, sqls []string) error {

Просмотреть файл

@ -115,6 +115,21 @@ func TestTabletExecutorValidate(t *testing.T) {
}); err != nil {
t.Fatalf("executor.Validate should succeed, drop a table with more than 2,000,000 rows is allowed")
}
executor.AllowBigSchemaChange()
// alter a table with more than 100,000 rows
if err := executor.Validate(ctx, []string{
"ALTER TABLE test_table_03 ADD COLUMN new_id bigint(20)",
}); err != nil {
t.Fatalf("executor.Validate should succeed, big schema change is disabled")
}
executor.DisallowBigSchemaChange()
if err := executor.Validate(ctx, []string{
"ALTER TABLE test_table_03 ADD COLUMN new_id bigint(20)",
}); err == nil {
t.Fatalf("executor.Validate should fail, alter a table more than 100,000 rows")
}
}
func TestTabletExecutorExecute(t *testing.T) {

Просмотреть файл

@ -36,10 +36,9 @@ import (
)
var (
healthcheckTopologyRefresh = flag.Duration("binlog_player_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology when filtered replication is running")
retryDelay = flag.Duration("binlog_player_retry_delay", 5*time.Second, "delay before retrying a failed healthcheck or a failed binlog connection")
healthCheckTimeout = flag.Duration("binlog_player_healthcheck_timeout", time.Minute, "the health check timeout period")
healthCheckTopologyRefresh = flag.Duration("binlog_player_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology when filtered replication is running")
retryDelay = flag.Duration("binlog_player_retry_delay", 5*time.Second, "delay before retrying a failed healthcheck or a failed binlog connection")
healthCheckTimeout = flag.Duration("binlog_player_healthcheck_timeout", time.Minute, "the health check timeout period")
)
func init() {
@ -106,7 +105,7 @@ func newBinlogPlayerController(ts topo.Server, vtClientFactory func() binlogplay
binlogPlayerStats: binlogplayer.NewStats(),
healthCheck: discovery.NewHealthCheck(*binlogplayer.BinlogPlayerConnTimeout, *retryDelay, *healthCheckTimeout),
}
blc.shardReplicationWatcher = discovery.NewShardReplicationWatcher(ts, blc.healthCheck, cell, sourceShard.Keyspace, sourceShard.Shard, *healthcheckTopologyRefresh, 5)
blc.shardReplicationWatcher = discovery.NewShardReplicationWatcher(ts, blc.healthCheck, cell, sourceShard.Keyspace, sourceShard.Shard, *healthCheckTopologyRefresh, 5)
return blc
}
@ -442,6 +441,7 @@ func (blm *BinlogPlayerMap) StopAllPlayersAndReset() {
if blm.state == BpmStateRunning {
bpc.Stop()
}
bpc.healthCheck.Close()
hadPlayers = true
}
blm.players = make(map[uint32]*BinlogPlayerController)
@ -482,6 +482,7 @@ func (blm *BinlogPlayerMap) RefreshMap(ctx context.Context, tablet *topodatapb.T
// remove all entries from toRemove
for source := range toRemove {
blm.players[source].Stop()
blm.players[source].healthCheck.Close()
delete(blm.players, source)
}

Просмотреть файл

@ -308,8 +308,8 @@ var commands = []commandGroup{
"[-exclude_tables=''] [-include-views] <keyspace name>",
"Validates that the master schema from shard 0 matches the schema on all of the other tablets in the keyspace."},
{"ApplySchema", commandApplySchema,
"[-force] {-sql=<sql> || -sql-file=<filename>} <keyspace>",
"Applies the schema change to the specified keyspace on every master, running in parallel on all shards. The changes are then propagated to slaves via replication. If the force flag is set, then numerous checks will be ignored, so that option should be used very cautiously."},
"[-allow_long_unavailability] {-sql=<sql> || -sql-file=<filename>} <keyspace>",
"Applies the schema change to the specified keyspace on every master, running in parallel on all shards. The changes are then propagated to slaves via replication. If -allow_long_unavailability is set, schema changes affecting a large number of rows (and possibly incurring a longer period of unavailability) will not be rejected."},
{"CopySchemaShard", commandCopySchemaShard,
"[-tables=<table1>,<table2>,...] [-exclude_tables=<table1>,<table2>,...] [-include-views] {<source keyspace/shard> || <source tablet alias>} <destination keyspace/shard>",
"Copies the schema from a source shard's master (or a specific tablet) to a destination shard. The schema is applied directly on the master of the destination shard, and it is propagated to the replicas through binlogs."},
@ -1831,7 +1831,7 @@ func commandValidateSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, s
}
func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
force := subFlags.Bool("force", false, "Applies the schema even if the preflight schema doesn't match")
allowLongUnavailability := subFlags.Bool("allow_long_unavailability", false, "Allow large schema changes which incur a longer unavailability of the database.")
sql := subFlags.String("sql", "", "A list of semicolon-delimited SQL commands")
sqlFile := subFlags.String("sql-file", "", "Identifies the file that contains the SQL commands")
waitSlaveTimeout := subFlags.Duration("wait_slave_timeout", 30*time.Second, "The amount of time to wait for slaves to catch up during reparenting. The default value is 30 seconds.")
@ -1847,11 +1847,10 @@ func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
if err != nil {
return err
}
scr, err := wr.ApplySchemaKeyspace(ctx, keyspace, change, *force, *waitSlaveTimeout)
if err != nil {
if err := wr.ApplySchemaKeyspace(ctx, keyspace, change, *allowLongUnavailability, *waitSlaveTimeout); err != nil {
return err
}
return printJSON(wr, scr)
return nil
}
func commandCopySchemaShard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {

Просмотреть файл

@ -32,7 +32,9 @@ import (
// testQueryService is a local QueryService implementation to support the tests
type testQueryService struct {
queryservice.ErrorQueryService
t *testing.T
t *testing.T
keyspace string
shard string
}
func (sq *testQueryService) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID int64, sendReply func(reply *sqltypes.Result) error) error {
@ -92,6 +94,21 @@ func (sq *testQueryService) StreamExecute(ctx context.Context, target *querypb.T
return nil
}
func (sq *testQueryService) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) {
c <- &querypb.StreamHealthResponse{
Target: &querypb.Target{
Keyspace: sq.keyspace,
Shard: sq.shard,
TabletType: topodatapb.TabletType_RDONLY,
},
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
SecondsBehindMaster: 1,
},
}
return 0, nil
}
type ExpectedExecuteFetch struct {
Query string
MaxRows int
@ -323,7 +340,11 @@ func testSplitClone(t *testing.T, strategy string) {
"STOP SLAVE",
"START SLAVE",
}
grpcqueryservice.RegisterForTest(sourceRdonly.RPCServer, &testQueryService{t: t})
grpcqueryservice.RegisterForTest(sourceRdonly.RPCServer, &testQueryService{
t: t,
keyspace: sourceRdonly.Tablet.Keyspace,
shard: sourceRdonly.Tablet.Shard,
})
}
// We read 100 source rows. sourceReaderCount is set to 10, so

Просмотреть файл

@ -34,6 +34,8 @@ type destinationTabletServer struct {
queryservice.ErrorQueryService
t *testing.T
excludedTable string
keyspace string
shard string
}
func (sq *destinationTabletServer) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID int64, sendReply func(reply *sqltypes.Result) error) error {
@ -85,11 +87,28 @@ func (sq *destinationTabletServer) StreamExecute(ctx context.Context, target *qu
return nil
}
func (sq *destinationTabletServer) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) {
c <- &querypb.StreamHealthResponse{
Target: &querypb.Target{
Keyspace: sq.keyspace,
Shard: sq.shard,
TabletType: topodatapb.TabletType_RDONLY,
},
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
SecondsBehindMaster: 1,
},
}
return 0, nil
}
// sourceTabletServer is a local QueryService implementation to support the tests
type sourceTabletServer struct {
queryservice.ErrorQueryService
t *testing.T
excludedTable string
keyspace string
shard string
}
func (sq *sourceTabletServer) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID int64, sendReply func(reply *sqltypes.Result) error) error {
@ -144,6 +163,21 @@ func (sq *sourceTabletServer) StreamExecute(ctx context.Context, target *querypb
return nil
}
func (sq *sourceTabletServer) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) {
c <- &querypb.StreamHealthResponse{
Target: &querypb.Target{
Keyspace: sq.keyspace,
Shard: sq.shard,
TabletType: topodatapb.TabletType_RDONLY,
},
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
SecondsBehindMaster: 1,
},
}
return 0, nil
}
// TODO(aaijazi): Create a test in which source and destination data does not match
func TestSplitDiff(t *testing.T) {
@ -233,10 +267,30 @@ func TestSplitDiff(t *testing.T) {
}
}
grpcqueryservice.RegisterForTest(leftRdonly1.RPCServer, &destinationTabletServer{t: t, excludedTable: excludedTable})
grpcqueryservice.RegisterForTest(leftRdonly2.RPCServer, &destinationTabletServer{t: t, excludedTable: excludedTable})
grpcqueryservice.RegisterForTest(sourceRdonly1.RPCServer, &sourceTabletServer{t: t, excludedTable: excludedTable})
grpcqueryservice.RegisterForTest(sourceRdonly2.RPCServer, &sourceTabletServer{t: t, excludedTable: excludedTable})
grpcqueryservice.RegisterForTest(leftRdonly1.RPCServer, &destinationTabletServer{
t: t,
excludedTable: excludedTable,
keyspace: leftRdonly1.Tablet.Keyspace,
shard: leftRdonly1.Tablet.Shard,
})
grpcqueryservice.RegisterForTest(leftRdonly2.RPCServer, &destinationTabletServer{
t: t,
excludedTable: excludedTable,
keyspace: leftRdonly2.Tablet.Keyspace,
shard: leftRdonly2.Tablet.Shard,
})
grpcqueryservice.RegisterForTest(sourceRdonly1.RPCServer, &sourceTabletServer{
t: t,
excludedTable: excludedTable,
keyspace: sourceRdonly1.Tablet.Keyspace,
shard: sourceRdonly1.Tablet.Shard,
})
grpcqueryservice.RegisterForTest(sourceRdonly2.RPCServer, &sourceTabletServer{
t: t,
excludedTable: excludedTable,
keyspace: sourceRdonly2.Tablet.Keyspace,
shard: sourceRdonly2.Tablet.Shard,
})
err = wrk.Run(ctx)
status := wrk.StatusAsText()

Просмотреть файл

@ -10,8 +10,8 @@ import (
"math/rand"
"time"
"github.com/youtube/vitess/go/vt/discovery"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/vt/wrangler"
"golang.org/x/net/context"
@ -29,6 +29,10 @@ var (
// than -health_check_interval.
// (it is public for tests to override it)
WaitForHealthyEndPointsTimeout = flag.Duration("wait_for_healthy_rdonly_endpoints_timeout", 60*time.Second, "maximum time to wait if less than --min_healthy_rdonly_endpoints are available")
healthCheckTopologyRefresh = flag.Duration("worker_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology when filtered replication is running")
retryDelay = flag.Duration("worker_retry_delay", 5*time.Second, "delay before retrying a failed healthcheck or a failed binlog connection")
healthCheckTimeout = flag.Duration("worker_healthcheck_timeout", time.Minute, "the health check timeout period")
)
// FindHealthyRdonlyEndPoint returns a random healthy endpoint.
@ -39,6 +43,15 @@ func FindHealthyRdonlyEndPoint(ctx context.Context, wr *wrangler.Wrangler, cell,
busywaitCtx, busywaitCancel := context.WithTimeout(ctx, *WaitForHealthyEndPointsTimeout)
defer busywaitCancel()
// create a discovery healthcheck, wait for it to have one rdonly
// endpoints at this point
healthCheck := discovery.NewHealthCheck(*remoteActionsTimeout, *retryDelay, *healthCheckTimeout)
discovery.NewShardReplicationWatcher(wr.TopoServer(), healthCheck, cell, keyspace, shard, *healthCheckTopologyRefresh, 5 /*topoReadConcurrency*/)
defer healthCheck.Close()
if err := discovery.WaitForEndPoints(healthCheck, cell, keyspace, shard, []topodatapb.TabletType{topodatapb.TabletType_RDONLY}); err != nil {
return nil, fmt.Errorf("error waiting for rdonly endpoints for %v %v %v: %v", cell, keyspace, shard, err)
}
var healthyEndpoints []*topodatapb.EndPoint
for {
select {
@ -47,22 +60,19 @@ func FindHealthyRdonlyEndPoint(ctx context.Context, wr *wrangler.Wrangler, cell,
default:
}
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
endPoints, _, err := wr.TopoServer().GetEndPoints(shortCtx, cell, keyspace, shard, topodatapb.TabletType_RDONLY)
cancel()
if err != nil {
if err == topo.ErrNoNode {
// If the node doesn't exist, count that as 0 available rdonly instances.
endPoints = &topodatapb.EndPoints{}
} else {
return nil, fmt.Errorf("GetEndPoints(%v,%v,%v,rdonly) failed: %v", cell, keyspace, shard, err)
}
}
healthyEndpoints = make([]*topodatapb.EndPoint, 0, len(endPoints.Entries))
for _, entry := range endPoints.Entries {
if len(entry.HealthMap) == 0 {
healthyEndpoints = append(healthyEndpoints, entry)
addrs := healthCheck.GetEndPointStatsFromTarget(keyspace, shard, topodatapb.TabletType_RDONLY)
healthyEndpoints = make([]*topodatapb.EndPoint, 0, len(addrs))
for _, addr := range addrs {
// Note we do not check the 'Serving' flag here.
// This is mainly to avoid the case where we run a
// Diff between a source and destination, and the source
// is not serving (disabled by TabletControl).
// When we switch the tablet to 'worker', it will
// go back to serving state.
if addr.Stats == nil || addr.Stats.HealthError != "" || addr.Stats.SecondsBehindMaster > 30 {
continue
}
healthyEndpoints = append(healthyEndpoints, addr.EndPoint)
}
if len(healthyEndpoints) < *minHealthyEndPoints {
deadlineForLog, _ := busywaitCtx.Deadline()

Просмотреть файл

@ -32,7 +32,9 @@ import (
// verticalTabletServer is a local QueryService implementation to support the tests
type verticalTabletServer struct {
queryservice.ErrorQueryService
t *testing.T
t *testing.T
keyspace string
shard string
}
func (sq *verticalTabletServer) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID int64, sendReply func(reply *sqltypes.Result) error) error {
@ -85,6 +87,21 @@ func (sq *verticalTabletServer) StreamExecute(ctx context.Context, target *query
return nil
}
func (sq *verticalTabletServer) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) {
c <- &querypb.StreamHealthResponse{
Target: &querypb.Target{
Keyspace: sq.keyspace,
Shard: sq.shard,
TabletType: topodatapb.TabletType_RDONLY,
},
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
SecondsBehindMaster: 1,
},
}
return 0, nil
}
// VerticalFakePoolConnection implements dbconnpool.PoolConnection
type VerticalFakePoolConnection struct {
t *testing.T
@ -317,7 +334,11 @@ func testVerticalSplitClone(t *testing.T, strategy string) {
"STOP SLAVE",
"START SLAVE",
}
grpcqueryservice.RegisterForTest(sourceRdonly.RPCServer, &verticalTabletServer{t: t})
grpcqueryservice.RegisterForTest(sourceRdonly.RPCServer, &verticalTabletServer{
t: t,
keyspace: sourceRdonly.Tablet.Keyspace,
shard: sourceRdonly.Tablet.Shard,
})
}
// We read 100 source rows. sourceReaderCount is set to 10, so

Просмотреть файл

@ -34,6 +34,8 @@ type verticalDiffTabletServer struct {
queryservice.ErrorQueryService
t *testing.T
excludedTable string
keyspace string
shard string
}
func (sq *verticalDiffTabletServer) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID int64, sendReply func(reply *sqltypes.Result) error) error {
@ -79,6 +81,21 @@ func (sq *verticalDiffTabletServer) StreamExecute(ctx context.Context, target *q
return nil
}
func (sq *verticalDiffTabletServer) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) {
c <- &querypb.StreamHealthResponse{
Target: &querypb.Target{
Keyspace: sq.keyspace,
Shard: sq.shard,
TabletType: topodatapb.TabletType_RDONLY,
},
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
SecondsBehindMaster: 1,
},
}
return 0, nil
}
// TODO(aaijazi): Create a test in which source and destination data does not match
func TestVerticalSplitDiff(t *testing.T) {
@ -173,7 +190,12 @@ func TestVerticalSplitDiff(t *testing.T) {
},
},
}
grpcqueryservice.RegisterForTest(rdonly.RPCServer, &verticalDiffTabletServer{t: t, excludedTable: excludedTable})
grpcqueryservice.RegisterForTest(rdonly.RPCServer, &verticalDiffTabletServer{
t: t,
excludedTable: excludedTable,
keyspace: rdonly.Tablet.Keyspace,
shard: rdonly.Tablet.Shard,
})
}
err = wrk.Run(ctx)

Просмотреть файл

@ -320,20 +320,23 @@ func (wr *Wrangler) applySchemaShard(ctx context.Context, shardInfo *topo.ShardI
// take a keyspace lock to do this.
// first we will validate the Preflight works the same on all shard masters
// and fail if not (unless force is specified)
func (wr *Wrangler) ApplySchemaKeyspace(ctx context.Context, keyspace string, change string, force bool, waitSlaveTimeout time.Duration) (*tmutils.SchemaChangeResult, error) {
func (wr *Wrangler) ApplySchemaKeyspace(ctx context.Context, keyspace, change string, allowLongUnavailability bool, waitSlaveTimeout time.Duration) error {
actionNode := actionnode.ApplySchemaKeyspace(change)
lockPath, err := wr.lockKeyspace(ctx, keyspace, actionNode)
if err != nil {
return nil, err
return err
}
executor := schemamanager.NewTabletExecutor(wr.tmc, wr.ts)
if allowLongUnavailability {
executor.AllowBigSchemaChange()
}
err = schemamanager.Run(
ctx,
schemamanager.NewPlainController(change, keyspace),
schemamanager.NewTabletExecutor(wr.tmc, wr.ts),
executor,
)
return nil, wr.unlockKeyspace(ctx, keyspace, actionNode, lockPath, err)
return wr.unlockKeyspace(ctx, keyspace, actionNode, lockPath, err)
}
// CopySchemaShardFromShard copies the schema from a source shard to the specified destination shard.

Просмотреть файл

@ -0,0 +1,105 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package testlib
import (
"strings"
"testing"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/vttest/fakesqldb"
"github.com/youtube/vitess/go/vt/wrangler"
"github.com/youtube/vitess/go/vt/zktopo"
tabletmanagerdatapb "github.com/youtube/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// TestApplySchema_AllowLongUnavailability is an integration test for the
// -allow_long_unavailability flag of vtctl ApplySchema.
// Only if the flag is specified, potentially long running schema changes are
// allowed.
func TestApplySchema_AllowLongUnavailability(t *testing.T) {
cells := []string{"cell1"}
db := fakesqldb.Register()
ts := zktopo.NewTestServer(t, cells)
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
vp := NewVtctlPipe(t, ts)
defer vp.Close()
if err := ts.CreateKeyspace(context.Background(), "ks", &topodatapb.Keyspace{
ShardingColumnName: "keyspace_id",
ShardingColumnType: topodatapb.KeyspaceIdType_UINT64,
}); err != nil {
t.Fatalf("CreateKeyspace failed: %v", err)
}
beforeSchema := &tabletmanagerdatapb.SchemaDefinition{
DatabaseSchema: "CREATE DATABASE `{{.DatabaseName}}` /*!40100 DEFAULT CHARACTER SET utf8 */",
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: "table1",
Schema: "CREATE TABLE `table1` (\n `id` bigint(20) NOT NULL AUTO_INCREMENT,\n `msg` varchar(64) DEFAULT NULL,\n `keyspace_id` bigint(20) unsigned NOT NULL,\n PRIMARY KEY (`id`),\n KEY `by_msg` (`msg`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8",
Type: tmutils.TableBaseTable,
RowCount: 3000000,
},
},
}
afterSchema := &tabletmanagerdatapb.SchemaDefinition{
DatabaseSchema: "CREATE DATABASE `{{.DatabaseName}}` /*!40100 DEFAULT CHARACTER SET utf8 */",
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: "table1",
Schema: "CREATE TABLE `table1` (\n `id` bigint(20) NOT NULL AUTO_INCREMENT,\n `msg` varchar(64) DEFAULT NULL,\n `keyspace_id` bigint(20) unsigned NOT NULL,\n `id` bigint(20),\n PRIMARY KEY (`id`),\n KEY `by_msg` (`msg`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8",
Type: tmutils.TableBaseTable,
RowCount: 3000000,
},
},
}
preflightSchemaChange := &tmutils.SchemaChangeResult{
BeforeSchema: beforeSchema,
AfterSchema: afterSchema,
}
tShard1 := NewFakeTablet(t, wr, cells[0], 0,
topodatapb.TabletType_MASTER, db, TabletKeyspaceShard(t, "ks", "-80"))
tShard2 := NewFakeTablet(t, wr, cells[0], 1,
topodatapb.TabletType_MASTER, db, TabletKeyspaceShard(t, "ks", "80-"))
for _, ft := range []*FakeTablet{tShard1, tShard2} {
ft.StartActionLoop(t, wr)
defer ft.StopActionLoop(t)
ft.FakeMysqlDaemon.Schema = beforeSchema
ft.FakeMysqlDaemon.PreflightSchemaChangeResult = preflightSchemaChange
}
changeToDb := "USE vt_ks"
addColumn := "ALTER TABLE table1 ADD COLUMN new_id bigint(20)"
db.AddQuery(changeToDb, &sqltypes.Result{})
db.AddQuery(addColumn, &sqltypes.Result{})
// First ApplySchema fails because the table is very big and -allow_long_unavailability is missing.
if err := vp.Run([]string{"ApplySchema", "-sql", addColumn, "ks"}); err == nil {
t.Fatal("ApplySchema should have failed but did not.")
} else if !strings.Contains(err.Error(), "big schema change detected") {
t.Fatalf("ApplySchema failed with wrong error. got: %v", err)
}
// Second ApplySchema succeeds because -allow_long_unavailability is set.
if err := vp.Run([]string{"ApplySchema", "-allow_long_unavailability", "-sql", addColumn, "ks"}); err != nil {
t.Fatalf("ApplySchema failed: %v", err)
}
if count := db.GetQueryCalledNum(changeToDb); count != 2 {
t.Fatalf("ApplySchema: unexpected call count. Query: %v got: %v want: %v", changeToDb, count, 2)
}
if count := db.GetQueryCalledNum(addColumn); count != 2 {
t.Fatalf("ApplySchema: unexpected call count. Query: %v got: %v want: %v", addColumn, count, 2)
}
}

Просмотреть файл

@ -5,7 +5,6 @@
package testlib
import (
"fmt"
"testing"
"golang.org/x/net/context"
@ -23,75 +22,6 @@ import (
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
type ExpectedExecuteFetch struct {
Query string
MaxRows int
WantFields bool
QueryResult *sqltypes.Result
Error error
}
// FakePoolConnection implements dbconnpool.PoolConnection
type FakePoolConnection struct {
t *testing.T
Closed bool
ExpectedExecuteFetch []ExpectedExecuteFetch
ExpectedExecuteFetchIndex int
}
func NewFakePoolConnectionQuery(t *testing.T, query string) *FakePoolConnection {
return &FakePoolConnection{
t: t,
ExpectedExecuteFetch: []ExpectedExecuteFetch{
{
Query: query,
QueryResult: &sqltypes.Result{},
},
},
}
}
func (fpc *FakePoolConnection) ExecuteFetch(query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
if fpc.ExpectedExecuteFetchIndex >= len(fpc.ExpectedExecuteFetch) {
fpc.t.Errorf("got unexpected out of bound fetch: %v >= %v", fpc.ExpectedExecuteFetchIndex, len(fpc.ExpectedExecuteFetch))
return nil, fmt.Errorf("unexpected out of bound fetch")
}
expected := fpc.ExpectedExecuteFetch[fpc.ExpectedExecuteFetchIndex].Query
if query != expected {
fpc.t.Errorf("got unexpected query: %v != %v", query, expected)
return nil, fmt.Errorf("unexpected query")
}
fpc.t.Logf("ExecuteFetch: %v", query)
defer func() {
fpc.ExpectedExecuteFetchIndex++
}()
return fpc.ExpectedExecuteFetch[fpc.ExpectedExecuteFetchIndex].QueryResult, nil
}
func (fpc *FakePoolConnection) ExecuteStreamFetch(query string, callback func(*sqltypes.Result) error, streamBufferSize int) error {
return nil
}
func (fpc *FakePoolConnection) ID() int64 {
return 1
}
func (fpc *FakePoolConnection) Close() {
fpc.Closed = true
}
func (fpc *FakePoolConnection) IsClosed() bool {
return fpc.Closed
}
func (fpc *FakePoolConnection) Recycle() {
}
func (fpc *FakePoolConnection) Reconnect() error {
return nil
}
func TestCopySchemaShard_UseTabletAsSource(t *testing.T) {
copySchema(t, false /* useShardAsSource */)
}
@ -132,7 +62,7 @@ func copySchema(t *testing.T, useShardAsSource bool) {
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: "table1",
Schema: "CREATE TABLE `resharding1` (\n `id` bigint(20) NOT NULL AUTO_INCREMENT,\n `msg` varchar(64) DEFAULT NULL,\n `keyspace_id` bigint(20) unsigned NOT NULL,\n PRIMARY KEY (`id`),\n KEY `by_msg` (`msg`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8",
Schema: "CREATE TABLE `table1` (\n `id` bigint(20) NOT NULL AUTO_INCREMENT,\n `msg` varchar(64) DEFAULT NULL,\n `keyspace_id` bigint(20) unsigned NOT NULL,\n PRIMARY KEY (`id`),\n KEY `by_msg` (`msg`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8",
Type: tmutils.TableBaseTable,
},
{
@ -145,8 +75,9 @@ func copySchema(t *testing.T, useShardAsSource bool) {
sourceMaster.FakeMysqlDaemon.Schema = schema
sourceRdonly.FakeMysqlDaemon.Schema = schema
changeToDb := "USE vt_ks"
createDb := "CREATE DATABASE `vt_ks` /*!40100 DEFAULT CHARACTER SET utf8 */"
createTable := "CREATE TABLE `vt_ks`.`resharding1` (\n" +
createTable := "CREATE TABLE `vt_ks`.`table1` (\n" +
" `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" +
" `msg` varchar(64) DEFAULT NULL,\n" +
" `keyspace_id` bigint(20) unsigned NOT NULL,\n" +
@ -160,7 +91,7 @@ func copySchema(t *testing.T, useShardAsSource bool) {
" PRIMARY KEY (`id`),\n" +
" KEY `by_msg` (`msg`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8"
db.AddQuery("USE vt_ks", &sqltypes.Result{})
db.AddQuery(changeToDb, &sqltypes.Result{})
db.AddQuery(createDb, &sqltypes.Result{})
db.AddQuery(createTable, &sqltypes.Result{})
db.AddQuery(createTableView, &sqltypes.Result{})
@ -172,6 +103,9 @@ func copySchema(t *testing.T, useShardAsSource bool) {
if err := vp.Run([]string{"CopySchemaShard", "-include-views", source, "ks/-40"}); err != nil {
t.Fatalf("CopySchemaShard failed: %v", err)
}
if count := db.GetQueryCalledNum(changeToDb); count != 3 {
t.Fatalf("CopySchemaShard did not change to the db exactly once. Query count: %v", count)
}
if count := db.GetQueryCalledNum(createDb); count != 1 {
t.Fatalf("CopySchemaShard did not create the db exactly once. Query count: %v", count)
}

Просмотреть файл

@ -22,8 +22,7 @@ from mysql_flavor import mysql_flavor
src_master = tablet.Tablet()
src_replica = tablet.Tablet()
src_rdonly1 = tablet.Tablet()
src_rdonly2 = tablet.Tablet()
src_rdonly = tablet.Tablet()
dst_master = tablet.Tablet()
dst_replica = tablet.Tablet()
@ -35,8 +34,7 @@ def setUpModule():
setup_procs = [
src_master.init_mysql(),
src_replica.init_mysql(),
src_rdonly1.init_mysql(),
src_rdonly2.init_mysql(),
src_rdonly.init_mysql(),
dst_master.init_mysql(),
dst_replica.init_mysql(),
]
@ -51,17 +49,16 @@ def setUpModule():
src_master.init_tablet('master', 'test_keyspace', '0')
src_replica.init_tablet('replica', 'test_keyspace', '0')
src_rdonly1.init_tablet('rdonly', 'test_keyspace', '0')
src_rdonly2.init_tablet('rdonly', 'test_keyspace', '0')
src_rdonly.init_tablet('rdonly', 'test_keyspace', '0')
utils.run_vtctl(['RebuildShardGraph', 'test_keyspace/0'])
utils.validate_topology()
for t in [src_master, src_replica, src_rdonly1, src_rdonly2]:
for t in [src_master, src_replica, src_rdonly]:
t.create_db('vt_test_keyspace')
t.start_vttablet(wait_for_state=None)
for t in [src_master, src_replica, src_rdonly1, src_rdonly2]:
for t in [src_master, src_replica, src_rdonly]:
t.wait_for_vttablet_state('SERVING')
utils.run_vtctl(['InitShardMaster', 'test_keyspace/0',
@ -82,7 +79,9 @@ def setUpModule():
'test_keyspace'], auto_log=True)
# run a health check on source replica so it responds to discovery
# (for binlog players) and on the source rdonlys (for workers)
utils.run_vtctl(['RunHealthCheck', src_replica.tablet_alias, 'replica'])
utils.run_vtctl(['RunHealthCheck', src_rdonly.tablet_alias, 'rdonly'])
# Create destination shard.
dst_master.init_tablet('master', 'test_keyspace', '-')
@ -120,14 +119,13 @@ def tearDownModule():
if utils.options.skip_teardown:
return
tablet.kill_tablets([src_master, src_replica, src_rdonly1, src_rdonly2,
tablet.kill_tablets([src_master, src_replica, src_rdonly,
dst_master, dst_replica])
teardown_procs = [
src_master.teardown_mysql(),
src_replica.teardown_mysql(),
src_rdonly1.teardown_mysql(),
src_rdonly2.teardown_mysql(),
src_rdonly.teardown_mysql(),
dst_master.teardown_mysql(),
dst_replica.teardown_mysql(),
]
@ -139,8 +137,7 @@ def tearDownModule():
src_master.remove_tree()
src_replica.remove_tree()
src_rdonly1.remove_tree()
src_rdonly2.remove_tree()
src_rdonly.remove_tree()
dst_master.remove_tree()
dst_replica.remove_tree()

Просмотреть файл

@ -384,6 +384,7 @@ index by_msg (msg)
shard_rdonly1.tablet_alias,
keyspace_shard],
auto_log=True)
utils.run_vtctl(['RunHealthCheck', shard_rdonly1.tablet_alias, 'rdonly'])
utils.run_vtworker(['--cell', 'test_nj',
'--command_display_interval', '10ms',
@ -423,6 +424,8 @@ index by_msg (msg)
# use vtworker to compare the data
logging.debug('Running vtworker SplitDiff for -80')
for t in [shard_0_rdonly1, shard_1_rdonly1]:
utils.run_vtctl(['RunHealthCheck', t.tablet_alias, 'rdonly'])
utils.run_vtworker(['-cell', 'test_nj', 'SplitDiff', 'test_keyspace/-80'],
auto_log=True)
utils.run_vtctl(['ChangeSlaveType', shard_rdonly1.tablet_alias, 'rdonly'],

Просмотреть файл

@ -512,8 +512,11 @@ primary key (name)
self._test_keyrange_constraints()
# run a health check on source replicas so they respond to discovery
utils.run_vtctl(['RunHealthCheck', shard_0_replica.tablet_alias, 'replica'])
utils.run_vtctl(['RunHealthCheck', shard_1_slave1.tablet_alias, 'replica'])
# (for binlog players) and on the source rdonlys (for workers)
for t in [shard_0_replica, shard_1_slave1]:
utils.run_vtctl(['RunHealthCheck', t.tablet_alias, 'replica'])
for t in [shard_0_ny_rdonly, shard_1_ny_rdonly, shard_1_rdonly1]:
utils.run_vtctl(['RunHealthCheck', t.tablet_alias, 'rdonly'])
# create the split shards
shard_2_master.init_tablet('master', 'test_keyspace', '80-c0')
@ -608,7 +611,9 @@ primary key (name)
self._check_binlog_player_vars(shard_2_master, seconds_behind_master_max=30)
self._check_binlog_player_vars(shard_3_master, seconds_behind_master_max=30)
# use vtworker to compare the data
# use vtworker to compare the data (after health-checking the destination
# rdonly tablets so discovery works)
utils.run_vtctl(['RunHealthCheck', shard_3_rdonly1.tablet_alias, 'rdonly'])
logging.debug('Running vtworker SplitDiff')
utils.run_vtworker(['-cell', 'test_nj', 'SplitDiff', '--exclude_tables',
'unrelated', 'test_keyspace/c0-'],

Просмотреть файл

@ -134,11 +134,11 @@ index by_msg (msg)
conn.close()
return result
def _check_values(self, tablet, dbname, table, first, count):
def _check_values(self, t, dbname, table, first, count):
logging.debug(
'Checking %d values from %s/%s starting at %d', count, dbname,
table, first)
rows = tablet.mquery(
rows = t.mquery(
dbname, 'select id, msg from %s where id>=%d order by id limit %d' %
(table, first, count))
self.assertEqual(count, len(rows), 'got wrong number of rows: %d != %d' %
@ -150,11 +150,11 @@ index by_msg (msg)
"invalid msg[%d]: 'value %d' != '%s'" %
(i, first + i, rows[i][1]))
def _check_values_timeout(self, tablet, dbname, table, first, count,
def _check_values_timeout(self, t, dbname, table, first, count,
timeout=30):
while True:
try:
self._check_values(tablet, dbname, table, first, count)
self._check_values(t, dbname, table, first, count)
return
except:
timeout -= 1
@ -191,31 +191,31 @@ index by_msg (msg)
'Got a sharding_column_type in SrvKeyspace: %s' %
str(ks))
def _check_blacklisted_tables(self, tablet, expected):
status = tablet.get_status()
def _check_blacklisted_tables(self, t, expected):
status = t.get_status()
if expected:
self.assertIn('BlacklistedTables: %s' % ' '.join(expected), status)
else:
self.assertNotIn('BlacklistedTables', status)
# check we can or cannot access the tables
for t in ['moving1', 'moving2']:
for table in ['moving1', 'moving2']:
if expected and 'moving.*' in expected:
# table is blacklisted, should get the error
_, stderr = utils.run_vtctl(['VtTabletExecute',
'-keyspace', tablet.keyspace,
'-shard', tablet.shard,
tablet.tablet_alias,
'select count(1) from %s' % t],
'-keyspace', t.keyspace,
'-shard', t.shard,
t.tablet_alias,
'select count(1) from %s' % table],
expect_fail=True)
self.assertIn(
'retry: Query disallowed due to rule: enforce blacklisted tables',
stderr)
else:
# table is not blacklisted, should just work
qr = tablet.execute('select count(1) from %s' % t)
qr = t.execute('select count(1) from %s' % table)
logging.debug('Got %s rows from table %s on tablet %s',
qr['Rows'][0][0], t, tablet.tablet_alias)
qr['Rows'][0][0], table, t.tablet_alias)
def _check_client_conn_redirection(
self, destination_ks, servedfrom_db_types,
@ -336,7 +336,10 @@ index by_msg (msg)
moving1_first, 100)
# run a health check on source replica so it responds to discovery
# (for binlog players) and on the source rdonlys (for workers)
utils.run_vtctl(['RunHealthCheck', source_replica.tablet_alias, 'replica'])
for t in [source_rdonly1, source_rdonly2]:
utils.run_vtctl(['RunHealthCheck', t.tablet_alias, 'rdonly'])
# the worker will do everything. We test with source_reader_count=10
# (down from default=20) as connection pool is not big enough for 20.
@ -381,6 +384,8 @@ index by_msg (msg)
'moving2', moving2_first_add1, 100)
# use vtworker to compare the data
for t in [destination_rdonly1, destination_rdonly2]:
utils.run_vtctl(['RunHealthCheck', t.tablet_alias, 'rdonly'])
logging.debug('Running vtworker VerticalSplitDiff')
utils.run_vtworker(['-cell', 'test_nj', 'VerticalSplitDiff',
'destination_keyspace/0'], auto_log=True)