vplayer: test copy and fix bugs

Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Sugu Sougoumarane 2019-03-24 09:53:22 -07:00
Родитель 9c15bbeafd
Коммит e7e2b5fc07
8 изменённых файлов: 275 добавлений и 96 удалений

Просмотреть файл

@ -206,8 +206,14 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, target *querypb.Target,
return streamerEngine.Stream(ctx, startPos, filter, send) return streamerEngine.Stream(ctx, startPos, filter, send)
} }
// streamRowsHook allows you to do work just before VStreamRows is dispatched.
var streamRowsHook func()
// VStreamRows directly calls into the pre-initialized engine. // VStreamRows directly calls into the pre-initialized engine.
func (ftc *fakeTabletConn) VStreamRows(ctx context.Context, target *querypb.Target, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error { func (ftc *fakeTabletConn) VStreamRows(ctx context.Context, target *querypb.Target, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error {
if streamRowsHook != nil {
streamRowsHook()
}
var row []sqltypes.Value var row []sqltypes.Value
if lastpk != nil { if lastpk != nil {
r := sqltypes.Proto3ToResult(lastpk) r := sqltypes.Proto3ToResult(lastpk)
@ -393,6 +399,54 @@ func expectDBClientQueries(t *testing.T, queries []string) {
} }
} }
// expectNontxQueries disregards transactional statements like begin and commit.
// It also disregards updates to _vt.vreplication.
func expectNontxQueries(t *testing.T, queries []string) {
t.Helper()
failed := false
for i, query := range queries {
if failed {
t.Errorf("no query received, expecting %s", query)
continue
}
var got string
retry:
select {
case got = <-globalDBQueries:
if got == "begin" || got == "commit" || strings.Contains(got, "_vt.vreplication") {
goto retry
}
var match bool
if query[0] == '/' {
result, err := regexp.MatchString(query[1:], got)
if err != nil {
panic(err)
}
match = result
} else {
match = (got == query)
}
if !match {
t.Errorf("query:\n%q, does not match query %d:\n%q", got, i, query)
}
case <-time.After(5 * time.Second):
t.Errorf("no query received, expecting %s", query)
failed = true
}
}
for {
select {
case got := <-globalDBQueries:
if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "_vt.vreplication") {
continue
}
t.Errorf("unexpected query: %s", got)
default:
return
}
}
}
func expectData(t *testing.T, table string, values [][]string) { func expectData(t *testing.T, table string, values [][]string) {
t.Helper() t.Helper()

Просмотреть файл

@ -164,12 +164,12 @@ func TestBuildPlayerPlan(t *testing.T) {
"t2": { "t2": {
TargetName: "t1", TargetName: "t1",
SendRule: "t2", SendRule: "t2",
PKReferences: []string{"c1"}, PKReferences: []string{"c1", "pk1", "pk2"},
InsertFront: "insert into t1(c1,c2)", InsertFront: "insert into t1(c1,c2)",
InsertValues: "(:a_c1,:a_c2)", InsertValues: "(:a_c1,:a_c2)",
Insert: "insert into t1(c1,c2) select :a_c1, :a_c2 where :a_pk1 <= 1 and :a_pk2 <= 'aaa'", Insert: "insert into t1(c1,c2) select :a_c1, :a_c2 where (:a_pk1,:a_pk2) <= (1,'aaa')",
Update: "update t1 set c2=:a_c2 where c1=:b_c1 and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", Update: "update t1 set c2=:a_c2 where c1=:b_c1 and (:b_pk1,:b_pk2) <= (1,'aaa')",
Delete: "delete from t1 where c1=:b_c1 and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", Delete: "delete from t1 where c1=:b_c1 and (:b_pk1,:b_pk2) <= (1,'aaa')",
}, },
}, },
}, },
@ -215,13 +215,13 @@ func TestBuildPlayerPlan(t *testing.T) {
"t2": { "t2": {
TargetName: "t1", TargetName: "t1",
SendRule: "t2", SendRule: "t2",
PKReferences: []string{"c1"}, PKReferences: []string{"c1", "pk1", "pk2"},
InsertFront: "insert into t1(c1,c2,c3)", InsertFront: "insert into t1(c1,c2,c3)",
InsertValues: "(:a_c1,:a_c2,:a_c3)", InsertValues: "(:a_c1,:a_c2,:a_c3)",
InsertOnDup: "on duplicate key update c2=values(c2)", InsertOnDup: "on duplicate key update c2=values(c2)",
Insert: "insert into t1(c1,c2,c3) select :a_c1, :a_c2, :a_c3 where :a_pk1 <= 1 and :a_pk2 <= 'aaa' on duplicate key update c2=values(c2)", Insert: "insert into t1(c1,c2,c3) select :a_c1, :a_c2, :a_c3 where (:a_pk1,:a_pk2) <= (1,'aaa') on duplicate key update c2=values(c2)",
Update: "update t1 set c2=:a_c2 where c1=:b_c1 and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", Update: "update t1 set c2=:a_c2 where c1=:b_c1 and (:b_pk1,:b_pk2) <= (1,'aaa')",
Delete: "update t1 set c2=null where c1=:b_c1 and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", Delete: "update t1 set c2=null where c1=:b_c1 and (:b_pk1,:b_pk2) <= (1,'aaa')",
}, },
}, },
}, },
@ -265,11 +265,11 @@ func TestBuildPlayerPlan(t *testing.T) {
"t2": { "t2": {
TargetName: "t1", TargetName: "t1",
SendRule: "t2", SendRule: "t2",
PKReferences: []string{"c1"}, PKReferences: []string{"c1", "pk1", "pk2"},
InsertFront: "insert ignore into t1(c1,c2,c3)", InsertFront: "insert ignore into t1(c1,c2,c3)",
InsertValues: "(:a_c1,:a_c2,:a_c3)", InsertValues: "(:a_c1,:a_c2,:a_c3)",
Insert: "insert ignore into t1(c1,c2,c3) select :a_c1, :a_c2, :a_c3 where :a_pk1 <= 1 and :a_pk2 <= 'aaa'", Insert: "insert ignore into t1(c1,c2,c3) select :a_c1, :a_c2, :a_c3 where (:a_pk1,:a_pk2) <= (1,'aaa')",
Update: "insert ignore into t1(c1,c2,c3) select :a_c1, :a_c2, :a_c3 where :a_pk1 <= 1 and :a_pk2 <= 'aaa'", Update: "insert ignore into t1(c1,c2,c3) select :a_c1, :a_c2, :a_c3 where (:a_pk1,:a_pk2) <= (1,'aaa')",
}, },
}, },
}, },
@ -313,12 +313,12 @@ func TestBuildPlayerPlan(t *testing.T) {
"t1": { "t1": {
TargetName: "t1", TargetName: "t1",
SendRule: "t1", SendRule: "t1",
PKReferences: []string{"a"}, PKReferences: []string{"a", "pk1", "pk2"},
InsertFront: "insert into t1(c1,c2)", InsertFront: "insert into t1(c1,c2)",
InsertValues: "(foo(:a_a),:a_b)", InsertValues: "(foo(:a_a),:a_b)",
Insert: "insert into t1(c1,c2) select foo(:a_a), :a_b where :a_pk1 <= 1 and :a_pk2 <= 'aaa'", Insert: "insert into t1(c1,c2) select foo(:a_a), :a_b where (:a_pk1,:a_pk2) <= (1,'aaa')",
Update: "update t1 set c2=:a_b where c1=(foo(:b_a)) and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", Update: "update t1 set c2=:a_b where c1=(foo(:b_a)) and (:b_pk1,:b_pk2) <= (1,'aaa')",
Delete: "delete from t1 where c1=(foo(:b_a)) and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", Delete: "delete from t1 where c1=(foo(:b_a)) and (:b_pk1,:b_pk2) <= (1,'aaa')",
}, },
}, },
}, },
@ -362,12 +362,12 @@ func TestBuildPlayerPlan(t *testing.T) {
"t1": { "t1": {
TargetName: "t1", TargetName: "t1",
SendRule: "t1", SendRule: "t1",
PKReferences: []string{"a", "b"}, PKReferences: []string{"a", "b", "pk1", "pk2"},
InsertFront: "insert into t1(c1,c2)", InsertFront: "insert into t1(c1,c2)",
InsertValues: "(:a_a + :a_b,:a_c)", InsertValues: "(:a_a + :a_b,:a_c)",
Insert: "insert into t1(c1,c2) select :a_a + :a_b, :a_c where :a_pk1 <= 1 and :a_pk2 <= 'aaa'", Insert: "insert into t1(c1,c2) select :a_a + :a_b, :a_c where (:a_pk1,:a_pk2) <= (1,'aaa')",
Update: "update t1 set c2=:a_c where c1=(:b_a + :b_b) and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", Update: "update t1 set c2=:a_c where c1=(:b_a + :b_b) and (:b_pk1,:b_pk2) <= (1,'aaa')",
Delete: "delete from t1 where c1=(:b_a + :b_b) and :b_pk1 <= 1 and :b_pk2 <= 'aaa'", Delete: "delete from t1 where c1=(:b_a + :b_b) and (:b_pk1,:b_pk2) <= (1,'aaa')",
}, },
}, },
}, },

Просмотреть файл

@ -205,6 +205,11 @@ func (tpb *tablePlanBuilder) generate(tableKeys map[string][]string) *TablePlan
refmap[k] = true refmap[k] = true
} }
} }
if tpb.lastpk != nil {
for _, f := range tpb.lastpk.Fields {
refmap[f.Name] = true
}
}
pkrefs := make([]string, 0, len(refmap)) pkrefs := make([]string, 0, len(refmap))
for k := range refmap { for k := range refmap {
pkrefs = append(pkrefs, k) pkrefs = append(pkrefs, k)
@ -465,12 +470,7 @@ func (tpb *tablePlanBuilder) generateSelectPart(buf *sqlparser.TrackedBuffer, bv
} }
} }
buf.WriteString(" where ") buf.WriteString(" where ")
separator = "" tpb.generatePKConstraint(buf, bvf)
for i, pkname := range tpb.lastpk.Fields {
buf.Myprintf("%s%v <= ", separator, &sqlparser.ColName{Name: sqlparser.NewColIdent(pkname.Name)})
separator = " and "
tpb.lastpk.Rows[0][i].EncodeSQL(buf)
}
return buf.ParsedQuery() return buf.ParsedQuery()
} }
@ -578,13 +578,26 @@ func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bi
separator = " and " separator = " and "
} }
if tpb.lastpk != nil { if tpb.lastpk != nil {
for i, pkname := range tpb.lastpk.Fields { buf.WriteString(" and ")
buf.Myprintf("%s%v <= ", separator, &sqlparser.ColName{Name: sqlparser.NewColIdent(pkname.Name)}) tpb.generatePKConstraint(buf, bvf)
tpb.lastpk.Rows[0][i].EncodeSQL(buf)
}
} }
} }
func (tpb *tablePlanBuilder) generatePKConstraint(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) {
separator := "("
for _, pkname := range tpb.lastpk.Fields {
buf.Myprintf("%s%v", separator, &sqlparser.ColName{Name: sqlparser.NewColIdent(pkname.Name)})
separator = ","
}
separator = ") <= ("
for _, val := range tpb.lastpk.Rows[0] {
buf.WriteString(separator)
separator = ","
val.EncodeSQL(buf)
}
buf.WriteString(")")
}
type bindvarFormatter struct { type bindvarFormatter struct {
mode bindvarMode mode bindvarMode
} }

Просмотреть файл

@ -83,50 +83,47 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error {
return vc.vr.dbClient.Commit() return vc.vr.dbClient.Commit()
} }
func (vc *vcopier) copyTables(ctx context.Context, settings binlogplayer.VRSettings) error { func (vc *vcopier) copyNext(ctx context.Context, settings binlogplayer.VRSettings) error {
for { qr, err := vc.vr.dbClient.ExecuteFetch(fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id=%d", vc.vr.id), 10000)
qr, err := vc.vr.dbClient.ExecuteFetch(fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id=%d", vc.vr.id), 10000) if err != nil {
if err != nil { return err
return err }
var tableToCopy string
copyState := make(map[string]*sqltypes.Result)
for _, row := range qr.Rows {
tableName := row[0].ToString()
lastpk := row[1].ToString()
if tableToCopy == "" {
tableToCopy = tableName
} }
var tableToCopy string copyState[tableName] = nil
copyState := make(map[string]*sqltypes.Result) if lastpk != "" {
for _, row := range qr.Rows { var r querypb.QueryResult
tableName := row[0].ToString() if err := proto.UnmarshalText(lastpk, &r); err != nil {
lastpk := row[1].ToString()
if tableToCopy == "" {
tableToCopy = tableName
}
copyState[tableName] = nil
if lastpk != "" {
var r querypb.QueryResult
if err := proto.UnmarshalText(lastpk, &r); err != nil {
return err
}
copyState[tableName] = sqltypes.Proto3ToResult(&r)
}
}
if len(copyState) == 0 {
if err := vc.vr.setState(binlogplayer.BlpRunning, ""); err != nil {
return err return err
} }
return nil copyState[tableName] = sqltypes.Proto3ToResult(&r)
}
if err := vc.catchup(ctx, settings, copyState); err != nil {
return err
}
if err := vc.copyTable(ctx, tableToCopy, settings, copyState); err != nil {
return err
} }
} }
if len(copyState) == 0 {
return fmt.Errorf("unexpected: there are no tables to copy")
}
if err := vc.catchup(ctx, copyState); err != nil {
return err
}
return vc.copyTable(ctx, tableToCopy, copyState)
} }
func (vc *vcopier) catchup(ctx context.Context, settings binlogplayer.VRSettings, copyState map[string]*sqltypes.Result) error { func (vc *vcopier) catchup(ctx context.Context, copyState map[string]*sqltypes.Result) error {
ctx, cancel := context.WithTimeout(ctx, 1*time.Hour)
settings, err := binlogplayer.ReadVRSettings(vc.vr.dbClient, vc.vr.id)
if err != nil {
return err
}
if settings.StartPos.IsZero() { if settings.StartPos.IsZero() {
return nil return nil
} }
ctx, cancel := context.WithTimeout(ctx, 1*time.Hour)
// Start vreplication. // Start vreplication.
errch := make(chan error, 1) errch := make(chan error, 1)
go func() { go func() {
@ -157,7 +154,7 @@ func (vc *vcopier) catchup(ctx context.Context, settings binlogplayer.VRSettings
} }
} }
func (vc *vcopier) copyTable(ctx context.Context, tableName string, settings binlogplayer.VRSettings, copyState map[string]*sqltypes.Result) error { func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState map[string]*sqltypes.Result) error {
defer vc.vr.dbClient.Rollback() defer vc.vr.dbClient.Rollback()
log.Infof("Copying table %s, lastpk: %v", tableName, copyState[tableName]) log.Infof("Copying table %s, lastpk: %v", tableName, copyState[tableName])
@ -198,7 +195,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, settings bin
if len(rows.Fields) == 0 { if len(rows.Fields) == 0 {
return fmt.Errorf("expecting field event first, got: %v", rows) return fmt.Errorf("expecting field event first, got: %v", rows)
} }
if err := vc.fastForward(ctx, settings, copyState, rows.Gtid); err != nil { if err := vc.fastForward(ctx, copyState, rows.Gtid); err != nil {
return err return err
} }
fieldEvent := &binlogdatapb.FieldEvent{ fieldEvent := &binlogdatapb.FieldEvent{
@ -264,11 +261,15 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, settings bin
return nil return nil
} }
func (vc *vcopier) fastForward(ctx context.Context, settings binlogplayer.VRSettings, copyState map[string]*sqltypes.Result, gtid string) error { func (vc *vcopier) fastForward(ctx context.Context, copyState map[string]*sqltypes.Result, gtid string) error {
pos, err := mysql.DecodePosition(gtid) pos, err := mysql.DecodePosition(gtid)
if err != nil { if err != nil {
return err return err
} }
settings, err := binlogplayer.ReadVRSettings(vc.vr.dbClient, vc.vr.id)
if err != nil {
return err
}
if settings.StartPos.IsZero() { if settings.StartPos.IsZero() {
update := binlogplayer.GenerateUpdatePos(vc.vr.id, pos, time.Now().Unix(), 0) update := binlogplayer.GenerateUpdatePos(vc.vr.id, pos, time.Now().Unix(), 0)
_, err := vc.vr.dbClient.ExecuteFetch(update, 0) _, err := vc.vr.dbClient.ExecuteFetch(update, 0)

Просмотреть файл

@ -23,6 +23,7 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/binlog/binlogplayer"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
@ -80,21 +81,30 @@ func TestPlayerCopyTables(t *testing.T) {
}() }()
expectDBClientQueries(t, []string{ expectDBClientQueries(t, []string{
"/insert", "/insert into _vt.vreplication",
// Create the list of tables to copy and transition to Copying state.
"begin", "begin",
"/insert into _vt.copy_state", "/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'", "/update _vt.vreplication set state='Copying'",
"commit", "commit",
"rollback", "rollback",
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=", "/update _vt.vreplication set pos=",
"begin", "begin",
"/insert into dst1", "insert into dst1(id,val) values (1,'aaa'), (2,'bbb')",
"/update _vt.copy_state set lastpk", `/update _vt.copy_state set lastpk='fields:<name:\\"id\\" type:INT32 > rows:<lengths:1 values:\\"2\\" > ' where vrepl_id=.*`,
"commit", "commit",
// copy of dst1 is done: delete from copy_state.
"/delete from _vt.copy_state.*dst1", "/delete from _vt.copy_state.*dst1",
"rollback", "rollback",
// The next FF executes and updates the position before copying.
"begin",
"/update _vt.vreplication set pos=",
"commit",
// Nothing to copy from yes. Delete from copy_state.
"/delete from _vt.copy_state.*yes", "/delete from _vt.copy_state.*yes",
"rollback", "rollback",
// All tables copied. Final catch up followed by Running state.
"/update _vt.vreplication set state='Running'", "/update _vt.vreplication set state='Running'",
}) })
expectData(t, "dst1", [][]string{ expectData(t, "dst1", [][]string{
@ -104,13 +114,27 @@ func TestPlayerCopyTables(t *testing.T) {
expectData(t, "yes", [][]string{}) expectData(t, "yes", [][]string{})
} }
func TestPlayerCopyTablePartial(t *testing.T) { // TestPlayerCopyTableContinuation tests the copy workflow where tables have been partially copied.
func TestPlayerCopyTableContinuation(t *testing.T) {
defer deleteTablet(addTablet(100, "0", topodatapb.TabletType_REPLICA, true, true)) defer deleteTablet(addTablet(100, "0", topodatapb.TabletType_REPLICA, true, true))
execStatements(t, []string{ execStatements(t, []string{
"create table src1(id int, val varbinary(128), primary key(id))", // src1 is initialized as partially copied.
"insert into src1 values(2, 'bbb'), (1, 'aaa')", // lastpk will be initialized at (6,6) later below.
// dst1 only copies id1 and val. This will allow us to test for correctness if id2 changes in the source.
"create table src1(id1 int, id2 int, val varbinary(128), primary key(id1, id2))",
"insert into src1 values(2,2,'no change'), (3,3,'update'), (4,4,'delete'), (5,5,'move within'), (6,6,'move out'), (8,8,'no change'), (9,9,'delete'), (10,10,'update'), (11,11,'move in')",
fmt.Sprintf("create table %s.dst1(id int, val varbinary(128), primary key(id))", vrepldb), fmt.Sprintf("create table %s.dst1(id int, val varbinary(128), primary key(id))", vrepldb),
fmt.Sprintf("insert into %s.dst1 values(2,'no change'), (3,'update'), (4,'delete'), (5,'move within'), (6,'move out')", vrepldb),
// copied is initialized as fully copied
"create table copied(id int, val varbinary(128), primary key(id))",
"insert into copied values(1,'aaa')",
fmt.Sprintf("create table %s.copied(id int, val varbinary(128), primary key(id))", vrepldb),
fmt.Sprintf("insert into %s.copied values(1,'aaa')", vrepldb),
// not_copied yet to be copied.
"create table not_copied(id int, val varbinary(128), primary key(id))",
"insert into not_copied values(1,'aaa')",
fmt.Sprintf("create table %s.not_copied(id int, val varbinary(128), primary key(id))", vrepldb),
}) })
defer execStatements(t, []string{ defer execStatements(t, []string{
"drop table src1", "drop table src1",
@ -121,9 +145,44 @@ func TestPlayerCopyTablePartial(t *testing.T) {
filter := &binlogdatapb.Filter{ filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{ Rules: []*binlogdatapb.Rule{{
Match: "dst1", Match: "dst1",
Filter: "select * from src1", Filter: "select id1 as id, val from src1",
}, {
Match: "copied",
Filter: "select * from copied",
}, {
Match: "not_copied",
Filter: "select * from not_copied",
}}, }},
} }
pos := masterPosition(t)
execStatements(t, []string{
// insert inside and outside current range.
"insert into src1 values(1,1,'insert in'), (7,7,'insert out')",
// update inside and outside current range.
"update src1 set val='updated' where id1 in (3,10)",
// delete inside and outside current range.
"delete from src1 where id1 in (4,9)",
// move row within range by changing id2.
"update src1 set id2=10 where id1=5",
// move row from within to outside range.
"update src1 set id1=12 where id1=6",
// move row from outside to witihn range.
"update src1 set id1=4 where id1=11",
// modify the copied table.
"update copied set val='bbb' where id=1",
// modify the uncopied table.
"update not_copied set val='bbb' where id=1",
})
// Set a hook to execute statements just before the copy begins from src1.
streamRowsHook = func() {
execStatements(t, []string{
"update src1 set val='updated again' where id1 = 3",
})
// Set it back to nil. Otherwise, this will get executed again when copying not_copied.
streamRowsHook = nil
}
defer func() { streamRowsHook = nil }()
bls := &binlogdatapb.BinlogSource{ bls := &binlogdatapb.BinlogSource{
Keyspace: env.KeyspaceName, Keyspace: env.KeyspaceName,
@ -136,11 +195,21 @@ func TestPlayerCopyTablePartial(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// As mentioned above. lastpk cut-off is set at (6,6)
lastpk := sqltypes.ResultToProto3(sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id1|id2",
"int32|int32",
),
"6|6",
))
lastpk.RowsAffected = 0
execStatements(t, []string{ execStatements(t, []string{
fmt.Sprintf("insert into _vt.copy_state values(%d, '%s', '%s')", qr.InsertID, "dst1", `fields:<name:"id" type:INT32 > rows:<lengths:1 values:"1" > `), fmt.Sprintf("insert into _vt.copy_state values(%d, '%s', %s)", qr.InsertID, "dst1", encodeString(fmt.Sprintf("%v", lastpk))),
fmt.Sprintf("insert into _vt.copy_state values(%d, '%s', null)", qr.InsertID, "not_copied"),
}) })
id := qr.InsertID id := qr.InsertID
_, err = playerEngine.Exec(fmt.Sprintf("update _vt.vreplication set state='Copying' where id=%d", id)) _, err = playerEngine.Exec(fmt.Sprintf("update _vt.vreplication set state='Copying', pos=%s where id=%d", encodeString(pos), id))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -149,9 +218,11 @@ func TestPlayerCopyTablePartial(t *testing.T) {
if _, err := playerEngine.Exec(query); err != nil { if _, err := playerEngine.Exec(query); err != nil {
t.Fatal(err) t.Fatal(err)
} }
expectDBClientQueries(t, []string{ for q := range globalDBQueries {
"/delete", if strings.HasPrefix(q, "delete from _vt.vreplication") {
}) break
}
}
}() }()
for q := range globalDBQueries { for q := range globalDBQueries {
@ -160,17 +231,40 @@ func TestPlayerCopyTablePartial(t *testing.T) {
} }
} }
expectDBClientQueries(t, []string{ expectNontxQueries(t, []string{
"/update _vt.vreplication set pos=", // Catchup
"begin", "insert into dst1(id,val) select 1, 'insert in' where (1,1) <= (6,6)",
"/insert into dst1", "insert into dst1(id,val) select 7, 'insert out' where (7,7) <= (6,6)",
"/update _vt.copy_state set lastpk", "update dst1 set val='updated' where id=3 and (3,3) <= (6,6)",
"commit", "update dst1 set val='updated' where id=10 and (10,10) <= (6,6)",
"delete from dst1 where id=4 and (4,4) <= (6,6)",
"delete from dst1 where id=9 and (9,9) <= (6,6)",
"delete from dst1 where id=5 and (5,5) <= (6,6)",
"insert into dst1(id,val) select 5, 'move within' where (5,10) <= (6,6)",
"delete from dst1 where id=6 and (6,6) <= (6,6)",
"insert into dst1(id,val) select 12, 'move out' where (12,6) <= (6,6)",
"delete from dst1 where id=11 and (11,11) <= (6,6)",
"insert into dst1(id,val) select 4, 'move in' where (4,11) <= (6,6)",
"update copied set val='bbb' where id=1",
// Fast-forward
"update dst1 set val='updated again' where id=3 and (3,3) <= (6,6)",
// Copy
"insert into dst1(id,val) values (7,'insert out'), (8,'no change'), (10,'updated'), (12,'move out')",
`/update _vt.copy_state set lastpk='fields:<name:\\"id1\\" type:INT32 > fields:<name:\\"id2\\" type:INT32 > rows:<lengths:2 lengths:1 values:\\"126\\" > ' where vrepl_id=.*`,
"/delete from _vt.copy_state.*dst1", "/delete from _vt.copy_state.*dst1",
"rollback", "rollback",
"/update _vt.vreplication set state='Running'", // Copy again. There should be no events for catchup.
"insert into not_copied(id,val) values (1,'bbb')",
}) })
expectData(t, "dst1", [][]string{ expectData(t, "dst1", [][]string{
{"2", "bbb"}, {"1", "insert in"},
{"2", "no change"},
{"3", "updated again"},
{"4", "move in"},
{"5", "move within"},
{"7", "insert out"},
{"8", "no change"},
{"10", "updated"},
{"12", "move out"},
}) })
} }

Просмотреть файл

@ -611,6 +611,8 @@ func TestPlayerDDL(t *testing.T) {
} }
// It should stop at the next DDL // It should stop at the next DDL
expectDBClientQueries(t, []string{ expectDBClientQueries(t, []string{
"/update.*'Running'",
// Second update is from vreplicator.
"/update.*'Running'", "/update.*'Running'",
"begin", "begin",
fmt.Sprintf("/update.*'%s'", pos2), fmt.Sprintf("/update.*'%s'", pos2),
@ -703,6 +705,8 @@ func TestPlayerStopPos(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
expectDBClientQueries(t, []string{ expectDBClientQueries(t, []string{
"/update.*'Running'",
// Second update is from vreplicator.
"/update.*'Running'", "/update.*'Running'",
"begin", "begin",
"insert into yes(id,val) values (1,'aaa')", "insert into yes(id,val) values (1,'aaa')",
@ -725,6 +729,8 @@ func TestPlayerStopPos(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
expectDBClientQueries(t, []string{ expectDBClientQueries(t, []string{
"/update.*'Running'",
// Second update is from vreplicator.
"/update.*'Running'", "/update.*'Running'",
"begin", "begin",
// Since 'no' generates empty transactions that are skipped by // Since 'no' generates empty transactions that are skipped by
@ -740,6 +746,8 @@ func TestPlayerStopPos(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
expectDBClientQueries(t, []string{ expectDBClientQueries(t, []string{
"/update.*'Running'",
// Second update is from vreplicator.
"/update.*'Running'", "/update.*'Running'",
"/update.*'Stopped'.*already reached", "/update.*'Stopped'.*already reached",
}) })
@ -1208,6 +1216,7 @@ func TestRestartOnVStreamEnd(t *testing.T) {
"insert into t1 values(2, 'aaa')", "insert into t1 values(2, 'aaa')",
}) })
expectDBClientQueries(t, []string{ expectDBClientQueries(t, []string{
"/update _vt.vreplication set state='Running'",
"begin", "begin",
"insert into t1(id,val) values (2,'aaa')", "insert into t1(id,val) values (2,'aaa')",
"/update _vt.vreplication set pos=", "/update _vt.vreplication set pos=",
@ -1283,7 +1292,8 @@ func startVReplication(t *testing.T, filter *binlogdatapb.Filter, onddl binlogda
t.Fatal(err) t.Fatal(err)
} }
expectDBClientQueries(t, []string{ expectDBClientQueries(t, []string{
"/insert", "/insert into _vt.vreplication",
"/update _vt.vreplication set state='Running'",
}) })
return func() { return func() {
t.Helper() t.Helper()

Просмотреть файл

@ -89,7 +89,7 @@ func (vr *vreplicator) Replicate(ctx context.Context) error {
} }
switch { switch {
case numTablesToCopy != 0: case numTablesToCopy != 0:
if err := newVCopier(vr).copyTables(ctx, settings); err != nil { if err := newVCopier(vr).copyNext(ctx, settings); err != nil {
return err return err
} }
case settings.StartPos.IsZero(): case settings.StartPos.IsZero():
@ -97,6 +97,9 @@ func (vr *vreplicator) Replicate(ctx context.Context) error {
return err return err
} }
default: default:
if err := vr.setState(binlogplayer.BlpRunning, ""); err != nil {
return err
}
return newVPlayer(vr, settings, nil, mysql.Position{}).play(ctx) return newVPlayer(vr, settings, nil, mysql.Position{}).play(ctx)
} }
} }

Просмотреть файл

@ -142,16 +142,20 @@ func (rs *rowStreamer) buildSelect() (string, error) {
if len(rs.lastpk) != len(rs.pkColumns) { if len(rs.lastpk) != len(rs.pkColumns) {
return "", fmt.Errorf("primary key values don't match length: %v vs %v", rs.lastpk, rs.pkColumns) return "", fmt.Errorf("primary key values don't match length: %v vs %v", rs.lastpk, rs.pkColumns)
} }
prefix := " where " buf.WriteString(" where (")
for i, pk := range rs.pkColumns { prefix := ""
if i == len(rs.lastpk)-1 { for _, pk := range rs.pkColumns {
buf.Myprintf("%s%v > ", prefix, rs.plan.Table.Columns[pk].Name) buf.Myprintf("%s%v", prefix, rs.plan.Table.Columns[pk].Name)
} else { prefix = ","
buf.Myprintf("%s%v >= ", prefix, rs.plan.Table.Columns[pk].Name)
}
rs.lastpk[i].EncodeSQL(buf)
prefix = " and "
} }
buf.WriteString(") > (")
prefix = ""
for _, val := range rs.lastpk {
buf.WriteString(prefix)
prefix = ","
val.EncodeSQL(buf)
}
buf.WriteString(")")
} }
buf.Myprintf(" order by ", sqlparser.NewTableIdent(rs.plan.Table.Name)) buf.Myprintf(" order by ", sqlparser.NewTableIdent(rs.plan.Table.Name))
prefix = "" prefix = ""