Merge pull request #5179 from planetscale/ss-vrepl-better-sql

vreplication: more VReplicationExec constructs
This commit is contained in:
Sugu Sougoumarane 2019-09-15 18:18:06 -07:00 коммит произвёл GitHub
Родитель 2c170a9688 428c46866f
Коммит cfe43b3439
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
9 изменённых файлов: 389 добавлений и 208 удалений

Просмотреть файл

@ -18,18 +18,24 @@ package vreplication
import ( import (
"fmt" "fmt"
"strconv"
"vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/sqlparser"
) )
// controllerPlan is the plan for vreplication control statements. // controllerPlan is the plan for vreplication control statements.
type controllerPlan struct { type controllerPlan struct {
opcode int
query string query string
// delCopySate is set for deletes. opcode int
delCopyState string
id int // numInserts is set for insertQuery.
numInserts int
// selector and applier are set for updateQuery and deleteQuery.
selector string
applier *sqlparser.ParsedQuery
// delCopyState is set of deletes.
delCopyState *sqlparser.ParsedQuery
} }
const ( const (
@ -46,18 +52,24 @@ func buildControllerPlan(query string) (*controllerPlan, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
var plan *controllerPlan
switch stmt := stmt.(type) { switch stmt := stmt.(type) {
case *sqlparser.Insert: case *sqlparser.Insert:
return buildInsertPlan(stmt) plan, err = buildInsertPlan(stmt)
case *sqlparser.Update: case *sqlparser.Update:
return buildUpdatePlan(stmt) plan, err = buildUpdatePlan(stmt)
case *sqlparser.Delete: case *sqlparser.Delete:
return buildDeletePlan(stmt) plan, err = buildDeletePlan(stmt)
case *sqlparser.Select: case *sqlparser.Select:
return buildSelectPlan(stmt) plan, err = buildSelectPlan(stmt)
default: default:
return nil, fmt.Errorf("unsupported construct: %s", sqlparser.String(stmt)) return nil, fmt.Errorf("unsupported construct: %s", sqlparser.String(stmt))
} }
if err != nil {
return nil, err
}
plan.query = query
return plan, nil
} }
func buildInsertPlan(ins *sqlparser.Insert) (*controllerPlan, error) { func buildInsertPlan(ins *sqlparser.Insert) (*controllerPlan, error) {
@ -65,7 +77,6 @@ func buildInsertPlan(ins *sqlparser.Insert) (*controllerPlan, error) {
case reshardingJournalTableName: case reshardingJournalTableName:
return &controllerPlan{ return &controllerPlan{
opcode: reshardingJournalQuery, opcode: reshardingJournalQuery,
query: sqlparser.String(ins),
}, nil }, nil
case vreplicationTableName: case vreplicationTableName:
// no-op // no-op
@ -88,15 +99,8 @@ func buildInsertPlan(ins *sqlparser.Insert) (*controllerPlan, error) {
if !ok { if !ok {
return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins)) return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins))
} }
if len(rows) != 1 {
return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins))
}
row := rows[0]
idPos := 0 idPos := 0
if len(ins.Columns) != 0 { if len(ins.Columns) != 0 {
if len(ins.Columns) != len(row) {
return nil, fmt.Errorf("malformed statement: %v", sqlparser.String(ins))
}
idPos = -1 idPos = -1
for i, col := range ins.Columns { for i, col := range ins.Columns {
if col.EqualString("id") { if col.EqualString("id") {
@ -106,13 +110,18 @@ func buildInsertPlan(ins *sqlparser.Insert) (*controllerPlan, error) {
} }
} }
if idPos >= 0 { if idPos >= 0 {
if _, ok := row[idPos].(*sqlparser.NullVal); !ok { for _, row := range rows {
return nil, fmt.Errorf("id should not have a value: %v", sqlparser.String(ins)) if idPos >= len(row) {
return nil, fmt.Errorf("malformed statement: %v", sqlparser.String(ins))
}
if _, ok := row[idPos].(*sqlparser.NullVal); !ok {
return nil, fmt.Errorf("id should not have a value: %v", sqlparser.String(ins))
}
} }
} }
return &controllerPlan{ return &controllerPlan{
opcode: insertQuery, opcode: insertQuery,
query: sqlparser.String(ins), numInserts: len(rows),
}, nil }, nil
} }
@ -121,7 +130,6 @@ func buildUpdatePlan(upd *sqlparser.Update) (*controllerPlan, error) {
case reshardingJournalTableName: case reshardingJournalTableName:
return &controllerPlan{ return &controllerPlan{
opcode: reshardingJournalQuery, opcode: reshardingJournalQuery,
query: sqlparser.String(upd),
}, nil }, nil
case vreplicationTableName: case vreplicationTableName:
// no-op // no-op
@ -137,15 +145,24 @@ func buildUpdatePlan(upd *sqlparser.Update) (*controllerPlan, error) {
} }
} }
id, err := extractID(upd.Where) buf1 := sqlparser.NewTrackedBuffer(nil)
if err != nil { buf1.Myprintf("select id from %s%v", vreplicationTableName, upd.Where)
return nil, err upd.Where = &sqlparser.Where{
Type: sqlparser.WhereStr,
Expr: &sqlparser.ComparisonExpr{
Left: &sqlparser.ColName{Name: sqlparser.NewColIdent("id")},
Operator: sqlparser.InStr,
Right: sqlparser.ListArg("::ids"),
},
} }
buf2 := sqlparser.NewTrackedBuffer(nil)
buf2.Myprintf("%v", upd)
return &controllerPlan{ return &controllerPlan{
opcode: updateQuery, opcode: updateQuery,
query: sqlparser.String(upd), selector: buf1.String(),
id: id, applier: buf2.ParsedQuery(),
}, nil }, nil
} }
@ -154,7 +171,6 @@ func buildDeletePlan(del *sqlparser.Delete) (*controllerPlan, error) {
case reshardingJournalTableName: case reshardingJournalTableName:
return &controllerPlan{ return &controllerPlan{
opcode: reshardingJournalQuery, opcode: reshardingJournalQuery,
query: sqlparser.String(del),
}, nil }, nil
case vreplicationTableName: case vreplicationTableName:
// no-op // no-op
@ -171,49 +187,46 @@ func buildDeletePlan(del *sqlparser.Delete) (*controllerPlan, error) {
return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(del)) return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(del))
} }
id, err := extractID(del.Where) buf1 := sqlparser.NewTrackedBuffer(nil)
if err != nil { buf1.Myprintf("select id from %s%v", vreplicationTableName, del.Where)
return nil, err del.Where = &sqlparser.Where{
Type: sqlparser.WhereStr,
Expr: &sqlparser.ComparisonExpr{
Left: &sqlparser.ColName{Name: sqlparser.NewColIdent("id")},
Operator: sqlparser.InStr,
Right: sqlparser.ListArg("::ids"),
},
} }
buf2 := sqlparser.NewTrackedBuffer(nil)
buf2.Myprintf("%v", del)
copyStateWhere := &sqlparser.Where{
Type: sqlparser.WhereStr,
Expr: &sqlparser.ComparisonExpr{
Left: &sqlparser.ColName{Name: sqlparser.NewColIdent("vrepl_id")},
Operator: sqlparser.InStr,
Right: sqlparser.ListArg("::ids"),
},
}
buf3 := sqlparser.NewTrackedBuffer(nil)
buf3.Myprintf("delete from %s%v", copyStateTableName, copyStateWhere)
return &controllerPlan{ return &controllerPlan{
opcode: deleteQuery, opcode: deleteQuery,
query: sqlparser.String(del), selector: buf1.String(),
delCopyState: fmt.Sprintf("delete from %s where vrepl_id = %d", copySateTableName, id), applier: buf2.ParsedQuery(),
id: id, delCopyState: buf3.ParsedQuery(),
}, nil }, nil
} }
func buildSelectPlan(sel *sqlparser.Select) (*controllerPlan, error) { func buildSelectPlan(sel *sqlparser.Select) (*controllerPlan, error) {
switch sqlparser.String(sel.From) { switch sqlparser.String(sel.From) {
case vreplicationTableName, reshardingJournalTableName, copySateTableName: case vreplicationTableName, reshardingJournalTableName, copyStateTableName:
return &controllerPlan{ return &controllerPlan{
opcode: selectQuery, opcode: selectQuery,
query: sqlparser.String(sel),
}, nil }, nil
default: default:
return nil, fmt.Errorf("invalid table name: %v", sqlparser.String(sel.From)) return nil, fmt.Errorf("invalid table name: %v", sqlparser.String(sel.From))
} }
} }
func extractID(where *sqlparser.Where) (int, error) {
if where == nil {
return 0, fmt.Errorf("invalid where clause:%v", sqlparser.String(where))
}
comp, ok := where.Expr.(*sqlparser.ComparisonExpr)
if !ok {
return 0, fmt.Errorf("invalid where clause:%v", sqlparser.String(where))
}
if sqlparser.String(comp.Left) != "id" {
return 0, fmt.Errorf("invalid where clause:%v", sqlparser.String(where))
}
if comp.Operator != sqlparser.EqualStr {
return 0, fmt.Errorf("invalid where clause:%v", sqlparser.String(where))
}
id, err := strconv.Atoi(sqlparser.String(comp.Right))
if err != nil {
return 0, fmt.Errorf("invalid where clause:%v", sqlparser.String(where))
}
return id, nil
}

Просмотреть файл

@ -21,35 +21,54 @@ import (
"testing" "testing"
) )
type testControllerPlan struct {
query string
opcode int
numInserts int
selector string
applier string
delCopyState string
}
func TestControllerPlan(t *testing.T) { func TestControllerPlan(t *testing.T) {
tcases := []struct { tcases := []struct {
in string in string
plan *controllerPlan plan *testControllerPlan
err string err string
}{{ }{{
// Insert // Insert
in: "insert into _vt.vreplication values(null)", in: "insert into _vt.vreplication values(null)",
plan: &controllerPlan{ plan: &testControllerPlan{
opcode: insertQuery, query: "insert into _vt.vreplication values(null)",
query: "insert into _vt.vreplication values (null)", opcode: insertQuery,
numInserts: 1,
}, },
}, { }, {
in: "insert into _vt.vreplication(id) values(null)", in: "insert into _vt.vreplication(id) values(null)",
plan: &controllerPlan{ plan: &testControllerPlan{
opcode: insertQuery, query: "insert into _vt.vreplication(id) values(null)",
query: "insert into _vt.vreplication(id) values (null)", opcode: insertQuery,
numInserts: 1,
}, },
}, { }, {
in: "insert into _vt.vreplication(workflow, id) values('', null)", in: "insert into _vt.vreplication(workflow, id) values('', null)",
plan: &controllerPlan{ plan: &testControllerPlan{
opcode: insertQuery, query: "insert into _vt.vreplication(workflow, id) values('', null)",
query: "insert into _vt.vreplication(workflow, id) values ('', null)", opcode: insertQuery,
numInserts: 1,
},
}, {
in: "insert into _vt.vreplication values(null), (null)",
plan: &testControllerPlan{
query: "insert into _vt.vreplication values(null), (null)",
opcode: insertQuery,
numInserts: 2,
}, },
}, { }, {
in: "insert into _vt.resharding_journal values (1)", in: "insert into _vt.resharding_journal values (1)",
plan: &controllerPlan{ plan: &testControllerPlan{
opcode: reshardingJournalQuery,
query: "insert into _vt.resharding_journal values (1)", query: "insert into _vt.resharding_journal values (1)",
opcode: reshardingJournalQuery,
}, },
}, { }, {
in: "replace into _vt.vreplication values(null)", in: "replace into _vt.vreplication values(null)",
@ -70,11 +89,8 @@ func TestControllerPlan(t *testing.T) {
in: "insert into _vt.vreplication select * from a", in: "insert into _vt.vreplication select * from a",
err: "unsupported construct: insert into _vt.vreplication select * from a", err: "unsupported construct: insert into _vt.vreplication select * from a",
}, { }, {
in: "insert into _vt.vreplication values(null), (null)", in: "insert into _vt.vreplication(a, b, id) values(null)",
err: "unsupported construct: insert into _vt.vreplication values (null), (null)", err: "malformed statement: insert into _vt.vreplication(a, b, id) values (null)",
}, {
in: "insert into _vt.vreplication(a, b, c) values(null)",
err: "malformed statement: insert into _vt.vreplication(a, b, c) values (null)",
}, { }, {
in: "insert into _vt.vreplication(workflow, id) values('aa', 1)", in: "insert into _vt.vreplication(workflow, id) values('aa', 1)",
err: "id should not have a value: insert into _vt.vreplication(workflow, id) values ('aa', 1)", err: "id should not have a value: insert into _vt.vreplication(workflow, id) values ('aa', 1)",
@ -85,16 +101,33 @@ func TestControllerPlan(t *testing.T) {
// Update // Update
}, { }, {
in: "update _vt.vreplication set state='Running' where id = 1", in: "update _vt.vreplication set state='Running' where id = 1",
plan: &controllerPlan{ plan: &testControllerPlan{
opcode: updateQuery, query: "update _vt.vreplication set state='Running' where id = 1",
query: "update _vt.vreplication set state = 'Running' where id = 1", opcode: updateQuery,
id: 1, selector: "select id from _vt.vreplication where id = 1",
applier: "update _vt.vreplication set state = 'Running' where id in ::ids",
},
}, {
in: "update _vt.vreplication set state='Running'",
plan: &testControllerPlan{
query: "update _vt.vreplication set state='Running'",
opcode: updateQuery,
selector: "select id from _vt.vreplication",
applier: "update _vt.vreplication set state = 'Running' where id in ::ids",
},
}, {
in: "update _vt.vreplication set state='Running' where a = 1",
plan: &testControllerPlan{
query: "update _vt.vreplication set state='Running' where a = 1",
opcode: updateQuery,
selector: "select id from _vt.vreplication where a = 1",
applier: "update _vt.vreplication set state = 'Running' where id in ::ids",
}, },
}, { }, {
in: "update _vt.resharding_journal set col = 1", in: "update _vt.resharding_journal set col = 1",
plan: &controllerPlan{ plan: &testControllerPlan{
opcode: reshardingJournalQuery,
query: "update _vt.resharding_journal set col = 1", query: "update _vt.resharding_journal set col = 1",
opcode: reshardingJournalQuery,
}, },
}, { }, {
in: "update a set state='Running' where id = 1", in: "update a set state='Running' where id = 1",
@ -108,36 +141,40 @@ func TestControllerPlan(t *testing.T) {
}, { }, {
in: "update _vt.vreplication set state='Running', id = 2 where id = 1", in: "update _vt.vreplication set state='Running', id = 2 where id = 1",
err: "id cannot be changed: id = 2", err: "id cannot be changed: id = 2",
}, {
in: "update _vt.vreplication set state='Running'",
err: "invalid where clause:",
}, {
in: "update _vt.vreplication set state='Running' where a = 1 and id = 2",
err: "invalid where clause: where a = 1 and id = 2",
}, {
in: "update _vt.vreplication set state='Running' where a = 1",
err: "invalid where clause: where a = 1",
}, {
in: "update _vt.vreplication set state='Running' where id > 1",
err: "invalid where clause: where id > 1",
}, {
in: "update _vt.vreplication set state='Running' where id = 1.1",
err: "invalid where clause: where id = 1.1",
// Delete // Delete
}, { }, {
in: "delete from _vt.vreplication where id = 1", in: "delete from _vt.vreplication where id = 1",
plan: &controllerPlan{ plan: &testControllerPlan{
opcode: deleteQuery,
query: "delete from _vt.vreplication where id = 1", query: "delete from _vt.vreplication where id = 1",
delCopyState: "delete from _vt.copy_state where vrepl_id = 1", opcode: deleteQuery,
id: 1, selector: "select id from _vt.vreplication where id = 1",
applier: "delete from _vt.vreplication where id in ::ids",
delCopyState: "delete from _vt.copy_state where vrepl_id in ::ids",
},
}, {
in: "delete from _vt.vreplication",
plan: &testControllerPlan{
query: "delete from _vt.vreplication",
opcode: deleteQuery,
selector: "select id from _vt.vreplication",
applier: "delete from _vt.vreplication where id in ::ids",
delCopyState: "delete from _vt.copy_state where vrepl_id in ::ids",
},
}, {
in: "delete from _vt.vreplication where a = 1",
plan: &testControllerPlan{
query: "delete from _vt.vreplication where a = 1",
opcode: deleteQuery,
selector: "select id from _vt.vreplication where a = 1",
applier: "delete from _vt.vreplication where id in ::ids",
delCopyState: "delete from _vt.copy_state where vrepl_id in ::ids",
}, },
}, { }, {
in: "delete from _vt.resharding_journal where id = 1", in: "delete from _vt.resharding_journal where id = 1",
plan: &controllerPlan{ plan: &testControllerPlan{
opcode: reshardingJournalQuery,
query: "delete from _vt.resharding_journal where id = 1", query: "delete from _vt.resharding_journal where id = 1",
opcode: reshardingJournalQuery,
}, },
}, { }, {
in: "delete from a where id = 1", in: "delete from a where id = 1",
@ -154,38 +191,23 @@ func TestControllerPlan(t *testing.T) {
}, { }, {
in: "delete from _vt.vreplication partition (a) where id = 1 limit 1", in: "delete from _vt.vreplication partition (a) where id = 1 limit 1",
err: "unsupported construct: delete from _vt.vreplication partition (a) where id = 1 limit 1", err: "unsupported construct: delete from _vt.vreplication partition (a) where id = 1 limit 1",
}, {
in: "delete from _vt.vreplication",
err: "invalid where clause:",
}, {
in: "delete from _vt.vreplication where a = 1 and id = 2",
err: "invalid where clause: where a = 1 and id = 2",
}, {
in: "delete from _vt.vreplication where a = 1",
err: "invalid where clause: where a = 1",
}, {
in: "delete from _vt.vreplication where id > 1",
err: "invalid where clause: where id > 1",
}, {
in: "delete from _vt.vreplication where id = 1.1",
err: "invalid where clause: where id = 1.1",
// Select // Select
}, { }, {
in: "select * from _vt.vreplication", in: "select * from _vt.vreplication",
plan: &controllerPlan{ plan: &testControllerPlan{
opcode: selectQuery, opcode: selectQuery,
query: "select * from _vt.vreplication", query: "select * from _vt.vreplication",
}, },
}, { }, {
in: "select * from _vt.resharding_journal", in: "select * from _vt.resharding_journal",
plan: &controllerPlan{ plan: &testControllerPlan{
opcode: selectQuery, opcode: selectQuery,
query: "select * from _vt.resharding_journal", query: "select * from _vt.resharding_journal",
}, },
}, { }, {
in: "select * from _vt.copy_state", in: "select * from _vt.copy_state",
plan: &controllerPlan{ plan: &testControllerPlan{
opcode: selectQuery, opcode: selectQuery,
query: "select * from _vt.copy_state", query: "select * from _vt.copy_state",
}, },
@ -213,8 +235,20 @@ func TestControllerPlan(t *testing.T) {
t.Errorf("getPlan(%v) error:\n%v, want\n%v", tcase.in, err, tcase.err) t.Errorf("getPlan(%v) error:\n%v, want\n%v", tcase.in, err, tcase.err)
continue continue
} }
if !reflect.DeepEqual(pl, tcase.plan) { gotPlan := &testControllerPlan{
t.Errorf("getPlan(%v):\n%+v, want\n%+v", tcase.in, pl, tcase.plan) query: pl.query,
opcode: pl.opcode,
numInserts: pl.numInserts,
selector: pl.selector,
}
if pl.applier != nil {
gotPlan.applier = pl.applier.Query
}
if pl.delCopyState != nil {
gotPlan.delCopyState = pl.delCopyState.Query
}
if !reflect.DeepEqual(gotPlan, tcase.plan) {
t.Errorf("getPlan(%v):\n%+v, want\n%+v", tcase.in, gotPlan, tcase.plan)
} }
} }
} }

Просмотреть файл

@ -48,8 +48,10 @@ var (
}, },
}, },
} }
testDMLResponse = &sqltypes.Result{RowsAffected: 1} testSelectorResponse1 = &sqltypes.Result{Rows: [][]sqltypes.Value{{sqltypes.NewInt64(1)}}}
testPos = "MariaDB/0-1-1083" testSelectorResponse2 = &sqltypes.Result{Rows: [][]sqltypes.Value{{sqltypes.NewInt64(1)}, {sqltypes.NewInt64(2)}}}
testDMLResponse = &sqltypes.Result{RowsAffected: 1}
testPos = "MariaDB/0-1-1083"
) )
func TestControllerKeyRange(t *testing.T) { func TestControllerKeyRange(t *testing.T) {

Просмотреть файл

@ -29,13 +29,14 @@ import (
"vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/mysqlctl"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo"
) )
const ( const (
reshardingJournalTableName = "_vt.resharding_journal" reshardingJournalTableName = "_vt.resharding_journal"
vreplicationTableName = "_vt.vreplication" vreplicationTableName = "_vt.vreplication"
copySateTableName = "_vt.copy_state" copyStateTableName = "_vt.copy_state"
createReshardingJournalTable = `create table if not exists _vt.resharding_journal( createReshardingJournalTable = `create table if not exists _vt.resharding_journal(
id bigint, id bigint,
@ -269,54 +270,92 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) {
if qr.InsertID == 0 { if qr.InsertID == 0 {
return nil, fmt.Errorf("insert failed to generate an id") return nil, fmt.Errorf("insert failed to generate an id")
} }
params, err := readRow(dbClient, int(qr.InsertID)) for id := int(qr.InsertID); id < int(qr.InsertID)+plan.numInserts; id++ {
if err != nil { if ct := vre.controllers[id]; ct != nil {
return nil, err // Unreachable. Just a failsafe.
ct.Stop()
delete(vre.controllers, id)
}
params, err := readRow(dbClient, id)
if err != nil {
return nil, err
}
ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, nil)
if err != nil {
return nil, err
}
vre.controllers[id] = ct
} }
// Create a controller for the newly created row.
ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, nil)
if err != nil {
return nil, err
}
vre.controllers[int(qr.InsertID)] = ct
return qr, nil return qr, nil
case updateQuery: case updateQuery:
var blpStats *binlogplayer.Stats ids, bv, err := vre.fetchIDs(dbClient, plan.selector)
if ct := vre.controllers[plan.id]; ct != nil {
// Stop the current controller.
ct.Stop()
blpStats = ct.blpStats
}
qr, err := vre.executeFetchMaybeCreateTable(dbClient, plan.query, 1)
if err != nil { if err != nil {
return nil, err return nil, err
} }
params, err := readRow(dbClient, plan.id) if len(ids) == 0 {
return &sqltypes.Result{}, nil
}
blpStats := make(map[int]*binlogplayer.Stats)
for _, id := range ids {
if ct := vre.controllers[id]; ct != nil {
// Stop the current controller.
ct.Stop()
blpStats[id] = ct.blpStats
}
}
query, err := plan.applier.GenerateQuery(bv, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Create a new controller in place of the old one. qr, err := vre.executeFetchMaybeCreateTable(dbClient, query, 1)
// For continuity, the new controller inherits the previous stats.
ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, blpStats)
if err != nil { if err != nil {
return nil, err return nil, err
} }
vre.controllers[plan.id] = ct for _, id := range ids {
params, err := readRow(dbClient, id)
if err != nil {
return nil, err
}
// Create a new controller in place of the old one.
// For continuity, the new controller inherits the previous stats.
ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, blpStats[id])
if err != nil {
return nil, err
}
vre.controllers[id] = ct
}
return qr, nil return qr, nil
case deleteQuery: case deleteQuery:
// Stop and delete the current controller. ids, bv, err := vre.fetchIDs(dbClient, plan.selector)
if ct := vre.controllers[plan.id]; ct != nil { if err != nil {
ct.Stop() return nil, err
delete(vre.controllers, plan.id) }
if len(ids) == 0 {
return &sqltypes.Result{}, nil
}
// Stop and delete the current controllers.
for _, id := range ids {
if ct := vre.controllers[id]; ct != nil {
ct.Stop()
delete(vre.controllers, id)
}
} }
if err := dbClient.Begin(); err != nil { if err := dbClient.Begin(); err != nil {
return nil, err return nil, err
} }
qr, err := dbClient.ExecuteFetch(plan.query, 10000) query, err := plan.applier.GenerateQuery(bv, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if _, err := dbClient.ExecuteFetch(plan.delCopyState, 10000); err != nil { qr, err := vre.executeFetchMaybeCreateTable(dbClient, query, 1)
if err != nil {
return nil, err
}
delQuery, err := plan.delCopyState.GenerateQuery(bv, nil)
if err != nil {
return nil, err
}
if _, err := dbClient.ExecuteFetch(delQuery, 10000); err != nil {
// Legacy vreplication won't create this table. So, ignore table not found error. // Legacy vreplication won't create this table. So, ignore table not found error.
merr, isSQLErr := err.(*mysql.SQLError) merr, isSQLErr := err.(*mysql.SQLError)
if !isSQLErr || !(merr.Num == mysql.ERNoSuchTable) { if !isSQLErr || !(merr.Num == mysql.ERNoSuchTable) {
@ -334,6 +373,27 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) {
panic("unreachable") panic("unreachable")
} }
func (vre *Engine) fetchIDs(dbClient binlogplayer.DBClient, selector string) (ids []int, bv map[string]*querypb.BindVariable, err error) {
qr, err := dbClient.ExecuteFetch(selector, 10000)
if err != nil {
return nil, nil, err
}
for _, row := range qr.Rows {
id, err := sqltypes.ToInt64(row[0])
if err != nil {
return nil, nil, err
}
ids = append(ids, int(id))
}
bvval, err := sqltypes.BuildBindVariable(ids)
if err != nil {
// Unreachable.
return nil, nil, err
}
bv = map[string]*querypb.BindVariable{"ids": bvval}
return ids, bv, nil
}
// WaitForPos waits for the replication to reach the specified position. // WaitForPos waits for the replication to reach the specified position.
func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error { func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error {
mPos, err := mysql.DecodePosition(pos) mPos, err := mysql.DecodePosition(pos)

Просмотреть файл

@ -98,7 +98,7 @@ func TestEngineExec(t *testing.T) {
defer vre.Close() defer vre.Close()
dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{InsertID: 1}, nil) dbClient.ExpectRequest("insert into _vt.vreplication values(null)", &sqltypes.Result{InsertID: 1}, nil)
dbClient.ExpectRequest("select * from _vt.vreplication where id = 1", sqltypes.MakeTestResult( dbClient.ExpectRequest("select * from _vt.vreplication where id = 1", sqltypes.MakeTestResult(
sqltypes.MakeTestFields( sqltypes.MakeTestFields(
"id|state|source", "id|state|source",
@ -138,7 +138,8 @@ func TestEngineExec(t *testing.T) {
savedBlp := ct.blpStats savedBlp := ct.blpStats
dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("update _vt.vreplication set pos = 'MariaDB/0-1-1084', state = 'Running' where id = 1", testDMLResponse, nil) dbClient.ExpectRequest("select id from _vt.vreplication where id = 1", testSelectorResponse1, nil)
dbClient.ExpectRequest("update _vt.vreplication set pos = 'MariaDB/0-1-1084', state = 'Running' where id in (1)", testDMLResponse, nil)
dbClient.ExpectRequest("select * from _vt.vreplication where id = 1", sqltypes.MakeTestResult( dbClient.ExpectRequest("select * from _vt.vreplication where id = 1", sqltypes.MakeTestResult(
sqltypes.MakeTestFields( sqltypes.MakeTestFields(
"id|state|source", "id|state|source",
@ -175,16 +176,25 @@ func TestEngineExec(t *testing.T) {
t.Errorf("stats are mismatched: %v, want %v", globalStats.controllers, vre.controllers) t.Errorf("stats are mismatched: %v, want %v", globalStats.controllers, vre.controllers)
} }
// Test no update
dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("select id from _vt.vreplication where id = 2", &sqltypes.Result{}, nil)
_, err = vre.Exec("update _vt.vreplication set pos = 'MariaDB/0-1-1084', state = 'Running' where id = 2")
if err != nil {
t.Fatal(err)
}
dbClient.Wait()
// Test Delete // Test Delete
dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
delQuery := "delete from _vt.vreplication where id = 1" dbClient.ExpectRequest("select id from _vt.vreplication where id = 1", testSelectorResponse1, nil)
dbClient.ExpectRequest("begin", nil, nil) dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest(delQuery, testDMLResponse, nil) dbClient.ExpectRequest("delete from _vt.vreplication where id in (1)", testDMLResponse, nil)
dbClient.ExpectRequest("delete from _vt.copy_state where vrepl_id = 1", nil, nil) dbClient.ExpectRequest("delete from _vt.copy_state where vrepl_id in (1)", nil, nil)
dbClient.ExpectRequest("commit", nil, nil) dbClient.ExpectRequest("commit", nil, nil)
qr, err = vre.Exec(delQuery) qr, err = vre.Exec("delete from _vt.vreplication where id = 1")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -203,6 +213,30 @@ func TestEngineExec(t *testing.T) {
if !reflect.DeepEqual(globalStats.controllers, vre.controllers) { if !reflect.DeepEqual(globalStats.controllers, vre.controllers) {
t.Errorf("stats are mismatched: %v, want %v", globalStats.controllers, vre.controllers) t.Errorf("stats are mismatched: %v, want %v", globalStats.controllers, vre.controllers)
} }
// Test Delete of multiple rows
dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("select id from _vt.vreplication where id > 1", testSelectorResponse2, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("delete from _vt.vreplication where id in (1, 2)", testDMLResponse, nil)
dbClient.ExpectRequest("delete from _vt.copy_state where vrepl_id in (1, 2)", nil, nil)
dbClient.ExpectRequest("commit", nil, nil)
_, err = vre.Exec("delete from _vt.vreplication where id > 1")
if err != nil {
t.Fatal(err)
}
dbClient.Wait()
// Test no delete
dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("select id from _vt.vreplication where id = 3", &sqltypes.Result{}, nil)
_, err = vre.Exec("delete from _vt.vreplication where id = 3")
if err != nil {
t.Fatal(err)
}
dbClient.Wait()
} }
func TestEngineBadInsert(t *testing.T) { func TestEngineBadInsert(t *testing.T) {
@ -224,7 +258,7 @@ func TestEngineBadInsert(t *testing.T) {
defer vre.Close() defer vre.Close()
dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{}, nil) dbClient.ExpectRequest("insert into _vt.vreplication values(null)", &sqltypes.Result{}, nil)
_, err := vre.Exec("insert into _vt.vreplication values(null)") _, err := vre.Exec("insert into _vt.vreplication values(null)")
want := "insert failed to generate an id" want := "insert failed to generate an id"
if err == nil || err.Error() != want { if err == nil || err.Error() != want {
@ -424,14 +458,14 @@ func TestCreateDBAndTable(t *testing.T) {
// Missing table. Statement should get retried after creating everything. // Missing table. Statement should get retried after creating everything.
dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{}, &tableNotFound) dbClient.ExpectRequest("insert into _vt.vreplication values(null)", &sqltypes.Result{}, &tableNotFound)
dbClient.ExpectRequest("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("DROP TABLE IF EXISTS _vt.blp_checkpoint", &sqltypes.Result{}, nil) dbClient.ExpectRequest("DROP TABLE IF EXISTS _vt.blp_checkpoint", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("CREATE TABLE IF NOT EXISTS _vt.vreplication.*", &sqltypes.Result{}, nil) dbClient.ExpectRequestRE("CREATE TABLE IF NOT EXISTS _vt.vreplication.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("create table if not exists _vt.resharding_journal.*", &sqltypes.Result{}, nil) dbClient.ExpectRequestRE("create table if not exists _vt.resharding_journal.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{InsertID: 1}, nil) dbClient.ExpectRequest("insert into _vt.vreplication values(null)", &sqltypes.Result{InsertID: 1}, nil)
// The rest of this test is normal with no db errors or extra queries. // The rest of this test is normal with no db errors or extra queries.

Просмотреть файл

@ -20,6 +20,7 @@ import (
"flag" "flag"
"fmt" "fmt"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -1532,13 +1533,17 @@ func startVReplication(t *testing.T, filter *binlogdatapb.Filter, onddl binlogda
"/insert into _vt.vreplication", "/insert into _vt.vreplication",
"/update _vt.vreplication set state='Running'", "/update _vt.vreplication set state='Running'",
}) })
var once sync.Once
return func() { return func() {
t.Helper() t.Helper()
query := fmt.Sprintf("delete from _vt.vreplication where id = %d", qr.InsertID) once.Do(func() {
if _, err := playerEngine.Exec(query); err != nil { query := fmt.Sprintf("delete from _vt.vreplication where id = %d", qr.InsertID)
t.Fatal(err) if _, err := playerEngine.Exec(query); err != nil {
} t.Fatal(err)
expectDeleteQueries(t) }
expectDeleteQueries(t)
})
}, int(qr.InsertID) }, int(qr.InsertID)
} }

Просмотреть файл

@ -35,8 +35,8 @@ import (
"vitess.io/vitess/go/vt/vttablet/tmclient" "vitess.io/vitess/go/vt/vttablet/tmclient"
) )
const vreplQueryks = "select id, source from _vt.vreplication where workflow = 'test' and db_name = 'vt_ks'" const vreplQueryks = "select id, source from _vt.vreplication where workflow='test' and db_name='vt_ks'"
const vreplQueryks2 = "select id, source from _vt.vreplication where workflow = 'test' and db_name = 'vt_ks2'" const vreplQueryks2 = "select id, source from _vt.vreplication where workflow='test' and db_name='vt_ks2'"
type testMigraterEnv struct { type testMigraterEnv struct {
ts *topo.Server ts *topo.Server

Просмотреть файл

@ -31,6 +31,11 @@ import (
"vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo"
) )
var (
resultid1 = &sqltypes.Result{Rows: [][]sqltypes.Value{{sqltypes.NewInt64(1)}}}
resultid2 = &sqltypes.Result{Rows: [][]sqltypes.Value{{sqltypes.NewInt64(2)}}}
)
// TestTableMigrate tests table mode migrations. // TestTableMigrate tests table mode migrations.
// This has to be kept in sync with TestShardMigrate. // This has to be kept in sync with TestShardMigrate.
func TestTableMigrate(t *testing.T) { func TestTableMigrate(t *testing.T) {
@ -243,8 +248,8 @@ func TestTableMigrate(t *testing.T) {
}) })
// Check for journals. // Check for journals.
tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id = 9113431017721636330", &sqltypes.Result{}, nil) tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id=9113431017721636330", &sqltypes.Result{}, nil)
tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id = 9113431017721636330", &sqltypes.Result{}, nil) tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id=9113431017721636330", &sqltypes.Result{}, nil)
// Wait for position: Reads current state, updates to Stopped, and re-reads. // Wait for position: Reads current state, updates to Stopped, and re-reads.
state := sqltypes.MakeTestResult(sqltypes.MakeTestFields( state := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
@ -255,9 +260,12 @@ func TestTableMigrate(t *testing.T) {
tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil)
tme.dbDest2Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) tme.dbDest2Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil)
tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil) tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil)
tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil)
tme.dbDest2Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 2", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 2", resultid2, nil)
tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (2)", &sqltypes.Result{}, nil)
tme.dbDest2Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil)
tme.dbDest2Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", &sqltypes.Result{}, nil)
stopped := sqltypes.MakeTestResult(sqltypes.MakeTestFields( stopped := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|state", "id|state",
"int64|varchar"), "int64|varchar"),
@ -268,8 +276,8 @@ func TestTableMigrate(t *testing.T) {
tme.dbDest1Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil) tme.dbDest1Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil)
// Cancel Migration // Cancel Migration
cancel1 := "update _vt.vreplication set state = 'Running', stop_pos = null where id = 1" cancel1 := "update _vt.vreplication set state = 'Running', stop_pos = null where id in (1)"
cancel2 := "update _vt.vreplication set state = 'Running', stop_pos = null where id = 2" cancel2 := "update _vt.vreplication set state = 'Running', stop_pos = null where id in (2)"
tme.dbDest1Client.addQuery(cancel1, &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery(cancel1, &sqltypes.Result{}, nil)
tme.dbDest2Client.addQuery(cancel1, &sqltypes.Result{}, nil) tme.dbDest2Client.addQuery(cancel1, &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery(cancel2, &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery(cancel2, &sqltypes.Result{}, nil)
@ -320,9 +328,12 @@ func TestTableMigrate(t *testing.T) {
tme.dbSource2Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil) tme.dbSource2Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil)
// Delete the target replications. // Delete the target replications.
tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil)
tme.dbDest2Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 2", &sqltypes.Result{}, nil) tme.dbDest2Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil)
tme.dbDest2Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (2)", &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (2)", &sqltypes.Result{}, nil)
journalID, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) journalID, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second)
if err != nil { if err != nil {
@ -482,8 +493,8 @@ func TestShardMigrate(t *testing.T) {
checkIsMasterServing(t, tme.ts, "ks:80-", false) checkIsMasterServing(t, tme.ts, "ks:80-", false)
// Check for journals. // Check for journals.
tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id = 6432976123657117098", &sqltypes.Result{}, nil) tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id=6432976123657117098", &sqltypes.Result{}, nil)
tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id = 6432976123657117098", &sqltypes.Result{}, nil) tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id=6432976123657117098", &sqltypes.Result{}, nil)
// Wait for position: Reads current state, updates to Stopped, and re-reads. // Wait for position: Reads current state, updates to Stopped, and re-reads.
state := sqltypes.MakeTestResult(sqltypes.MakeTestFields( state := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
@ -494,9 +505,12 @@ func TestShardMigrate(t *testing.T) {
tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil)
tme.dbDest2Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) tme.dbDest2Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil)
tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil) tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil)
tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil)
tme.dbDest2Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 2", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 2", resultid2, nil)
tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (2)", &sqltypes.Result{}, nil)
tme.dbDest2Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil)
tme.dbDest2Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", &sqltypes.Result{}, nil)
stopped := sqltypes.MakeTestResult(sqltypes.MakeTestFields( stopped := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|state", "id|state",
"int64|varchar"), "int64|varchar"),
@ -507,8 +521,8 @@ func TestShardMigrate(t *testing.T) {
tme.dbDest1Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil) tme.dbDest1Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil)
// Cancel Migration // Cancel Migration
cancel1 := "update _vt.vreplication set state = 'Running', stop_pos = null where id = 1" cancel1 := "update _vt.vreplication set state = 'Running', stop_pos = null where id in (1)"
cancel2 := "update _vt.vreplication set state = 'Running', stop_pos = null where id = 2" cancel2 := "update _vt.vreplication set state = 'Running', stop_pos = null where id in (2)"
tme.dbDest1Client.addQuery(cancel1, &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery(cancel1, &sqltypes.Result{}, nil)
tme.dbDest2Client.addQuery(cancel1, &sqltypes.Result{}, nil) tme.dbDest2Client.addQuery(cancel1, &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery(cancel2, &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery(cancel2, &sqltypes.Result{}, nil)
@ -545,9 +559,12 @@ func TestShardMigrate(t *testing.T) {
tme.dbSource2Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil) tme.dbSource2Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil)
// Delete the target replications. // Delete the target replications.
tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil)
tme.dbDest2Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 2", &sqltypes.Result{}, nil) tme.dbDest2Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil)
tme.dbDest2Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (2)", &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (2)", &sqltypes.Result{}, nil)
journalID, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) journalID, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second)
if err != nil { if err != nil {
@ -587,8 +604,8 @@ func TestMigrateFailJournal(t *testing.T) {
} }
// Check for journals. // Check for journals.
tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id = 9113431017721636330", &sqltypes.Result{}, nil) tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id=9113431017721636330", &sqltypes.Result{}, nil)
tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id = 9113431017721636330", &sqltypes.Result{}, nil) tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id=9113431017721636330", &sqltypes.Result{}, nil)
// Wait for position: Reads current state, updates to Stopped, and re-reads. // Wait for position: Reads current state, updates to Stopped, and re-reads.
state := sqltypes.MakeTestResult(sqltypes.MakeTestFields( state := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
@ -599,9 +616,12 @@ func TestMigrateFailJournal(t *testing.T) {
tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil)
tme.dbDest2Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) tme.dbDest2Client.addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil)
tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil) tme.dbDest1Client.addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil)
tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil)
tme.dbDest2Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id = 2", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 2", resultid2, nil)
tme.dbDest1Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (2)", &sqltypes.Result{}, nil)
tme.dbDest2Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil)
tme.dbDest2Client.addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", &sqltypes.Result{}, nil)
stopped := sqltypes.MakeTestResult(sqltypes.MakeTestFields( stopped := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|state", "id|state",
"int64|varchar"), "int64|varchar"),
@ -612,8 +632,8 @@ func TestMigrateFailJournal(t *testing.T) {
tme.dbDest1Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil) tme.dbDest1Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil)
// Cancel Migration: these must not get called. // Cancel Migration: these must not get called.
cancel1 := "update _vt.vreplication set state = 'Running', stop_pos = null where id = 1" cancel1 := "update _vt.vreplication set state = 'Running', stop_pos = null where id in (1)"
cancel2 := "update _vt.vreplication set state = 'Running', stop_pos = null where id = 2" cancel2 := "update _vt.vreplication set state = 'Running', stop_pos = null where id in (2)"
tme.dbDest1Client.addQuery(cancel1, &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery(cancel1, &sqltypes.Result{}, nil)
tme.dbDest2Client.addQuery(cancel1, &sqltypes.Result{}, nil) tme.dbDest2Client.addQuery(cancel1, &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery(cancel2, &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery(cancel2, &sqltypes.Result{}, nil)
@ -655,8 +675,8 @@ func TestTableMigrateJournalExists(t *testing.T) {
} }
// Show one journal as created. // Show one journal as created.
tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id = 9113431017721636330", sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64"), "1"), nil) tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id=9113431017721636330", sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64"), "1"), nil)
tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id = 9113431017721636330", &sqltypes.Result{}, nil) tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id=9113431017721636330", &sqltypes.Result{}, nil)
// Create the missing journal. // Create the missing journal.
journal2 := "insert into _vt.resharding_journal.*9113431017721636330.*tables.*t1.*t2.*local_position.*MariaDB/5-456-892.*shard_gtids.*80.*MariaDB/5-456-893.*80.*participants.*40.*40" journal2 := "insert into _vt.resharding_journal.*9113431017721636330.*tables.*t1.*t2.*local_position.*MariaDB/5-456-892.*shard_gtids.*80.*MariaDB/5-456-893.*80.*participants.*40.*40"
@ -676,9 +696,15 @@ func TestTableMigrateJournalExists(t *testing.T) {
tme.dbSource2Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil) tme.dbSource2Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil)
// Delete the target replications. // Delete the target replications.
tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil)
tme.dbDest2Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 2", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 2", resultid2, nil)
tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (2)", &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (2)", &sqltypes.Result{}, nil)
tme.dbDest2Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil)
tme.dbDest2Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil)
tme.dbDest2Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil)
_, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second)
if err != nil { if err != nil {
@ -717,8 +743,8 @@ func TestShardMigrateJournalExists(t *testing.T) {
} }
// Show one journal as created. // Show one journal as created.
tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id = 6432976123657117098", sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64"), "1"), nil) tme.dbSource1Client.addQuery("select 1 from _vt.resharding_journal where id=6432976123657117098", sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64"), "1"), nil)
tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id = 6432976123657117098", &sqltypes.Result{}, nil) tme.dbSource2Client.addQuery("select 1 from _vt.resharding_journal where id=6432976123657117098", &sqltypes.Result{}, nil)
// Create the missing journal. // Create the missing journal.
journal2 := "insert into _vt.resharding_journal.*6432976123657117098.*migration_type:SHARDS.*local_position.*MariaDB/5-456-892.*shard_gtids.*80.*MariaDB/5-456-893.*shard_gtids.*80.*MariaDB/5-456-893.*participants.*40.*40" journal2 := "insert into _vt.resharding_journal.*6432976123657117098.*migration_type:SHARDS.*local_position.*MariaDB/5-456-892.*shard_gtids.*80.*MariaDB/5-456-893.*shard_gtids.*80.*MariaDB/5-456-893.*participants.*40.*40"
@ -738,9 +764,15 @@ func TestShardMigrateJournalExists(t *testing.T) {
tme.dbSource2Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil) tme.dbSource2Client.addQuery("select * from _vt.vreplication where id = 2", stopped, nil)
// Delete the target replications. // Delete the target replications.
tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil)
tme.dbDest2Client.addQuery("delete from _vt.vreplication where id = 1", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery("delete from _vt.vreplication where id = 2", &sqltypes.Result{}, nil) tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery("select id from _vt.vreplication where id = 2", resultid2, nil)
tme.dbDest1Client.addQuery("delete from _vt.vreplication where id in (2)", &sqltypes.Result{}, nil)
tme.dbDest1Client.addQuery("delete from _vt.copy_state where vrepl_id in (2)", &sqltypes.Result{}, nil)
tme.dbDest2Client.addQuery("select id from _vt.vreplication where id = 1", resultid1, nil)
tme.dbDest2Client.addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil)
tme.dbDest2Client.addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil)
_, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second)
if err != nil { if err != nil {

Просмотреть файл

@ -585,8 +585,9 @@ func TestMultiShardMigrateServedTypes(t *testing.T) {
func expectDeleteVRepl(dbClient *binlogplayer.MockDBClient) { func expectDeleteVRepl(dbClient *binlogplayer.MockDBClient) {
dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("select id from _vt.vreplication where id = 1", &sqltypes.Result{Rows: [][]sqltypes.Value{{sqltypes.NewInt64(1)}}}, nil)
dbClient.ExpectRequest("begin", nil, nil) dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("delete from _vt.vreplication where id = 1", &sqltypes.Result{RowsAffected: 1}, nil) dbClient.ExpectRequest("delete from _vt.vreplication where id in (1)", &sqltypes.Result{RowsAffected: 1}, nil)
dbClient.ExpectRequest("delete from _vt.copy_state where vrepl_id = 1", nil, nil) dbClient.ExpectRequest("delete from _vt.copy_state where vrepl_id in (1)", nil, nil)
dbClient.ExpectRequest("commit", nil, nil) dbClient.ExpectRequest("commit", nil, nil)
} }