automation: Add cluster operation for vertical split.

This commit is contained in:
Michael Berlin 2016-01-11 23:56:02 +01:00
Родитель e16b226589
Коммит 73a007bc4d
5 изменённых файлов: 244 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,42 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package automation
import (
automationpb "github.com/youtube/vitess/go/vt/proto/automation"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"golang.org/x/net/context"
)
// MigrateServedFromTask runs vtctl MigrateServedFrom to let vertically split
// out tables get served from the new destination keyspace.
type MigrateServedFromTask struct {
}
// Run is part of the Task interface.
func (t *MigrateServedFromTask) Run(parameters map[string]string) ([]*automationpb.TaskContainer, string, error) {
args := []string{"MigrateServedFrom"}
if cells := parameters["cells"]; cells != "" {
args = append(args, "--cells="+cells)
}
if reverse := parameters["reverse"]; reverse != "" {
args = append(args, "--reverse="+reverse)
}
args = append(args,
topoproto.KeyspaceShardString(parameters["dest_keyspace"], parameters["shard"]),
parameters["type"])
output, err := ExecuteVtctl(context.TODO(), parameters["vtctld_endpoint"], args)
return nil, output, err
}
// RequiredParameters is part of the Task interface.
func (t *MigrateServedFromTask) RequiredParameters() []string {
return []string{"dest_keyspace", "shard", "type", "vtctld_endpoint"}
}
// OptionalParameters is part of the Task interface.
func (t *MigrateServedFromTask) OptionalParameters() []string {
return []string{"cells", "reverse"}
}

Просмотреть файл

@ -63,6 +63,7 @@ type Scheduler struct {
func NewScheduler() (*Scheduler, error) {
defaultClusterOperations := map[string]bool{
"HorizontalReshardingTask": true,
"VerticalSplitTask": true,
}
s := &Scheduler{
@ -206,8 +207,12 @@ func defaultTaskCreator(taskName string) Task {
switch taskName {
case "HorizontalReshardingTask":
return &HorizontalReshardingTask{}
case "VerticalSplitTask":
return &VerticalSplitTask{}
case "CopySchemaShardTask":
return &CopySchemaShardTask{}
case "MigrateServedFromTask":
return &MigrateServedFromTask{}
case "MigrateServedTypesTask":
return &MigrateServedTypesTask{}
case "RebuildKeyspaceGraph":
@ -216,6 +221,10 @@ func defaultTaskCreator(taskName string) Task {
return &SplitCloneTask{}
case "SplitDiffTask":
return &SplitDiffTask{}
case "VerticalSplitCloneTask":
return &VerticalSplitCloneTask{}
case "VerticalSplitDiffTask":
return &VerticalSplitDiffTask{}
case "WaitForFilteredReplicationTask":
return &WaitForFilteredReplicationTask{}
default:

Просмотреть файл

@ -0,0 +1,47 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package automation
import (
automationpb "github.com/youtube/vitess/go/vt/proto/automation"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"golang.org/x/net/context"
)
// VerticalSplitCloneTask runs VerticalSplitClone on a remote vtworker to
// split out tables from an existing keyspace to a different keyspace.
type VerticalSplitCloneTask struct {
}
// Run is part of the Task interface.
func (t *VerticalSplitCloneTask) Run(parameters map[string]string) ([]*automationpb.TaskContainer, string, error) {
// TODO(mberlin): Add parameters for the following options?
// '--source_reader_count', '1',
// '--destination_pack_count', '1',
// '--destination_writer_count', '1',
args := []string{"VerticalSplitClone", "--strategy=-populate_blp_checkpoint"}
if tables := parameters["tables"]; tables != "" {
args = append(args, "--tables="+tables)
}
args = append(args, topoproto.KeyspaceShardString(parameters["dest_keyspace"], parameters["shard"]))
output, err := ExecuteVtworker(context.TODO(), parameters["vtworker_endpoint"], args)
// TODO(mberlin): Remove explicit reset when vtworker supports it implicility.
if err == nil {
// Ignore output and error of the Reset.
ExecuteVtworker(context.TODO(), parameters["vtworker_endpoint"], []string{"Reset"})
}
return nil, output, err
}
// RequiredParameters is part of the Task interface.
func (t *VerticalSplitCloneTask) RequiredParameters() []string {
return []string{"dest_keyspace", "shard", "tables", "vtworker_endpoint"}
}
// OptionalParameters is part of the Task interface.
func (t *VerticalSplitCloneTask) OptionalParameters() []string {
return []string{""}
}

Просмотреть файл

@ -0,0 +1,40 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package automation
import (
automationpb "github.com/youtube/vitess/go/vt/proto/automation"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"golang.org/x/net/context"
)
// VerticalSplitDiffTask runs VerticalSplitDiff on a remote vtworker to compare
// the split out tables against the source keyspace.
type VerticalSplitDiffTask struct {
}
// Run is part of the Task interface.
func (t *VerticalSplitDiffTask) Run(parameters map[string]string) ([]*automationpb.TaskContainer, string, error) {
args := []string{"VerticalSplitDiff"}
args = append(args, topoproto.KeyspaceShardString(parameters["dest_keyspace"], parameters["shard"]))
output, err := ExecuteVtworker(context.TODO(), parameters["vtworker_endpoint"], args)
// TODO(mberlin): Remove explicit reset when vtworker supports it implicility.
if err == nil {
// Ignore output and error of the Reset.
ExecuteVtworker(context.TODO(), parameters["vtworker_endpoint"], []string{"Reset"})
}
return nil, output, err
}
// RequiredParameters is part of the Task interface.
func (t *VerticalSplitDiffTask) RequiredParameters() []string {
return []string{"dest_keyspace", "shard", "vtworker_endpoint"}
}
// OptionalParameters is part of the Task interface.
func (t *VerticalSplitDiffTask) OptionalParameters() []string {
return []string{""}
}

Просмотреть файл

@ -0,0 +1,106 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package automation
import (
"strings"
automationpb "github.com/youtube/vitess/go/vt/proto/automation"
"github.com/youtube/vitess/go/vt/topo/topoproto"
)
// VerticalSplitTask is a cluster operation to split out specific tables of one
// keyspace to a different keyspace.
type VerticalSplitTask struct {
}
// Run is part of the Task interface.
func (t *VerticalSplitTask) Run(parameters map[string]string) ([]*automationpb.TaskContainer, string, error) {
// Example: source_keyspace
sourceKeyspace := parameters["source_keyspace"]
// Example: destination_keyspace
destKeyspace := parameters["dest_keyspace"]
// Example: 10-20
shards := strings.Split(parameters["shard_list"], ",")
// Example: table1,table2
tables := parameters["tables"]
// Example: localhost:15000
vtctldEndpoint := parameters["vtctld_endpoint"]
// Example: localhost:15001
vtworkerEndpoint := parameters["vtworker_endpoint"]
var newTasks []*automationpb.TaskContainer
copySchemaTasks := NewTaskContainer()
for _, shard := range shards {
AddTask(copySchemaTasks, "CopySchemaShardTask", map[string]string{
"source_keyspace_and_shard": topoproto.KeyspaceShardString(sourceKeyspace, shard),
"dest_keyspace_and_shard": topoproto.KeyspaceShardString(destKeyspace, shard),
"tables": tables,
"vtctld_endpoint": vtctldEndpoint,
})
}
newTasks = append(newTasks, copySchemaTasks)
vSplitCloneTasks := NewTaskContainer()
for _, shard := range shards {
// TODO(mberlin): Add a semaphore as argument to limit the parallism.
AddTask(vSplitCloneTasks, "VerticalSplitCloneTask", map[string]string{
"dest_keyspace": destKeyspace,
"shard": shard,
"tables": tables,
"vtworker_endpoint": vtworkerEndpoint,
})
}
newTasks = append(newTasks, vSplitCloneTasks)
// TODO(mberlin): When the framework supports nesting tasks, these wait tasks should be run before each SplitDiff.
waitTasks := NewTaskContainer()
for _, shard := range shards {
AddTask(waitTasks, "WaitForFilteredReplicationTask", map[string]string{
"keyspace": destKeyspace,
"shard": shard,
"max_delay": "30s",
"vtctld_endpoint": vtctldEndpoint,
})
}
newTasks = append(newTasks, waitTasks)
// TODO(mberlin): Run all SplitDiffTasks in parallel which do not use overlapping source shards for the comparison.
for _, shard := range shards {
vSplitDiffTask := NewTaskContainer()
AddTask(vSplitDiffTask, "VerticalSplitDiffTask", map[string]string{
"dest_keyspace": destKeyspace,
"shard": shard,
"vtworker_endpoint": vtworkerEndpoint,
})
newTasks = append(newTasks, vSplitDiffTask)
}
for _, servedType := range []string{"rdonly", "replica", "master"} {
migrateServedTypesTasks := NewTaskContainer()
for _, shard := range shards {
AddTask(migrateServedTypesTasks, "MigrateServedFromTask", map[string]string{
"dest_keyspace": destKeyspace,
"shard": shard,
"type": servedType,
"vtctld_endpoint": vtctldEndpoint,
})
}
newTasks = append(newTasks, migrateServedTypesTasks)
}
return newTasks, "", nil
}
// RequiredParameters is part of the Task interface.
func (t *VerticalSplitTask) RequiredParameters() []string {
return []string{"source_keyspace", "dest_keyspace", "shard_list",
"tables", "vtctld_endpoint", "vtworker_endpoint"}
}
// OptionalParameters is part of the Task interface.
func (t *VerticalSplitTask) OptionalParameters() []string {
return []string{""}
}