зеркало из https://github.com/github/vitess-gh.git
Support big schema change in schema manager.
There is currently no big schema change support in open source; therefore, it is better to allow schema manager to do big schema changes without enfocing 100,000 rows limit. Add flag "-force" for vtctl command "ApplySchema" so it accepts big schema changes.
This commit is contained in:
Родитель
fd4514bd2b
Коммит
ac28e233e8
|
@ -19,11 +19,12 @@ import (
|
|||
|
||||
// TabletExecutor applies schema changes to all tablets.
|
||||
type TabletExecutor struct {
|
||||
tmClient tmclient.TabletManagerClient
|
||||
topoServer topo.Server
|
||||
tabletInfos []*topo.TabletInfo
|
||||
schemaDiffs []*tmutils.SchemaChangeResult
|
||||
isClosed bool
|
||||
tmClient tmclient.TabletManagerClient
|
||||
topoServer topo.Server
|
||||
tabletInfos []*topo.TabletInfo
|
||||
schemaDiffs []*tmutils.SchemaChangeResult
|
||||
isClosed bool
|
||||
allowBigSchemaChange bool
|
||||
}
|
||||
|
||||
// NewTabletExecutor creates a new TabletExecutor instance
|
||||
|
@ -31,12 +32,25 @@ func NewTabletExecutor(
|
|||
tmClient tmclient.TabletManagerClient,
|
||||
topoServer topo.Server) *TabletExecutor {
|
||||
return &TabletExecutor{
|
||||
tmClient: tmClient,
|
||||
topoServer: topoServer,
|
||||
isClosed: true,
|
||||
tmClient: tmClient,
|
||||
topoServer: topoServer,
|
||||
isClosed: true,
|
||||
allowBigSchemaChange: false,
|
||||
}
|
||||
}
|
||||
|
||||
// AllowBigSchemaChange skips big schema changes check and tablet executor will
|
||||
// send all schema changes to all VTTablets even for big changes.
|
||||
func (exec *TabletExecutor) AllowBigSchemaChange() {
|
||||
exec.allowBigSchemaChange = true
|
||||
}
|
||||
|
||||
// DisallowBigSchemaChange enforce big schema change check and will reject
|
||||
// certain schema changes.
|
||||
func (exec *TabletExecutor) DisallowBigSchemaChange() {
|
||||
exec.allowBigSchemaChange = false
|
||||
}
|
||||
|
||||
// Open opens a connection to the master for every shard
|
||||
func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error {
|
||||
if !exec.isClosed {
|
||||
|
@ -90,6 +104,10 @@ func (exec *TabletExecutor) Validate(ctx context.Context, sqls []string) error {
|
|||
}
|
||||
parsedDDLs[i] = ddl
|
||||
}
|
||||
if exec.allowBigSchemaChange {
|
||||
log.Warningf("skipped big schema check, this may cause visible MySQL downtime")
|
||||
return nil
|
||||
}
|
||||
return exec.detectBigSchemaChanges(ctx, parsedDDLs)
|
||||
}
|
||||
|
||||
|
|
|
@ -115,6 +115,21 @@ func TestTabletExecutorValidate(t *testing.T) {
|
|||
}); err != nil {
|
||||
t.Fatalf("executor.Validate should succeed, drop a table with more than 2,000,000 rows is allowed")
|
||||
}
|
||||
|
||||
executor.AllowBigSchemaChange()
|
||||
// alter a table with more than 100,000 rows
|
||||
if err := executor.Validate(ctx, []string{
|
||||
"ALTER TABLE test_table_03 ADD COLUMN new_id bigint(20)",
|
||||
}); err != nil {
|
||||
t.Fatalf("executor.Validate should succeed, big schema change is disabled")
|
||||
}
|
||||
|
||||
executor.DisallowBigSchemaChange()
|
||||
if err := executor.Validate(ctx, []string{
|
||||
"ALTER TABLE test_table_03 ADD COLUMN new_id bigint(20)",
|
||||
}); err == nil {
|
||||
t.Fatalf("executor.Validate should fail, alter a table more than 100,000 rows")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTabletExecutorExecute(t *testing.T) {
|
||||
|
|
|
@ -1831,7 +1831,7 @@ func commandValidateSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, s
|
|||
}
|
||||
|
||||
func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
|
||||
force := subFlags.Bool("force", false, "Applies the schema even if the preflight schema doesn't match")
|
||||
force := subFlags.Bool("force", false, "Applies the schema even if it is big (alter too many rows)")
|
||||
sql := subFlags.String("sql", "", "A list of semicolon-delimited SQL commands")
|
||||
sqlFile := subFlags.String("sql-file", "", "Identifies the file that contains the SQL commands")
|
||||
waitSlaveTimeout := subFlags.Duration("wait_slave_timeout", 30*time.Second, "The amount of time to wait for slaves to catch up during reparenting. The default value is 30 seconds.")
|
||||
|
|
|
@ -326,11 +326,14 @@ func (wr *Wrangler) ApplySchemaKeyspace(ctx context.Context, keyspace string, ch
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
executor := schemamanager.NewTabletExecutor(wr.tmc, wr.ts)
|
||||
if force {
|
||||
executor.AllowBigSchemaChange()
|
||||
}
|
||||
err = schemamanager.Run(
|
||||
ctx,
|
||||
schemamanager.NewPlainController(change, keyspace),
|
||||
schemamanager.NewTabletExecutor(wr.tmc, wr.ts),
|
||||
executor,
|
||||
)
|
||||
|
||||
return nil, wr.unlockKeyspace(ctx, keyspace, actionNode, lockPath, err)
|
||||
|
|
Загрузка…
Ссылка в новой задаче