зеркало из https://github.com/github/vitess-gh.git
change wrangler.ApplySchemaKeyspace to use schemamanager
This commit is contained in:
Родитель
6a2257ed51
Коммит
8d5f43bb1c
|
@ -18,6 +18,7 @@ import (
|
|||
log "github.com/golang/glog"
|
||||
"github.com/youtube/vitess/go/vt/concurrency"
|
||||
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
|
||||
"github.com/youtube/vitess/go/vt/schemamanager"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/topotools/events"
|
||||
|
@ -407,106 +408,13 @@ func (wr *Wrangler) ApplySchemaKeyspace(ctx context.Context, keyspace string, ch
|
|||
return nil, err
|
||||
}
|
||||
|
||||
scr, err := wr.applySchemaKeyspace(ctx, keyspace, change, simple, force, waitSlaveTimeout)
|
||||
return scr, wr.unlockKeyspace(ctx, keyspace, actionNode, lockPath, err)
|
||||
}
|
||||
err = schemamanager.Run(
|
||||
schemamanager.NewSimpleDataSourcer(change),
|
||||
schemamanager.NewTabletExecutor(wr.tmc, wr.ts, keyspace),
|
||||
schemamanager.NewConsoleEventHandler(),
|
||||
)
|
||||
|
||||
func (wr *Wrangler) applySchemaKeyspace(ctx context.Context, keyspace string, change string, simple, force bool, waitSlaveTimeout time.Duration) (*myproto.SchemaChangeResult, error) {
|
||||
shards, err := wr.ts.GetShardNames(keyspace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// corner cases
|
||||
if len(shards) == 0 {
|
||||
return nil, fmt.Errorf("No shards in keyspace %v", keyspace)
|
||||
}
|
||||
if len(shards) == 1 {
|
||||
log.Infof("Only one shard in keyspace %v, using ApplySchemaShard", keyspace)
|
||||
return wr.ApplySchemaShard(ctx, keyspace, shards[0], change, topo.TabletAlias{}, simple, force, waitSlaveTimeout)
|
||||
}
|
||||
|
||||
// Get schema on all shard masters in parallel
|
||||
log.Infof("Getting schema on all shards")
|
||||
beforeSchemas := make([]*myproto.SchemaDefinition, len(shards))
|
||||
shardInfos := make([]*topo.ShardInfo, len(shards))
|
||||
wg := sync.WaitGroup{}
|
||||
mu := sync.Mutex{}
|
||||
getErrs := make([]string, 0, 5)
|
||||
for i, shard := range shards {
|
||||
wg.Add(1)
|
||||
go func(i int, shard string) {
|
||||
var err error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
mu.Lock()
|
||||
getErrs = append(getErrs, err.Error())
|
||||
mu.Unlock()
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
shardInfos[i], err = wr.ts.GetShard(keyspace, shard)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
beforeSchemas[i], err = wr.GetSchema(ctx, shardInfos[i].MasterAlias, nil, nil, false)
|
||||
}(i, shard)
|
||||
}
|
||||
wg.Wait()
|
||||
if len(getErrs) > 0 {
|
||||
return nil, fmt.Errorf("Error(s) getting schema: %v", strings.Join(getErrs, ", "))
|
||||
}
|
||||
|
||||
// check they all match, or use the force flag
|
||||
log.Infof("Checking starting schemas match on all shards")
|
||||
for i, beforeSchema := range beforeSchemas {
|
||||
if i == 0 {
|
||||
continue
|
||||
}
|
||||
diffs := myproto.DiffSchemaToArray("shard 0", beforeSchemas[0], fmt.Sprintf("shard %v", i), beforeSchema)
|
||||
if len(diffs) > 0 {
|
||||
if force {
|
||||
log.Warningf("Shard %v has inconsistent schema, ignoring: %v", i, strings.Join(diffs, "\n"))
|
||||
} else {
|
||||
return nil, fmt.Errorf("Shard %v has inconsistent schema: %v", i, strings.Join(diffs, "\n"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// preflight on shard 0 master, to get baseline
|
||||
// this assumes shard 0 master doesn't have the schema upgrade applied
|
||||
// if it does, we'll have to fix the slaves and other shards manually.
|
||||
log.Infof("Running Preflight on Shard 0 Master")
|
||||
preflight, err := wr.PreflightSchema(ctx, shardInfos[0].MasterAlias, change)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// for each shard, apply the change
|
||||
log.Infof("Applying change on all shards")
|
||||
var applyErr error
|
||||
for i, shard := range shards {
|
||||
wg.Add(1)
|
||||
go func(i int, shard string) {
|
||||
defer wg.Done()
|
||||
|
||||
_, err := wr.lockAndApplySchemaShard(ctx, shardInfos[i], preflight, keyspace, shard, shardInfos[i].MasterAlias, change, topo.TabletAlias{}, simple, force, waitSlaveTimeout)
|
||||
if err != nil {
|
||||
mu.Lock()
|
||||
applyErr = err
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
}(i, shard)
|
||||
}
|
||||
wg.Wait()
|
||||
if applyErr != nil {
|
||||
return nil, applyErr
|
||||
}
|
||||
|
||||
return &myproto.SchemaChangeResult{BeforeSchema: preflight.BeforeSchema, AfterSchema: preflight.AfterSchema}, nil
|
||||
return nil, wr.unlockKeyspace(ctx, keyspace, actionNode, lockPath, err)
|
||||
}
|
||||
|
||||
// CopySchemaShard copies the schema from a source tablet to the
|
||||
|
|
Загрузка…
Ссылка в новой задаче