From e7e2b5fc07c3cf4c606cf913374861202081d453 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sun, 24 Mar 2019 09:53:22 -0700 Subject: [PATCH] vplayer: test copy and fix bugs Signed-off-by: Sugu Sougoumarane --- .../vreplication/framework_test.go | 54 +++++++ .../vreplication/replicator_plan_test.go | 38 ++--- .../vreplication/table_plan_builder.go | 33 +++-- .../tabletmanager/vreplication/vcopier.go | 73 +++++----- .../vreplication/vcopier_test.go | 134 +++++++++++++++--- .../vreplication/vplayer_test.go | 12 +- .../tabletmanager/vreplication/vreplicator.go | 5 +- .../tabletserver/vstreamer/rowstreamer.go | 22 +-- 8 files changed, 275 insertions(+), 96 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index cd8da1c48d..7db64dd409 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -206,8 +206,14 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, target *querypb.Target, return streamerEngine.Stream(ctx, startPos, filter, send) } +// streamRowsHook allows you to do work just before VStreamRows is dispatched. +var streamRowsHook func() + // VStreamRows directly calls into the pre-initialized engine. func (ftc *fakeTabletConn) VStreamRows(ctx context.Context, target *querypb.Target, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error { + if streamRowsHook != nil { + streamRowsHook() + } var row []sqltypes.Value if lastpk != nil { r := sqltypes.Proto3ToResult(lastpk) @@ -393,6 +399,54 @@ func expectDBClientQueries(t *testing.T, queries []string) { } } +// expectNontxQueries disregards transactional statements like begin and commit. +// It also disregards updates to _vt.vreplication. +func expectNontxQueries(t *testing.T, queries []string) { + t.Helper() + failed := false + for i, query := range queries { + if failed { + t.Errorf("no query received, expecting %s", query) + continue + } + var got string + retry: + select { + case got = <-globalDBQueries: + if got == "begin" || got == "commit" || strings.Contains(got, "_vt.vreplication") { + goto retry + } + var match bool + if query[0] == '/' { + result, err := regexp.MatchString(query[1:], got) + if err != nil { + panic(err) + } + match = result + } else { + match = (got == query) + } + if !match { + t.Errorf("query:\n%q, does not match query %d:\n%q", got, i, query) + } + case <-time.After(5 * time.Second): + t.Errorf("no query received, expecting %s", query) + failed = true + } + } + for { + select { + case got := <-globalDBQueries: + if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "_vt.vreplication") { + continue + } + t.Errorf("unexpected query: %s", got) + default: + return + } + } +} + func expectData(t *testing.T, table string, values [][]string) { t.Helper() diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go index 52cc5b29c4..c12c26f830 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go @@ -164,12 +164,12 @@ func TestBuildPlayerPlan(t *testing.T) { "t2": { TargetName: "t1", SendRule: "t2", - PKReferences: []string{"c1"}, + PKReferences: []string{"c1", "pk1", "pk2"}, InsertFront: "insert into t1(c1,c2)", InsertValues: "(:a_c1,:a_c2)", - Insert: "insert into t1(c1,c2) select :a_c1, :a_c2 where :a_pk1 <= 1 and :a_pk2 <= 'aaa'", - Update: "update t1 set c2=:a_c2 where c1=:b_c1 and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", - Delete: "delete from t1 where c1=:b_c1 and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", + Insert: "insert into t1(c1,c2) select :a_c1, :a_c2 where (:a_pk1,:a_pk2) <= (1,'aaa')", + Update: "update t1 set c2=:a_c2 where c1=:b_c1 and (:b_pk1,:b_pk2) <= (1,'aaa')", + Delete: "delete from t1 where c1=:b_c1 and (:b_pk1,:b_pk2) <= (1,'aaa')", }, }, }, @@ -215,13 +215,13 @@ func TestBuildPlayerPlan(t *testing.T) { "t2": { TargetName: "t1", SendRule: "t2", - PKReferences: []string{"c1"}, + PKReferences: []string{"c1", "pk1", "pk2"}, InsertFront: "insert into t1(c1,c2,c3)", InsertValues: "(:a_c1,:a_c2,:a_c3)", InsertOnDup: "on duplicate key update c2=values(c2)", - Insert: "insert into t1(c1,c2,c3) select :a_c1, :a_c2, :a_c3 where :a_pk1 <= 1 and :a_pk2 <= 'aaa' on duplicate key update c2=values(c2)", - Update: "update t1 set c2=:a_c2 where c1=:b_c1 and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", - Delete: "update t1 set c2=null where c1=:b_c1 and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", + Insert: "insert into t1(c1,c2,c3) select :a_c1, :a_c2, :a_c3 where (:a_pk1,:a_pk2) <= (1,'aaa') on duplicate key update c2=values(c2)", + Update: "update t1 set c2=:a_c2 where c1=:b_c1 and (:b_pk1,:b_pk2) <= (1,'aaa')", + Delete: "update t1 set c2=null where c1=:b_c1 and (:b_pk1,:b_pk2) <= (1,'aaa')", }, }, }, @@ -265,11 +265,11 @@ func TestBuildPlayerPlan(t *testing.T) { "t2": { TargetName: "t1", SendRule: "t2", - PKReferences: []string{"c1"}, + PKReferences: []string{"c1", "pk1", "pk2"}, InsertFront: "insert ignore into t1(c1,c2,c3)", InsertValues: "(:a_c1,:a_c2,:a_c3)", - Insert: "insert ignore into t1(c1,c2,c3) select :a_c1, :a_c2, :a_c3 where :a_pk1 <= 1 and :a_pk2 <= 'aaa'", - Update: "insert ignore into t1(c1,c2,c3) select :a_c1, :a_c2, :a_c3 where :a_pk1 <= 1 and :a_pk2 <= 'aaa'", + Insert: "insert ignore into t1(c1,c2,c3) select :a_c1, :a_c2, :a_c3 where (:a_pk1,:a_pk2) <= (1,'aaa')", + Update: "insert ignore into t1(c1,c2,c3) select :a_c1, :a_c2, :a_c3 where (:a_pk1,:a_pk2) <= (1,'aaa')", }, }, }, @@ -313,12 +313,12 @@ func TestBuildPlayerPlan(t *testing.T) { "t1": { TargetName: "t1", SendRule: "t1", - PKReferences: []string{"a"}, + PKReferences: []string{"a", "pk1", "pk2"}, InsertFront: "insert into t1(c1,c2)", InsertValues: "(foo(:a_a),:a_b)", - Insert: "insert into t1(c1,c2) select foo(:a_a), :a_b where :a_pk1 <= 1 and :a_pk2 <= 'aaa'", - Update: "update t1 set c2=:a_b where c1=(foo(:b_a)) and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", - Delete: "delete from t1 where c1=(foo(:b_a)) and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", + Insert: "insert into t1(c1,c2) select foo(:a_a), :a_b where (:a_pk1,:a_pk2) <= (1,'aaa')", + Update: "update t1 set c2=:a_b where c1=(foo(:b_a)) and (:b_pk1,:b_pk2) <= (1,'aaa')", + Delete: "delete from t1 where c1=(foo(:b_a)) and (:b_pk1,:b_pk2) <= (1,'aaa')", }, }, }, @@ -362,12 +362,12 @@ func TestBuildPlayerPlan(t *testing.T) { "t1": { TargetName: "t1", SendRule: "t1", - PKReferences: []string{"a", "b"}, + PKReferences: []string{"a", "b", "pk1", "pk2"}, InsertFront: "insert into t1(c1,c2)", InsertValues: "(:a_a + :a_b,:a_c)", - Insert: "insert into t1(c1,c2) select :a_a + :a_b, :a_c where :a_pk1 <= 1 and :a_pk2 <= 'aaa'", - Update: "update t1 set c2=:a_c where c1=(:b_a + :b_b) and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", - Delete: "delete from t1 where c1=(:b_a + :b_b) and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", + Insert: "insert into t1(c1,c2) select :a_a + :a_b, :a_c where (:a_pk1,:a_pk2) <= (1,'aaa')", + Update: "update t1 set c2=:a_c where c1=(:b_a + :b_b) and (:b_pk1,:b_pk2) <= (1,'aaa')", + Delete: "delete from t1 where c1=(:b_a + :b_b) and (:b_pk1,:b_pk2) <= (1,'aaa')", }, }, }, diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index a69dc492df..4d309dce66 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -205,6 +205,11 @@ func (tpb *tablePlanBuilder) generate(tableKeys map[string][]string) *TablePlan refmap[k] = true } } + if tpb.lastpk != nil { + for _, f := range tpb.lastpk.Fields { + refmap[f.Name] = true + } + } pkrefs := make([]string, 0, len(refmap)) for k := range refmap { pkrefs = append(pkrefs, k) @@ -465,12 +470,7 @@ func (tpb *tablePlanBuilder) generateSelectPart(buf *sqlparser.TrackedBuffer, bv } } buf.WriteString(" where ") - separator = "" - for i, pkname := range tpb.lastpk.Fields { - buf.Myprintf("%s%v <= ", separator, &sqlparser.ColName{Name: sqlparser.NewColIdent(pkname.Name)}) - separator = " and " - tpb.lastpk.Rows[0][i].EncodeSQL(buf) - } + tpb.generatePKConstraint(buf, bvf) return buf.ParsedQuery() } @@ -578,13 +578,26 @@ func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bi separator = " and " } if tpb.lastpk != nil { - for i, pkname := range tpb.lastpk.Fields { - buf.Myprintf("%s%v <= ", separator, &sqlparser.ColName{Name: sqlparser.NewColIdent(pkname.Name)}) - tpb.lastpk.Rows[0][i].EncodeSQL(buf) - } + buf.WriteString(" and ") + tpb.generatePKConstraint(buf, bvf) } } +func (tpb *tablePlanBuilder) generatePKConstraint(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) { + separator := "(" + for _, pkname := range tpb.lastpk.Fields { + buf.Myprintf("%s%v", separator, &sqlparser.ColName{Name: sqlparser.NewColIdent(pkname.Name)}) + separator = "," + } + separator = ") <= (" + for _, val := range tpb.lastpk.Rows[0] { + buf.WriteString(separator) + separator = "," + val.EncodeSQL(buf) + } + buf.WriteString(")") +} + type bindvarFormatter struct { mode bindvarMode } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index c7a7ac03c6..b4f6301157 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -83,50 +83,47 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error { return vc.vr.dbClient.Commit() } -func (vc *vcopier) copyTables(ctx context.Context, settings binlogplayer.VRSettings) error { - for { - qr, err := vc.vr.dbClient.ExecuteFetch(fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id=%d", vc.vr.id), 10000) - if err != nil { - return err +func (vc *vcopier) copyNext(ctx context.Context, settings binlogplayer.VRSettings) error { + qr, err := vc.vr.dbClient.ExecuteFetch(fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id=%d", vc.vr.id), 10000) + if err != nil { + return err + } + var tableToCopy string + copyState := make(map[string]*sqltypes.Result) + for _, row := range qr.Rows { + tableName := row[0].ToString() + lastpk := row[1].ToString() + if tableToCopy == "" { + tableToCopy = tableName } - var tableToCopy string - copyState := make(map[string]*sqltypes.Result) - for _, row := range qr.Rows { - tableName := row[0].ToString() - lastpk := row[1].ToString() - if tableToCopy == "" { - tableToCopy = tableName - } - copyState[tableName] = nil - if lastpk != "" { - var r querypb.QueryResult - if err := proto.UnmarshalText(lastpk, &r); err != nil { - return err - } - copyState[tableName] = sqltypes.Proto3ToResult(&r) - } - } - if len(copyState) == 0 { - if err := vc.vr.setState(binlogplayer.BlpRunning, ""); err != nil { + copyState[tableName] = nil + if lastpk != "" { + var r querypb.QueryResult + if err := proto.UnmarshalText(lastpk, &r); err != nil { return err } - return nil - } - if err := vc.catchup(ctx, settings, copyState); err != nil { - return err - } - if err := vc.copyTable(ctx, tableToCopy, settings, copyState); err != nil { - return err + copyState[tableName] = sqltypes.Proto3ToResult(&r) } } + if len(copyState) == 0 { + return fmt.Errorf("unexpected: there are no tables to copy") + } + if err := vc.catchup(ctx, copyState); err != nil { + return err + } + return vc.copyTable(ctx, tableToCopy, copyState) } -func (vc *vcopier) catchup(ctx context.Context, settings binlogplayer.VRSettings, copyState map[string]*sqltypes.Result) error { +func (vc *vcopier) catchup(ctx context.Context, copyState map[string]*sqltypes.Result) error { + ctx, cancel := context.WithTimeout(ctx, 1*time.Hour) + settings, err := binlogplayer.ReadVRSettings(vc.vr.dbClient, vc.vr.id) + if err != nil { + return err + } if settings.StartPos.IsZero() { return nil } - ctx, cancel := context.WithTimeout(ctx, 1*time.Hour) // Start vreplication. errch := make(chan error, 1) go func() { @@ -157,7 +154,7 @@ func (vc *vcopier) catchup(ctx context.Context, settings binlogplayer.VRSettings } } -func (vc *vcopier) copyTable(ctx context.Context, tableName string, settings binlogplayer.VRSettings, copyState map[string]*sqltypes.Result) error { +func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState map[string]*sqltypes.Result) error { defer vc.vr.dbClient.Rollback() log.Infof("Copying table %s, lastpk: %v", tableName, copyState[tableName]) @@ -198,7 +195,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, settings bin if len(rows.Fields) == 0 { return fmt.Errorf("expecting field event first, got: %v", rows) } - if err := vc.fastForward(ctx, settings, copyState, rows.Gtid); err != nil { + if err := vc.fastForward(ctx, copyState, rows.Gtid); err != nil { return err } fieldEvent := &binlogdatapb.FieldEvent{ @@ -264,11 +261,15 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, settings bin return nil } -func (vc *vcopier) fastForward(ctx context.Context, settings binlogplayer.VRSettings, copyState map[string]*sqltypes.Result, gtid string) error { +func (vc *vcopier) fastForward(ctx context.Context, copyState map[string]*sqltypes.Result, gtid string) error { pos, err := mysql.DecodePosition(gtid) if err != nil { return err } + settings, err := binlogplayer.ReadVRSettings(vc.vr.dbClient, vc.vr.id) + if err != nil { + return err + } if settings.StartPos.IsZero() { update := binlogplayer.GenerateUpdatePos(vc.vr.id, pos, time.Now().Unix(), 0) _, err := vc.vr.dbClient.ExecuteFetch(update, 0) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index 39b384e81b..57d5988dba 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -23,6 +23,7 @@ import ( "golang.org/x/net/context" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -80,21 +81,30 @@ func TestPlayerCopyTables(t *testing.T) { }() expectDBClientQueries(t, []string{ - "/insert", + "/insert into _vt.vreplication", + // Create the list of tables to copy and transition to Copying state. "begin", "/insert into _vt.copy_state", "/update _vt.vreplication set state='Copying'", "commit", "rollback", + // The first fast-forward has no starting point. So, it just saves the current position. "/update _vt.vreplication set pos=", "begin", - "/insert into dst1", - "/update _vt.copy_state set lastpk", + "insert into dst1(id,val) values (1,'aaa'), (2,'bbb')", + `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, "commit", + // copy of dst1 is done: delete from copy_state. "/delete from _vt.copy_state.*dst1", "rollback", + // The next FF executes and updates the position before copying. + "begin", + "/update _vt.vreplication set pos=", + "commit", + // Nothing to copy from yes. Delete from copy_state. "/delete from _vt.copy_state.*yes", "rollback", + // All tables copied. Final catch up followed by Running state. "/update _vt.vreplication set state='Running'", }) expectData(t, "dst1", [][]string{ @@ -104,13 +114,27 @@ func TestPlayerCopyTables(t *testing.T) { expectData(t, "yes", [][]string{}) } -func TestPlayerCopyTablePartial(t *testing.T) { +// TestPlayerCopyTableContinuation tests the copy workflow where tables have been partially copied. +func TestPlayerCopyTableContinuation(t *testing.T) { defer deleteTablet(addTablet(100, "0", topodatapb.TabletType_REPLICA, true, true)) execStatements(t, []string{ - "create table src1(id int, val varbinary(128), primary key(id))", - "insert into src1 values(2, 'bbb'), (1, 'aaa')", + // src1 is initialized as partially copied. + // lastpk will be initialized at (6,6) later below. + // dst1 only copies id1 and val. This will allow us to test for correctness if id2 changes in the source. + "create table src1(id1 int, id2 int, val varbinary(128), primary key(id1, id2))", + "insert into src1 values(2,2,'no change'), (3,3,'update'), (4,4,'delete'), (5,5,'move within'), (6,6,'move out'), (8,8,'no change'), (9,9,'delete'), (10,10,'update'), (11,11,'move in')", fmt.Sprintf("create table %s.dst1(id int, val varbinary(128), primary key(id))", vrepldb), + fmt.Sprintf("insert into %s.dst1 values(2,'no change'), (3,'update'), (4,'delete'), (5,'move within'), (6,'move out')", vrepldb), + // copied is initialized as fully copied + "create table copied(id int, val varbinary(128), primary key(id))", + "insert into copied values(1,'aaa')", + fmt.Sprintf("create table %s.copied(id int, val varbinary(128), primary key(id))", vrepldb), + fmt.Sprintf("insert into %s.copied values(1,'aaa')", vrepldb), + // not_copied yet to be copied. + "create table not_copied(id int, val varbinary(128), primary key(id))", + "insert into not_copied values(1,'aaa')", + fmt.Sprintf("create table %s.not_copied(id int, val varbinary(128), primary key(id))", vrepldb), }) defer execStatements(t, []string{ "drop table src1", @@ -121,9 +145,44 @@ func TestPlayerCopyTablePartial(t *testing.T) { filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ Match: "dst1", - Filter: "select * from src1", + Filter: "select id1 as id, val from src1", + }, { + Match: "copied", + Filter: "select * from copied", + }, { + Match: "not_copied", + Filter: "select * from not_copied", }}, } + pos := masterPosition(t) + execStatements(t, []string{ + // insert inside and outside current range. + "insert into src1 values(1,1,'insert in'), (7,7,'insert out')", + // update inside and outside current range. + "update src1 set val='updated' where id1 in (3,10)", + // delete inside and outside current range. + "delete from src1 where id1 in (4,9)", + // move row within range by changing id2. + "update src1 set id2=10 where id1=5", + // move row from within to outside range. + "update src1 set id1=12 where id1=6", + // move row from outside to witihn range. + "update src1 set id1=4 where id1=11", + // modify the copied table. + "update copied set val='bbb' where id=1", + // modify the uncopied table. + "update not_copied set val='bbb' where id=1", + }) + + // Set a hook to execute statements just before the copy begins from src1. + streamRowsHook = func() { + execStatements(t, []string{ + "update src1 set val='updated again' where id1 = 3", + }) + // Set it back to nil. Otherwise, this will get executed again when copying not_copied. + streamRowsHook = nil + } + defer func() { streamRowsHook = nil }() bls := &binlogdatapb.BinlogSource{ Keyspace: env.KeyspaceName, @@ -136,11 +195,21 @@ func TestPlayerCopyTablePartial(t *testing.T) { if err != nil { t.Fatal(err) } + // As mentioned above. lastpk cut-off is set at (6,6) + lastpk := sqltypes.ResultToProto3(sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id1|id2", + "int32|int32", + ), + "6|6", + )) + lastpk.RowsAffected = 0 execStatements(t, []string{ - fmt.Sprintf("insert into _vt.copy_state values(%d, '%s', '%s')", qr.InsertID, "dst1", `fields: rows: `), + fmt.Sprintf("insert into _vt.copy_state values(%d, '%s', %s)", qr.InsertID, "dst1", encodeString(fmt.Sprintf("%v", lastpk))), + fmt.Sprintf("insert into _vt.copy_state values(%d, '%s', null)", qr.InsertID, "not_copied"), }) id := qr.InsertID - _, err = playerEngine.Exec(fmt.Sprintf("update _vt.vreplication set state='Copying' where id=%d", id)) + _, err = playerEngine.Exec(fmt.Sprintf("update _vt.vreplication set state='Copying', pos=%s where id=%d", encodeString(pos), id)) if err != nil { t.Fatal(err) } @@ -149,9 +218,11 @@ func TestPlayerCopyTablePartial(t *testing.T) { if _, err := playerEngine.Exec(query); err != nil { t.Fatal(err) } - expectDBClientQueries(t, []string{ - "/delete", - }) + for q := range globalDBQueries { + if strings.HasPrefix(q, "delete from _vt.vreplication") { + break + } + } }() for q := range globalDBQueries { @@ -160,17 +231,40 @@ func TestPlayerCopyTablePartial(t *testing.T) { } } - expectDBClientQueries(t, []string{ - "/update _vt.vreplication set pos=", - "begin", - "/insert into dst1", - "/update _vt.copy_state set lastpk", - "commit", + expectNontxQueries(t, []string{ + // Catchup + "insert into dst1(id,val) select 1, 'insert in' where (1,1) <= (6,6)", + "insert into dst1(id,val) select 7, 'insert out' where (7,7) <= (6,6)", + "update dst1 set val='updated' where id=3 and (3,3) <= (6,6)", + "update dst1 set val='updated' where id=10 and (10,10) <= (6,6)", + "delete from dst1 where id=4 and (4,4) <= (6,6)", + "delete from dst1 where id=9 and (9,9) <= (6,6)", + "delete from dst1 where id=5 and (5,5) <= (6,6)", + "insert into dst1(id,val) select 5, 'move within' where (5,10) <= (6,6)", + "delete from dst1 where id=6 and (6,6) <= (6,6)", + "insert into dst1(id,val) select 12, 'move out' where (12,6) <= (6,6)", + "delete from dst1 where id=11 and (11,11) <= (6,6)", + "insert into dst1(id,val) select 4, 'move in' where (4,11) <= (6,6)", + "update copied set val='bbb' where id=1", + // Fast-forward + "update dst1 set val='updated again' where id=3 and (3,3) <= (6,6)", + // Copy + "insert into dst1(id,val) values (7,'insert out'), (8,'no change'), (10,'updated'), (12,'move out')", + `/update _vt.copy_state set lastpk='fields: fields: rows: ' where vrepl_id=.*`, "/delete from _vt.copy_state.*dst1", "rollback", - "/update _vt.vreplication set state='Running'", + // Copy again. There should be no events for catchup. + "insert into not_copied(id,val) values (1,'bbb')", }) expectData(t, "dst1", [][]string{ - {"2", "bbb"}, + {"1", "insert in"}, + {"2", "no change"}, + {"3", "updated again"}, + {"4", "move in"}, + {"5", "move within"}, + {"7", "insert out"}, + {"8", "no change"}, + {"10", "updated"}, + {"12", "move out"}, }) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go index 29bc4dbd21..e2e822462b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go @@ -611,6 +611,8 @@ func TestPlayerDDL(t *testing.T) { } // It should stop at the next DDL expectDBClientQueries(t, []string{ + "/update.*'Running'", + // Second update is from vreplicator. "/update.*'Running'", "begin", fmt.Sprintf("/update.*'%s'", pos2), @@ -703,6 +705,8 @@ func TestPlayerStopPos(t *testing.T) { t.Fatal(err) } expectDBClientQueries(t, []string{ + "/update.*'Running'", + // Second update is from vreplicator. "/update.*'Running'", "begin", "insert into yes(id,val) values (1,'aaa')", @@ -725,6 +729,8 @@ func TestPlayerStopPos(t *testing.T) { t.Fatal(err) } expectDBClientQueries(t, []string{ + "/update.*'Running'", + // Second update is from vreplicator. "/update.*'Running'", "begin", // Since 'no' generates empty transactions that are skipped by @@ -740,6 +746,8 @@ func TestPlayerStopPos(t *testing.T) { t.Fatal(err) } expectDBClientQueries(t, []string{ + "/update.*'Running'", + // Second update is from vreplicator. "/update.*'Running'", "/update.*'Stopped'.*already reached", }) @@ -1208,6 +1216,7 @@ func TestRestartOnVStreamEnd(t *testing.T) { "insert into t1 values(2, 'aaa')", }) expectDBClientQueries(t, []string{ + "/update _vt.vreplication set state='Running'", "begin", "insert into t1(id,val) values (2,'aaa')", "/update _vt.vreplication set pos=", @@ -1283,7 +1292,8 @@ func startVReplication(t *testing.T, filter *binlogdatapb.Filter, onddl binlogda t.Fatal(err) } expectDBClientQueries(t, []string{ - "/insert", + "/insert into _vt.vreplication", + "/update _vt.vreplication set state='Running'", }) return func() { t.Helper() diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index d0ca85a3b4..8f4f7583c8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -89,7 +89,7 @@ func (vr *vreplicator) Replicate(ctx context.Context) error { } switch { case numTablesToCopy != 0: - if err := newVCopier(vr).copyTables(ctx, settings); err != nil { + if err := newVCopier(vr).copyNext(ctx, settings); err != nil { return err } case settings.StartPos.IsZero(): @@ -97,6 +97,9 @@ func (vr *vreplicator) Replicate(ctx context.Context) error { return err } default: + if err := vr.setState(binlogplayer.BlpRunning, ""); err != nil { + return err + } return newVPlayer(vr, settings, nil, mysql.Position{}).play(ctx) } } diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index bd25631d51..1d5dba386d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -142,16 +142,20 @@ func (rs *rowStreamer) buildSelect() (string, error) { if len(rs.lastpk) != len(rs.pkColumns) { return "", fmt.Errorf("primary key values don't match length: %v vs %v", rs.lastpk, rs.pkColumns) } - prefix := " where " - for i, pk := range rs.pkColumns { - if i == len(rs.lastpk)-1 { - buf.Myprintf("%s%v > ", prefix, rs.plan.Table.Columns[pk].Name) - } else { - buf.Myprintf("%s%v >= ", prefix, rs.plan.Table.Columns[pk].Name) - } - rs.lastpk[i].EncodeSQL(buf) - prefix = " and " + buf.WriteString(" where (") + prefix := "" + for _, pk := range rs.pkColumns { + buf.Myprintf("%s%v", prefix, rs.plan.Table.Columns[pk].Name) + prefix = "," } + buf.WriteString(") > (") + prefix = "" + for _, val := range rs.lastpk { + buf.WriteString(prefix) + prefix = "," + val.EncodeSQL(buf) + } + buf.WriteString(")") } buf.Myprintf(" order by ", sqlparser.NewTableIdent(rs.plan.Table.Name)) prefix = ""