Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Sugu Sougoumarane 2019-10-16 16:13:16 -07:00
Родитель 3891247a15
Коммит 78babddadd
3 изменённых файлов: 76 добавлений и 0 удалений

Просмотреть файл

@ -313,6 +313,9 @@ var commands = []commandGroup{
{"VerticalSplitClone", commandVerticalSplitClone,
"<from_keyspace> <to_keyspace> <tables>",
"Start the VerticalSplitClone process to perform vertical resharding. Example: SplitClone from_ks to_ks 'a,/b.*/'"},
{"VDiff", commandVDiff,
"-workflow=<workflow> <target keyspace> [-source_cell=<cell>] [-target_cell=<cell>] [-tablet_types=REPLICA] [-filtered_replication_wait_time=30s]",
"Perform a diff of all tables in the workflow"},
{"MigrateServedTypes", commandMigrateServedTypes,
"[-cells=c1,c2,...] [-reverse] [-skip-refresh-state] <keyspace/shard> <served tablet type>",
"Migrates a serving type from the source shard to the shards that it replicates to. This command also rebuilds the serving graph. The <keyspace/shard> argument can specify any of the shards involved in the migration."},
@ -1807,6 +1810,24 @@ func commandVerticalSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFl
return wr.VerticalSplitClone(ctx, fromKeyspace, toKeyspace, tables)
}
func commandVDiff(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
workflow := subFlags.String("workflow", "", "Specifies the workflow name")
sourceCell := subFlags.String("source_cell", "", "The source cell to compare from")
targetCell := subFlags.String("target_cell", "", "The target cell to compare with")
tabletTypes := subFlags.String("tablet_types", "", "Tablet types for source and target")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be aborted on timeout.")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 1 {
return fmt.Errorf("the <target keyspace> is required")
}
targetKeyspace := subFlags.Arg(0)
_, err := wr.VDiff(ctx, targetKeyspace, *workflow, *sourceCell, *targetCell, *tabletTypes, *filteredReplicationWaitTime)
return err
}
func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update")
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward.")

Просмотреть файл

@ -83,6 +83,24 @@ type dfParams struct {
// VDiff reports differences between the sources and targets of a vreplication workflow.
func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflow, sourceCell, targetCell, tabletTypesStr string, filteredReplicationWaitTime time.Duration) (map[string]*DiffReport, error) {
if sourceCell == "" && targetCell == "" {
cells, err := wr.ts.GetCellInfoNames(ctx)
if err != nil {
return nil, err
}
if len(cells) == 0 {
// Unreachable
return nil, fmt.Errorf("there are no cells in the topo")
}
sourceCell = cells[0]
targetCell = sourceCell
}
if sourceCell == "" {
sourceCell = targetCell
}
if targetCell == "" {
targetCell = sourceCell
}
mi, err := wr.buildMigrater(ctx, targetKeyspace, workflow)
if err != nil {
wr.Logger().Errorf("buildMigrater failed: %v", err)

Просмотреть файл

@ -686,6 +686,43 @@ func TestVDiffNoPKWeightString(t *testing.T) {
assert.Equal(t, wantdr, dr["t1"])
}
func TestVDiffDefaults(t *testing.T) {
env := newTestVDiffEnv([]string{"0"}, []string{"0"}, "", nil)
defer env.close()
schm := &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: "t1",
Columns: []string{"c1", "c2"},
PrimaryKeyColumns: []string{"c1"},
Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"),
}},
}
env.tmc.schema = schm
fields := sqltypes.MakeTestFields(
"c1|c2",
"int64|int64",
)
source := sqltypes.MakeTestStreamingResults(fields,
"1|3",
"2|4",
"---",
"3|1",
)
target := source
env.tablets[101].setResults("select c1, c2 from t1 order by c1 asc", vdiffSourceGtid, source)
env.tablets[201].setResults("select c1, c2 from t1 order by c1 asc", vdiffSourceGtid, target)
_, err := env.wr.VDiff(context.Background(), "target", env.workflow, "", "", "replica", 30*time.Second)
require.NoError(t, err)
_, err = env.wr.VDiff(context.Background(), "target", env.workflow, "", env.cell, "replica", 30*time.Second)
require.NoError(t, err)
_, err = env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, "", "replica", 30*time.Second)
require.NoError(t, err)
}
func TestVDiffReplicationWait(t *testing.T) {
env := newTestVDiffEnv([]string{"0"}, []string{"0"}, "", nil)
defer env.close()