Upsert is mostly feature complete. There are still two
issues to resolve:
1. RowsAffected is returned as 0 if no rows were modified. So, we cannot
assume that no rows were matched. So, the value cannot be used to verify
if the dup key matched a pk or a unique key.
2. I mistooke VALUES to be VALUE, and coded defense against using that
construct in the parser. It turns out that the parser already allows use
of VALUES. So, I'll have to find a different way to prevent that usage
for upserts.
This commit is contained in:
Sugu Sougoumarane 2015-08-14 13:23:33 -07:00
Родитель 67a82405f0
Коммит d3fc76999f
16 изменённых файлов: 406 добавлений и 74 удалений

Просмотреть файл

@ -8,6 +8,7 @@
"FullQuery": "select * from a union select * from b",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -28,6 +29,7 @@
"FullQuery": "select distinct * from a limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -48,6 +50,7 @@
"FullQuery": "select * from a group by b limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -68,6 +71,7 @@
"FullQuery": "select * from a having b = 1 limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -88,6 +92,7 @@
"FullQuery": "select * from a limit 5",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -113,6 +118,7 @@
"FullQuery": "select * from a.b limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -133,6 +139,7 @@
"FullQuery": "select * from a, b limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -153,6 +160,7 @@
"FullQuery": "select * from a join b limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -173,6 +181,7 @@
"FullQuery": "select * from a right join b limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -193,6 +202,7 @@
"FullQuery": "select * from b limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -213,6 +223,7 @@
"FullQuery": "select * from (b) limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -233,6 +244,7 @@
"FullQuery": "select :bv from a limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -253,6 +265,7 @@
"FullQuery": "select eid + 1 from a limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -273,6 +286,7 @@
"FullQuery": "select case when eid = 1 then 1 end from a limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -293,6 +307,7 @@
"FullQuery": "select eid from a limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0
@ -315,6 +330,7 @@
"FullQuery": "select eid as foo from a limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0
@ -337,6 +353,7 @@
"FullQuery": "select * from a limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -362,6 +379,7 @@
"FullQuery": "select c.eid from a as c limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0
@ -384,6 +402,7 @@
"FullQuery": "select (eid) from a limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -404,6 +423,7 @@
"FullQuery": "select eid from a limit :#maxLimit for update",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -424,6 +444,7 @@
"FullQuery": "select eid from a limit :#maxLimit lock in share mode",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -444,6 +465,7 @@
"FullQuery": "select * from a where eid = 1 and id in (1, 2) limit :#maxLimit",
"OuterQuery": "select eid, id, name, foo from a where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -469,6 +491,7 @@
"FullQuery": "select * from a where eid = :v1 and id in (:v2, :v3) limit :#maxLimit",
"OuterQuery": "select eid, id, name, foo from a where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -494,6 +517,7 @@
"FullQuery": "select * from a where name = 'foo' limit :#maxLimit",
"OuterQuery": "select eid, id, name, foo from a where :#pk",
"Subquery": "select eid, id from a use index (b_name) where name = 'foo' limit :#maxLimit",
"UpsertQuery": null,
"IndexUsed": "b_name",
"ColumnNumbers": [
0,
@ -519,6 +543,7 @@
"FullQuery": "select eid, name, id from a where name = 'foo' limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "b_name",
"ColumnNumbers": [
0,
@ -543,6 +568,7 @@
"FullQuery": "select * from d where id = 1 limit :#maxLimit",
"OuterQuery": "select name, id, foo, bar from d where :#pk",
"Subquery": "select name from d use index (d_id) where id = 1 limit :#maxLimit",
"UpsertQuery": null,
"IndexUsed": "d_id",
"ColumnNumbers": [
0,
@ -568,6 +594,7 @@
"FullQuery": "select * from d where id = 1 limit 1",
"OuterQuery": "select name, id, foo, bar from d where :#pk",
"Subquery": "select name from d use index (d_id) where id = 1 limit 1",
"UpsertQuery": null,
"IndexUsed": "d_id",
"ColumnNumbers": [
0,
@ -593,6 +620,7 @@
"FullQuery": "select * from a where eid + 1 = 1 limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -618,6 +646,7 @@
"FullQuery": "select * from a where eid = id limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -643,6 +672,7 @@
"FullQuery": "select * from d where name between 'foo' and 'bar' limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -668,6 +698,7 @@
"FullQuery": "select * from a where (eid = 1) and (id = 2) limit :#maxLimit",
"OuterQuery": "select eid, id, name, foo from a where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -696,6 +727,7 @@
"FullQuery": "select * from a where eid = 1 and id = 1 limit :#maxLimit",
"OuterQuery": "select eid, id, name, foo from a where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -724,6 +756,7 @@
"FullQuery": "select * from d where bar = 'foo' and id = 1 limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -749,6 +782,7 @@
"FullQuery": "select * from d where name = 'foo' limit :#maxLimit",
"OuterQuery": "select name, id, foo, bar from d where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -776,6 +810,7 @@
"FullQuery": "select * from d where name = 'foo' limit 1",
"OuterQuery": "select name, id, foo, bar from d where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -803,6 +838,7 @@
"FullQuery": "select * from d where name = 'foo' limit 0",
"OuterQuery": "select name, id, foo, bar from d where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -830,6 +866,7 @@
"FullQuery": "select * from d where name = 'foo' limit :a",
"OuterQuery": "select name, id, foo, bar from d where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -857,6 +894,7 @@
"FullQuery": "select * from d where name = 'foo' limit 1, 1",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -894,6 +932,7 @@
"FullQuery": "select * from d where 'foo' = name and eid = 1 limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -919,6 +958,7 @@
"FullQuery": "select * from d where name in ('foo', 'bar') limit :#maxLimit",
"OuterQuery": "select name, id, foo, bar from d where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -947,6 +987,7 @@
"FullQuery": "select * from d where name in (:a, :b) limit :#maxLimit",
"OuterQuery": "select name, id, foo, bar from d where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -975,6 +1016,7 @@
"FullQuery": "select * from d where name in ('foo') limit :#maxLimit",
"OuterQuery": "select name, id, foo, bar from d where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -1002,6 +1044,7 @@
"FullQuery": "select * from d where name in (:a) limit :#maxLimit",
"OuterQuery": "select name, id, foo, bar from d where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -1029,6 +1072,7 @@
"FullQuery": "select * from d where name in (:a) limit 1",
"OuterQuery": "select name, id, foo, bar from d where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -1054,6 +1098,7 @@
"FullQuery": "select * from a where eid in (1) and id in (1, 2) limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -1079,6 +1124,7 @@
"FullQuery": "select * from a where eid in (1, 2) and id in (1, 2) limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -1104,6 +1150,7 @@
"FullQuery": "select * from a where (eid, id) in ((1, 1), (2, 2)) limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -1129,6 +1176,7 @@
"FullQuery":"select * from d where foo = 'bar' limit :#maxLimit",
"OuterQuery": null,
"Subquery":null,
"UpsertQuery": null,
"IndexUsed":"",
"ColumnNumbers": [
0,
@ -1154,6 +1202,7 @@
"FullQuery": "select * from d as c where c.name = 'foo' limit :#maxLimit",
"OuterQuery": "select name, id, foo, bar from d as c where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "PRIMARY",
"ColumnNumbers": [
0,
@ -1181,6 +1230,7 @@
"FullQuery": "select * from d where id \u003c 0 limit :#maxLimit",
"OuterQuery": "select name, id, foo, bar from d where :#pk",
"Subquery": "select name from d use index (d_id) where id \u003c 0 limit :#maxLimit",
"UpsertQuery": null,
"IndexUsed": "d_id",
"ColumnNumbers": [
0,
@ -1206,6 +1256,7 @@
"FullQuery": "select * from d where name in ('foo', id) limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -1231,6 +1282,7 @@
"FullQuery": "select * from d where id between 1 and 2 limit :#maxLimit",
"OuterQuery": "select name, id, foo, bar from d where :#pk",
"Subquery": "select name from d use index (d_id) where id between 1 and 2 limit :#maxLimit",
"UpsertQuery": null,
"IndexUsed": "d_id",
"ColumnNumbers": [
0,
@ -1256,6 +1308,7 @@
"FullQuery": "select * from d where id not between 1 and 2 limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -1281,6 +1334,7 @@
"FullQuery": "select * from d where 1 between 1 and 2 limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -1306,6 +1360,7 @@
"FullQuery": "select * from d where name is not null limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -1331,6 +1386,7 @@
"FullQuery": "select * from a where eid = 1 and id = 1 order by name asc limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -1356,6 +1412,7 @@
"FullQuery": "select * from d where bar = 'foo' limit :#maxLimit",
"OuterQuery": "select name, id, foo, bar from d where :#pk",
"Subquery": "select name from d use index (d_bar) where bar = 'foo' limit :#maxLimit",
"UpsertQuery": null,
"IndexUsed": "d_bar",
"ColumnNumbers": [
0,
@ -1381,6 +1438,7 @@
"FullQuery": "select * from d use index (d_bar_never) where bar = 'foo' limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -1406,6 +1464,7 @@
"FullQuery": "select * from d force index (d_bar_never) where bar = 'foo' limit :#maxLimit",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -1435,6 +1494,7 @@
"FullQuery": "insert into b.a(eid, id) values (1, :a)",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -1455,6 +1515,7 @@
"FullQuery": "insert into a(a.eid, id) values (1, 2)",
"OuterQuery": "insert into a(a.eid, id) values (1, 2)",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": [
@ -1482,6 +1543,7 @@
"FullQuery": "insert into a(eid, id) values (1, :a)",
"OuterQuery": "insert into a(eid, id) values (1, :a)",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": [
@ -1505,6 +1567,7 @@
"FullQuery": "insert into a(id) values (1)",
"OuterQuery": "insert into a(id) values (1)",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": [
@ -1528,6 +1591,7 @@
"FullQuery": "insert into d(id) values (1)",
"OuterQuery": "insert into d(id) values (1)",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": [
@ -1554,6 +1618,7 @@
"FullQuery": "insert into a(eid, id) values (-1, 2)",
"OuterQuery": "insert into a(eid, id) values (-1, 2)",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": [
@ -1577,6 +1642,7 @@
"FullQuery": "insert into a(eid, id) values (1, 2)",
"OuterQuery": "insert into a(eid, id) values (1, 2)",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": [
@ -1594,12 +1660,13 @@
"insert into a (eid, id) values (~1, 2)"
{
"PlanId": "PASS_DML",
"Reason": "DEFAULT",
"Reason": "COMPLEX_EXPR",
"TableName": "a",
"FieldQuery": null,
"FullQuery": "insert into a(eid, id) values (~1, 2)",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -1614,12 +1681,13 @@
"insert into a (eid, id) values (1+1, 2)"
{
"PlanId": "PASS_DML",
"Reason": "DEFAULT",
"Reason": "COMPLEX_EXPR",
"TableName": "a",
"FieldQuery": null,
"FullQuery": "insert into a(eid, id) values (1 + 1, 2)",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -1640,6 +1708,7 @@
"FullQuery": "insert into c(eid, id) values (1, 2)",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -1660,6 +1729,7 @@
"FullQuery": "insert into a values (1, 2)",
"OuterQuery": "insert into a values (1, 2)",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": [
@ -1674,18 +1744,19 @@
}
# on dup
"insert into b (eid, id) values (1, 2) on duplicate key update name = values(a)"
"insert into b (eid, id) values (1, 2) on duplicate key update name = func(a)"
{
"PlanId": "PASS_DML",
"Reason": "UPSERT",
"PlanId": "UPSERT_PK",
"Reason": "DEFAULT",
"TableName": "b",
"FieldQuery": null,
"FullQuery": "insert into b(eid, id) values (1, 2) on duplicate key update name = values(a)",
"OuterQuery": null,
"FullQuery": "insert into b(eid, id) values (1, 2) on duplicate key update name = func(a)",
"OuterQuery": "insert into b(eid, id) values (1, 2)",
"Subquery": null,
"UpsertQuery": "update b set name = func(a) where :#pk",
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
"PKValues": [1, 2],
"Limit": null,
"SecondaryPKValues": null,
"SubqueryPKColumns": null,
@ -1696,16 +1767,38 @@
# on dup pk change
"insert into b (eid, id) values (1, 2) on duplicate key update eid = 2"
{
"PlanId": "PASS_DML",
"Reason": "UPSERT",
"PlanId": "UPSERT_PK",
"Reason": "DEFAULT",
"TableName": "b",
"FieldQuery": null,
"FullQuery": "insert into b(eid, id) values (1, 2) on duplicate key update eid = 2",
"OuterQuery": null,
"OuterQuery": "insert into b(eid, id) values (1, 2)",
"Subquery": null,
"UpsertQuery": "update b set eid = 2 where :#pk",
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
"PKValues": [1, 2],
"Limit": null,
"SecondaryPKValues": [2, null],
"SubqueryPKColumns": null,
"SetKey": "",
"SetValue": null
}
# on dup complex pk change
"insert into b (id, eid) values (1, 2) on duplicate key update eid = func(a)"
{
"PlanId": "PASS_DML",
"Reason": "PK_CHANGE",
"TableName": "b",
"FieldQuery": null,
"FullQuery": "insert into b(id, eid) values (1, 2) on duplicate key update eid = func(a)",
"OuterQuery": "insert into b(id, eid) values (1, 2)",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": [2, 1],
"Limit": null,
"SecondaryPKValues": null,
"SubqueryPKColumns": null,
@ -1713,16 +1806,38 @@
"SetValue": null
}
# on dup complex pk change
"insert into b (id, eid) values (1, 2) on duplicate key update eid = values(a)"
# on dup multi-row
"insert into b (id, eid) values (1, 2), (2, 3) on duplicate key update name = func(a)"
{
"PlanId": "PASS_DML",
"Reason": "UPSERT",
"TableName": "b",
"FieldQuery": null,
"FullQuery": "insert into b(id, eid) values (1, 2) on duplicate key update eid = values(a)",
"FullQuery": "insert into b(id, eid) values (1, 2), (2, 3) on duplicate key update name = func(a)",
"OuterQuery": "insert into b(id, eid) values (1, 2), (2, 3)",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": [[2,3],[1,2]],
"Limit": null,
"SecondaryPKValues": null,
"SubqueryPKColumns": null,
"SetKey": "",
"SetValue": null
}
# on dup subquery
"insert into b (id, eid) select * from a on duplicate key update name = func(a)"
{
"PlanId": "PASS_DML",
"Reason": "UPSERT",
"TableName": "b",
"FieldQuery": null,
"FullQuery": "insert into b(id, eid) select * from a on duplicate key update name = func(a)",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -1743,6 +1858,7 @@
"FullQuery": "insert into b(eid, id) select * from a",
"OuterQuery": "insert into b(eid, id) values :#values",
"Subquery": "select * from a limit :#maxLimit",
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -1769,6 +1885,7 @@
"FullQuery": "insert into b select * from a",
"OuterQuery": "insert into b values :#values",
"Subquery": "select * from a limit :#maxLimit",
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": [
0,
@ -1795,6 +1912,7 @@
"FullQuery": "insert into b(eid, id) values (1, 2), (3, 4)",
"OuterQuery": "insert into b(eid, id) values (1, 2), (3, 4)",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": [
@ -1824,6 +1942,7 @@
"FullQuery": "update b.a set name = 'foo' where eid = 1 and id = 1",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -1844,6 +1963,7 @@
"FullQuery": "update b set eid = 1",
"OuterQuery": "update b set eid = 1 where :#pk",
"Subquery": "select eid, id from b limit :#maxLimit for update",
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -1871,6 +1991,7 @@
"FullQuery": "update b set a.eid = 1",
"OuterQuery": "update b set a.eid = 1 where :#pk",
"Subquery": "select eid, id from b limit :#maxLimit for update",
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -1894,6 +2015,7 @@
"FullQuery": "update b set eid = foo()",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -1914,6 +2036,7 @@
"FullQuery": "update a set name = 'foo'",
"OuterQuery": "update a set name = 'foo' where :#pk",
"Subquery": "select eid, id from a limit :#maxLimit for update",
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -1934,6 +2057,7 @@
"FullQuery": "update a set name = 'foo' where eid + 1 = 1",
"OuterQuery": "update a set name = 'foo' where :#pk",
"Subquery": "select eid, id from a where eid + 1 = 1 limit :#maxLimit for update",
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -1954,6 +2078,7 @@
"FullQuery": "update a set name = 'foo' where eid = 1 and id = 1",
"OuterQuery": "update a set name = 'foo' where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": [
@ -1977,6 +2102,7 @@
"FullQuery": "update a set a.name = 'foo' where eid = 1 and id = 1",
"OuterQuery": "update a set a.name = 'foo' where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": [
@ -2000,6 +2126,7 @@
"FullQuery": "update a set name = 'foo' where eid = 1",
"OuterQuery": "update a set name = 'foo' where :#pk",
"Subquery": "select eid, id from a where eid = 1 limit :#maxLimit for update",
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -2020,6 +2147,7 @@
"FullQuery": "update a set name = 'foo' where eid = 1 limit 10",
"OuterQuery": "update a set name = 'foo' where :#pk",
"Subquery": "select eid, id from a where eid = 1 limit 10 for update",
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -2040,6 +2168,7 @@
"FullQuery": "update a set name = 'foo' where eid = 1 and name = 'foo'",
"OuterQuery": "update a set name = 'foo' where :#pk",
"Subquery": "select eid, id from a where eid = 1 and name = 'foo' limit :#maxLimit for update",
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -2060,6 +2189,7 @@
"FullQuery": "update c set eid = 1",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -2080,6 +2210,7 @@
"FullQuery": "delete from b.a where eid = 1 and id = 1",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -2100,6 +2231,7 @@
"FullQuery": "delete from a",
"OuterQuery": "delete from a where :#pk",
"Subquery": "select eid, id from a limit :#maxLimit for update",
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -2120,6 +2252,7 @@
"FullQuery": "delete from a where eid + 1 = 1",
"OuterQuery": "delete from a where :#pk",
"Subquery": "select eid, id from a where eid + 1 = 1 limit :#maxLimit for update",
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -2140,6 +2273,7 @@
"FullQuery": "delete from a where eid = 1 and id = 1",
"OuterQuery": "delete from a where :#pk",
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": [
@ -2163,6 +2297,7 @@
"FullQuery": "delete from a where eid = 1",
"OuterQuery": "delete from a where :#pk",
"Subquery": "select eid, id from a where eid = 1 limit :#maxLimit for update",
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -2183,6 +2318,7 @@
"FullQuery": "delete from a where eid = 1 and name = 'foo'",
"OuterQuery": "delete from a where :#pk",
"Subquery": "select eid, id from a where eid = 1 and name = 'foo' limit :#maxLimit for update",
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -2203,6 +2339,7 @@
"FullQuery": "delete from c",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -2223,6 +2360,7 @@
"FullQuery": "set a = 1",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -2243,6 +2381,7 @@
"FullQuery": "set a = 1.2",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -2263,6 +2402,7 @@
"FullQuery": "set a = 'b'",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -2283,6 +2423,7 @@
"FullQuery": "set a = 1, b = 2",
"OuterQuery": null,
"Subquery": null,
"UpsertQuery": null,
"IndexUsed": "",
"ColumnNumbers": null,
"PKValues": null,
@ -2303,6 +2444,7 @@
"FullQuery":null,
"OuterQuery":null,
"Subquery":null,
"UpsertQuery": null,
"IndexUsed":"",
"ColumnNumbers":null,
"PKValues":null,
@ -2323,6 +2465,7 @@
"FullQuery":null,
"OuterQuery":null,
"Subquery":null,
"UpsertQuery": null,
"IndexUsed":"",
"ColumnNumbers":null,
"PKValues":null,
@ -2343,6 +2486,7 @@
"FullQuery":null,
"OuterQuery":null,
"Subquery":null,
"UpsertQuery": null,
"IndexUsed":"",
"ColumnNumbers":null,
"PKValues":null,
@ -2363,6 +2507,7 @@
"FullQuery":null,
"OuterQuery":null,
"Subquery":null,
"UpsertQuery": null,
"IndexUsed":"",
"ColumnNumbers":null,
"PKValues":null,
@ -2383,6 +2528,7 @@
"FullQuery":null,
"OuterQuery":null,
"Subquery":null,
"UpsertQuery": null,
"IndexUsed":"",
"ColumnNumbers":null,
"PKValues":null,
@ -2403,6 +2549,7 @@
"FullQuery":null,
"OuterQuery":null,
"Subquery":null,
"UpsertQuery": null,
"IndexUsed":"",
"ColumnNumbers":null,
"PKValues":null,
@ -2423,6 +2570,7 @@
"FullQuery":null,
"OuterQuery":null,
"Subquery":null,
"UpsertQuery": null,
"IndexUsed":"",
"ColumnNumbers":null,
"PKValues":null,
@ -2443,6 +2591,7 @@
"FullQuery":null,
"OuterQuery":null,
"Subquery":null,
"UpsertQuery": null,
"IndexUsed":"",
"ColumnNumbers":null,
"PKValues":null,
@ -2463,6 +2612,7 @@
"FullQuery":null,
"OuterQuery":null,
"Subquery":null,
"UpsertQuery": null,
"IndexUsed":"",
"ColumnNumbers":null,
"PKValues":null,

Просмотреть файл

@ -8,6 +8,7 @@
"FullQuery":"select * from a",
"OuterQuery":null,
"Subquery":null,
"UpsertQuery":null,
"IndexUsed":"",
"ColumnNumbers":null,
"PKValues":null,
@ -28,6 +29,7 @@
"FullQuery":"select * from a join b",
"OuterQuery":null,
"Subquery":null,
"UpsertQuery":null,
"IndexUsed":"",
"ColumnNumbers":null,
"PKValues":null,
@ -52,6 +54,7 @@
"FullQuery": "select * from a union select * from b",
"OuterQuery":null,
"Subquery":null,
"UpsertQuery":null,
"IndexUsed":"",
"ColumnNumbers":null,
"PKValues":null,

Просмотреть файл

@ -84,7 +84,7 @@ func TestDBConnKill(t *testing.T) {
}
newKillQuery := fmt.Sprintf("kill %d", dbConn.ID())
// Kill failed because "kill query_id" failed
db.AddRejectedQuery(newKillQuery)
db.AddRejectedQuery(newKillQuery, errRejected)
err = dbConn.Kill()
testUtils.checkTabletError(t, err, ErrFail, "Could not kill query")

Просмотреть файл

@ -388,14 +388,13 @@ func analyzeInsert(ins *sqlparser.Insert, getTable TableGetter) (plan *ExecPlan,
pkColumnNumbers := getInsertPKColumns(ins.Columns, tableInfo)
if ins.OnDup != nil {
// Upserts are not safe for statement based replication:
// http://bugs.mysql.com/bug.php?id=58637
plan.Reason = REASON_UPSERT
return plan, nil
}
if sel, ok := ins.Rows.(sqlparser.SelectStatement); ok {
if ins.OnDup != nil {
// Upserts not allowed for subqueries.
// http://bugs.mysql.com/bug.php?id=58637
plan.Reason = REASON_UPSERT
return plan, nil
}
plan.PlanId = PLAN_INSERT_SUBQUERY
plan.OuterQuery = GenerateInsertOuterQuery(ins)
plan.Subquery = GenerateSelectLimitQuery(sel)
@ -422,11 +421,36 @@ func analyzeInsert(ins *sqlparser.Insert, getTable TableGetter) (plan *ExecPlan,
if err != nil {
return nil, err
}
if pkValues != nil {
plan.PlanId = PLAN_INSERT_PK
plan.OuterQuery = plan.FullQuery
plan.PKValues = pkValues
if pkValues == nil {
plan.Reason = REASON_COMPLEX_EXPR
return plan, nil
}
plan.OuterQuery = GenerateInsertNoUpdate(ins)
plan.PKValues = pkValues
if ins.OnDup == nil {
plan.PlanId = PLAN_INSERT_PK
return plan, nil
}
if len(rowList) > 1 {
// Upsert supported only for single row inserts.
plan.Reason = REASON_UPSERT
return plan, nil
}
plan.SecondaryPKValues, err = analyzeUpdateExpressions(sqlparser.UpdateExprs(ins.OnDup), tableInfo.Indexes[0])
if err != nil {
if err == ErrTooComplex {
plan.Reason = REASON_PK_CHANGE
return plan, nil
}
return nil, err
}
plan.PlanId = PLAN_UPSERT_PK
upd := &sqlparser.Update{
Comments: ins.Comments,
Table: ins.Table,
Exprs: sqlparser.UpdateExprs(ins.OnDup),
}
plan.UpsertQuery = GenerateUpdateOuterQuery(upd)
return plan, nil
}
@ -467,7 +491,6 @@ func getInsertPKValues(pkColumnNumbers []int, rowList sqlparser.Values, tableInf
}
node := row[columnNumber]
if !sqlparser.IsValue(node) {
log.Warningf("insert is too complex %v", node)
return nil, nil
}
var err error

Просмотреть файл

@ -54,6 +54,8 @@ const (
PLAN_SELECT_STREAM
// PLAN_OTHER is for SHOW, DESCRIBE & EXPLAIN statements
PLAN_OTHER
// PLAN_UPSERT_PK is for insert ... on duplicate key constructs
PLAN_UPSERT_PK
// NumPlans stores the total number of plans
NumPlans
)
@ -73,6 +75,7 @@ var planName = []string{
"DDL",
"SELECT_STREAM",
"OTHER",
"UPSERT_PK",
}
func (pt PlanType) String() string {
@ -120,6 +123,7 @@ var tableAclRoles = map[PlanType]tableacl.Role{
PLAN_DDL: tableacl.ADMIN,
PLAN_SELECT_STREAM: tableacl.READER,
PLAN_OTHER: tableacl.ADMIN,
PLAN_UPSERT_PK: tableacl.WRITER,
}
// ReasonType indicates why a query plan fails to build
@ -141,6 +145,7 @@ const (
REASON_TABLE_NOINDEX
REASON_PK_CHANGE
REASON_HAS_HINTS
REASON_COMPLEX_EXPR
REASON_UPSERT
)
@ -161,6 +166,7 @@ var reasonName = []string{
"TABLE_NOINDEX",
"PK_CHANGE",
"HAS_HINTS",
"COMPLEX_EXPR",
"UPSERT",
}
@ -193,9 +199,10 @@ type ExecPlan struct {
// For PK plans, only OuterQuery is set.
// For SUBQUERY plans, Subquery is also set.
// IndexUsed is set only for PLAN_SELECT_SUBQUERY
OuterQuery *sqlparser.ParsedQuery
Subquery *sqlparser.ParsedQuery
IndexUsed string
OuterQuery *sqlparser.ParsedQuery
Subquery *sqlparser.ParsedQuery
UpsertQuery *sqlparser.ParsedQuery
IndexUsed string
// For selects, columns to be returned
// For PLAN_INSERT_SUBQUERY, columns to be inserted

Просмотреть файл

@ -78,16 +78,24 @@ func GenerateSelectOuterQuery(sel *sqlparser.Select, tableInfo *schema.Table) *s
// GenerateInsertOuterQuery generates the outer query for inserts.
func GenerateInsertOuterQuery(ins *sqlparser.Insert) *sqlparser.ParsedQuery {
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("insert %vinto %v%v values %a%v",
buf.Myprintf("insert %vinto %v%v values %a",
ins.Comments,
ins.Table,
ins.Columns,
":#values",
ins.OnDup,
)
return buf.ParsedQuery()
}
// GenerateInsertNoUpdate generates an insert without the update part.
func GenerateInsertNoUpdate(ins *sqlparser.Insert) *sqlparser.ParsedQuery {
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("insert %vinto %v%v %v",
ins.Comments,
ins.Table, ins.Columns, ins.Rows)
return buf.ParsedQuery()
}
// GenerateUpdateOuterQuery generates the outer query for updates.
func GenerateUpdateOuterQuery(upd *sqlparser.Update) *sqlparser.ParsedQuery {
buf := sqlparser.NewTrackedBuffer(nil)

Просмотреть файл

@ -10,6 +10,7 @@ import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/hack"
"github.com/youtube/vitess/go/mysql"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/callinfo"
@ -88,6 +89,8 @@ func (qre *QueryExecutor) Execute() (reply *mproto.QueryResult, err error) {
reply, err = qre.execDMLSubquery(conn, invalidator)
case planbuilder.PLAN_OTHER:
reply, err = qre.execSQL(conn, qre.query, true)
case planbuilder.PLAN_UPSERT_PK:
reply, err = qre.execUpsertPK(conn, invalidator)
default: // select or set in a transaction, just count as select
reply, err = qre.execDirect(conn)
}
@ -180,6 +183,8 @@ func (qre *QueryExecutor) execDmlAutoCommit() (reply *mproto.QueryResult, err er
reply, err = qre.execDMLPK(conn, invalidator)
case planbuilder.PLAN_DML_SUBQUERY:
reply, err = qre.execDMLSubquery(conn, invalidator)
case planbuilder.PLAN_UPSERT_PK:
reply, err = qre.execUpsertPK(conn, invalidator)
default:
return nil, NewTabletError(ErrFatal, "unsupported query: %s", qre.query)
}
@ -466,12 +471,35 @@ func (qre *QueryExecutor) execInsertSubquery(conn poolConn) (*mproto.QueryResult
}
func (qre *QueryExecutor) execInsertPKRows(conn poolConn, pkRows [][]sqltypes.Value) (*mproto.QueryResult, error) {
secondaryList, err := buildSecondaryList(qre.plan.TableInfo, pkRows, qre.plan.SecondaryPKValues, qre.bindVars)
bsc := buildStreamComment(qre.plan.TableInfo, pkRows, nil)
return qre.directFetch(conn, qre.plan.OuterQuery, qre.bindVars, bsc)
}
func (qre *QueryExecutor) execUpsertPK(conn poolConn, invalidator CacheInvalidator) (*mproto.QueryResult, error) {
pkRows, err := buildValueList(qre.plan.TableInfo, qre.plan.PKValues, qre.bindVars)
if err != nil {
return nil, err
}
bsc := buildStreamComment(qre.plan.TableInfo, pkRows, secondaryList)
return qre.directFetch(conn, qre.plan.OuterQuery, qre.bindVars, bsc)
bsc := buildStreamComment(qre.plan.TableInfo, pkRows, nil)
result, err := qre.directFetch(conn, qre.plan.OuterQuery, qre.bindVars, bsc)
if err == nil {
return result, nil
}
terr, ok := err.(*TabletError)
if !ok {
return result, err
}
if terr.SqlError != mysql.ErrDupEntry {
return nil, err
}
result, err = qre.execDMLPKRows(conn, qre.plan.UpsertQuery, pkRows, invalidator)
if err != nil {
return nil, err
}
if result.RowsAffected != 1 {
return nil, NewTabletError(ErrFail, "upsert failed to update a dup key row")
}
return result, nil
}
func (qre *QueryExecutor) execDMLPK(conn poolConn, invalidator CacheInvalidator) (*mproto.QueryResult, error) {
@ -479,7 +507,7 @@ func (qre *QueryExecutor) execDMLPK(conn poolConn, invalidator CacheInvalidator)
if err != nil {
return nil, err
}
return qre.execDMLPKRows(conn, pkRows, invalidator)
return qre.execDMLPKRows(conn, qre.plan.OuterQuery, pkRows, invalidator)
}
func (qre *QueryExecutor) execDMLSubquery(conn poolConn, invalidator CacheInvalidator) (*mproto.QueryResult, error) {
@ -487,10 +515,10 @@ func (qre *QueryExecutor) execDMLSubquery(conn poolConn, invalidator CacheInvali
if err != nil {
return nil, err
}
return qre.execDMLPKRows(conn, innerResult.Rows, invalidator)
return qre.execDMLPKRows(conn, qre.plan.OuterQuery, innerResult.Rows, invalidator)
}
func (qre *QueryExecutor) execDMLPKRows(conn poolConn, pkRows [][]sqltypes.Value, invalidator CacheInvalidator) (*mproto.QueryResult, error) {
func (qre *QueryExecutor) execDMLPKRows(conn poolConn, query *sqlparser.ParsedQuery, pkRows [][]sqltypes.Value, invalidator CacheInvalidator) (*mproto.QueryResult, error) {
if len(pkRows) == 0 {
return &mproto.QueryResult{RowsAffected: 0}, nil
}
@ -516,7 +544,7 @@ func (qre *QueryExecutor) execDMLPKRows(conn poolConn, pkRows [][]sqltypes.Value
Columns: qre.plan.TableInfo.Indexes[0].Columns,
Rows: pkRows,
}
r, err := qre.directFetch(conn, qre.plan.OuterQuery, qre.bindVars, bsc)
r, err := qre.directFetch(conn, query, qre.bindVars, bsc)
if err != nil {
return nil, err
}

Просмотреть файл

@ -12,7 +12,9 @@ import (
"testing"
"time"
"github.com/youtube/vitess/go/mysql"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/callinfo"
tableaclpb "github.com/youtube/vitess/go/vt/proto/tableacl"
@ -214,6 +216,60 @@ func TestQueryExecutorPlanInsertSubQuery(t *testing.T) {
}
}
func TestQueryExecutorPlanUpsertPk(t *testing.T) {
db := setUpQueryExecutorTest()
db.AddQuery("insert into test_table values (1) /* _stream test_table (pk ) (1 ); */", &mproto.QueryResult{})
want := &mproto.QueryResult{
Rows: make([][]sqltypes.Value, 0),
}
query := "insert into test_table values(1) on duplicate key update val=1"
ctx := context.Background()
sqlQuery := newTestSQLQuery(ctx, enableRowCache|enableStrict)
qre := newTestQueryExecutor(ctx, sqlQuery, query, 0)
defer sqlQuery.disallowQueries()
checkPlanID(t, planbuilder.PLAN_UPSERT_PK, qre.plan.PlanId)
got, err := qre.Execute()
if err != nil {
t.Fatalf("qre.Execute() = %v, want nil", err)
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got: %v, want: %v", got, want)
}
db.AddRejectedQuery("insert into test_table values (1) /* _stream test_table (pk ) (1 ); */", errRejected)
_, err = qre.Execute()
wantErr := "error: rejected"
if err == nil || err.Error() != wantErr {
t.Fatalf("qre.Execute() = %v, want %v", err, wantErr)
}
db.AddRejectedQuery(
"insert into test_table values (1) /* _stream test_table (pk ) (1 ); */",
sqldb.NewSqlError(mysql.ErrDupEntry, "err"),
)
db.AddQuery("update test_table set val = 1 where pk in (1) /* _stream test_table (pk ) (1 ); */", &mproto.QueryResult{})
_, err = qre.Execute()
wantErr = "error: upsert failed to update a dup key row"
if err == nil || err.Error() != wantErr {
t.Fatalf("qre.Execute() = %v, want %v", err, wantErr)
}
db.AddQuery(
"update test_table set val = 1 where pk in (1) /* _stream test_table (pk ) (1 ); */",
&mproto.QueryResult{RowsAffected: 1},
)
got, err = qre.Execute()
if err != nil {
t.Fatalf("qre.Execute() = %v, want nil", err)
}
want = &mproto.QueryResult{
RowsAffected: 1,
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got: %v, want: %v", got, want)
}
}
func TestQueryExecutorPlanDmlPk(t *testing.T) {
db := setUpQueryExecutorTest()
query := "update test_table set name = 2 where pk in (1) /* _stream test_table (pk ) (1 ); */"

Просмотреть файл

@ -527,7 +527,7 @@ func TestSqlQueryExecuteBatchBeginFail(t *testing.T) {
db := setUpSqlQueryTest()
testUtils := newTestUtils()
// make "begin" query fail
db.AddRejectedQuery("begin")
db.AddRejectedQuery("begin", errRejected)
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
@ -561,7 +561,7 @@ func TestSqlQueryExecuteBatchCommitFail(t *testing.T) {
db := setUpSqlQueryTest()
testUtils := newTestUtils()
// make "commit" query fail
db.AddRejectedQuery("commit")
db.AddRejectedQuery("commit", errRejected)
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
dbconfigs := testUtils.newDBConfigs()
@ -607,8 +607,8 @@ func TestSqlQueryExecuteBatchSqlExecFailInTransaction(t *testing.T) {
db.AddQuery(expanedSql, sqlResult)
// make this query fail
db.AddRejectedQuery(sql)
db.AddRejectedQuery(expanedSql)
db.AddRejectedQuery(sql, errRejected)
db.AddRejectedQuery(expanedSql, errRejected)
config := testUtils.newQueryServiceConfig()
sqlQuery := NewSqlQuery(config)
@ -662,7 +662,7 @@ func TestSqlQueryExecuteBatchSqlSucceedInTransaction(t *testing.T) {
db.AddQuery(expanedSql, sqlResult)
// cause execution error for this particular sql query
db.AddRejectedQuery(sql)
db.AddRejectedQuery(sql, errRejected)
config := testUtils.newQueryServiceConfig()
config.EnableAutoCommit = true

Просмотреть файл

@ -5,6 +5,7 @@
package tabletserver
import (
"errors"
"fmt"
"math/rand"
"testing"
@ -18,6 +19,8 @@ import (
"golang.org/x/net/context"
)
var errRejected = errors.New("rejected")
func TestTableInfoNew(t *testing.T) {
fakecacheservice.Register()
db := fakesqldb.Register()
@ -46,7 +49,7 @@ func TestTableInfoFailBecauseUnableToRetrieveTableIndex(t *testing.T) {
for query, result := range getTestTableInfoQueries() {
db.AddQuery(query, result)
}
db.AddRejectedQuery("show index from `test_table`")
db.AddRejectedQuery("show index from `test_table`", errRejected)
cachePool := newTestTableInfoCachePool()
cachePool.Open()
defer cachePool.Close()

Просмотреть файл

@ -174,7 +174,7 @@ func TestTxPoolBeginWithPoolConnectionError(t *testing.T) {
func TestTxPoolBeginWithExecError(t *testing.T) {
db := fakesqldb.Register()
db.AddRejectedQuery("begin")
db.AddRejectedQuery("begin", errRejected)
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
@ -190,7 +190,7 @@ func TestTxPoolSafeCommitFail(t *testing.T) {
sql := fmt.Sprintf("alter table test_table add test_column int")
db.AddQuery("begin", &proto.QueryResult{})
db.AddQuery(sql, &proto.QueryResult{})
db.AddRejectedQuery("commit")
db.AddRejectedQuery("commit", errRejected)
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
@ -215,7 +215,7 @@ func TestTxPoolRollbackFail(t *testing.T) {
db := fakesqldb.Register()
db.AddQuery(sql, &proto.QueryResult{})
db.AddQuery("begin", &proto.QueryResult{})
db.AddRejectedQuery("rollback")
db.AddRejectedQuery("rollback", errRejected)
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}

Просмотреть файл

@ -31,7 +31,7 @@ type Conn struct {
type DB struct {
isConnFail bool
data map[string]*proto.QueryResult
rejectedData map[string]*proto.QueryResult
rejectedData map[string]error
queryCalled map[string]int
mu sync.Mutex
}
@ -67,18 +67,10 @@ func (db *DB) DeleteQuery(query string) {
}
// AddRejectedQuery adds a query which will be rejected at execution time.
func (db *DB) AddRejectedQuery(query string) {
func (db *DB) AddRejectedQuery(query string, err error) {
db.mu.Lock()
defer db.mu.Unlock()
db.rejectedData[strings.ToLower(query)] = &proto.QueryResult{}
}
// HasRejectedQuery returns true if this query will be rejected.
func (db *DB) HasRejectedQuery(query string) bool {
db.mu.Lock()
defer db.mu.Unlock()
_, ok := db.rejectedData[strings.ToLower(query)]
return ok
db.rejectedData[strings.ToLower(query)] = err
}
// DeleteRejectedQuery deletes query from the fake DB.
@ -138,8 +130,8 @@ func (conn *Conn) ExecuteFetch(query string, maxrows int, wantfields bool) (*pro
if conn.IsClosed() {
return nil, fmt.Errorf("connection is closed")
}
if conn.db.HasRejectedQuery(query) {
return nil, fmt.Errorf("unsupported query, reject query: %s", query)
if err, ok := conn.db.rejectedData[query]; ok {
return nil, err
}
result, ok := conn.db.GetQuery(query)
if !ok {
@ -158,12 +150,9 @@ func (conn *Conn) ExecuteFetch(query string, maxrows int, wantfields bool) (*pro
copy(qr.Fields, result.Fields)
}
rowCount := int(qr.RowsAffected)
rows := make([][]sqltypes.Value, rowCount)
if rowCount > 0 {
for i := 0; i < rowCount; i++ {
rows[i] = result.Rows[i]
}
rows := make([][]sqltypes.Value, 0, len(result.Rows))
for _, r := range result.Rows {
rows = append(rows, r)
}
qr.Rows = rows
return qr, nil
@ -204,8 +193,8 @@ func (conn *Conn) ExecuteStreamFetch(query string) error {
if conn.IsClosed() {
return fmt.Errorf("connection is closed")
}
if conn.db.HasRejectedQuery(query) {
return fmt.Errorf("unsupported query, reject query: %s", query)
if err, ok := conn.db.rejectedData[query]; ok {
return err
}
result, ok := conn.db.GetQuery(query)
if !ok {
@ -283,7 +272,7 @@ func Register() *DB {
name := fmt.Sprintf("fake-%d", rand.Int63())
db := &DB{
data: make(map[string]*proto.QueryResult),
rejectedData: make(map[string]*proto.QueryResult),
rejectedData: make(map[string]error),
queryCalled: make(map[string]int),
}
sqldb.Register(name, func(sqldb.ConnParams) (sqldb.Conn, error) {

Просмотреть файл

@ -411,6 +411,46 @@ cases = [
'delete from vtocc_a where eid>1',
'commit']),
MultiCase(
'upsert single row present/absent',
['begin',
Case(sql="insert into upsert_test(id1, id2) values (1, 1) on duplicate key update id2 = 1",
rewritten="insert into upsert_test(id1, id2) values (1, 1) /* _stream upsert_test (id1 ) (1 )",
rowcount=1),
Case(sql='select * from upsert_test',
result=[(1L, 1L)]),
Case(sql="insert into upsert_test(id1, id2) values (1, 2) on duplicate key update id2 = 2",
rewritten=[
"insert into upsert_test(id1, id2) values (1, 2) /* _stream upsert_test (id1 ) (1 )",
"update upsert_test set id2 = 2 where id1 in (1) /* _stream upsert_test (id1 ) (1 )"],
rowcount=1),
Case(sql='select * from upsert_test',
result=[(1L, 2L)]),
'commit',
'begin',
'delete from upsert_test',
'commit']),
MultiCase(
'upsert changes pk',
['begin',
Case(sql="insert into upsert_test(id1, id2) values (1, 1) on duplicate key update id1 = 1",
rewritten="insert into upsert_test(id1, id2) values (1, 1) /* _stream upsert_test (id1 ) (1 )",
rowcount=1),
Case(sql='select * from upsert_test',
result=[(1L, 1L)]),
Case(sql="insert into upsert_test(id1, id2) values (1, 2) on duplicate key update id1 = 2",
rewritten=[
"insert into upsert_test(id1, id2) values (1, 2) /* _stream upsert_test (id1 ) (1 )",
"update upsert_test set id1 = 2 where id1 in (1) /* _stream upsert_test (id1 ) (1 ) (2 )"],
rowcount=1),
Case(sql='select * from upsert_test',
result=[(2L, 1L)]),
'commit',
'begin',
'delete from upsert_test',
'commit']),
MultiCase(
'update',
['begin',

Просмотреть файл

@ -142,12 +142,33 @@ class TestNocache(framework.TestCase):
self.env.execute("insert into vtocc_a(eid, id, name, foo) values (7, 1+1, '', '')")
with self.assertRaises(dbexceptions.DatabaseError):
self.env.execute("insert into vtocc_d(eid, id) values (1, 1)")
with self.assertRaises(dbexceptions.DatabaseError):
self.env.execute("insert into vtocc_a(eid, id, name, foo) values (8, 2, '', '') on duplicate key update id = 2+1")
with self.assertRaises(dbexceptions.DatabaseError):
self.env.execute("update vtocc_a set eid = 1+1 where eid = 1 and id = 1")
with self.assertRaises(dbexceptions.DatabaseError):
self.env.execute("insert into vtocc_d(eid, id) values (1, 1)")
self.env.execute("delete from upsert_test")
with self.assertRaises(dbexceptions.DatabaseError):
self.env.execute("insert into upsert_test(id1, id2) values (1, 1), (2, 2) on duplicate key update id1 = 1")
self.env.execute("delete from upsert_test")
with self.assertRaises(dbexceptions.DatabaseError):
self.env.execute("insert into upsert_test(id1, id2) select eid, id from vtocc_a limit 1 on duplicate key update id2 = id1")
self.env.execute("delete from upsert_test")
with self.assertRaises(dbexceptions.DatabaseError):
self.env.execute("insert into upsert_test(id1, id2) values (1, 1) on duplicate key update id1 = 2+1")
self.env.execute("delete from upsert_test")
with self.assertRaises(dbexceptions.DatabaseError):
self.env.execute("insert into upsert_test(id1, id2) values (1, 1)")
self.env.execute("insert into upsert_test(id1, id2) values (2, 1) on duplicate key update id2 = 2")
# TODO(sougou): add test for values clause after bug fix on handling
# values.
self.env.execute("delete from upsert_test")
with self.assertRaises(dbexceptions.DatabaseError):
self.env.execute("insert into upsert_test(id1, id2) values (1, 1) on duplicate key update id2 = last_insert_id(id1)")
finally:
self.env.conn.rollback()

Просмотреть файл

@ -30,7 +30,7 @@
},
{
"name": "vtocc",
"table_names_or_prefixes": ["vtocc_a", "vtocc_b", "vtocc_c", "dual", "vtocc_d", "vtocc_temp", "vtocc_e", "vtocc_f", "vtocc_strings", "vtocc_fracts", "vtocc_ints", "vtocc_misc", "vtocc_big", "vtocc_view"],
"table_names_or_prefixes": ["vtocc_a", "vtocc_b", "vtocc_c", "dual", "vtocc_d", "vtocc_temp", "vtocc_e", "vtocc_f", "upsert_test", "vtocc_strings", "vtocc_fracts", "vtocc_ints", "vtocc_misc", "vtocc_big", "vtocc_view"],
"readers": ["youtube-dev-dedicated"],
"writers": ["youtube-dev-dedicated"],
"admins": ["youtube-dev-dedicated"]

Просмотреть файл

@ -12,12 +12,15 @@ create table vtocc_c(eid bigint, name varchar(128), foo varbinary(128), primary
create table vtocc_d(eid bigint, id int) comment 'vtocc_nocache'
create table vtocc_e(eid bigint auto_increment, id int default 1, name varchar(128) default 'name', foo varchar(128), primary key(eid, id, name)) comment 'vtocc_nocache'
create table vtocc_f(vb varbinary(16) default 'ab', id int, primary key(vb)) comment 'vtocc_nocache'
create table upsert_test(id1 int, id2 int, primary key (id1)) comment 'vtocc_nocache'
create unique index id2_idx on upsert_test(id2)
begin
delete from vtocc_a
delete from vtocc_c
insert into vtocc_a(eid, id, name, foo) values(1, 1, 'abcd', 'efgh'), (1, 2, 'bcde', 'fghi')
insert into vtocc_b(eid, id) values(1, 1), (1, 2)
insert into vtocc_c(eid, name, foo) values(10, 'abcd', '20'), (11, 'bcde', '30')
delete from upsert_test
commit
create table vtocc_cached1(eid bigint, name varchar(128), foo varbinary(128), primary key(eid))
@ -77,6 +80,7 @@ drop table if exists vtocc_c
drop table if exists vtocc_d
drop table if exists vtocc_e
drop table if exists vtocc_f
drop table if exists upsert_test
drop table if exists vtocc_cached1
drop table if exists vtocc_cached2
drop table if exists vtocc_renamed