diff --git a/go/vt/automation/migrate_served_from_task.go b/go/vt/automation/migrate_served_from_task.go new file mode 100644 index 0000000000..c10f793946 --- /dev/null +++ b/go/vt/automation/migrate_served_from_task.go @@ -0,0 +1,42 @@ +// Copyright 2016, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package automation + +import ( + automationpb "github.com/youtube/vitess/go/vt/proto/automation" + "github.com/youtube/vitess/go/vt/topo/topoproto" + "golang.org/x/net/context" +) + +// MigrateServedFromTask runs vtctl MigrateServedFrom to let vertically split +// out tables get served from the new destination keyspace. +type MigrateServedFromTask struct { +} + +// Run is part of the Task interface. +func (t *MigrateServedFromTask) Run(parameters map[string]string) ([]*automationpb.TaskContainer, string, error) { + args := []string{"MigrateServedFrom"} + if cells := parameters["cells"]; cells != "" { + args = append(args, "--cells="+cells) + } + if reverse := parameters["reverse"]; reverse != "" { + args = append(args, "--reverse="+reverse) + } + args = append(args, + topoproto.KeyspaceShardString(parameters["dest_keyspace"], parameters["shard"]), + parameters["type"]) + output, err := ExecuteVtctl(context.TODO(), parameters["vtctld_endpoint"], args) + return nil, output, err +} + +// RequiredParameters is part of the Task interface. +func (t *MigrateServedFromTask) RequiredParameters() []string { + return []string{"dest_keyspace", "shard", "type", "vtctld_endpoint"} +} + +// OptionalParameters is part of the Task interface. +func (t *MigrateServedFromTask) OptionalParameters() []string { + return []string{"cells", "reverse"} +} diff --git a/go/vt/automation/scheduler.go b/go/vt/automation/scheduler.go index 92384c517f..15183121f0 100644 --- a/go/vt/automation/scheduler.go +++ b/go/vt/automation/scheduler.go @@ -63,6 +63,7 @@ type Scheduler struct { func NewScheduler() (*Scheduler, error) { defaultClusterOperations := map[string]bool{ "HorizontalReshardingTask": true, + "VerticalSplitTask": true, } s := &Scheduler{ @@ -206,8 +207,12 @@ func defaultTaskCreator(taskName string) Task { switch taskName { case "HorizontalReshardingTask": return &HorizontalReshardingTask{} + case "VerticalSplitTask": + return &VerticalSplitTask{} case "CopySchemaShardTask": return &CopySchemaShardTask{} + case "MigrateServedFromTask": + return &MigrateServedFromTask{} case "MigrateServedTypesTask": return &MigrateServedTypesTask{} case "RebuildKeyspaceGraph": @@ -216,6 +221,10 @@ func defaultTaskCreator(taskName string) Task { return &SplitCloneTask{} case "SplitDiffTask": return &SplitDiffTask{} + case "VerticalSplitCloneTask": + return &VerticalSplitCloneTask{} + case "VerticalSplitDiffTask": + return &VerticalSplitDiffTask{} case "WaitForFilteredReplicationTask": return &WaitForFilteredReplicationTask{} default: diff --git a/go/vt/automation/vertical_split_clone_task.go b/go/vt/automation/vertical_split_clone_task.go new file mode 100644 index 0000000000..3a385d5687 --- /dev/null +++ b/go/vt/automation/vertical_split_clone_task.go @@ -0,0 +1,47 @@ +// Copyright 2016, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package automation + +import ( + automationpb "github.com/youtube/vitess/go/vt/proto/automation" + "github.com/youtube/vitess/go/vt/topo/topoproto" + "golang.org/x/net/context" +) + +// VerticalSplitCloneTask runs VerticalSplitClone on a remote vtworker to +// split out tables from an existing keyspace to a different keyspace. +type VerticalSplitCloneTask struct { +} + +// Run is part of the Task interface. +func (t *VerticalSplitCloneTask) Run(parameters map[string]string) ([]*automationpb.TaskContainer, string, error) { + // TODO(mberlin): Add parameters for the following options? + // '--source_reader_count', '1', + // '--destination_pack_count', '1', + // '--destination_writer_count', '1', + args := []string{"VerticalSplitClone", "--strategy=-populate_blp_checkpoint"} + if tables := parameters["tables"]; tables != "" { + args = append(args, "--tables="+tables) + } + args = append(args, topoproto.KeyspaceShardString(parameters["dest_keyspace"], parameters["shard"])) + output, err := ExecuteVtworker(context.TODO(), parameters["vtworker_endpoint"], args) + + // TODO(mberlin): Remove explicit reset when vtworker supports it implicility. + if err == nil { + // Ignore output and error of the Reset. + ExecuteVtworker(context.TODO(), parameters["vtworker_endpoint"], []string{"Reset"}) + } + return nil, output, err +} + +// RequiredParameters is part of the Task interface. +func (t *VerticalSplitCloneTask) RequiredParameters() []string { + return []string{"dest_keyspace", "shard", "tables", "vtworker_endpoint"} +} + +// OptionalParameters is part of the Task interface. +func (t *VerticalSplitCloneTask) OptionalParameters() []string { + return []string{""} +} diff --git a/go/vt/automation/vertical_split_diff_task.go b/go/vt/automation/vertical_split_diff_task.go new file mode 100644 index 0000000000..e6d34c2068 --- /dev/null +++ b/go/vt/automation/vertical_split_diff_task.go @@ -0,0 +1,40 @@ +// Copyright 2016, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package automation + +import ( + automationpb "github.com/youtube/vitess/go/vt/proto/automation" + "github.com/youtube/vitess/go/vt/topo/topoproto" + "golang.org/x/net/context" +) + +// VerticalSplitDiffTask runs VerticalSplitDiff on a remote vtworker to compare +// the split out tables against the source keyspace. +type VerticalSplitDiffTask struct { +} + +// Run is part of the Task interface. +func (t *VerticalSplitDiffTask) Run(parameters map[string]string) ([]*automationpb.TaskContainer, string, error) { + args := []string{"VerticalSplitDiff"} + args = append(args, topoproto.KeyspaceShardString(parameters["dest_keyspace"], parameters["shard"])) + output, err := ExecuteVtworker(context.TODO(), parameters["vtworker_endpoint"], args) + + // TODO(mberlin): Remove explicit reset when vtworker supports it implicility. + if err == nil { + // Ignore output and error of the Reset. + ExecuteVtworker(context.TODO(), parameters["vtworker_endpoint"], []string{"Reset"}) + } + return nil, output, err +} + +// RequiredParameters is part of the Task interface. +func (t *VerticalSplitDiffTask) RequiredParameters() []string { + return []string{"dest_keyspace", "shard", "vtworker_endpoint"} +} + +// OptionalParameters is part of the Task interface. +func (t *VerticalSplitDiffTask) OptionalParameters() []string { + return []string{""} +} diff --git a/go/vt/automation/vertical_split_task.go b/go/vt/automation/vertical_split_task.go new file mode 100644 index 0000000000..95ad4cce6e --- /dev/null +++ b/go/vt/automation/vertical_split_task.go @@ -0,0 +1,106 @@ +// Copyright 2016, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package automation + +import ( + "strings" + + automationpb "github.com/youtube/vitess/go/vt/proto/automation" + "github.com/youtube/vitess/go/vt/topo/topoproto" +) + +// VerticalSplitTask is a cluster operation to split out specific tables of one +// keyspace to a different keyspace. +type VerticalSplitTask struct { +} + +// Run is part of the Task interface. +func (t *VerticalSplitTask) Run(parameters map[string]string) ([]*automationpb.TaskContainer, string, error) { + // Example: source_keyspace + sourceKeyspace := parameters["source_keyspace"] + // Example: destination_keyspace + destKeyspace := parameters["dest_keyspace"] + // Example: 10-20 + shards := strings.Split(parameters["shard_list"], ",") + // Example: table1,table2 + tables := parameters["tables"] + // Example: localhost:15000 + vtctldEndpoint := parameters["vtctld_endpoint"] + // Example: localhost:15001 + vtworkerEndpoint := parameters["vtworker_endpoint"] + + var newTasks []*automationpb.TaskContainer + copySchemaTasks := NewTaskContainer() + for _, shard := range shards { + AddTask(copySchemaTasks, "CopySchemaShardTask", map[string]string{ + "source_keyspace_and_shard": topoproto.KeyspaceShardString(sourceKeyspace, shard), + "dest_keyspace_and_shard": topoproto.KeyspaceShardString(destKeyspace, shard), + "tables": tables, + "vtctld_endpoint": vtctldEndpoint, + }) + } + newTasks = append(newTasks, copySchemaTasks) + + vSplitCloneTasks := NewTaskContainer() + for _, shard := range shards { + // TODO(mberlin): Add a semaphore as argument to limit the parallism. + AddTask(vSplitCloneTasks, "VerticalSplitCloneTask", map[string]string{ + "dest_keyspace": destKeyspace, + "shard": shard, + "tables": tables, + "vtworker_endpoint": vtworkerEndpoint, + }) + } + newTasks = append(newTasks, vSplitCloneTasks) + + // TODO(mberlin): When the framework supports nesting tasks, these wait tasks should be run before each SplitDiff. + waitTasks := NewTaskContainer() + for _, shard := range shards { + AddTask(waitTasks, "WaitForFilteredReplicationTask", map[string]string{ + "keyspace": destKeyspace, + "shard": shard, + "max_delay": "30s", + "vtctld_endpoint": vtctldEndpoint, + }) + } + newTasks = append(newTasks, waitTasks) + + // TODO(mberlin): Run all SplitDiffTasks in parallel which do not use overlapping source shards for the comparison. + for _, shard := range shards { + vSplitDiffTask := NewTaskContainer() + AddTask(vSplitDiffTask, "VerticalSplitDiffTask", map[string]string{ + "dest_keyspace": destKeyspace, + "shard": shard, + "vtworker_endpoint": vtworkerEndpoint, + }) + newTasks = append(newTasks, vSplitDiffTask) + } + + for _, servedType := range []string{"rdonly", "replica", "master"} { + migrateServedTypesTasks := NewTaskContainer() + for _, shard := range shards { + AddTask(migrateServedTypesTasks, "MigrateServedFromTask", map[string]string{ + "dest_keyspace": destKeyspace, + "shard": shard, + "type": servedType, + "vtctld_endpoint": vtctldEndpoint, + }) + } + newTasks = append(newTasks, migrateServedTypesTasks) + } + + return newTasks, "", nil +} + +// RequiredParameters is part of the Task interface. +func (t *VerticalSplitTask) RequiredParameters() []string { + return []string{"source_keyspace", "dest_keyspace", "shard_list", + "tables", "vtctld_endpoint", "vtworker_endpoint"} +} + +// OptionalParameters is part of the Task interface. +func (t *VerticalSplitTask) OptionalParameters() []string { + return []string{""} +}