From 78babddadde9f61f984a51838c058bb7f6a32ab8 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Wed, 16 Oct 2019 16:13:16 -0700 Subject: [PATCH] vdiff: vtctl command Signed-off-by: Sugu Sougoumarane --- go/vt/vtctl/vtctl.go | 21 ++++++++++++++++++++ go/vt/wrangler/vdiff.go | 18 ++++++++++++++++++ go/vt/wrangler/vdiff_test.go | 37 ++++++++++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 82d1b00c24..a94e29aeea 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -313,6 +313,9 @@ var commands = []commandGroup{ {"VerticalSplitClone", commandVerticalSplitClone, " ", "Start the VerticalSplitClone process to perform vertical resharding. Example: SplitClone from_ks to_ks 'a,/b.*/'"}, + {"VDiff", commandVDiff, + "-workflow= [-source_cell=] [-target_cell=] [-tablet_types=REPLICA] [-filtered_replication_wait_time=30s]", + "Perform a diff of all tables in the workflow"}, {"MigrateServedTypes", commandMigrateServedTypes, "[-cells=c1,c2,...] [-reverse] [-skip-refresh-state] ", "Migrates a serving type from the source shard to the shards that it replicates to. This command also rebuilds the serving graph. The argument can specify any of the shards involved in the migration."}, @@ -1807,6 +1810,24 @@ func commandVerticalSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFl return wr.VerticalSplitClone(ctx, fromKeyspace, toKeyspace, tables) } +func commandVDiff(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { + workflow := subFlags.String("workflow", "", "Specifies the workflow name") + sourceCell := subFlags.String("source_cell", "", "The source cell to compare from") + targetCell := subFlags.String("target_cell", "", "The target cell to compare with") + tabletTypes := subFlags.String("tablet_types", "", "Tablet types for source and target") + filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be aborted on timeout.") + if err := subFlags.Parse(args); err != nil { + return err + } + if subFlags.NArg() != 1 { + return fmt.Errorf("the is required") + } + + targetKeyspace := subFlags.Arg(0) + _, err := wr.VDiff(ctx, targetKeyspace, *workflow, *sourceCell, *targetCell, *tabletTypes, *filteredReplicationWaitTime) + return err +} + func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update") reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward.") diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 2511574405..dbe7f46c57 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -83,6 +83,24 @@ type dfParams struct { // VDiff reports differences between the sources and targets of a vreplication workflow. func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflow, sourceCell, targetCell, tabletTypesStr string, filteredReplicationWaitTime time.Duration) (map[string]*DiffReport, error) { + if sourceCell == "" && targetCell == "" { + cells, err := wr.ts.GetCellInfoNames(ctx) + if err != nil { + return nil, err + } + if len(cells) == 0 { + // Unreachable + return nil, fmt.Errorf("there are no cells in the topo") + } + sourceCell = cells[0] + targetCell = sourceCell + } + if sourceCell == "" { + sourceCell = targetCell + } + if targetCell == "" { + targetCell = sourceCell + } mi, err := wr.buildMigrater(ctx, targetKeyspace, workflow) if err != nil { wr.Logger().Errorf("buildMigrater failed: %v", err) diff --git a/go/vt/wrangler/vdiff_test.go b/go/vt/wrangler/vdiff_test.go index 277f2e6a5d..d23052094f 100644 --- a/go/vt/wrangler/vdiff_test.go +++ b/go/vt/wrangler/vdiff_test.go @@ -686,6 +686,43 @@ func TestVDiffNoPKWeightString(t *testing.T) { assert.Equal(t, wantdr, dr["t1"]) } +func TestVDiffDefaults(t *testing.T) { + env := newTestVDiffEnv([]string{"0"}, []string{"0"}, "", nil) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + fields := sqltypes.MakeTestFields( + "c1|c2", + "int64|int64", + ) + + source := sqltypes.MakeTestStreamingResults(fields, + "1|3", + "2|4", + "---", + "3|1", + ) + target := source + env.tablets[101].setResults("select c1, c2 from t1 order by c1 asc", vdiffSourceGtid, source) + env.tablets[201].setResults("select c1, c2 from t1 order by c1 asc", vdiffSourceGtid, target) + + _, err := env.wr.VDiff(context.Background(), "target", env.workflow, "", "", "replica", 30*time.Second) + require.NoError(t, err) + _, err = env.wr.VDiff(context.Background(), "target", env.workflow, "", env.cell, "replica", 30*time.Second) + require.NoError(t, err) + _, err = env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, "", "replica", 30*time.Second) + require.NoError(t, err) +} + func TestVDiffReplicationWait(t *testing.T) { env := newTestVDiffEnv([]string{"0"}, []string{"0"}, "", nil) defer env.close()