зеркало из https://github.com/github/vitess-gh.git
Add ExternalSource to BinlogSource. Conditional code in MoveTables for Migrate. Migrate vtctl command.
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
This commit is contained in:
Родитель
f2980c36fe
Коммит
706a372178
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
|
@ -333,3 +333,20 @@ func (ts *Server) clearCellAliasesCache() {
|
|||
defer cellsAliases.mu.Unlock()
|
||||
cellsAliases.cellsToAliases = make(map[string]string)
|
||||
}
|
||||
|
||||
// OpenExternalVitessClusterServer returns the topo server of the external cluster
|
||||
func (ts *Server) OpenExternalVitessClusterServer(ctx context.Context, clusterName string) (*Server, error) {
|
||||
vc, err := ts.GetVitessCluster(ctx, clusterName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var externalTopo *Server
|
||||
externalTopo, err = OpenServer(vc.TopoConfig.TopoType, vc.TopoConfig.Server, vc.TopoConfig.Root)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if externalTopo == nil {
|
||||
return nil, fmt.Errorf("unable to open external topo for config %s", clusterName)
|
||||
}
|
||||
return externalTopo, nil
|
||||
}
|
||||
|
|
|
@ -308,6 +308,9 @@ var commands = []commandGroup{
|
|||
{"MoveTables", commandMoveTables,
|
||||
"[-cells=<cells>] [-tablet_types=<source_tablet_types>] -workflow=<workflow> <source_keyspace> <target_keyspace> <table_specs>",
|
||||
`Move table(s) to another keyspace, table_specs is a list of tables or the tables section of the vschema for the target keyspace. Example: '{"t1":{"column_vindexes": [{"column": "id1", "name": "hash"}]}, "t2":{"column_vindexes": [{"column": "id2", "name": "hash"}]}}'. In the case of an unsharded target keyspace the vschema for each table may be empty. Example: '{"t1":{}, "t2":{}}'.`},
|
||||
{"Migrate", commandMigrate,
|
||||
"[-cells=<cells>] [-tablet_types=<source_tablet_types>] -workflow=<workflow> <source_keyspace> <target_keyspace> <table_specs>",
|
||||
`Move table(s) to another keyspace, table_specs is a list of tables or the tables section of the vschema for the target keyspace. Example: '{"t1":{"column_vindexes": [{"column": "id1", "name": "hash"}]}, "t2":{"column_vindexes": [{"column": "id2", "name": "hash"}]}}'. In the case of an unsharded target keyspace the vschema for each table may be empty. Example: '{"t1":{}, "t2":{}}'.`},
|
||||
{"DropSources", commandDropSources,
|
||||
"[-dry_run] [-rename_tables] <keyspace.workflow>",
|
||||
"After a MoveTables or Resharding workflow cleanup unused artifacts like source tables, source shards and blacklists"},
|
||||
|
@ -1945,7 +1948,7 @@ func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
|
|||
target := subFlags.Arg(1)
|
||||
tableSpecs := subFlags.Arg(2)
|
||||
return wr.MoveTables(ctx, *workflow, source, target, tableSpecs, *cells, *tabletTypes, *allTables,
|
||||
*excludes, *autoStart, *stopAfterCopy)
|
||||
*excludes, *autoStart, *stopAfterCopy, "")
|
||||
}
|
||||
|
||||
// VReplicationWorkflowAction defines subcommands passed to vtctl for movetables or reshard
|
||||
|
@ -1962,6 +1965,12 @@ const (
|
|||
vReplicationWorkflowActionGetState = "getstate"
|
||||
)
|
||||
|
||||
func commandMigrate(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
|
||||
return commandVRWorkflow(ctx, wr, subFlags, args, wrangler.MigrateWorkflow)
|
||||
}
|
||||
|
||||
// commandVRWorkflow is the common entry point for MoveTables/Reshard/Migrate workflows
|
||||
// FIXME: this needs a refactor. Also validations for params need to be done per workflow type
|
||||
func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string,
|
||||
workflowType wrangler.VReplicationWorkflowType) error {
|
||||
|
||||
|
@ -1975,14 +1984,16 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
|
|||
autoStart := subFlags.Bool("auto_start", true, "If false, streams will start in the Stopped state and will need to be explicitly started")
|
||||
stopAfterCopy := subFlags.Bool("stop_after_copy", false, "Streams will be stopped once the copy phase is completed")
|
||||
|
||||
// MoveTables-only params
|
||||
sourceKeyspace := subFlags.String("source", "", "Source keyspace")
|
||||
// MoveTables and Migrate params
|
||||
tables := subFlags.String("tables", "", "A table spec or a list of tables")
|
||||
allTables := subFlags.Bool("all", false, "Move all tables from the source keyspace")
|
||||
excludes := subFlags.String("exclude", "", "Tables to exclude (comma-separated) if -all is specified")
|
||||
sourceKeyspace := subFlags.String("source", "", "Source keyspace")
|
||||
|
||||
// MoveTables-only params
|
||||
renameTables := subFlags.Bool("rename_tables", false, "Rename tables instead of dropping them")
|
||||
|
||||
// Reshard-only params
|
||||
// Reshard params
|
||||
sourceShards := subFlags.String("source_shards", "", "Source shards")
|
||||
targetShards := subFlags.String("target_shards", "", "Target shards")
|
||||
skipSchemaCopy := subFlags.Bool("skip_schema_copy", false, "Skip copying of schema to target shards")
|
||||
|
@ -2057,14 +2068,40 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
|
|||
//TODO: check if invalid parameters were passed in that do not apply to this action
|
||||
originalAction := action
|
||||
action = strings.ToLower(action) // allow users to input action in a case-insensitive manner
|
||||
if workflowType == wrangler.MigrateWorkflow {
|
||||
switch action {
|
||||
case vReplicationWorkflowActionCreate, vReplicationWorkflowActionCancel, vReplicationWorkflowActionComplete:
|
||||
default:
|
||||
return fmt.Errorf("invalid action for Migrate: %s", action)
|
||||
}
|
||||
}
|
||||
|
||||
switch action {
|
||||
case vReplicationWorkflowActionCreate:
|
||||
switch workflowType {
|
||||
case wrangler.MoveTablesWorkflow:
|
||||
case wrangler.MoveTablesWorkflow, wrangler.MigrateWorkflow:
|
||||
var sourceTopo *topo.Server
|
||||
var externalClusterName string
|
||||
|
||||
sourceTopo = wr.TopoServer()
|
||||
if *sourceKeyspace == "" {
|
||||
return fmt.Errorf("source keyspace is not specified")
|
||||
}
|
||||
_, err := wr.TopoServer().GetKeyspace(ctx, *sourceKeyspace)
|
||||
if workflowType == wrangler.MigrateWorkflow {
|
||||
splits := strings.Split(*sourceKeyspace, ".")
|
||||
if len(splits) != 2 {
|
||||
return fmt.Errorf("invalid format for external source cluster: %s", *sourceKeyspace)
|
||||
}
|
||||
externalClusterName = splits[0]
|
||||
*sourceKeyspace = splits[1]
|
||||
|
||||
sourceTopo, err = sourceTopo.OpenExternalVitessClusterServer(ctx, externalClusterName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
_, err := sourceTopo.GetKeyspace(ctx, *sourceKeyspace)
|
||||
if err != nil {
|
||||
wr.Logger().Errorf("keyspace %s not found", *sourceKeyspace)
|
||||
return err
|
||||
|
@ -2077,7 +2114,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
|
|||
vrwp.AllTables = *allTables
|
||||
vrwp.ExcludeTables = *excludes
|
||||
vrwp.Timeout = *timeout
|
||||
workflowType = wrangler.MoveTablesWorkflow
|
||||
vrwp.ExternalCluster = externalClusterName
|
||||
case wrangler.ReshardWorkflow:
|
||||
if *sourceShards == "" || *targetShards == "" {
|
||||
return fmt.Errorf("source and target shards are not specified")
|
||||
|
@ -2086,8 +2123,6 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
|
|||
vrwp.TargetShards = strings.Split(*targetShards, ",")
|
||||
vrwp.SkipSchemaCopy = *skipSchemaCopy
|
||||
vrwp.SourceKeyspace = target
|
||||
workflowType = wrangler.ReshardWorkflow
|
||||
log.Infof("params are %s, %s, %+v", *sourceShards, *targetShards, vrwp)
|
||||
default:
|
||||
return fmt.Errorf("unknown workflow type passed: %v", workflowType)
|
||||
}
|
||||
|
@ -2108,12 +2143,13 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
|
|||
case wrangler.MoveTablesWorkflow:
|
||||
vrwp.RenameTables = *renameTables
|
||||
case wrangler.ReshardWorkflow:
|
||||
case wrangler.MigrateWorkflow:
|
||||
default:
|
||||
return fmt.Errorf("unknown workflow type passed: %v", workflowType)
|
||||
}
|
||||
vrwp.KeepData = *keepData
|
||||
}
|
||||
|
||||
vrwp.WorkflowType = workflowType
|
||||
wf, err := wr.NewVReplicationWorkflow(ctx, workflowType, vrwp)
|
||||
if err != nil {
|
||||
log.Warningf("NewVReplicationWorkflow returned error %+v", wf)
|
||||
|
|
|
@ -118,7 +118,15 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
|
|||
}
|
||||
log.Infof("creating tablet picker for source keyspace/shard %v/%v with cell: %v and tabletTypes: %v", ct.source.Keyspace, ct.source.Shard, cell, tabletTypesStr)
|
||||
cells := strings.Split(cell, ",")
|
||||
tp, err := discovery.NewTabletPicker(ts, cells, ct.source.Keyspace, ct.source.Shard, tabletTypesStr)
|
||||
|
||||
sourceTopo := ts
|
||||
if ct.source.ExternalCluster != "" {
|
||||
sourceTopo, err = sourceTopo.OpenExternalVitessClusterServer(ctx, ct.source.ExternalCluster)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
tp, err := discovery.NewTabletPicker(sourceTopo, cells, ct.source.Keyspace, ct.source.Shard, tabletTypesStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -64,11 +64,21 @@ const (
|
|||
|
||||
// MoveTables initiates moving table(s) over to another keyspace
|
||||
func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, targetKeyspace, tableSpecs,
|
||||
cell, tabletTypes string, allTables bool, excludeTables string, autoStart, stopAfterCopy bool) error {
|
||||
cell, tabletTypes string, allTables bool, excludeTables string, autoStart, stopAfterCopy bool,
|
||||
externalCluster string) error {
|
||||
//FIXME validate tableSpecs, allTables, excludeTables
|
||||
var tables []string
|
||||
var externalTopo *topo.Server
|
||||
var err error
|
||||
|
||||
if externalCluster != "" {
|
||||
externalTopo, err = wr.ts.OpenExternalVitessClusterServer(ctx, externalCluster)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
wr.sourceTs = externalTopo
|
||||
log.Infof("Successfully opened external topo: %+v", externalTopo)
|
||||
}
|
||||
var vschema *vschemapb.Keyspace
|
||||
vschema, err = wr.ts.GetVSchema(ctx, targetKeyspace)
|
||||
if err != nil {
|
||||
|
@ -97,7 +107,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
|
|||
if len(strings.TrimSpace(tableSpecs)) > 0 {
|
||||
tables = strings.Split(tableSpecs, ",")
|
||||
}
|
||||
ksTables, err := wr.getKeyspaceTables(ctx, sourceKeyspace)
|
||||
ksTables, err := wr.getKeyspaceTables(ctx, sourceKeyspace, wr.sourceTs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -148,33 +158,34 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Save routing rules before vschema. If we save vschema first, and routing rules
|
||||
// fails to save, we may generate duplicate table errors.
|
||||
rules, err := wr.getRoutingRules(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, table := range tables {
|
||||
toSource := []string{sourceKeyspace + "." + table}
|
||||
rules[table] = toSource
|
||||
rules[table+"@replica"] = toSource
|
||||
rules[table+"@rdonly"] = toSource
|
||||
rules[targetKeyspace+"."+table] = toSource
|
||||
rules[targetKeyspace+"."+table+"@replica"] = toSource
|
||||
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
|
||||
rules[targetKeyspace+"."+table] = toSource
|
||||
rules[sourceKeyspace+"."+table+"@replica"] = toSource
|
||||
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
|
||||
}
|
||||
if err := wr.saveRoutingRules(ctx, rules); err != nil {
|
||||
return err
|
||||
}
|
||||
if vschema != nil {
|
||||
// We added to the vschema.
|
||||
if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil {
|
||||
if externalTopo != nil {
|
||||
// Save routing rules before vschema. If we save vschema first, and routing rules
|
||||
// fails to save, we may generate duplicate table errors.
|
||||
rules, err := wr.getRoutingRules(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, table := range tables {
|
||||
toSource := []string{sourceKeyspace + "." + table}
|
||||
rules[table] = toSource
|
||||
rules[table+"@replica"] = toSource
|
||||
rules[table+"@rdonly"] = toSource
|
||||
rules[targetKeyspace+"."+table] = toSource
|
||||
rules[targetKeyspace+"."+table+"@replica"] = toSource
|
||||
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
|
||||
rules[targetKeyspace+"."+table] = toSource
|
||||
rules[sourceKeyspace+"."+table+"@replica"] = toSource
|
||||
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
|
||||
}
|
||||
if err := wr.saveRoutingRules(ctx, rules); err != nil {
|
||||
return err
|
||||
}
|
||||
if vschema != nil {
|
||||
// We added to the vschema.
|
||||
if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := wr.ts.RebuildSrvVSchema(ctx, nil); err != nil {
|
||||
return err
|
||||
|
@ -252,8 +263,8 @@ func (wr *Wrangler) validateSourceTablesExist(ctx context.Context, sourceKeyspac
|
|||
return nil
|
||||
}
|
||||
|
||||
func (wr *Wrangler) getKeyspaceTables(ctx context.Context, ks string) ([]string, error) {
|
||||
shards, err := wr.ts.GetServingShards(ctx, ks)
|
||||
func (wr *Wrangler) getKeyspaceTables(ctx context.Context, ks string, ts *topo.Server) ([]string, error) {
|
||||
shards, err := ts.GetServingShards(ctx, ks)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -266,7 +277,11 @@ func (wr *Wrangler) getKeyspaceTables(ctx context.Context, ks string) ([]string,
|
|||
}
|
||||
allTables := []string{"/.*/"}
|
||||
|
||||
schema, err := wr.GetSchema(ctx, master, allTables, nil, false)
|
||||
ti, err := ts.GetTablet(ctx, master)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
schema, err := wr.tmc.GetSchema(ctx, ti.Tablet, allTables, nil, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -823,7 +838,7 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater
|
|||
}
|
||||
}
|
||||
|
||||
sourceShards, err := wr.ts.GetServingShards(ctx, ms.SourceKeyspace)
|
||||
sourceShards, err := wr.sourceTs.GetServingShards(ctx, ms.SourceKeyspace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -849,8 +864,11 @@ func (mz *materializer) getSourceTableDDLs(ctx context.Context) (map[string]stri
|
|||
return nil, fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName())
|
||||
}
|
||||
|
||||
var err error
|
||||
sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, allTables, nil, false)
|
||||
ti, err := mz.wr.sourceTs.GetTablet(ctx, sourceMaster)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sourceSchema, err := mz.wr.tmc.GetSchema(ctx, ti.Tablet, allTables, nil, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -987,10 +1005,11 @@ func (mz *materializer) generateInserts(ctx context.Context) (string, error) {
|
|||
|
||||
for _, source := range mz.sourceShards {
|
||||
bls := &binlogdatapb.BinlogSource{
|
||||
Keyspace: mz.ms.SourceKeyspace,
|
||||
Shard: source.ShardName(),
|
||||
Filter: &binlogdatapb.Filter{},
|
||||
StopAfterCopy: mz.ms.StopAfterCopy,
|
||||
Keyspace: mz.ms.SourceKeyspace,
|
||||
Shard: source.ShardName(),
|
||||
Filter: &binlogdatapb.Filter{},
|
||||
StopAfterCopy: mz.ms.StopAfterCopy,
|
||||
ExternalCluster: mz.ms.ExternalCluster,
|
||||
}
|
||||
for _, ts := range mz.ms.TableSettings {
|
||||
rule := &binlogdatapb.Rule{
|
||||
|
|
|
@ -62,7 +62,7 @@ func TestMigrateTables(t *testing.T) {
|
|||
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})
|
||||
|
||||
ctx := context.Background()
|
||||
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false)
|
||||
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "")
|
||||
require.NoError(t, err)
|
||||
vschema, err := env.wr.ts.GetSrvVSchema(ctx, env.cell)
|
||||
require.NoError(t, err)
|
||||
|
@ -103,11 +103,11 @@ func TestMissingTables(t *testing.T) {
|
|||
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})
|
||||
|
||||
ctx := context.Background()
|
||||
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt", "", "", false, "", true, false)
|
||||
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt", "", "", false, "", true, false, "")
|
||||
require.EqualError(t, err, "table(s) not found in source keyspace sourceks: tyt")
|
||||
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt,t2,txt", "", "", false, "", true, false)
|
||||
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt,t2,txt", "", "", false, "", true, false, "")
|
||||
require.EqualError(t, err, "table(s) not found in source keyspace sourceks: tyt,txt")
|
||||
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false)
|
||||
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -163,7 +163,7 @@ func TestMoveTablesAllAndExclude(t *testing.T) {
|
|||
env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{})
|
||||
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
|
||||
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})
|
||||
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "", "", "", tcase.allTables, tcase.excludeTables, true, false)
|
||||
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "", "", "", tcase.allTables, tcase.excludeTables, true, false, "")
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, tcase.want, targetTables(env))
|
||||
})
|
||||
|
@ -197,7 +197,7 @@ func TestMoveTablesStopFlags(t *testing.T) {
|
|||
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
|
||||
// -auto_start=false is tested by NOT expecting the update query which sets state to RUNNING
|
||||
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "",
|
||||
"", false, "", false, true)
|
||||
"", false, "", false, true, "")
|
||||
require.NoError(t, err)
|
||||
env.tmc.verifyQueries(t)
|
||||
})
|
||||
|
@ -223,7 +223,7 @@ func TestMigrateVSchema(t *testing.T) {
|
|||
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})
|
||||
|
||||
ctx := context.Background()
|
||||
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", `{"t1":{}}`, "", "", false, "", true, false)
|
||||
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", `{"t1":{}}`, "", "", false, "", true, false, "")
|
||||
require.NoError(t, err)
|
||||
vschema, err := env.wr.ts.GetSrvVSchema(ctx, env.cell)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -26,6 +26,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"vitess.io/vitess/go/json2"
|
||||
|
||||
"vitess.io/vitess/go/vt/topotools"
|
||||
|
||||
"vitess.io/vitess/go/vt/vtgate/evalengine"
|
||||
|
@ -1759,3 +1761,43 @@ func reverseName(workflow string) string {
|
|||
}
|
||||
return workflow + reverse
|
||||
}
|
||||
|
||||
func (ts *trafficSwitcher) addParticipatingTablesToKeyspace(ctx context.Context, keyspace, tableSpecs string) error {
|
||||
var err error
|
||||
var vschema *vschemapb.Keyspace
|
||||
vschema, err = ts.wr.ts.GetVSchema(ctx, keyspace)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if vschema == nil {
|
||||
return fmt.Errorf("no vschema found for keyspace %s", keyspace)
|
||||
}
|
||||
if strings.HasPrefix(tableSpecs, "{") {
|
||||
if vschema.Tables == nil {
|
||||
vschema.Tables = make(map[string]*vschemapb.Table)
|
||||
}
|
||||
wrap := fmt.Sprintf(`{"tables": %s}`, tableSpecs)
|
||||
ks := &vschemapb.Keyspace{}
|
||||
if err := json2.Unmarshal([]byte(wrap), ks); err != nil {
|
||||
return err
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for table, vtab := range ks.Tables {
|
||||
vschema.Tables[table] = vtab
|
||||
}
|
||||
} else {
|
||||
if !vschema.Sharded {
|
||||
if vschema.Tables == nil {
|
||||
vschema.Tables = make(map[string]*vschemapb.Table)
|
||||
}
|
||||
for _, table := range ts.tables {
|
||||
vschema.Tables[table] = &vschemapb.Table{}
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("no sharded vschema was provided, so you will need to update the vschema of the target manually for the moved tables")
|
||||
}
|
||||
}
|
||||
return ts.wr.ts.SaveVSchema(ctx, keyspace, vschema)
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ type VReplicationWorkflowType int
|
|||
const (
|
||||
MoveTablesWorkflow = VReplicationWorkflowType(iota)
|
||||
ReshardWorkflow
|
||||
MigrateWorkflow
|
||||
)
|
||||
|
||||
// Workflow state display strings
|
||||
|
@ -54,6 +55,7 @@ func (vrw *VReplicationWorkflow) String() string {
|
|||
|
||||
// VReplicationWorkflowParams stores args and options passed to a VReplicationWorkflow command
|
||||
type VReplicationWorkflowParams struct {
|
||||
WorkflowType VReplicationWorkflowType
|
||||
Workflow, TargetKeyspace string
|
||||
Cells, TabletTypes, ExcludeTables string
|
||||
EnableReverseReplication, DryRun bool
|
||||
|
@ -69,6 +71,9 @@ type VReplicationWorkflowParams struct {
|
|||
SourceShards, TargetShards []string
|
||||
SkipSchemaCopy bool
|
||||
AutoStart, StopAfterCopy bool
|
||||
|
||||
// Migrate specific
|
||||
ExternalCluster string
|
||||
}
|
||||
|
||||
// NewVReplicationWorkflow sets up a MoveTables or Reshard workflow based on options provided, deduces the state of the
|
||||
|
@ -225,7 +230,7 @@ func (vrw *VReplicationWorkflow) GetStreamCount() (int64, int64, []*WorkflowErro
|
|||
return totalStreams, runningStreams, workflowErrors, nil
|
||||
}
|
||||
|
||||
// SwitchTraffic switches traffic forward for tablet_types passed
|
||||
// SwitchTraffic switches traffic in the direction passed for specified tablet_types
|
||||
func (vrw *VReplicationWorkflow) SwitchTraffic(direction TrafficSwitchDirection) (*[]string, error) {
|
||||
var dryRunResults []string
|
||||
var rdDryRunResults, wrDryRunResults *[]string
|
||||
|
@ -362,7 +367,8 @@ func (vrw *VReplicationWorkflow) parseTabletTypes() (hasReplica, hasRdonly, hasM
|
|||
func (vrw *VReplicationWorkflow) initMoveTables() error {
|
||||
log.Infof("In VReplicationWorkflow.initMoveTables() for %+v", vrw)
|
||||
return vrw.wr.MoveTables(vrw.ctx, vrw.params.Workflow, vrw.params.SourceKeyspace, vrw.params.TargetKeyspace,
|
||||
vrw.params.Tables, vrw.params.Cells, vrw.params.TabletTypes, vrw.params.AllTables, vrw.params.ExcludeTables, vrw.params.AutoStart, vrw.params.StopAfterCopy)
|
||||
vrw.params.Tables, vrw.params.Cells, vrw.params.TabletTypes, vrw.params.AllTables, vrw.params.ExcludeTables,
|
||||
vrw.params.AutoStart, vrw.params.StopAfterCopy, vrw.params.ExternalCluster)
|
||||
}
|
||||
|
||||
func (vrw *VReplicationWorkflow) initReshard() error {
|
||||
|
|
|
@ -42,19 +42,21 @@ var (
|
|||
// Multiple go routines can use the same Wrangler at the same time,
|
||||
// provided they want to share the same logger / topo server / lock timeout.
|
||||
type Wrangler struct {
|
||||
logger logutil.Logger
|
||||
ts *topo.Server
|
||||
tmc tmclient.TabletManagerClient
|
||||
vtctld vtctlservicepb.VtctldServer
|
||||
logger logutil.Logger
|
||||
ts *topo.Server
|
||||
tmc tmclient.TabletManagerClient
|
||||
vtctld vtctlservicepb.VtctldServer
|
||||
sourceTs *topo.Server
|
||||
}
|
||||
|
||||
// New creates a new Wrangler object.
|
||||
func New(logger logutil.Logger, ts *topo.Server, tmc tmclient.TabletManagerClient) *Wrangler {
|
||||
return &Wrangler{
|
||||
logger: logger,
|
||||
ts: ts,
|
||||
tmc: tmc,
|
||||
vtctld: grpcvtctldserver.NewVtctldServer(ts),
|
||||
logger: logger,
|
||||
ts: ts,
|
||||
tmc: tmc,
|
||||
vtctld: grpcvtctldserver.NewVtctldServer(ts),
|
||||
sourceTs: ts,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -200,6 +200,10 @@ message BinlogSource {
|
|||
// StopAfterCopy specifies if vreplication should be stopped
|
||||
// after copying is done.
|
||||
bool stop_after_copy = 9;
|
||||
|
||||
// ExternalCluster is the name of the mounted cluster which has the source keyspace/db for this workflow
|
||||
// it is of the type <cluster_type.cluster_name>
|
||||
string external_cluster = 10;
|
||||
}
|
||||
|
||||
// VEventType enumerates the event types. Many of these types
|
||||
|
|
|
@ -452,4 +452,8 @@ message MaterializeSettings {
|
|||
// optional parameters.
|
||||
string cell = 6;
|
||||
string tablet_types = 7;
|
||||
// ExternalCluster is the name of the mounted cluster which has the source keyspace/db for this workflow
|
||||
// it is of the type <cluster_type.cluster_name>
|
||||
string external_cluster = 8;
|
||||
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче