Modify horizontal workflow to require explicit source and destination shards

Signed-off-by: Rafael Chacon <rafael@slack-corp.com>
This commit is contained in:
Rafael Chacon 2018-08-16 13:58:53 -07:00
Родитель 83d309eff0
Коммит e0f8b5b33b
3 изменённых файлов: 35 добавлений и 44 удалений

Просмотреть файл

@ -34,7 +34,6 @@ import (
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tmclient"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/workflow"
"vitess.io/vitess/go/vt/wrangler"
@ -76,6 +75,8 @@ func (*HorizontalReshardingWorkflowFactory) Init(m *workflow.Manager, w *workflo
subFlags := flag.NewFlagSet(horizontalReshardingFactoryName, flag.ContinueOnError)
keyspace := subFlags.String("keyspace", "", "Name of keyspace to perform horizontal resharding")
vtworkersStr := subFlags.String("vtworkers", "", "A comma-separated list of vtworker addresses")
sourceShardsStr := subFlags.String("source_shards", "", "A comma-separated list of source shards")
destinationShardsStr := subFlags.String("destination_shards", "", "A comma-separated list of destination shards")
minHealthyRdonlyTablets := subFlags.String("min_healthy_rdonly_tablets", "1", "Minimum number of healthy RDONLY tablets required in source shards")
splitCmd := subFlags.String("split_cmd", "SplitClone", "Split command to use to perform horizontal resharding (either SplitClone or LegacySplitClone)")
splitDiffDestTabletType := subFlags.String("split_diff_dest_tablet_type", "RDONLY", "Specifies tablet type to use in destination shards while performing SplitDiff operation")
@ -89,9 +90,11 @@ func (*HorizontalReshardingWorkflowFactory) Init(m *workflow.Manager, w *workflo
}
vtworkers := strings.Split(*vtworkersStr, ",")
w.Name = fmt.Sprintf("Horizontal resharding on keyspace %s", *keyspace)
sourceShards := strings.Split(*sourceShardsStr, ",")
destinationShards := strings.Split(*destinationShardsStr, ",")
checkpoint, err := initCheckpoint(m.TopoServer(), *keyspace, vtworkers, *minHealthyRdonlyTablets, *splitCmd, *splitDiffDestTabletType)
w.Name = fmt.Sprintf("Horizontal resharding on keyspace %s", *keyspace)
checkpoint, err := initCheckpoint(*keyspace, vtworkers, sourceShards, destinationShards, *minHealthyRdonlyTablets, *splitCmd, *splitDiffDestTabletType)
if err != nil {
return err
}
@ -212,42 +215,10 @@ func createUINodes(rootNode *workflow.Node, phaseName PhaseType, shards []string
}
// initCheckpoint initialize the checkpoint for the horizontal workflow.
func initCheckpoint(ts *topo.Server, keyspace string, vtworkers []string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType string) (*workflowpb.WorkflowCheckpoint, error) {
sourceShards, destinationShards, err := findSourceAndDestinationShards(ts, keyspace)
if err != nil {
return nil, err
func initCheckpoint(keyspace string, vtworkers, sourceShards, destinationShards []string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType string) (*workflowpb.WorkflowCheckpoint, error) {
if len(sourceShards) == 0 || len(destinationShards) == 0 {
return nil, fmt.Errorf("invalid source or destination shards")
}
return initCheckpointFromShards(keyspace, vtworkers, sourceShards, destinationShards, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType)
}
func findSourceAndDestinationShards(ts *topo.Server, keyspace string) ([]string, []string, error) {
overlappingShards, err := topotools.FindOverlappingShards(context.Background(), ts, keyspace)
if err != nil {
return nil, nil, err
}
var sourceShards, destinationShards []string
for _, os := range overlappingShards {
var sourceShardInfo *topo.ShardInfo
var destinationShardInfos []*topo.ShardInfo
// Judge which side is source shard by checking the number of servedTypes.
if len(os.Left[0].ServedTypes) > 0 {
sourceShardInfo = os.Left[0]
destinationShardInfos = os.Right
} else {
sourceShardInfo = os.Right[0]
destinationShardInfos = os.Left
}
sourceShards = append(sourceShards, sourceShardInfo.ShardName())
for _, d := range destinationShardInfos {
destinationShards = append(destinationShards, d.ShardName())
}
}
return sourceShards, destinationShards, nil
}
func initCheckpointFromShards(keyspace string, vtworkers, sourceShards, destinationShards []string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType string) (*workflowpb.WorkflowCheckpoint, error) {
if len(vtworkers) != len(destinationShards) {
return nil, fmt.Errorf("there are %v vtworkers, %v destination shards: the number should be same", len(vtworkers), len(destinationShards))
}

Просмотреть файл

@ -67,7 +67,7 @@ func TestHorizontalResharding(t *testing.T) {
wg, _, cancel := startManager(m)
// Create the workflow.
vtworkersParameter := testVtworkers + "," + testVtworkers
uuid, err := m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-enable_approvals=false", "-min_healthy_rdonly_tablets=2"})
uuid, err := m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-enable_approvals=false", "-min_healthy_rdonly_tablets=2", "-source_shards=0", "-destination_shards=-80,80-"})
if err != nil {
t.Fatalf("cannot create resharding workflow: %v", err)
}

Просмотреть файл

@ -27,19 +27,25 @@ export class NewWorkflowFlags {
this.flags['horizontal_resharding_keyspace'] = new HorizontalReshardingKeyspaceFlag(5, 'horizontal_resharding_keyspace');
this.flags['horizontal_resharding_keyspace'].positional = true;
this.flags['horizontal_resharding_keyspace'].namedPositional = 'keyspace';
this.flags['horizontal_resharding_vtworkers'] = new HorizontalReshardingVtworkerFlag(6, 'horizontal_resharding_vtworkers');
this.flags['horizontal_resharding_source_shards'] = new HorizontalReshardingSourceShardsFlag(6, 'horizontal_resharding_source_shards');
this.flags['horizontal_resharding_source_shards'].positional = true;
this.flags['horizontal_resharding_source_shards'].namedPositional = 'source_shards';
this.flags['horizontal_resharding_destination_shards'] = new HorizontalReshardingDestinationShardsFlag(7, 'horizontal_resharding_destination_shards');
this.flags['horizontal_resharding_destination_shards'].positional = true;
this.flags['horizontal_resharding_destination_shards'].namedPositional = 'destination_shards';
this.flags['horizontal_resharding_vtworkers'] = new HorizontalReshardingVtworkerFlag(8, 'horizontal_resharding_vtworkers');
this.flags['horizontal_resharding_vtworkers'].positional = true;
this.flags['horizontal_resharding_vtworkers'].namedPositional = 'vtworkers';
this.flags['horizontal_resharding_split_cmd'] = new SplitCloneCommand(7, 'horizontal_resharding_split_cmd');
this.flags['horizontal_resharding_split_cmd'] = new SplitCloneCommand(9, 'horizontal_resharding_split_cmd');
this.flags['horizontal_resharding_split_cmd'].positional = true;
this.flags['horizontal_resharding_split_cmd'].namedPositional = 'split_cmd';
this.flags['horizontal_resharding_split_diff_dest_tablet_type'] = new SplitDiffTabletType(8, 'horizontal_resharding_split_diff_dest_tablet_type');
this.flags['horizontal_resharding_split_diff_dest_tablet_type'] = new SplitDiffTabletType(10, 'horizontal_resharding_split_diff_dest_tablet_type');
this.flags['horizontal_resharding_split_diff_dest_tablet_type'].positional = true;
this.flags['horizontal_resharding_split_diff_dest_tablet_type'].namedPositional = 'split_diff_dest_tablet_type';
this.flags['horizontal_resharding_min_healthy_rdonly_tablets'] = new HorizontalReshardingMinHealthyRdonlyTablets(9, 'horizontal_resharding_min_healthy_rdonly_tablets');
this.flags['horizontal_resharding_min_healthy_rdonly_tablets'] = new HorizontalReshardingMinHealthyRdonlyTablets(11, 'horizontal_resharding_min_healthy_rdonly_tablets');
this.flags['horizontal_resharding_min_healthy_rdonly_tablets'].positional = true;
this.flags['horizontal_resharding_min_healthy_rdonly_tablets'].namedPositional = 'min_healthy_rdonly_tablets';
this.flags['horizontal_resharding_enable_approvals'] = new HorizontalReshardingEnableApprovalsFlag(10, 'horizontal_resharding_enable_approvals');
this.flags['horizontal_resharding_enable_approvals'] = new HorizontalReshardingEnableApprovalsFlag(12, 'horizontal_resharding_enable_approvals');
this.flags['horizontal_resharding_enable_approvals'].positional = true;
this.flags['horizontal_resharding_enable_approvals'].namedPositional = 'enable_approvals';
}
@ -148,6 +154,20 @@ export class HorizontalReshardingKeyspaceFlag extends InputFlag {
}
}
export class HorizontalReshardingSourceShardsFlag extends InputFlag {
constructor(position: number, id: string, value= '') {
super(position, id, 'Source Shards', 'A comma-separated list of source shards.', value);
this.setDisplayOn('factory_name', 'horizontal_resharding');
}
}
export class HorizontalReshardingDestinationShardsFlag extends InputFlag {
constructor(position: number, id: string, value= '') {
super(position, id, 'Destination Shards', 'A comma-separated list of destination shards.', value);
this.setDisplayOn('factory_name', 'horizontal_resharding');
}
}
export class HorizontalReshardingVtworkerFlag extends InputFlag {
constructor(position: number, id: string, value= '') {
super(position, id, 'vtworker Addresses', 'Comma-separated list of vtworker addresses.', value);