зеркало из https://github.com/github/vitess-gh.git
vtctl: ApplySchema: Renamed flag -force to -allow_long_unavailability.
schemamanager: Log warning that the flag is set only if it's actually a big schema. Added integration test using VtctlPipe. The test is based on copy_schema_shard_test.go.
This commit is contained in:
Родитель
ac28e233e8
Коммит
2473510229
|
@ -39,19 +39,19 @@ func NewTabletExecutor(
|
|||
}
|
||||
}
|
||||
|
||||
// AllowBigSchemaChange skips big schema changes check and tablet executor will
|
||||
// send all schema changes to all VTTablets even for big changes.
|
||||
// AllowBigSchemaChange changes TabletExecutor such that big schema changes
|
||||
// will no longer be rejected.
|
||||
func (exec *TabletExecutor) AllowBigSchemaChange() {
|
||||
exec.allowBigSchemaChange = true
|
||||
}
|
||||
|
||||
// DisallowBigSchemaChange enforce big schema change check and will reject
|
||||
// certain schema changes.
|
||||
// DisallowBigSchemaChange enables the check for big schema changes such that
|
||||
// TabletExecutor will reject these.
|
||||
func (exec *TabletExecutor) DisallowBigSchemaChange() {
|
||||
exec.allowBigSchemaChange = false
|
||||
}
|
||||
|
||||
// Open opens a connection to the master for every shard
|
||||
// Open opens a connection to the master for every shard.
|
||||
func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error {
|
||||
if !exec.isClosed {
|
||||
return nil
|
||||
|
@ -87,7 +87,7 @@ func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Validate validates a list of sql statements
|
||||
// Validate validates a list of sql statements.
|
||||
func (exec *TabletExecutor) Validate(ctx context.Context, sqls []string) error {
|
||||
if exec.isClosed {
|
||||
return fmt.Errorf("executor is closed")
|
||||
|
@ -104,18 +104,19 @@ func (exec *TabletExecutor) Validate(ctx context.Context, sqls []string) error {
|
|||
}
|
||||
parsedDDLs[i] = ddl
|
||||
}
|
||||
if exec.allowBigSchemaChange {
|
||||
log.Warningf("skipped big schema check, this may cause visible MySQL downtime")
|
||||
bigSchemaChange, err := exec.detectBigSchemaChanges(ctx, parsedDDLs)
|
||||
if bigSchemaChange && exec.allowBigSchemaChange {
|
||||
log.Warning("Processing big schema change. This may cause visible MySQL downtime.")
|
||||
return nil
|
||||
}
|
||||
return exec.detectBigSchemaChanges(ctx, parsedDDLs)
|
||||
return err
|
||||
}
|
||||
|
||||
// a schema change that satisfies any following condition is considered
|
||||
// to be a big schema change and will be rejected.
|
||||
// 1. Alter more than 100,000 rows.
|
||||
// 2. Change a table with more than 2,000,000 rows (Drops are fine).
|
||||
func (exec *TabletExecutor) detectBigSchemaChanges(ctx context.Context, parsedDDLs []*sqlparser.DDL) error {
|
||||
func (exec *TabletExecutor) detectBigSchemaChanges(ctx context.Context, parsedDDLs []*sqlparser.DDL) (bool, error) {
|
||||
// exec.tabletInfos is guaranteed to have at least one element;
|
||||
// Otherwise, Open should fail and executor should fail.
|
||||
masterTabletInfo := exec.tabletInfos[0]
|
||||
|
@ -123,7 +124,7 @@ func (exec *TabletExecutor) detectBigSchemaChanges(ctx context.Context, parsedDD
|
|||
dbSchema, err := exec.tmClient.GetSchema(
|
||||
ctx, masterTabletInfo, []string{}, []string{}, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to get database schema, error: %v", err)
|
||||
return false, fmt.Errorf("unable to get database schema, error: %v", err)
|
||||
}
|
||||
tableWithCount := make(map[string]uint64, len(dbSchema.TableDefinitions))
|
||||
for _, tableSchema := range dbSchema.TableDefinitions {
|
||||
|
@ -136,16 +137,16 @@ func (exec *TabletExecutor) detectBigSchemaChanges(ctx context.Context, parsedDD
|
|||
tableName := string(ddl.Table)
|
||||
if rowCount, ok := tableWithCount[tableName]; ok {
|
||||
if rowCount > 100000 && ddl.Action == sqlparser.AlterStr {
|
||||
return fmt.Errorf(
|
||||
"big schema change, ddl: %v alters a table with more than 100 thousand rows", ddl)
|
||||
return true, fmt.Errorf(
|
||||
"big schema change detected. Disable check with -allow_long_unavailability. ddl: %v alters a table with more than 100 thousand rows", ddl)
|
||||
}
|
||||
if rowCount > 2000000 {
|
||||
return fmt.Errorf(
|
||||
"big schema change, ddl: %v changes a table with more than 2 million rows", ddl)
|
||||
return true, fmt.Errorf(
|
||||
"big schema change detected. Disable check with -allow_long_unavailability. ddl: %v changes a table with more than 2 million rows", ddl)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (exec *TabletExecutor) preflightSchemaChanges(ctx context.Context, sqls []string) error {
|
||||
|
|
|
@ -308,8 +308,8 @@ var commands = []commandGroup{
|
|||
"[-exclude_tables=''] [-include-views] <keyspace name>",
|
||||
"Validates that the master schema from shard 0 matches the schema on all of the other tablets in the keyspace."},
|
||||
{"ApplySchema", commandApplySchema,
|
||||
"[-force] {-sql=<sql> || -sql-file=<filename>} <keyspace>",
|
||||
"Applies the schema change to the specified keyspace on every master, running in parallel on all shards. The changes are then propagated to slaves via replication. If the force flag is set, then numerous checks will be ignored, so that option should be used very cautiously."},
|
||||
"[-allow_long_unavailability] {-sql=<sql> || -sql-file=<filename>} <keyspace>",
|
||||
"Applies the schema change to the specified keyspace on every master, running in parallel on all shards. The changes are then propagated to slaves via replication. If -allow_long_unavailability is set, schema changes affecting a large number of rows (and possibly incurring a longer period of unavailability) will not be rejected."},
|
||||
{"CopySchemaShard", commandCopySchemaShard,
|
||||
"[-tables=<table1>,<table2>,...] [-exclude_tables=<table1>,<table2>,...] [-include-views] {<source keyspace/shard> || <source tablet alias>} <destination keyspace/shard>",
|
||||
"Copies the schema from a source shard's master (or a specific tablet) to a destination shard. The schema is applied directly on the master of the destination shard, and it is propagated to the replicas through binlogs."},
|
||||
|
@ -1831,7 +1831,7 @@ func commandValidateSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, s
|
|||
}
|
||||
|
||||
func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
|
||||
force := subFlags.Bool("force", false, "Applies the schema even if it is big (alter too many rows)")
|
||||
allowLongUnavailability := subFlags.Bool("allow_long_unavailability", false, "Allow large schema changes which incur a longer unavailability of the database.")
|
||||
sql := subFlags.String("sql", "", "A list of semicolon-delimited SQL commands")
|
||||
sqlFile := subFlags.String("sql-file", "", "Identifies the file that contains the SQL commands")
|
||||
waitSlaveTimeout := subFlags.Duration("wait_slave_timeout", 30*time.Second, "The amount of time to wait for slaves to catch up during reparenting. The default value is 30 seconds.")
|
||||
|
@ -1847,7 +1847,7 @@ func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
scr, err := wr.ApplySchemaKeyspace(ctx, keyspace, change, *force, *waitSlaveTimeout)
|
||||
scr, err := wr.ApplySchemaKeyspace(ctx, keyspace, change, *allowLongUnavailability, *waitSlaveTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -320,14 +320,14 @@ func (wr *Wrangler) applySchemaShard(ctx context.Context, shardInfo *topo.ShardI
|
|||
// take a keyspace lock to do this.
|
||||
// first we will validate the Preflight works the same on all shard masters
|
||||
// and fail if not (unless force is specified)
|
||||
func (wr *Wrangler) ApplySchemaKeyspace(ctx context.Context, keyspace string, change string, force bool, waitSlaveTimeout time.Duration) (*tmutils.SchemaChangeResult, error) {
|
||||
func (wr *Wrangler) ApplySchemaKeyspace(ctx context.Context, keyspace, change string, allowLongUnavailability bool, waitSlaveTimeout time.Duration) (*tmutils.SchemaChangeResult, error) {
|
||||
actionNode := actionnode.ApplySchemaKeyspace(change)
|
||||
lockPath, err := wr.lockKeyspace(ctx, keyspace, actionNode)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
executor := schemamanager.NewTabletExecutor(wr.tmc, wr.ts)
|
||||
if force {
|
||||
if allowLongUnavailability {
|
||||
executor.AllowBigSchemaChange()
|
||||
}
|
||||
err = schemamanager.Run(
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
// Copyright 2016, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package testlib
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/youtube/vitess/go/sqltypes"
|
||||
"github.com/youtube/vitess/go/vt/logutil"
|
||||
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
"github.com/youtube/vitess/go/vt/vttest/fakesqldb"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
"github.com/youtube/vitess/go/vt/zktopo"
|
||||
|
||||
tabletmanagerdatapb "github.com/youtube/vitess/go/vt/proto/tabletmanagerdata"
|
||||
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
// TestApplySchema_AllowLongUnavailability is an integration test for the
|
||||
// -allow_long_unavailability flag of vtctl ApplySchema.
|
||||
// Only if the flag is specified, potentially long running schema changes are
|
||||
// allowed.
|
||||
func TestApplySchema_AllowLongUnavailability(t *testing.T) {
|
||||
cells := []string{"cell1"}
|
||||
db := fakesqldb.Register()
|
||||
ts := zktopo.NewTestServer(t, cells)
|
||||
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
|
||||
vp := NewVtctlPipe(t, ts)
|
||||
defer vp.Close()
|
||||
|
||||
if err := ts.CreateKeyspace(context.Background(), "ks", &topodatapb.Keyspace{
|
||||
ShardingColumnName: "keyspace_id",
|
||||
ShardingColumnType: topodatapb.KeyspaceIdType_UINT64,
|
||||
}); err != nil {
|
||||
t.Fatalf("CreateKeyspace failed: %v", err)
|
||||
}
|
||||
|
||||
beforeSchema := &tabletmanagerdatapb.SchemaDefinition{
|
||||
DatabaseSchema: "CREATE DATABASE `{{.DatabaseName}}` /*!40100 DEFAULT CHARACTER SET utf8 */",
|
||||
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
|
||||
{
|
||||
Name: "table1",
|
||||
Schema: "CREATE TABLE `table1` (\n `id` bigint(20) NOT NULL AUTO_INCREMENT,\n `msg` varchar(64) DEFAULT NULL,\n `keyspace_id` bigint(20) unsigned NOT NULL,\n PRIMARY KEY (`id`),\n KEY `by_msg` (`msg`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8",
|
||||
Type: tmutils.TableBaseTable,
|
||||
RowCount: 3000000,
|
||||
},
|
||||
},
|
||||
}
|
||||
afterSchema := &tabletmanagerdatapb.SchemaDefinition{
|
||||
DatabaseSchema: "CREATE DATABASE `{{.DatabaseName}}` /*!40100 DEFAULT CHARACTER SET utf8 */",
|
||||
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
|
||||
{
|
||||
Name: "table1",
|
||||
Schema: "CREATE TABLE `table1` (\n `id` bigint(20) NOT NULL AUTO_INCREMENT,\n `msg` varchar(64) DEFAULT NULL,\n `keyspace_id` bigint(20) unsigned NOT NULL,\n `id` bigint(20),\n PRIMARY KEY (`id`),\n KEY `by_msg` (`msg`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8",
|
||||
Type: tmutils.TableBaseTable,
|
||||
RowCount: 3000000,
|
||||
},
|
||||
},
|
||||
}
|
||||
preflightSchemaChange := &tmutils.SchemaChangeResult{
|
||||
BeforeSchema: beforeSchema,
|
||||
AfterSchema: afterSchema,
|
||||
}
|
||||
|
||||
tShard1 := NewFakeTablet(t, wr, cells[0], 0,
|
||||
topodatapb.TabletType_MASTER, db, TabletKeyspaceShard(t, "ks", "-80"))
|
||||
tShard2 := NewFakeTablet(t, wr, cells[0], 1,
|
||||
topodatapb.TabletType_MASTER, db, TabletKeyspaceShard(t, "ks", "80-"))
|
||||
for _, ft := range []*FakeTablet{tShard1, tShard2} {
|
||||
ft.StartActionLoop(t, wr)
|
||||
defer ft.StopActionLoop(t)
|
||||
|
||||
ft.FakeMysqlDaemon.Schema = beforeSchema
|
||||
ft.FakeMysqlDaemon.PreflightSchemaChangeResult = preflightSchemaChange
|
||||
}
|
||||
|
||||
changeToDb := "USE vt_ks"
|
||||
addColumn := "ALTER TABLE table1 ADD COLUMN new_id bigint(20)"
|
||||
db.AddQuery(changeToDb, &sqltypes.Result{})
|
||||
db.AddQuery(addColumn, &sqltypes.Result{})
|
||||
|
||||
// First ApplySchema fails because the table is very big and -allow_long_unavailability is missing.
|
||||
if err := vp.Run([]string{"ApplySchema", "-sql", addColumn, "ks"}); err == nil {
|
||||
t.Fatal("ApplySchema should have failed but did not.")
|
||||
} else if !strings.Contains(err.Error(), "big schema change detected") {
|
||||
t.Fatalf("ApplySchema failed with wrong error. got: %v", err)
|
||||
}
|
||||
|
||||
// Second ApplySchema succeeds because -allow_long_unavailability is set.
|
||||
if err := vp.Run([]string{"ApplySchema", "-allow_long_unavailability", "-sql", addColumn, "ks"}); err != nil {
|
||||
t.Fatalf("ApplySchema failed: %v", err)
|
||||
}
|
||||
if count := db.GetQueryCalledNum(changeToDb); count != 2 {
|
||||
t.Fatalf("ApplySchema: unexpected call count. Query: %v got: %v want: %v", changeToDb, count, 2)
|
||||
}
|
||||
if count := db.GetQueryCalledNum(addColumn); count != 2 {
|
||||
t.Fatalf("ApplySchema: unexpected call count. Query: %v got: %v want: %v", addColumn, count, 2)
|
||||
}
|
||||
}
|
Загрузка…
Ссылка в новой задаче