diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_plan.go b/go/vt/vttablet/tabletmanager/vreplication/controller_plan.go index e3c73415a1..06ce98f77f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller_plan.go @@ -18,18 +18,24 @@ package vreplication import ( "fmt" - "strconv" "vitess.io/vitess/go/vt/sqlparser" ) // controllerPlan is the plan for vreplication control statements. type controllerPlan struct { - opcode int query string - // delCopySate is set for deletes. - delCopyState string - id int + opcode int + + // numInserts is set for insertQuery. + numInserts int + + // selector and applier are set for updateQuery and deleteQuery. + selector string + applier *sqlparser.ParsedQuery + + // delCopyState is set of deletes. + delCopyState *sqlparser.ParsedQuery } const ( @@ -46,18 +52,24 @@ func buildControllerPlan(query string) (*controllerPlan, error) { if err != nil { return nil, err } + var plan *controllerPlan switch stmt := stmt.(type) { case *sqlparser.Insert: - return buildInsertPlan(stmt) + plan, err = buildInsertPlan(stmt) case *sqlparser.Update: - return buildUpdatePlan(stmt) + plan, err = buildUpdatePlan(stmt) case *sqlparser.Delete: - return buildDeletePlan(stmt) + plan, err = buildDeletePlan(stmt) case *sqlparser.Select: - return buildSelectPlan(stmt) + plan, err = buildSelectPlan(stmt) default: return nil, fmt.Errorf("unsupported construct: %s", sqlparser.String(stmt)) } + if err != nil { + return nil, err + } + plan.query = query + return plan, nil } func buildInsertPlan(ins *sqlparser.Insert) (*controllerPlan, error) { @@ -65,7 +77,6 @@ func buildInsertPlan(ins *sqlparser.Insert) (*controllerPlan, error) { case reshardingJournalTableName: return &controllerPlan{ opcode: reshardingJournalQuery, - query: sqlparser.String(ins), }, nil case vreplicationTableName: // no-op @@ -88,15 +99,8 @@ func buildInsertPlan(ins *sqlparser.Insert) (*controllerPlan, error) { if !ok { return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins)) } - if len(rows) != 1 { - return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins)) - } - row := rows[0] idPos := 0 if len(ins.Columns) != 0 { - if len(ins.Columns) != len(row) { - return nil, fmt.Errorf("malformed statement: %v", sqlparser.String(ins)) - } idPos = -1 for i, col := range ins.Columns { if col.EqualString("id") { @@ -106,13 +110,18 @@ func buildInsertPlan(ins *sqlparser.Insert) (*controllerPlan, error) { } } if idPos >= 0 { - if _, ok := row[idPos].(*sqlparser.NullVal); !ok { - return nil, fmt.Errorf("id should not have a value: %v", sqlparser.String(ins)) + for _, row := range rows { + if idPos >= len(row) { + return nil, fmt.Errorf("malformed statement: %v", sqlparser.String(ins)) + } + if _, ok := row[idPos].(*sqlparser.NullVal); !ok { + return nil, fmt.Errorf("id should not have a value: %v", sqlparser.String(ins)) + } } } return &controllerPlan{ - opcode: insertQuery, - query: sqlparser.String(ins), + opcode: insertQuery, + numInserts: len(rows), }, nil } @@ -121,7 +130,6 @@ func buildUpdatePlan(upd *sqlparser.Update) (*controllerPlan, error) { case reshardingJournalTableName: return &controllerPlan{ opcode: reshardingJournalQuery, - query: sqlparser.String(upd), }, nil case vreplicationTableName: // no-op @@ -137,15 +145,24 @@ func buildUpdatePlan(upd *sqlparser.Update) (*controllerPlan, error) { } } - id, err := extractID(upd.Where) - if err != nil { - return nil, err + buf1 := sqlparser.NewTrackedBuffer(nil) + buf1.Myprintf("select id from %s%v", vreplicationTableName, upd.Where) + upd.Where = &sqlparser.Where{ + Type: sqlparser.WhereStr, + Expr: &sqlparser.ComparisonExpr{ + Left: &sqlparser.ColName{Name: sqlparser.NewColIdent("id")}, + Operator: sqlparser.InStr, + Right: sqlparser.ListArg("::ids"), + }, } + buf2 := sqlparser.NewTrackedBuffer(nil) + buf2.Myprintf("%v", upd) + return &controllerPlan{ - opcode: updateQuery, - query: sqlparser.String(upd), - id: id, + opcode: updateQuery, + selector: buf1.String(), + applier: buf2.ParsedQuery(), }, nil } @@ -154,7 +171,6 @@ func buildDeletePlan(del *sqlparser.Delete) (*controllerPlan, error) { case reshardingJournalTableName: return &controllerPlan{ opcode: reshardingJournalQuery, - query: sqlparser.String(del), }, nil case vreplicationTableName: // no-op @@ -171,49 +187,46 @@ func buildDeletePlan(del *sqlparser.Delete) (*controllerPlan, error) { return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(del)) } - id, err := extractID(del.Where) - if err != nil { - return nil, err + buf1 := sqlparser.NewTrackedBuffer(nil) + buf1.Myprintf("select id from %s%v", vreplicationTableName, del.Where) + del.Where = &sqlparser.Where{ + Type: sqlparser.WhereStr, + Expr: &sqlparser.ComparisonExpr{ + Left: &sqlparser.ColName{Name: sqlparser.NewColIdent("id")}, + Operator: sqlparser.InStr, + Right: sqlparser.ListArg("::ids"), + }, } + buf2 := sqlparser.NewTrackedBuffer(nil) + buf2.Myprintf("%v", del) + + copyStateWhere := &sqlparser.Where{ + Type: sqlparser.WhereStr, + Expr: &sqlparser.ComparisonExpr{ + Left: &sqlparser.ColName{Name: sqlparser.NewColIdent("vrepl_id")}, + Operator: sqlparser.InStr, + Right: sqlparser.ListArg("::ids"), + }, + } + buf3 := sqlparser.NewTrackedBuffer(nil) + buf3.Myprintf("delete from %s%v", copyStateTableName, copyStateWhere) + return &controllerPlan{ opcode: deleteQuery, - query: sqlparser.String(del), - delCopyState: fmt.Sprintf("delete from %s where vrepl_id = %d", copySateTableName, id), - id: id, + selector: buf1.String(), + applier: buf2.ParsedQuery(), + delCopyState: buf3.ParsedQuery(), }, nil } func buildSelectPlan(sel *sqlparser.Select) (*controllerPlan, error) { switch sqlparser.String(sel.From) { - case vreplicationTableName, reshardingJournalTableName, copySateTableName: + case vreplicationTableName, reshardingJournalTableName, copyStateTableName: return &controllerPlan{ opcode: selectQuery, - query: sqlparser.String(sel), }, nil default: return nil, fmt.Errorf("invalid table name: %v", sqlparser.String(sel.From)) } } - -func extractID(where *sqlparser.Where) (int, error) { - if where == nil { - return 0, fmt.Errorf("invalid where clause:%v", sqlparser.String(where)) - } - comp, ok := where.Expr.(*sqlparser.ComparisonExpr) - if !ok { - return 0, fmt.Errorf("invalid where clause:%v", sqlparser.String(where)) - } - if sqlparser.String(comp.Left) != "id" { - return 0, fmt.Errorf("invalid where clause:%v", sqlparser.String(where)) - } - if comp.Operator != sqlparser.EqualStr { - return 0, fmt.Errorf("invalid where clause:%v", sqlparser.String(where)) - } - - id, err := strconv.Atoi(sqlparser.String(comp.Right)) - if err != nil { - return 0, fmt.Errorf("invalid where clause:%v", sqlparser.String(where)) - } - return id, nil -} diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_plan_test.go b/go/vt/vttablet/tabletmanager/vreplication/controller_plan_test.go index ecdb32e648..096dbb1593 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller_plan_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller_plan_test.go @@ -21,35 +21,54 @@ import ( "testing" ) +type testControllerPlan struct { + query string + opcode int + numInserts int + selector string + applier string + delCopyState string +} + func TestControllerPlan(t *testing.T) { tcases := []struct { in string - plan *controllerPlan + plan *testControllerPlan err string }{{ // Insert in: "insert into _vt.vreplication values(null)", - plan: &controllerPlan{ - opcode: insertQuery, - query: "insert into _vt.vreplication values (null)", + plan: &testControllerPlan{ + query: "insert into _vt.vreplication values(null)", + opcode: insertQuery, + numInserts: 1, }, }, { in: "insert into _vt.vreplication(id) values(null)", - plan: &controllerPlan{ - opcode: insertQuery, - query: "insert into _vt.vreplication(id) values (null)", + plan: &testControllerPlan{ + query: "insert into _vt.vreplication(id) values(null)", + opcode: insertQuery, + numInserts: 1, }, }, { in: "insert into _vt.vreplication(workflow, id) values('', null)", - plan: &controllerPlan{ - opcode: insertQuery, - query: "insert into _vt.vreplication(workflow, id) values ('', null)", + plan: &testControllerPlan{ + query: "insert into _vt.vreplication(workflow, id) values('', null)", + opcode: insertQuery, + numInserts: 1, + }, + }, { + in: "insert into _vt.vreplication values(null), (null)", + plan: &testControllerPlan{ + query: "insert into _vt.vreplication values(null), (null)", + opcode: insertQuery, + numInserts: 2, }, }, { in: "insert into _vt.resharding_journal values (1)", - plan: &controllerPlan{ - opcode: reshardingJournalQuery, + plan: &testControllerPlan{ query: "insert into _vt.resharding_journal values (1)", + opcode: reshardingJournalQuery, }, }, { in: "replace into _vt.vreplication values(null)", @@ -70,11 +89,8 @@ func TestControllerPlan(t *testing.T) { in: "insert into _vt.vreplication select * from a", err: "unsupported construct: insert into _vt.vreplication select * from a", }, { - in: "insert into _vt.vreplication values(null), (null)", - err: "unsupported construct: insert into _vt.vreplication values (null), (null)", - }, { - in: "insert into _vt.vreplication(a, b, c) values(null)", - err: "malformed statement: insert into _vt.vreplication(a, b, c) values (null)", + in: "insert into _vt.vreplication(a, b, id) values(null)", + err: "malformed statement: insert into _vt.vreplication(a, b, id) values (null)", }, { in: "insert into _vt.vreplication(workflow, id) values('aa', 1)", err: "id should not have a value: insert into _vt.vreplication(workflow, id) values ('aa', 1)", @@ -85,16 +101,33 @@ func TestControllerPlan(t *testing.T) { // Update }, { in: "update _vt.vreplication set state='Running' where id = 1", - plan: &controllerPlan{ - opcode: updateQuery, - query: "update _vt.vreplication set state = 'Running' where id = 1", - id: 1, + plan: &testControllerPlan{ + query: "update _vt.vreplication set state='Running' where id = 1", + opcode: updateQuery, + selector: "select id from _vt.vreplication where id = 1", + applier: "update _vt.vreplication set state = 'Running' where id in ::ids", + }, + }, { + in: "update _vt.vreplication set state='Running'", + plan: &testControllerPlan{ + query: "update _vt.vreplication set state='Running'", + opcode: updateQuery, + selector: "select id from _vt.vreplication", + applier: "update _vt.vreplication set state = 'Running' where id in ::ids", + }, + }, { + in: "update _vt.vreplication set state='Running' where a = 1", + plan: &testControllerPlan{ + query: "update _vt.vreplication set state='Running' where a = 1", + opcode: updateQuery, + selector: "select id from _vt.vreplication where a = 1", + applier: "update _vt.vreplication set state = 'Running' where id in ::ids", }, }, { in: "update _vt.resharding_journal set col = 1", - plan: &controllerPlan{ - opcode: reshardingJournalQuery, + plan: &testControllerPlan{ query: "update _vt.resharding_journal set col = 1", + opcode: reshardingJournalQuery, }, }, { in: "update a set state='Running' where id = 1", @@ -108,36 +141,40 @@ func TestControllerPlan(t *testing.T) { }, { in: "update _vt.vreplication set state='Running', id = 2 where id = 1", err: "id cannot be changed: id = 2", - }, { - in: "update _vt.vreplication set state='Running'", - err: "invalid where clause:", - }, { - in: "update _vt.vreplication set state='Running' where a = 1 and id = 2", - err: "invalid where clause: where a = 1 and id = 2", - }, { - in: "update _vt.vreplication set state='Running' where a = 1", - err: "invalid where clause: where a = 1", - }, { - in: "update _vt.vreplication set state='Running' where id > 1", - err: "invalid where clause: where id > 1", - }, { - in: "update _vt.vreplication set state='Running' where id = 1.1", - err: "invalid where clause: where id = 1.1", // Delete }, { in: "delete from _vt.vreplication where id = 1", - plan: &controllerPlan{ - opcode: deleteQuery, + plan: &testControllerPlan{ query: "delete from _vt.vreplication where id = 1", - delCopyState: "delete from _vt.copy_state where vrepl_id = 1", - id: 1, + opcode: deleteQuery, + selector: "select id from _vt.vreplication where id = 1", + applier: "delete from _vt.vreplication where id in ::ids", + delCopyState: "delete from _vt.copy_state where vrepl_id in ::ids", + }, + }, { + in: "delete from _vt.vreplication", + plan: &testControllerPlan{ + query: "delete from _vt.vreplication", + opcode: deleteQuery, + selector: "select id from _vt.vreplication", + applier: "delete from _vt.vreplication where id in ::ids", + delCopyState: "delete from _vt.copy_state where vrepl_id in ::ids", + }, + }, { + in: "delete from _vt.vreplication where a = 1", + plan: &testControllerPlan{ + query: "delete from _vt.vreplication where a = 1", + opcode: deleteQuery, + selector: "select id from _vt.vreplication where a = 1", + applier: "delete from _vt.vreplication where id in ::ids", + delCopyState: "delete from _vt.copy_state where vrepl_id in ::ids", }, }, { in: "delete from _vt.resharding_journal where id = 1", - plan: &controllerPlan{ - opcode: reshardingJournalQuery, + plan: &testControllerPlan{ query: "delete from _vt.resharding_journal where id = 1", + opcode: reshardingJournalQuery, }, }, { in: "delete from a where id = 1", @@ -154,38 +191,23 @@ func TestControllerPlan(t *testing.T) { }, { in: "delete from _vt.vreplication partition (a) where id = 1 limit 1", err: "unsupported construct: delete from _vt.vreplication partition (a) where id = 1 limit 1", - }, { - in: "delete from _vt.vreplication", - err: "invalid where clause:", - }, { - in: "delete from _vt.vreplication where a = 1 and id = 2", - err: "invalid where clause: where a = 1 and id = 2", - }, { - in: "delete from _vt.vreplication where a = 1", - err: "invalid where clause: where a = 1", - }, { - in: "delete from _vt.vreplication where id > 1", - err: "invalid where clause: where id > 1", - }, { - in: "delete from _vt.vreplication where id = 1.1", - err: "invalid where clause: where id = 1.1", // Select }, { in: "select * from _vt.vreplication", - plan: &controllerPlan{ + plan: &testControllerPlan{ opcode: selectQuery, query: "select * from _vt.vreplication", }, }, { in: "select * from _vt.resharding_journal", - plan: &controllerPlan{ + plan: &testControllerPlan{ opcode: selectQuery, query: "select * from _vt.resharding_journal", }, }, { in: "select * from _vt.copy_state", - plan: &controllerPlan{ + plan: &testControllerPlan{ opcode: selectQuery, query: "select * from _vt.copy_state", }, @@ -213,8 +235,20 @@ func TestControllerPlan(t *testing.T) { t.Errorf("getPlan(%v) error:\n%v, want\n%v", tcase.in, err, tcase.err) continue } - if !reflect.DeepEqual(pl, tcase.plan) { - t.Errorf("getPlan(%v):\n%+v, want\n%+v", tcase.in, pl, tcase.plan) + gotPlan := &testControllerPlan{ + query: pl.query, + opcode: pl.opcode, + numInserts: pl.numInserts, + selector: pl.selector, + } + if pl.applier != nil { + gotPlan.applier = pl.applier.Query + } + if pl.delCopyState != nil { + gotPlan.delCopyState = pl.delCopyState.Query + } + if !reflect.DeepEqual(gotPlan, tcase.plan) { + t.Errorf("getPlan(%v):\n%+v, want\n%+v", tcase.in, gotPlan, tcase.plan) } } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go index ba858cf04d..f330985c4c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go @@ -48,8 +48,10 @@ var ( }, }, } - testDMLResponse = &sqltypes.Result{RowsAffected: 1} - testPos = "MariaDB/0-1-1083" + testSelectorResponse1 = &sqltypes.Result{Rows: [][]sqltypes.Value{{sqltypes.NewInt64(1)}}} + testSelectorResponse2 = &sqltypes.Result{Rows: [][]sqltypes.Value{{sqltypes.NewInt64(1)}, {sqltypes.NewInt64(2)}}} + testDMLResponse = &sqltypes.Result{RowsAffected: 1} + testPos = "MariaDB/0-1-1083" ) func TestControllerKeyRange(t *testing.T) { diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 9a08a97469..4baad2ac62 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -29,13 +29,14 @@ import ( "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" + querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/topo" ) const ( reshardingJournalTableName = "_vt.resharding_journal" vreplicationTableName = "_vt.vreplication" - copySateTableName = "_vt.copy_state" + copyStateTableName = "_vt.copy_state" createReshardingJournalTable = `create table if not exists _vt.resharding_journal( id bigint, @@ -269,54 +270,92 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) { if qr.InsertID == 0 { return nil, fmt.Errorf("insert failed to generate an id") } - params, err := readRow(dbClient, int(qr.InsertID)) - if err != nil { - return nil, err + for id := int(qr.InsertID); id < int(qr.InsertID)+plan.numInserts; id++ { + if ct := vre.controllers[id]; ct != nil { + // Unreachable. Just a failsafe. + ct.Stop() + delete(vre.controllers, id) + } + params, err := readRow(dbClient, id) + if err != nil { + return nil, err + } + ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, nil) + if err != nil { + return nil, err + } + vre.controllers[id] = ct } - // Create a controller for the newly created row. - ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, nil) - if err != nil { - return nil, err - } - vre.controllers[int(qr.InsertID)] = ct return qr, nil case updateQuery: - var blpStats *binlogplayer.Stats - if ct := vre.controllers[plan.id]; ct != nil { - // Stop the current controller. - ct.Stop() - blpStats = ct.blpStats - } - qr, err := vre.executeFetchMaybeCreateTable(dbClient, plan.query, 1) + ids, bv, err := vre.fetchIDs(dbClient, plan.selector) if err != nil { return nil, err } - params, err := readRow(dbClient, plan.id) + if len(ids) == 0 { + return &sqltypes.Result{}, nil + } + blpStats := make(map[int]*binlogplayer.Stats) + for _, id := range ids { + if ct := vre.controllers[id]; ct != nil { + // Stop the current controller. + ct.Stop() + blpStats[id] = ct.blpStats + } + } + query, err := plan.applier.GenerateQuery(bv, nil) if err != nil { return nil, err } - // Create a new controller in place of the old one. - // For continuity, the new controller inherits the previous stats. - ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, blpStats) + qr, err := vre.executeFetchMaybeCreateTable(dbClient, query, 1) if err != nil { return nil, err } - vre.controllers[plan.id] = ct + for _, id := range ids { + params, err := readRow(dbClient, id) + if err != nil { + return nil, err + } + // Create a new controller in place of the old one. + // For continuity, the new controller inherits the previous stats. + ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, blpStats[id]) + if err != nil { + return nil, err + } + vre.controllers[id] = ct + } return qr, nil case deleteQuery: - // Stop and delete the current controller. - if ct := vre.controllers[plan.id]; ct != nil { - ct.Stop() - delete(vre.controllers, plan.id) + ids, bv, err := vre.fetchIDs(dbClient, plan.selector) + if err != nil { + return nil, err + } + if len(ids) == 0 { + return &sqltypes.Result{}, nil + } + // Stop and delete the current controllers. + for _, id := range ids { + if ct := vre.controllers[id]; ct != nil { + ct.Stop() + delete(vre.controllers, id) + } } if err := dbClient.Begin(); err != nil { return nil, err } - qr, err := dbClient.ExecuteFetch(plan.query, 10000) + query, err := plan.applier.GenerateQuery(bv, nil) if err != nil { return nil, err } - if _, err := dbClient.ExecuteFetch(plan.delCopyState, 10000); err != nil { + qr, err := vre.executeFetchMaybeCreateTable(dbClient, query, 1) + if err != nil { + return nil, err + } + delQuery, err := plan.delCopyState.GenerateQuery(bv, nil) + if err != nil { + return nil, err + } + if _, err := dbClient.ExecuteFetch(delQuery, 10000); err != nil { // Legacy vreplication won't create this table. So, ignore table not found error. merr, isSQLErr := err.(*mysql.SQLError) if !isSQLErr || !(merr.Num == mysql.ERNoSuchTable) { @@ -334,6 +373,27 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) { panic("unreachable") } +func (vre *Engine) fetchIDs(dbClient binlogplayer.DBClient, selector string) (ids []int, bv map[string]*querypb.BindVariable, err error) { + qr, err := dbClient.ExecuteFetch(selector, 10000) + if err != nil { + return nil, nil, err + } + for _, row := range qr.Rows { + id, err := sqltypes.ToInt64(row[0]) + if err != nil { + return nil, nil, err + } + ids = append(ids, int(id)) + } + bvval, err := sqltypes.BuildBindVariable(ids) + if err != nil { + // Unreachable. + return nil, nil, err + } + bv = map[string]*querypb.BindVariable{"ids": bvval} + return ids, bv, nil +} + // WaitForPos waits for the replication to reach the specified position. func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error { mPos, err := mysql.DecodePosition(pos) diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go index 27f2d31d67..d16ca35cc4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go @@ -98,7 +98,7 @@ func TestEngineExec(t *testing.T) { defer vre.Close() dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) - dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{InsertID: 1}, nil) + dbClient.ExpectRequest("insert into _vt.vreplication values(null)", &sqltypes.Result{InsertID: 1}, nil) dbClient.ExpectRequest("select * from _vt.vreplication where id = 1", sqltypes.MakeTestResult( sqltypes.MakeTestFields( "id|state|source", @@ -138,7 +138,8 @@ func TestEngineExec(t *testing.T) { savedBlp := ct.blpStats dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) - dbClient.ExpectRequest("update _vt.vreplication set pos = 'MariaDB/0-1-1084', state = 'Running' where id = 1", testDMLResponse, nil) + dbClient.ExpectRequest("select id from _vt.vreplication where id = 1", testSelectorResponse1, nil) + dbClient.ExpectRequest("update _vt.vreplication set pos = 'MariaDB/0-1-1084', state = 'Running' where id in (1)", testDMLResponse, nil) dbClient.ExpectRequest("select * from _vt.vreplication where id = 1", sqltypes.MakeTestResult( sqltypes.MakeTestFields( "id|state|source", @@ -175,16 +176,25 @@ func TestEngineExec(t *testing.T) { t.Errorf("stats are mismatched: %v, want %v", globalStats.controllers, vre.controllers) } + // Test no update + dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) + dbClient.ExpectRequest("select id from _vt.vreplication where id = 2", &sqltypes.Result{}, nil) + _, err = vre.Exec("update _vt.vreplication set pos = 'MariaDB/0-1-1084', state = 'Running' where id = 2") + if err != nil { + t.Fatal(err) + } + dbClient.Wait() + // Test Delete dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) - delQuery := "delete from _vt.vreplication where id = 1" + dbClient.ExpectRequest("select id from _vt.vreplication where id = 1", testSelectorResponse1, nil) dbClient.ExpectRequest("begin", nil, nil) - dbClient.ExpectRequest(delQuery, testDMLResponse, nil) - dbClient.ExpectRequest("delete from _vt.copy_state where vrepl_id = 1", nil, nil) + dbClient.ExpectRequest("delete from _vt.vreplication where id in (1)", testDMLResponse, nil) + dbClient.ExpectRequest("delete from _vt.copy_state where vrepl_id in (1)", nil, nil) dbClient.ExpectRequest("commit", nil, nil) - qr, err = vre.Exec(delQuery) + qr, err = vre.Exec("delete from _vt.vreplication where id = 1") if err != nil { t.Fatal(err) } @@ -203,6 +213,30 @@ func TestEngineExec(t *testing.T) { if !reflect.DeepEqual(globalStats.controllers, vre.controllers) { t.Errorf("stats are mismatched: %v, want %v", globalStats.controllers, vre.controllers) } + + // Test Delete of multiple rows + + dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) + dbClient.ExpectRequest("select id from _vt.vreplication where id > 1", testSelectorResponse2, nil) + dbClient.ExpectRequest("begin", nil, nil) + dbClient.ExpectRequest("delete from _vt.vreplication where id in (1, 2)", testDMLResponse, nil) + dbClient.ExpectRequest("delete from _vt.copy_state where vrepl_id in (1, 2)", nil, nil) + dbClient.ExpectRequest("commit", nil, nil) + + _, err = vre.Exec("delete from _vt.vreplication where id > 1") + if err != nil { + t.Fatal(err) + } + dbClient.Wait() + + // Test no delete + dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) + dbClient.ExpectRequest("select id from _vt.vreplication where id = 3", &sqltypes.Result{}, nil) + _, err = vre.Exec("delete from _vt.vreplication where id = 3") + if err != nil { + t.Fatal(err) + } + dbClient.Wait() } func TestEngineBadInsert(t *testing.T) { @@ -224,7 +258,7 @@ func TestEngineBadInsert(t *testing.T) { defer vre.Close() dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) - dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{}, nil) + dbClient.ExpectRequest("insert into _vt.vreplication values(null)", &sqltypes.Result{}, nil) _, err := vre.Exec("insert into _vt.vreplication values(null)") want := "insert failed to generate an id" if err == nil || err.Error() != want { @@ -424,14 +458,14 @@ func TestCreateDBAndTable(t *testing.T) { // Missing table. Statement should get retried after creating everything. dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) - dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{}, &tableNotFound) + dbClient.ExpectRequest("insert into _vt.vreplication values(null)", &sqltypes.Result{}, &tableNotFound) dbClient.ExpectRequest("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("DROP TABLE IF EXISTS _vt.blp_checkpoint", &sqltypes.Result{}, nil) dbClient.ExpectRequestRE("CREATE TABLE IF NOT EXISTS _vt.vreplication.*", &sqltypes.Result{}, nil) dbClient.ExpectRequestRE("create table if not exists _vt.resharding_journal.*", &sqltypes.Result{}, nil) - dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{InsertID: 1}, nil) + dbClient.ExpectRequest("insert into _vt.vreplication values(null)", &sqltypes.Result{InsertID: 1}, nil) // The rest of this test is normal with no db errors or extra queries. diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go index e2c0e80c9b..110e5bc877 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go @@ -20,6 +20,7 @@ import ( "flag" "fmt" "strings" + "sync" "testing" "time" @@ -1532,13 +1533,17 @@ func startVReplication(t *testing.T, filter *binlogdatapb.Filter, onddl binlogda "/insert into _vt.vreplication", "/update _vt.vreplication set state='Running'", }) + + var once sync.Once return func() { t.Helper() - query := fmt.Sprintf("delete from _vt.vreplication where id = %d", qr.InsertID) - if _, err := playerEngine.Exec(query); err != nil { - t.Fatal(err) - } - expectDeleteQueries(t) + once.Do(func() { + query := fmt.Sprintf("delete from _vt.vreplication where id = %d", qr.InsertID) + if _, err := playerEngine.Exec(query); err != nil { + t.Fatal(err) + } + expectDeleteQueries(t) + }) }, int(qr.InsertID) } diff --git a/go/vt/wrangler/migrater_env_test.go b/go/vt/wrangler/migrater_env_test.go index 5384591ba4..1f9dd8ddf1 100644 --- a/go/vt/wrangler/migrater_env_test.go +++ b/go/vt/wrangler/migrater_env_test.go @@ -35,8 +35,8 @@ import ( "vitess.io/vitess/go/vt/vttablet/tmclient" ) -const vreplQueryks = "select id, source from _vt.vreplication where workflow = 'test' and db_name = 'vt_ks'" -const vreplQueryks2 = "select id, source from _vt.vreplication where workflow = 'test' and db_name = 'vt_ks2'" +const vreplQueryks = "select id, source from _vt.vreplication where workflow='test' and db_name='vt_ks'" +const vreplQueryks2 = "select id, source from _vt.vreplication where workflow='test' and db_name='vt_ks2'" type testMigraterEnv struct { ts *topo.Server diff --git a/go/vt/wrangler/migrater_test.go b/go/vt/wrangler/migrater_test.go index 2895ed6b46..a4983f0df8 100644 --- a/go/vt/wrangler/migrater_test.go +++ b/go/vt/wrangler/migrater_test.go @@ -31,6 +31,11 @@ import ( "vitess.io/vitess/go/vt/topo" ) +var ( + resultid1 = &sqltypes.Result{Rows: [][]sqltypes.Value{{sqltypes.NewInt64(1)}}} + resultid2 = &sqltypes.Result{Rows: [][]sqltypes.Value{{sqltypes.NewInt64(2)}}} +) + // TestTableMigrate tests table mode migrations. // This has to be kept in sync with TestShardMigrate. func TestTableMigrate(t *testing.T) { @@ -243,8 +248,8 @@ func TestTableMigrate(t *testing.T) { }) // Check for journals. - tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id = 9113431017721636330", &sqltypes.Result{}, nil) - tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id = 9113431017721636330", &sqltypes.Result{}, nil) + tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id=9113431017721636330", &sqltypes.Result{}, nil) + tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id=9113431017721636330", &sqltypes.Result{}, nil) // Wait for position: Reads current state, updates to Stopped, and re-reads. state := sqltypes.MakeTestResult(sqltypes.MakeTestFields( @@ -255,9 +260,12 @@ func TestTableMigrate(t *testing.T) { tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) tme.dbDest2Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil) - tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 1", &sqltypes.Result{}, nil) - tme.dbDest2Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 1", &sqltypes.Result{}, nil) - tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 2", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil) + tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 2", resultid2, nil) + tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (2)", &sqltypes.Result{}, nil) + tme.dbDest2Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil) + tme.dbDest2Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", &sqltypes.Result{}, nil) stopped := sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|state", "int64|varchar"), @@ -268,8 +276,8 @@ func TestTableMigrate(t *testing.T) { tme.dbDest1Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil) // Cancel Migration - cancel1 := "update _vt.vreplication set state = 'Running', stop_pos = null where id = 1" - cancel2 := "update _vt.vreplication set state = 'Running', stop_pos = null where id = 2" + cancel1 := "update _vt.vreplication set state = 'Running', stop_pos = null where id in (1)" + cancel2 := "update _vt.vreplication set state = 'Running', stop_pos = null where id in (2)" tme.dbDest1Client.addQuery(cancel1, &sqltypes.Result{}, nil) tme.dbDest2Client.addQuery(cancel1, &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery(cancel2, &sqltypes.Result{}, nil) @@ -320,9 +328,12 @@ func TestTableMigrate(t *testing.T) { tme.dbSource2Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil) // Delete the target replications. - tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) - tme.dbDest2Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) - tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 2", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil) + tme.dbDest2Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil) + tme.dbDest2Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (2)", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (2)", &sqltypes.Result{}, nil) journalID, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) if err != nil { @@ -482,8 +493,8 @@ func TestShardMigrate(t *testing.T) { checkIsMasterServing(t, tme.ts, "ks:80-", false) // Check for journals. - tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id = 6432976123657117098", &sqltypes.Result{}, nil) - tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id = 6432976123657117098", &sqltypes.Result{}, nil) + tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id=6432976123657117098", &sqltypes.Result{}, nil) + tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id=6432976123657117098", &sqltypes.Result{}, nil) // Wait for position: Reads current state, updates to Stopped, and re-reads. state := sqltypes.MakeTestResult(sqltypes.MakeTestFields( @@ -494,9 +505,12 @@ func TestShardMigrate(t *testing.T) { tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) tme.dbDest2Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil) - tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 1", &sqltypes.Result{}, nil) - tme.dbDest2Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 1", &sqltypes.Result{}, nil) - tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 2", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil) + tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 2", resultid2, nil) + tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (2)", &sqltypes.Result{}, nil) + tme.dbDest2Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil) + tme.dbDest2Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", &sqltypes.Result{}, nil) stopped := sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|state", "int64|varchar"), @@ -507,8 +521,8 @@ func TestShardMigrate(t *testing.T) { tme.dbDest1Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil) // Cancel Migration - cancel1 := "update _vt.vreplication set state = 'Running', stop_pos = null where id = 1" - cancel2 := "update _vt.vreplication set state = 'Running', stop_pos = null where id = 2" + cancel1 := "update _vt.vreplication set state = 'Running', stop_pos = null where id in (1)" + cancel2 := "update _vt.vreplication set state = 'Running', stop_pos = null where id in (2)" tme.dbDest1Client.addQuery(cancel1, &sqltypes.Result{}, nil) tme.dbDest2Client.addQuery(cancel1, &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery(cancel2, &sqltypes.Result{}, nil) @@ -545,9 +559,12 @@ func TestShardMigrate(t *testing.T) { tme.dbSource2Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil) // Delete the target replications. - tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) - tme.dbDest2Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) - tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 2", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil) + tme.dbDest2Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil) + tme.dbDest2Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (2)", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (2)", &sqltypes.Result{}, nil) journalID, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) if err != nil { @@ -587,8 +604,8 @@ func TestMigrateFailJournal(t *testing.T) { } // Check for journals. - tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id = 9113431017721636330", &sqltypes.Result{}, nil) - tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id = 9113431017721636330", &sqltypes.Result{}, nil) + tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id=9113431017721636330", &sqltypes.Result{}, nil) + tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id=9113431017721636330", &sqltypes.Result{}, nil) // Wait for position: Reads current state, updates to Stopped, and re-reads. state := sqltypes.MakeTestResult(sqltypes.MakeTestFields( @@ -599,9 +616,12 @@ func TestMigrateFailJournal(t *testing.T) { tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) tme.dbDest2Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil) - tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 1", &sqltypes.Result{}, nil) - tme.dbDest2Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 1", &sqltypes.Result{}, nil) - tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 2", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil) + tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 2", resultid2, nil) + tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (2)", &sqltypes.Result{}, nil) + tme.dbDest2Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil) + tme.dbDest2Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", &sqltypes.Result{}, nil) stopped := sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|state", "int64|varchar"), @@ -612,8 +632,8 @@ func TestMigrateFailJournal(t *testing.T) { tme.dbDest1Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil) // Cancel Migration: these must not get called. - cancel1 := "update _vt.vreplication set state = 'Running', stop_pos = null where id = 1" - cancel2 := "update _vt.vreplication set state = 'Running', stop_pos = null where id = 2" + cancel1 := "update _vt.vreplication set state = 'Running', stop_pos = null where id in (1)" + cancel2 := "update _vt.vreplication set state = 'Running', stop_pos = null where id in (2)" tme.dbDest1Client.addQuery(cancel1, &sqltypes.Result{}, nil) tme.dbDest2Client.addQuery(cancel1, &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery(cancel2, &sqltypes.Result{}, nil) @@ -655,8 +675,8 @@ func TestTableMigrateJournalExists(t *testing.T) { } // Show one journal as created. - tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id = 9113431017721636330", sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64"), "1"), nil) - tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id = 9113431017721636330", &sqltypes.Result{}, nil) + tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id=9113431017721636330", sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64"), "1"), nil) + tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id=9113431017721636330", &sqltypes.Result{}, nil) // Create the missing journal. journal2 := "insert into _vt.resharding_journal.*9113431017721636330.*tables.*t1.*t2.*local_position.*MariaDB/5-456-892.*shard_gtids.*80.*MariaDB/5-456-893.*80.*participants.*40.*40" @@ -676,9 +696,15 @@ func TestTableMigrateJournalExists(t *testing.T) { tme.dbSource2Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil) // Delete the target replications. - tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) - tme.dbDest2Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) - tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 2", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil) + tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 2", resultid2, nil) + tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (2)", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (2)", &sqltypes.Result{}, nil) + tme.dbDest2Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil) + tme.dbDest2Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil) + tme.dbDest2Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil) _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) if err != nil { @@ -717,8 +743,8 @@ func TestShardMigrateJournalExists(t *testing.T) { } // Show one journal as created. - tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id = 6432976123657117098", sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64"), "1"), nil) - tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id = 6432976123657117098", &sqltypes.Result{}, nil) + tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id=6432976123657117098", sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64"), "1"), nil) + tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id=6432976123657117098", &sqltypes.Result{}, nil) // Create the missing journal. journal2 := "insert into _vt.resharding_journal.*6432976123657117098.*migration_type:SHARDS.*local_position.*MariaDB/5-456-892.*shard_gtids.*80.*MariaDB/5-456-893.*shard_gtids.*80.*MariaDB/5-456-893.*participants.*40.*40" @@ -738,9 +764,15 @@ func TestShardMigrateJournalExists(t *testing.T) { tme.dbSource2Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil) // Delete the target replications. - tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) - tme.dbDest2Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) - tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 2", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil) + tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 2", resultid2, nil) + tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (2)", &sqltypes.Result{}, nil) + tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (2)", &sqltypes.Result{}, nil) + tme.dbDest2Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil) + tme.dbDest2Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil) + tme.dbDest2Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil) _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) if err != nil { diff --git a/go/vt/wrangler/testlib/migrate_served_types_test.go b/go/vt/wrangler/testlib/migrate_served_types_test.go index 3db2caa496..5cd05514b4 100644 --- a/go/vt/wrangler/testlib/migrate_served_types_test.go +++ b/go/vt/wrangler/testlib/migrate_served_types_test.go @@ -585,8 +585,9 @@ func TestMultiShardMigrateServedTypes(t *testing.T) { func expectDeleteVRepl(dbClient *binlogplayer.MockDBClient) { dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) + dbClient.ExpectRequest("select id from _vt.vreplication where id = 1", &sqltypes.Result{Rows: [][]sqltypes.Value{{sqltypes.NewInt64(1)}}}, nil) dbClient.ExpectRequest("begin", nil, nil) - dbClient.ExpectRequest("delete from _vt.vreplication where id = 1", &sqltypes.Result{RowsAffected: 1}, nil) - dbClient.ExpectRequest("delete from _vt.copy_state where vrepl_id = 1", nil, nil) + dbClient.ExpectRequest("delete from _vt.vreplication where id in (1)", &sqltypes.Result{RowsAffected: 1}, nil) + dbClient.ExpectRequest("delete from _vt.copy_state where vrepl_id in (1)", nil, nil) dbClient.ExpectRequest("commit", nil, nil) }