Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Sugu Sougoumarane 2019-10-13 21:44:38 -07:00
Родитель 38c2ae96b2
Коммит 7139502dc8
2 изменённых файлов: 380 добавлений и 42 удалений

Просмотреть файл

@ -44,8 +44,8 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
)
// DiffSummary is the summary of differences for one table.
type DiffSummary struct {
// DiffReport is the summary of differences for one table.
type DiffReport struct {
ProcessedRows int
MatchingRows int
MismatchedRows int
@ -81,15 +81,15 @@ type dfParams struct {
}
// VDiff reports differences between the sources and targets of a vreplication workflow.
func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflow, sourceCell, targetCell, tabletTypesStr string, filteredReplicationWaitTime time.Duration) error {
func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflow, sourceCell, targetCell, tabletTypesStr string, filteredReplicationWaitTime time.Duration) (map[string]*DiffReport, error) {
mi, err := wr.buildMigrater(ctx, targetKeyspace, workflow)
if err != nil {
wr.Logger().Errorf("buildMigrater failed: %v", err)
return err
return nil, err
}
if err := mi.validate(ctx, false /* isWrite */); err != nil {
mi.wr.Logger().Errorf("validate failed: %v", err)
return err
return nil, err
}
df := &vdiff{
mi: mi,
@ -118,14 +118,14 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflow, sourceC
}
schm, err := wr.GetSchema(ctx, oneTarget.master.Alias, nil, nil, false)
if err != nil {
return err
return nil, err
}
df.differs, err = buildVDiffPlan(ctx, oneFilter, schm)
if err != nil {
return err
return nil, err
}
if err := df.selectTablets(ctx); err != nil {
return err
return nil, err
}
defer func() {
if err := df.restartTargets(ctx); err != nil {
@ -135,31 +135,33 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflow, sourceC
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// TODO(sougou): parallelize
for _, td := range df.differs {
diffReports := make(map[string]*DiffReport)
for table, td := range df.differs {
if err := df.stopTargets(ctx); err != nil {
return err
return nil, err
}
sourceReader, err := df.startQueryStreams(ctx, df.sources, td.sourceExpression, td.orderbyParams())
if err != nil {
return err
return nil, err
}
if err := df.syncTargets(ctx); err != nil {
return err
return nil, err
}
targetReader, err := df.startQueryStreams(ctx, df.targets, td.targetExpression, td.orderbyParams())
if err != nil {
return err
return nil, err
}
if err := df.restartTargets(ctx); err != nil {
return err
return nil, err
}
dr, err := td.diff(ctx, df.mi.wr, sourceReader, targetReader)
if err != nil {
return err
return nil, err
}
fmt.Printf("Summary for %v: %+v\n", td.targetTable, dr)
wr.Logger().Printf("Summary for %v: %+v\n", td.targetTable, *dr)
diffReports[table] = dr
}
return nil
return diffReports, nil
}
func buildVDiffPlan(ctx context.Context, filter *binlogdatapb.Filter, schm *tabletmanagerdatapb.SchemaDefinition) (map[string]*tableDiffer, error) {
@ -447,6 +449,7 @@ func (df *vdiff) streamOne(ctx context.Context, shard string, participant *dfPar
Rows: vrs.Rows,
}
result := sqltypes.Proto3ToResult(p3qr)
// Fields should be received only once, and sent only once.
if vrs.Fields == nil {
result.Fields = nil
}
@ -630,8 +633,8 @@ func (rr *resultReader) ResolveDestinations(keyspace string, ids []*querypb.Valu
return nil, nil, nil
}
func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, sourceReader, targetReader *resultReader) (*DiffSummary, error) {
dr := &DiffSummary{}
func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, sourceReader, targetReader *resultReader) (*DiffReport, error) {
dr := &DiffReport{}
var sourceRow, targetRow []sqltypes.Value
var err error
advanceSource := true
@ -650,16 +653,14 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, sourceReader, tar
}
}
if sourceRow == nil && targetRow == nil {
return dr, nil
}
advanceSource = true
advanceTarget = true
if sourceRow == nil {
// no more rows from the source
if targetRow == nil {
// no more rows from target either, we're done
return dr, nil
}
// drain target, update count
wr.Logger().Errorf("Draining extra row(s) found on the target starting with: %v", targetRow)
count, err := targetReader.drain(ctx)
@ -667,6 +668,7 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, sourceReader, tar
return nil, err
}
dr.ExtraRowsTarget += 1 + count
dr.ProcessedRows += 1 + count
return dr, nil
}
if targetRow == nil {
@ -678,10 +680,10 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler, sourceReader, tar
return nil, err
}
dr.ExtraRowsSource += 1 + count
dr.ProcessedRows += 1 + count
return dr, nil
}
// We have rows to process on both sides.
dr.ProcessedRows++
// Compare pk values.

Просмотреть файл

@ -218,7 +218,7 @@ func TestVDiffPlanSuccess(t *testing.T) {
comparePKs: []int{0},
},
}, {
// in_keyrange with other expressions.
// in_keyrange on RHS of AND.
// This is currently not a valid construct, but will be supported in the future.
input: &binlogdatapb.Rule{
Match: "t1",
@ -232,6 +232,51 @@ func TestVDiffPlanSuccess(t *testing.T) {
compareCols: []int{-1, 1},
comparePKs: []int{0},
},
}, {
// in_keyrange on LHS of AND.
// This is currently not a valid construct, but will be supported in the future.
input: &binlogdatapb.Rule{
Match: "t1",
Filter: "select * from t1 where in_keyrange('-80') and c2 = 2",
},
table: "t1",
td: &tableDiffer{
targetTable: "t1",
sourceExpression: "select c1, c2 from t1 where c2 = 2 order by c1 asc",
targetExpression: "select c1, c2 from t1 order by c1 asc",
compareCols: []int{-1, 1},
comparePKs: []int{0},
},
}, {
// in_keyrange on cascaded AND expression
// This is currently not a valid construct, but will be supported in the future.
input: &binlogdatapb.Rule{
Match: "t1",
Filter: "select * from t1 where c2 = 2 and c1 = 1 and in_keyrange('-80')",
},
table: "t1",
td: &tableDiffer{
targetTable: "t1",
sourceExpression: "select c1, c2 from t1 where c2 = 2 and c1 = 1 order by c1 asc",
targetExpression: "select c1, c2 from t1 order by c1 asc",
compareCols: []int{-1, 1},
comparePKs: []int{0},
},
}, {
// in_keyrange parenthesized
// This is currently not a valid construct, but will be supported in the future.
input: &binlogdatapb.Rule{
Match: "t1",
Filter: "select * from t1 where (c2 = 2 and in_keyrange('-80'))",
},
table: "t1",
td: &tableDiffer{
targetTable: "t1",
sourceExpression: "select c1, c2 from t1 where (c2 = 2) order by c1 asc",
targetExpression: "select c1, c2 from t1 order by c1 asc",
compareCols: []int{-1, 1},
comparePKs: []int{0},
},
}, {
// group by
input: &binlogdatapb.Rule{
@ -307,7 +352,7 @@ func TestVDiffPlanFailure(t *testing.T) {
}
}
func TestVDiffSimple(t *testing.T) {
func TestVDiffUnsharded(t *testing.T) {
env := newTestVDiffEnv([]string{"0"}, []string{"0"}, "", nil)
defer env.close()
@ -321,31 +366,322 @@ func TestVDiffSimple(t *testing.T) {
}
env.tmc.schema = schm
env.tablets[101].setResults(
"select c1, c2 from t1 order by c1 asc",
vdiffSourceGtid,
sqltypes.MakeTestStreamingResults(sqltypes.MakeTestFields(
"c1|c2",
"int64|int64"),
fields := sqltypes.MakeTestFields(
"c1|c2",
"int64|int64",
)
testcases := []struct {
id string
source []*sqltypes.Result
target []*sqltypes.Result
dr *DiffReport
}{{
id: "1",
source: sqltypes.MakeTestStreamingResults(fields,
"1|3",
"2|4",
"---",
"3|1",
),
target: sqltypes.MakeTestStreamingResults(fields,
"1|3",
"---",
"2|4",
"3|1",
),
dr: &DiffReport{
ProcessedRows: 3,
MatchingRows: 3,
},
}, {
id: "2",
source: sqltypes.MakeTestStreamingResults(fields,
"1|3",
),
target: sqltypes.MakeTestStreamingResults(fields,
"1|3",
"---",
"2|4",
"3|1",
),
dr: &DiffReport{
ProcessedRows: 3,
MatchingRows: 1,
ExtraRowsTarget: 2,
},
}, {
id: "3",
source: sqltypes.MakeTestStreamingResults(fields,
"1|3",
"---",
"2|4",
"3|1",
),
target: sqltypes.MakeTestStreamingResults(fields,
"1|3",
),
dr: &DiffReport{
ProcessedRows: 3,
MatchingRows: 1,
ExtraRowsSource: 2,
},
}, {
id: "4",
source: sqltypes.MakeTestStreamingResults(fields,
"1|3",
"---",
"2|4",
"3|1",
),
target: sqltypes.MakeTestStreamingResults(fields,
"1|3",
"---",
"3|1",
),
dr: &DiffReport{
ProcessedRows: 3,
MatchingRows: 2,
ExtraRowsSource: 1,
},
}, {
id: "5",
source: sqltypes.MakeTestStreamingResults(fields,
"1|3",
"---",
"3|1",
),
target: sqltypes.MakeTestStreamingResults(fields,
"1|3",
"---",
"2|4",
"3|1",
),
dr: &DiffReport{
ProcessedRows: 3,
MatchingRows: 2,
ExtraRowsTarget: 1,
},
}, {
id: "6",
source: sqltypes.MakeTestStreamingResults(fields,
"1|3",
"---",
"2|3",
"3|1",
),
target: sqltypes.MakeTestStreamingResults(fields,
"1|3",
"---",
"2|4",
"3|1",
),
dr: &DiffReport{
ProcessedRows: 3,
MatchingRows: 2,
MismatchedRows: 1,
},
}}
for _, tcase := range testcases {
env.tablets[101].setResults("select c1, c2 from t1 order by c1 asc", vdiffSourceGtid, tcase.source)
env.tablets[201].setResults("select c1, c2 from t1 order by c1 asc", vdiffSourceGtid, tcase.target)
dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second)
require.NoError(t, err)
assert.Equal(t, tcase.dr, dr["t1"], tcase.id)
}
}
func TestVDiffSharded(t *testing.T) {
// Also test that highest position ""MariaDB/5-456-892" will be used
// if lower positions are found.
env := newTestVDiffEnv([]string{"-40", "40-"}, []string{"-80", "80-"}, "", map[string]string{
"-40-80": "MariaDB/5-456-890",
"40-80-": "MariaDB/5-456-891",
})
defer env.close()
schm := &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: "t1",
Columns: []string{"c1", "c2"},
PrimaryKeyColumns: []string{"c1"},
Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"),
}},
}
env.tmc.schema = schm
query := "select c1, c2 from t1 order by c1 asc"
fields := sqltypes.MakeTestFields(
"c1|c2",
"int64|int64",
)
env.tablets[101].setResults(
query,
vdiffSourceGtid,
sqltypes.MakeTestStreamingResults(fields,
"1|3",
"2|4",
),
)
env.tablets[111].setResults(
query,
vdiffSourceGtid,
sqltypes.MakeTestStreamingResults(fields,
"3|4",
),
)
env.tablets[201].setResults(
"select c1, c2 from t1 order by c1 asc",
query,
vdiffSourceGtid,
sqltypes.MakeTestStreamingResults(sqltypes.MakeTestFields(
"c1|c2",
"int64|int64"),
sqltypes.MakeTestStreamingResults(fields,
"1|3",
"---",
),
)
env.tablets[211].setResults(
query,
vdiffSourceGtid,
sqltypes.MakeTestStreamingResults(fields,
"2|4",
"3|1",
"3|4",
),
)
err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second)
assert.NoError(t, err)
dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second)
require.NoError(t, err)
wantdr := &DiffReport{
ProcessedRows: 3,
MatchingRows: 3,
}
assert.Equal(t, wantdr, dr["t1"])
}
func TestVDiffPKWeightString(t *testing.T) {
// Also test that highest position ""MariaDB/5-456-892" will be used
// if lower positions are found.
env := newTestVDiffEnv([]string{"-40", "40-"}, []string{"-80", "80-"}, "", nil)
defer env.close()
schm := &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: "t1",
Columns: []string{"c1", "c2"},
PrimaryKeyColumns: []string{"c1"},
Fields: sqltypes.MakeTestFields("c1|c2", "varchar|int64"),
}},
}
env.tmc.schema = schm
query := "select c1, c2, weight_string(c1) from t1 order by c1 asc"
fields := sqltypes.MakeTestFields(
"c1|c2|weight_string(c1)",
"varchar|int64|varbinary",
)
env.tablets[101].setResults(
query,
vdiffSourceGtid,
sqltypes.MakeTestStreamingResults(fields,
"a|3|A",
"b|4|B",
),
)
env.tablets[111].setResults(
query,
vdiffSourceGtid,
sqltypes.MakeTestStreamingResults(fields,
"C|5|C",
"D|6|D",
),
)
env.tablets[201].setResults(
query,
vdiffSourceGtid,
sqltypes.MakeTestStreamingResults(fields,
"A|3|A",
),
)
env.tablets[211].setResults(
query,
vdiffSourceGtid,
sqltypes.MakeTestStreamingResults(fields,
"b|4|B",
"c|5|C",
"D|6|D",
),
)
dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second)
require.NoError(t, err)
wantdr := &DiffReport{
ProcessedRows: 4,
MatchingRows: 4,
}
assert.Equal(t, wantdr, dr["t1"])
}
func TestVDiffNoPKWeightString(t *testing.T) {
// Also test that highest position ""MariaDB/5-456-892" will be used
// if lower positions are found.
env := newTestVDiffEnv([]string{"-40", "40-"}, []string{"-80", "80-"}, "", nil)
defer env.close()
schm := &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: "t1",
Columns: []string{"c1", "c2"},
PrimaryKeyColumns: []string{"c1"},
Fields: sqltypes.MakeTestFields("c1|c2", "int64|varchar"),
}},
}
env.tmc.schema = schm
query := "select c1, c2, weight_string(c2) from t1 order by c1 asc"
fields := sqltypes.MakeTestFields(
"c1|c2|weight_string(c2)",
"int64|varchar|varbinary",
)
env.tablets[101].setResults(
query,
vdiffSourceGtid,
sqltypes.MakeTestStreamingResults(fields,
"3|a|A",
"4|b|B",
),
)
env.tablets[111].setResults(
query,
vdiffSourceGtid,
sqltypes.MakeTestStreamingResults(fields,
"5|C|C",
"6|D|D",
),
)
env.tablets[201].setResults(
query,
vdiffSourceGtid,
sqltypes.MakeTestStreamingResults(fields,
"3|A|A",
),
)
env.tablets[211].setResults(
query,
vdiffSourceGtid,
sqltypes.MakeTestStreamingResults(fields,
"4|b|B",
"5|c|C",
"6|D|D",
),
)
dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second)
require.NoError(t, err)
wantdr := &DiffReport{
ProcessedRows: 4,
MatchingRows: 4,
}
assert.Equal(t, wantdr, dr["t1"])
}