tabletserver: insert...select must lock rows

The select subquery in insert...select was not locking rows. This
introduces race conditions for self-join inserts like select(max).
So, it's safer to lock the rows we're selecting.
This commit is contained in:
Sugu Sougoumarane 2016-01-08 16:31:08 -08:00
Родитель 27ba4c2d73
Коммит 8b627165eb
5 изменённых файлов: 25 добавлений и 12 удалений

Просмотреть файл

@ -859,7 +859,7 @@
"TableName": "b",
"FullQuery": "insert into b(eid, id) select * from a",
"OuterQuery": "insert into b(eid, id) values :#values",
"Subquery": "select * from a limit :#maxLimit",
"Subquery": "select * from a limit :#maxLimit for update",
"ColumnNumbers": [0, 1],
"SubqueryPKColumns": [0, 1]
}
@ -871,7 +871,7 @@
"TableName": "b",
"FullQuery": "insert into b select * from a",
"OuterQuery": "insert into b values :#values",
"Subquery": "select * from a limit :#maxLimit",
"Subquery": "select * from a limit :#maxLimit for update",
"ColumnNumbers": [0, 1],
"SubqueryPKColumns": [0, 1]
}

Просмотреть файл

@ -119,3 +119,16 @@ func StringIn(str string, values ...string) bool {
}
return false
}
// ForUpdate converts a select statement for update
// and returns itself as value.
func ForUpdate(sel SelectStatement) SelectStatement {
switch stmt := sel.(type) {
case *Select:
stmt.Lock = ForUpdateStr
case *Union:
ForUpdate(stmt.Left)
ForUpdate(stmt.Right)
}
return sel
}

Просмотреть файл

@ -751,7 +751,7 @@ func TestNocacheCases(t *testing.T) {
&framework.TestCase{
Query: "insert /* subquery */ into vitess_a(eid, name, foo) select eid, name, foo from vitess_c",
Rewritten: []string{
"select eid, name, foo from vitess_c limit 10001",
"select eid, name, foo from vitess_c limit 10001 for update",
"insert /* subquery */ into vitess_a(eid, name, foo) values (10, 'abcd', '20'), (11, 'bcde', '30') /* _stream vitess_a (eid id ) (10 1 ) (11 1 )",
},
RowsAffected: 2,
@ -769,7 +769,7 @@ func TestNocacheCases(t *testing.T) {
&framework.TestCase{
Query: "insert into vitess_e(id, name, foo) select eid, name, foo from vitess_c",
Rewritten: []string{
"select eid, name, foo from vitess_c limit 10001",
"select eid, name, foo from vitess_c limit 10001 for update",
"insert into vitess_e(id, name, foo) values (10, 'abcd', '20'), (11, 'bcde', '30') /* _stream vitess_e (eid id name ) (null 10 'YWJjZA==' ) (null 11 'YmNkZQ==' )",
},
RowsAffected: 2,
@ -1434,7 +1434,7 @@ func TestNocacheCases(t *testing.T) {
&framework.TestCase{
Query: "insert into vitess_ints select 2, tinyu, small, smallu, medium, mediumu, normal, normalu, big, bigu, y from vitess_ints",
Rewritten: []string{
"select 2, tinyu, small, smallu, medium, mediumu, normal, normalu, big, bigu, y from vitess_ints limit 10001",
"select 2, tinyu, small, smallu, medium, mediumu, normal, normalu, big, bigu, y from vitess_ints limit 10001 for update",
"insert into vitess_ints values (2, 255, -32768, 65535, -8388608, 16777215, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, 2012) /* _stream vitess_ints (tiny ) (2 )",
},
},
@ -1485,7 +1485,7 @@ func TestNocacheCases(t *testing.T) {
&framework.TestCase{
Query: "insert into vitess_fracts select 2, deci, num, f, d from vitess_fracts",
Rewritten: []string{
"select 2, deci, num, f, d from vitess_fracts limit 10001",
"select 2, deci, num, f, d from vitess_fracts limit 10001 for update",
"insert into vitess_fracts values (2, 1.99, 2.99, 3.99, 4.99) /* _stream vitess_fracts (id ) (2 )",
},
},
@ -1541,7 +1541,7 @@ func TestNocacheCases(t *testing.T) {
&framework.TestCase{
Query: "insert into vitess_strings select 'b', c, vc, b, tb, bl, ttx, tx, en, s from vitess_strings",
Rewritten: []string{
"select 'b', c, vc, b, tb, bl, ttx, tx, en, s from vitess_strings limit 10001",
"select 'b', c, vc, b, tb, bl, ttx, tx, en, s from vitess_strings limit 10001 for update",
"insert into vitess_strings values ('b', 'b', 'c', 'd\\0\\0\\0', 'e', 'f', 'g', 'h', 'a', 'a,b') /* _stream vitess_strings (vb ) ('Yg==' )",
},
},
@ -1590,9 +1590,9 @@ func TestNocacheCases(t *testing.T) {
},
framework.TestQuery("begin"),
&framework.TestCase{
Query: "insert into vitess_misc select 2, b, d, dt, t from vitess_misc",
Query: "insert into vitess_misc select 2, b, d, dt, t from vitess_misc for update",
Rewritten: []string{
"select 2, b, d, dt, t from vitess_misc limit 10001",
"select 2, b, d, dt, t from vitess_misc limit 10001 for update",
"insert into vitess_misc values (2, '\x01', '2012-01-01', '2012-01-01 15:45:45', '15:45:45') /* _stream vitess_misc (id ) (2 )",
},
},

Просмотреть файл

@ -397,7 +397,7 @@ func analyzeInsert(ins *sqlparser.Insert, getTable TableGetter) (plan *ExecPlan,
}
plan.PlanID = PlanInsertSubquery
plan.OuterQuery = GenerateInsertOuterQuery(ins)
plan.Subquery = GenerateSelectLimitQuery(sel)
plan.Subquery = GenerateSelectLimitQuery(sqlparser.ForUpdate(sel))
if len(ins.Columns) != 0 {
plan.ColumnNumbers, err = analyzeSelectExprs(sqlparser.SelectExprs(ins.Columns), tableInfo)
if err != nil {

Просмотреть файл

@ -158,7 +158,7 @@ func TestQueryExecutorPlanInsertSubQueryAutoCommmit(t *testing.T) {
Rows: [][]sqltypes.Value{},
}
db.AddQuery(query, want)
selectQuery := "select pk from test_table where pk = 1 limit 1000"
selectQuery := "select pk from test_table where pk = 1 limit 1000 for update"
db.AddQuery(selectQuery, &sqltypes.Result{
RowsAffected: 1,
Rows: [][]sqltypes.Value{
@ -190,7 +190,7 @@ func TestQueryExecutorPlanInsertSubQuery(t *testing.T) {
Rows: [][]sqltypes.Value{},
}
db.AddQuery(query, want)
selectQuery := "select pk from test_table where pk = 1 limit 1000"
selectQuery := "select pk from test_table where pk = 1 limit 1000 for update"
db.AddQuery(selectQuery, &sqltypes.Result{
RowsAffected: 1,
Rows: [][]sqltypes.Value{