add option to disable lookup read lock (#11538)

DML to lookup VIndexes unconditionally takes a row lock on rows in the
lookup VIndex backing table. Add an option to optionally elide this lock
for cases where we know via business logic that the row will not be
deleted, nor the lookup column changed.

Signed-off-by: Max Englander <max@planetscale.com>

Signed-off-by: Max Englander <max@planetscale.com>
This commit is contained in:
Max Englander 2022-11-01 20:44:30 -04:00 коммит произвёл Arthur Schreiber
Родитель f20628fee1
Коммит 51f102532f
8 изменённых файлов: 676 добавлений и 88 удалений

Просмотреть файл

@ -42,6 +42,9 @@ var (
field BIGINT NOT NULL,
field2 BIGINT,
field3 BIGINT,
field4 BIGINT,
field5 BIGINT,
field6 BIGINT,
PRIMARY KEY (id)
) ENGINE=Innodb;
@ -63,6 +66,24 @@ CREATE TABLE lookup3 (
UNIQUE KEY (field3)
) ENGINE=Innodb;
CREATE TABLE lookup4 (
field4 BIGINT NOT NULL,
keyspace_id binary(8),
UNIQUE KEY (field4)
) ENGINE=Innodb;
CREATE TABLE lookup5 (
field5 BIGINT NOT NULL,
keyspace_id binary(8),
UNIQUE KEY (field5)
) ENGINE=Innodb;
CREATE TABLE lookup6 (
field6 BIGINT NOT NULL,
keyspace_id binary(8),
UNIQUE KEY (field6)
) ENGINE=Innodb;
CREATE TABLE thex (
id VARBINARY(64) NOT NULL,
field BIGINT NOT NULL,
@ -118,6 +139,36 @@ CREATE TABLE thex (
"to": "keyspace_id"
},
"owner": "t1"
},
"lookup4": {
"type": "lookup",
"params": {
"from": "field4",
"read_lock": "exclusive",
"table": "lookup4",
"to": "keyspace_id"
},
"owner": "t1"
},
"lookup5": {
"type": "lookup",
"params": {
"from": "field5",
"read_lock": "shared",
"table": "lookup5",
"to": "keyspace_id"
},
"owner": "t1"
},
"lookup6": {
"type": "lookup",
"params": {
"from": "field6",
"read_lock": "none",
"table": "lookup6",
"to": "keyspace_id"
},
"owner": "t1"
}
},
"tables": {
@ -138,6 +189,18 @@ CREATE TABLE thex (
{
"column": "field3",
"name": "lookup3"
},
{
"column": "field4",
"name": "lookup4"
},
{
"column": "field5",
"name": "lookup5"
},
{
"column": "field6",
"name": "lookup6"
}
]
},
@ -165,6 +228,30 @@ CREATE TABLE thex (
}
]
},
"lookup4": {
"column_vindexes": [
{
"column": "field4",
"name": "binary_md5_vdx"
}
]
},
"lookup5": {
"column_vindexes": [
{
"column": "field5",
"name": "binary_md5_vdx"
}
]
},
"lookup6": {
"column_vindexes": [
{
"column": "field6",
"name": "binary_md5_vdx"
}
]
},
"thex": {
"column_vindexes": [
{
@ -245,51 +332,51 @@ func TestVindexBindVarOverlap(t *testing.T) {
require.Nil(t, err)
defer conn.Close()
utils.Exec(t, conn, "INSERT INTO t1 (id, field, field2, field3) VALUES "+
"(0,1,2,3), "+
"(1,2,3,4), "+
"(2,3,4,5), "+
"(3,4,5,6), "+
"(4,5,6,7), "+
"(5,6,7,8), "+
"(6,7,8,9), "+
"(7,8,9,10), "+
"(8,9,10,11), "+
"(9,10,11,12), "+
"(10,11,12,13), "+
"(11,12,13,14), "+
"(12,13,14,15), "+
"(13,14,15,16), "+
"(14,15,16,17), "+
"(15,16,17,18), "+
"(16,17,18,19), "+
"(17,18,19,20), "+
"(18,19,20,21), "+
"(19,20,21,22), "+
"(20,21,22,23)")
result := utils.Exec(t, conn, "select id, field, field2, field3 from t1 order by id")
utils.Exec(t, conn, "INSERT INTO t1 (id, field, field2, field3, field4, field5, field6) VALUES "+
"(0,1,2,3,4,5,6), "+
"(1,2,3,4,5,6,7), "+
"(2,3,4,5,6,7,8), "+
"(3,4,5,6,7,8,9), "+
"(4,5,6,7,8,9,10), "+
"(5,6,7,8,9,10,11), "+
"(6,7,8,9,10,11,12), "+
"(7,8,9,10,11,12,13), "+
"(8,9,10,11,12,13,14), "+
"(9,10,11,12,13,14,15), "+
"(10,11,12,13,14,15,16), "+
"(11,12,13,14,15,16,17), "+
"(12,13,14,15,16,17,18), "+
"(13,14,15,16,17,18,19), "+
"(14,15,16,17,18,19,20), "+
"(15,16,17,18,19,20,21), "+
"(16,17,18,19,20,21,22), "+
"(17,18,19,20,21,22,23), "+
"(18,19,20,21,22,23,24), "+
"(19,20,21,22,23,24,25), "+
"(20,21,22,23,24,25,26)")
result := utils.Exec(t, conn, "select id, field, field2, field3, field4, field5, field6 from t1 order by id")
expected :=
"[[INT64(0) INT64(1) INT64(2) INT64(3)] " +
"[INT64(1) INT64(2) INT64(3) INT64(4)] " +
"[INT64(2) INT64(3) INT64(4) INT64(5)] " +
"[INT64(3) INT64(4) INT64(5) INT64(6)] " +
"[INT64(4) INT64(5) INT64(6) INT64(7)] " +
"[INT64(5) INT64(6) INT64(7) INT64(8)] " +
"[INT64(6) INT64(7) INT64(8) INT64(9)] " +
"[INT64(7) INT64(8) INT64(9) INT64(10)] " +
"[INT64(8) INT64(9) INT64(10) INT64(11)] " +
"[INT64(9) INT64(10) INT64(11) INT64(12)] " +
"[INT64(10) INT64(11) INT64(12) INT64(13)] " +
"[INT64(11) INT64(12) INT64(13) INT64(14)] " +
"[INT64(12) INT64(13) INT64(14) INT64(15)] " +
"[INT64(13) INT64(14) INT64(15) INT64(16)] " +
"[INT64(14) INT64(15) INT64(16) INT64(17)] " +
"[INT64(15) INT64(16) INT64(17) INT64(18)] " +
"[INT64(16) INT64(17) INT64(18) INT64(19)] " +
"[INT64(17) INT64(18) INT64(19) INT64(20)] " +
"[INT64(18) INT64(19) INT64(20) INT64(21)] " +
"[INT64(19) INT64(20) INT64(21) INT64(22)] " +
"[INT64(20) INT64(21) INT64(22) INT64(23)]]"
"[[INT64(0) INT64(1) INT64(2) INT64(3) INT64(4) INT64(5) INT64(6)] " +
"[INT64(1) INT64(2) INT64(3) INT64(4) INT64(5) INT64(6) INT64(7)] " +
"[INT64(2) INT64(3) INT64(4) INT64(5) INT64(6) INT64(7) INT64(8)] " +
"[INT64(3) INT64(4) INT64(5) INT64(6) INT64(7) INT64(8) INT64(9)] " +
"[INT64(4) INT64(5) INT64(6) INT64(7) INT64(8) INT64(9) INT64(10)] " +
"[INT64(5) INT64(6) INT64(7) INT64(8) INT64(9) INT64(10) INT64(11)] " +
"[INT64(6) INT64(7) INT64(8) INT64(9) INT64(10) INT64(11) INT64(12)] " +
"[INT64(7) INT64(8) INT64(9) INT64(10) INT64(11) INT64(12) INT64(13)] " +
"[INT64(8) INT64(9) INT64(10) INT64(11) INT64(12) INT64(13) INT64(14)] " +
"[INT64(9) INT64(10) INT64(11) INT64(12) INT64(13) INT64(14) INT64(15)] " +
"[INT64(10) INT64(11) INT64(12) INT64(13) INT64(14) INT64(15) INT64(16)] " +
"[INT64(11) INT64(12) INT64(13) INT64(14) INT64(15) INT64(16) INT64(17)] " +
"[INT64(12) INT64(13) INT64(14) INT64(15) INT64(16) INT64(17) INT64(18)] " +
"[INT64(13) INT64(14) INT64(15) INT64(16) INT64(17) INT64(18) INT64(19)] " +
"[INT64(14) INT64(15) INT64(16) INT64(17) INT64(18) INT64(19) INT64(20)] " +
"[INT64(15) INT64(16) INT64(17) INT64(18) INT64(19) INT64(20) INT64(21)] " +
"[INT64(16) INT64(17) INT64(18) INT64(19) INT64(20) INT64(21) INT64(22)] " +
"[INT64(17) INT64(18) INT64(19) INT64(20) INT64(21) INT64(22) INT64(23)] " +
"[INT64(18) INT64(19) INT64(20) INT64(21) INT64(22) INT64(23) INT64(24)] " +
"[INT64(19) INT64(20) INT64(21) INT64(22) INT64(23) INT64(24) INT64(25)] " +
"[INT64(20) INT64(21) INT64(22) INT64(23) INT64(24) INT64(25) INT64(26)]]"
assert.Equal(t, expected, fmt.Sprintf("%v", result.Rows))
}

18
go/vt/vtexplain/testdata/test-schema.sql поставляемый
Просмотреть файл

@ -106,6 +106,24 @@ CREATE TABLE orders_id_lookup (
primary key(id)
);
CREATE TABLE orders_id_lookup_exclusive_read_lock (
id int NOT NULL,
keyspace_id varbinary(128),
primary key(id)
);
CREATE TABLE orders_id_lookup_shared_read_lock (
id int NOT NULL,
keyspace_id varbinary(128),
primary key(id)
);
CREATE TABLE orders_id_lookup_no_read_lock (
id int NOT NULL,
keyspace_id varbinary(128),
primary key(id)
);
CREATE TABLE orders_id_lookup_no_verify (
id int NOT NULL,
keyspace_id varbinary(128),

38
go/vt/vtexplain/testdata/test-vschema.json поставляемый
Просмотреть файл

@ -18,6 +18,36 @@
},
"owner": "orders"
},
"orders_id_vdx_exclusive_read_lock": {
"type": "lookup_unique",
"params": {
"table": "orders_id_lookup_exclusive_read_lock",
"from": "id",
"to": "keyspace_id",
"read_lock": "exclusive"
},
"owner": "orders"
},
"orders_id_vdx_shared_read_lock": {
"type": "lookup_unique",
"params": {
"table": "orders_id_lookup_shared_read_lock",
"from": "id",
"to": "keyspace_id",
"read_lock": "shared"
},
"owner": "orders"
},
"orders_id_vdx_no_read_lock": {
"type": "lookup_unique",
"params": {
"table": "orders_id_lookup_no_read_lock",
"from": "id",
"to": "keyspace_id",
"read_lock": "none"
},
"owner": "orders"
},
"orders_id_vdx_no_verify": {
"type": "lookup_unique",
"params": {
@ -174,6 +204,14 @@
}
]
},
"orders_id_lookup_no_read_lock": {
"column_vindexes": [
{
"column": "id",
"name": "hash"
}
]
},
"orders_id_lookup_no_verify": {
"column_vindexes": [
{

Просмотреть файл

@ -166,8 +166,11 @@ func TestUpdateFromSubQuery(t *testing.T) {
func TestUpdateEqualWithNoVerifyAndWriteOnlyLookupUniqueVindexes(t *testing.T) {
res := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|wo_lu_col|nv_lu_col|lu_col|t2_lu_vdx", "int64|int64|int64|int64|int64"),
"1|2|2|1|0",
sqltypes.MakeTestFields(
"id|wo_lu_col|erl_lu_col|srl_lu_col|nrl_lu_col|nv_lu_col|lu_col|t2_lu_vdx",
"int64|int64|int64|int64|int64|int64|int64|int64",
),
"1|2|2|2|2|2|1|0",
)}
executor, sbc1, sbc2, sbcLookup := createCustomExecutorSetValues(executorVSchema, res)
@ -175,7 +178,7 @@ func TestUpdateEqualWithNoVerifyAndWriteOnlyLookupUniqueVindexes(t *testing.T) {
require.NoError(t, err)
wantQueries := []*querypb.BoundQuery{
{
Sql: "select id, wo_lu_col, nv_lu_col, lu_col, lu_col = 5 from t2_lookup where wo_lu_col = 2 for update",
Sql: "select id, wo_lu_col, erl_lu_col, srl_lu_col, nrl_lu_col, nv_lu_col, lu_col, lu_col = 5 from t2_lookup where wo_lu_col = 2 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "update t2_lookup set lu_col = 5 where wo_lu_col = 2",
@ -199,7 +202,264 @@ func TestUpdateEqualWithNoVerifyAndWriteOnlyLookupUniqueVindexes(t *testing.T) {
"lu_col_0": sqltypes.Int64BindVariable(5),
},
}
lookWant := []*querypb.BoundQuery{bq1, bq2, bq1, bq2, bq1, bq2, bq1, bq2, bq1, bq2, bq1, bq2, bq1, bq2, bq1, bq2}
lookWant := []*querypb.BoundQuery{
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
}
assertQueries(t, sbcLookup, lookWant)
}
func TestUpdateInTransactionLookupDefaultReadLock(t *testing.T) {
res := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|wo_lu_col|erl_lu_col|srl_lu_col|nrl_lu_col|nv_lu_col|lu_col|t2_lu_vdx",
"int64|int64|int64|int64|int64|int64|int64|int64",
),
"1|2|2|2|2|2|1|0",
)}
executor, sbc1, sbc2, sbcLookup := createCustomExecutorSetValues(executorVSchema, res)
safeSession := NewSafeSession(&vtgatepb.Session{InTransaction: true})
_, err := executorExecSession(
executor,
"update t2_lookup set lu_col = 5 where nv_lu_col = 2",
nil,
safeSession.Session,
)
require.NoError(t, err)
wantQueries := []*querypb.BoundQuery{
{
Sql: "select id, wo_lu_col, erl_lu_col, srl_lu_col, nrl_lu_col, nv_lu_col, lu_col, lu_col = 5 from t2_lookup where nv_lu_col = 2 and lu_col = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "update t2_lookup set lu_col = 5 where nv_lu_col = 2",
BindVariables: map[string]*querypb.BindVariable{},
},
}
assertQueries(t, sbc1, wantQueries)
assertQueries(t, sbc2, wantQueries)
vars, _ := sqltypes.BuildBindVariable([]any{
sqltypes.NewInt64(2),
})
bq1 := &querypb.BoundQuery{
Sql: "select nv_lu_col, keyspace_id from nv_lu_idx where nv_lu_col in ::nv_lu_col for update",
BindVariables: map[string]*querypb.BindVariable{
"nv_lu_col": vars,
},
}
bq2 := &querypb.BoundQuery{
Sql: "insert into lu_idx(lu_col, keyspace_id) values (:lu_col_0, :keyspace_id_0)",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id_0": sqltypes.Uint64BindVariable(1),
"lu_col_0": sqltypes.Int64BindVariable(5),
},
}
lookWant := []*querypb.BoundQuery{
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
}
assertQueries(t, sbcLookup, lookWant)
}
func TestUpdateInTransactionLookupExclusiveReadLock(t *testing.T) {
res := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|wo_lu_col|erl_lu_col|srl_lu_col|nrl_lu_col|nv_lu_col|lu_col|t2_lu_vdx",
"int64|int64|int64|int64|int64|int64|int64|int64",
),
"1|2|2|2|2|2|1|0",
)}
executor, sbc1, sbc2, sbcLookup := createCustomExecutorSetValues(executorVSchema, res)
safeSession := NewSafeSession(&vtgatepb.Session{InTransaction: true})
_, err := executorExecSession(
executor,
"update t2_lookup set lu_col = 5 where erl_lu_col = 2",
nil,
safeSession.Session,
)
require.NoError(t, err)
wantQueries := []*querypb.BoundQuery{
{
Sql: "select id, wo_lu_col, erl_lu_col, srl_lu_col, nrl_lu_col, nv_lu_col, lu_col, lu_col = 5 from t2_lookup where nv_lu_col = 2 and lu_col = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "update t2_lookup set lu_col = 5 where erl_lu_col = 2",
BindVariables: map[string]*querypb.BindVariable{},
},
}
assertQueries(t, sbc1, wantQueries)
assertQueries(t, sbc2, wantQueries)
vars, _ := sqltypes.BuildBindVariable([]any{
sqltypes.NewInt64(2),
})
bq1 := &querypb.BoundQuery{
Sql: "select erl_lu_col, keyspace_id from erl_lu_idx where erl_lu_col in ::erl_lu_col for update",
BindVariables: map[string]*querypb.BindVariable{
"erl_lu_col": vars,
},
}
bq2 := &querypb.BoundQuery{
Sql: "insert into lu_idx(lu_col, keyspace_id) values (:lu_col_0, :keyspace_id_0)",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id_0": sqltypes.Uint64BindVariable(1),
"lu_col_0": sqltypes.Int64BindVariable(5),
},
}
lookWant := []*querypb.BoundQuery{
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
}
assertQueries(t, sbcLookup, lookWant)
}
func TestUpdateInTransactionLookupSharedReadLock(t *testing.T) {
res := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|wo_lu_col|erl_lu_col|srl_lu_col|nrl_lu_col|nv_lu_col|lu_col|t2_lu_vdx",
"int64|int64|int64|int64|int64|int64|int64|int64",
),
"1|2|2|2|2|2|1|0",
)}
executor, sbc1, sbc2, sbcLookup := createCustomExecutorSetValues(executorVSchema, res)
safeSession := NewSafeSession(&vtgatepb.Session{InTransaction: true})
_, err := executorExecSession(
executor,
"update t2_lookup set lu_col = 5 where srl_lu_col = 2",
nil,
safeSession.Session,
)
require.NoError(t, err)
wantQueries := []*querypb.BoundQuery{
{
Sql: "select id, wo_lu_col, erl_lu_col, srl_lu_col, nrl_lu_col, nv_lu_col, lu_col, lu_col = 5 from t2_lookup where nv_lu_col = 2 and lu_col = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "update t2_lookup set lu_col = 5 where srl_lu_col = 2",
BindVariables: map[string]*querypb.BindVariable{},
},
}
assertQueries(t, sbc1, wantQueries)
assertQueries(t, sbc2, wantQueries)
vars, _ := sqltypes.BuildBindVariable([]any{
sqltypes.NewInt64(2),
})
bq1 := &querypb.BoundQuery{
Sql: "select srl_lu_col, keyspace_id from srl_lu_idx where srl_lu_col in ::srl_lu_col lock in share mode",
BindVariables: map[string]*querypb.BindVariable{
"srl_lu_col": vars,
},
}
bq2 := &querypb.BoundQuery{
Sql: "insert into lu_idx(lu_col, keyspace_id) values (:lu_col_0, :keyspace_id_0)",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id_0": sqltypes.Uint64BindVariable(1),
"lu_col_0": sqltypes.Int64BindVariable(5),
},
}
lookWant := []*querypb.BoundQuery{
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
}
assertQueries(t, sbcLookup, lookWant)
}
func TestUpdateInTransactionLookupNoReadLock(t *testing.T) {
res := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|wo_lu_col|erl_lu_col|srl_lu_col|nrl_lu_col|nv_lu_col|lu_col|t2_lu_vdx",
"int64|int64|int64|int64|int64|int64|int64|int64",
),
"1|2|2|2|2|2|1|0",
)}
executor, sbc1, sbc2, sbcLookup := createCustomExecutorSetValues(executorVSchema, res)
safeSession := NewSafeSession(&vtgatepb.Session{InTransaction: true})
_, err := executorExecSession(
executor,
"update t2_lookup set lu_col = 5 where nrl_lu_col = 2",
nil,
safeSession.Session,
)
require.NoError(t, err)
wantQueries := []*querypb.BoundQuery{
{
Sql: "select id, wo_lu_col, erl_lu_col, srl_lu_col, nrl_lu_col, nv_lu_col, lu_col, lu_col = 5 from t2_lookup where nrl_lu_col = 2 and lu_col = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "update t2_lookup set lu_col = 5 where nrl_lu_col = 2",
BindVariables: map[string]*querypb.BindVariable{},
},
}
assertQueries(t, sbc1, wantQueries)
assertQueries(t, sbc2, wantQueries)
vars, _ := sqltypes.BuildBindVariable([]any{
sqltypes.NewInt64(2),
})
bq1 := &querypb.BoundQuery{
Sql: "select nrl_lu_col, keyspace_id from nrl_lu_idx where nrl_lu_col in ::nrl_lu_col",
BindVariables: map[string]*querypb.BindVariable{
"nrl_lu_col": vars,
},
}
bq2 := &querypb.BoundQuery{
Sql: "insert into lu_idx(lu_col, keyspace_id) values (:lu_col_0, :keyspace_id_0)",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id_0": sqltypes.Uint64BindVariable(1),
"lu_col_0": sqltypes.Int64BindVariable(5),
},
}
lookWant := []*querypb.BoundQuery{
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
bq1, bq2,
}
assertQueries(t, sbcLookup, lookWant)
}
@ -513,15 +773,18 @@ func TestUpdateEqualWithMultipleLookupVindex(t *testing.T) {
)})
sbc1.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|wo_lu_col|nv_lu_col|lu_col|t2_lu_vdx", "int64|int64|int64|int64|int64"),
"1|2|2|1|0",
sqltypes.MakeTestFields(
"id|wo_lu_col|erl_lu_col|srl_lu_col|nrl_lu_col|nv_lu_col|lu_col|t2_lu_vdx",
"int64|int64|int64|int64|int64|int64|int64|int64",
),
"1|2|2|2|2|2|1|0",
)})
_, err := executorExec(executor, "update t2_lookup set lu_col = 5 where wo_lu_col = 2 and lu_col = 1", nil)
require.NoError(t, err)
wantQueries := []*querypb.BoundQuery{
{
Sql: "select id, wo_lu_col, nv_lu_col, lu_col, lu_col = 5 from t2_lookup where wo_lu_col = 2 and lu_col = 1 for update",
Sql: "select id, wo_lu_col, erl_lu_col, srl_lu_col, nrl_lu_col, nv_lu_col, lu_col, lu_col = 5 from t2_lookup where wo_lu_col = 2 and lu_col = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "update t2_lookup set lu_col = 5 where wo_lu_col = 2 and lu_col = 1",
@ -564,16 +827,19 @@ func TestUpdateUseHigherCostVindexIfBackfilling(t *testing.T) {
)})
sbc1.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|wo_lu_col|nv_lu_col|lu_col|t2_lu_vdx", "int64|int64|int64|int64|int64"),
"1|2|2|1|0",
"1|2|2|2|0",
sqltypes.MakeTestFields(
"id|wo_lu_col|erl_lu_col|srl_lu_col|nrl_lu_col|nv_lu_col|lu_col|t2_lu_vdx",
"int64|int64|int64|int64|int64|int64|int64|int64",
),
"1|2|2|2|2|2|1|0",
"1|2|2|2|2|2|2|0",
)})
_, err := executorExec(executor, "update t2_lookup set lu_col = 5 where wo_lu_col = 2 and lu_col in (1, 2)", nil)
require.NoError(t, err)
wantQueries := []*querypb.BoundQuery{
{
Sql: "select id, wo_lu_col, nv_lu_col, lu_col, lu_col = 5 from t2_lookup where wo_lu_col = 2 and lu_col in (1, 2) for update",
Sql: "select id, wo_lu_col, erl_lu_col, srl_lu_col, nrl_lu_col, nv_lu_col, lu_col, lu_col = 5 from t2_lookup where wo_lu_col = 2 and lu_col in (1, 2) for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "update t2_lookup set lu_col = 5 where wo_lu_col = 2 and lu_col in (1, 2)",
@ -621,8 +887,11 @@ func TestUpdateUseHigherCostVindexIfBackfilling(t *testing.T) {
func TestDeleteEqualWithNoVerifyAndWriteOnlyLookupUniqueVindex(t *testing.T) {
res := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|wo_lu_col|nv_lu_col|lu_col", "int64|int64|int64|int64"),
"1|1|1|1",
sqltypes.MakeTestFields(
"id|wo_lu_col|erl_lu_col|srl_lu_col|nrl_lu_col|nv_lu_col|lu_col",
"int64|int64|int64|int64|int64|int64|int64",
),
"1|1|1|1|1|1|1",
)}
executor, sbc1, sbc2, sbcLookup := createCustomExecutorSetValues(executorVSchema, res)
@ -630,7 +899,7 @@ func TestDeleteEqualWithNoVerifyAndWriteOnlyLookupUniqueVindex(t *testing.T) {
require.NoError(t, err)
wantQueries := []*querypb.BoundQuery{
{
Sql: "select id, wo_lu_col, nv_lu_col, lu_col from t2_lookup where wo_lu_col = 1 for update",
Sql: "select id, wo_lu_col, erl_lu_col, srl_lu_col, nrl_lu_col, nv_lu_col, lu_col from t2_lookup where wo_lu_col = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "delete from t2_lookup where wo_lu_col = 1",
@ -645,20 +914,50 @@ func TestDeleteEqualWithNoVerifyAndWriteOnlyLookupUniqueVindex(t *testing.T) {
},
}
bq2 := &querypb.BoundQuery{
Sql: "delete from erl_lu_idx where erl_lu_col = :erl_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"erl_lu_col": sqltypes.Int64BindVariable(1),
},
}
bq3 := &querypb.BoundQuery{
Sql: "delete from srl_lu_idx where srl_lu_col = :srl_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"srl_lu_col": sqltypes.Int64BindVariable(1),
},
}
bq4 := &querypb.BoundQuery{
Sql: "delete from nrl_lu_idx where nrl_lu_col = :nrl_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"nrl_lu_col": sqltypes.Int64BindVariable(1),
},
}
bq5 := &querypb.BoundQuery{
Sql: "delete from nv_lu_idx where nv_lu_col = :nv_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"nv_lu_col": sqltypes.Int64BindVariable(1),
},
}
bq3 := &querypb.BoundQuery{
bq6 := &querypb.BoundQuery{
Sql: "delete from lu_idx where lu_col = :lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": sqltypes.Uint64BindVariable(1),
"lu_col": sqltypes.Int64BindVariable(1),
},
}
lookWant := []*querypb.BoundQuery{bq1, bq2, bq3, bq1, bq2, bq3, bq1, bq2, bq3, bq1, bq2, bq3, bq1, bq2, bq3, bq1, bq2, bq3, bq1, bq2, bq3, bq1, bq2, bq3}
lookWant := []*querypb.BoundQuery{
bq1, bq2, bq3, bq4, bq5, bq6,
bq1, bq2, bq3, bq4, bq5, bq6,
bq1, bq2, bq3, bq4, bq5, bq6,
bq1, bq2, bq3, bq4, bq5, bq6,
bq1, bq2, bq3, bq4, bq5, bq6,
bq1, bq2, bq3, bq4, bq5, bq6,
bq1, bq2, bq3, bq4, bq5, bq6,
bq1, bq2, bq3, bq4, bq5, bq6,
}
assertQueries(t, sbcLookup, lookWant)
assertQueries(t, sbc1, wantQueries)
assertQueries(t, sbc2, wantQueries)
@ -673,15 +972,18 @@ func TestDeleteEqualWithMultipleLookupVindex(t *testing.T) {
)})
sbc1.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|wo_lu_col|nv_lu_col|lu_col", "int64|int64|int64|int64"),
"1|1|1|1",
sqltypes.MakeTestFields(
"id|wo_lu_col|erl_lu_col|srl_lu_col|nrl_lu_col|nv_lu_col|lu_col",
"int64|int64|int64|int64|int64|int64|int64",
),
"1|1|1|1|1|1|1",
)})
_, err := executorExec(executor, "delete from t2_lookup where wo_lu_col = 1 and lu_col = 1", nil)
require.NoError(t, err)
wantQueries := []*querypb.BoundQuery{
{
Sql: "select id, wo_lu_col, nv_lu_col, lu_col from t2_lookup where wo_lu_col = 1 and lu_col = 1 for update",
Sql: "select id, wo_lu_col, erl_lu_col, srl_lu_col, nrl_lu_col, nv_lu_col, lu_col from t2_lookup where wo_lu_col = 1 and lu_col = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "delete from t2_lookup where wo_lu_col = 1 and lu_col = 1",
@ -702,6 +1004,24 @@ func TestDeleteEqualWithMultipleLookupVindex(t *testing.T) {
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"wo_lu_col": sqltypes.Int64BindVariable(1),
},
}, {
Sql: "delete from erl_lu_idx where erl_lu_col = :erl_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"erl_lu_col": sqltypes.Int64BindVariable(1),
},
}, {
Sql: "delete from srl_lu_idx where srl_lu_col = :srl_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"srl_lu_col": sqltypes.Int64BindVariable(1),
},
}, {
Sql: "delete from nrl_lu_idx where nrl_lu_col = :nrl_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"nrl_lu_col": sqltypes.Int64BindVariable(1),
},
}, {
Sql: "delete from nv_lu_idx where nv_lu_col = :nv_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
@ -731,16 +1051,19 @@ func TestDeleteUseHigherCostVindexIfBackfilling(t *testing.T) {
)})
sbc1.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|wo_lu_col|nv_lu_col|lu_col", "int64|int64|int64|int64"),
"1|1|1|1",
"1|1|1|2",
sqltypes.MakeTestFields(
"id|wo_lu_col|erl_lu_col|srl_lu_col|nrl_lu_col|nv_lu_col|lu_col",
"int64|int64|int64|int64|int64|int64|int64",
),
"1|1|1|1|1|1|1",
"1|1|1|1|1|1|2",
)})
_, err := executorExec(executor, "delete from t2_lookup where wo_lu_col = 1 and lu_col in (1, 2)", nil)
require.NoError(t, err)
wantQueries := []*querypb.BoundQuery{
{
Sql: "select id, wo_lu_col, nv_lu_col, lu_col from t2_lookup where wo_lu_col = 1 and lu_col in (1, 2) for update",
Sql: "select id, wo_lu_col, erl_lu_col, srl_lu_col, nrl_lu_col, nv_lu_col, lu_col from t2_lookup where wo_lu_col = 1 and lu_col in (1, 2) for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "delete from t2_lookup where wo_lu_col = 1 and lu_col in (1, 2)",
@ -762,6 +1085,24 @@ func TestDeleteUseHigherCostVindexIfBackfilling(t *testing.T) {
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"wo_lu_col": sqltypes.Int64BindVariable(1),
},
}, {
Sql: "delete from erl_lu_idx where erl_lu_col = :erl_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"erl_lu_col": sqltypes.Int64BindVariable(1),
},
}, {
Sql: "delete from srl_lu_idx where srl_lu_col = :srl_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"srl_lu_col": sqltypes.Int64BindVariable(1),
},
}, {
Sql: "delete from nrl_lu_idx where nrl_lu_col = :nrl_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"nrl_lu_col": sqltypes.Int64BindVariable(1),
},
}, {
Sql: "delete from nv_lu_idx where nv_lu_col = :nv_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
@ -780,6 +1121,24 @@ func TestDeleteUseHigherCostVindexIfBackfilling(t *testing.T) {
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"wo_lu_col": sqltypes.Int64BindVariable(1),
},
}, {
Sql: "delete from erl_lu_idx where erl_lu_col = :erl_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"erl_lu_col": sqltypes.Int64BindVariable(1),
},
}, {
Sql: "delete from srl_lu_idx where srl_lu_col = :srl_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"srl_lu_col": sqltypes.Int64BindVariable(1),
},
}, {
Sql: "delete from nrl_lu_idx where nrl_lu_col = :nrl_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"nrl_lu_col": sqltypes.Int64BindVariable(1),
},
}, {
Sql: "delete from nv_lu_idx where nv_lu_col = :nv_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{

Просмотреть файл

@ -122,6 +122,36 @@ var executorVSchema = `
"write_only": "true"
},
"owner": "t2_lookup"
},
"t2_erl_lu_vdx": {
"type": "lookup_unique",
"params": {
"table": "TestUnsharded.erl_lu_idx",
"from": "erl_lu_col",
"to": "keyspace_id",
"read_lock": "exclusive"
},
"owner": "t2_lookup"
},
"t2_srl_lu_vdx": {
"type": "lookup_unique",
"params": {
"table": "TestUnsharded.srl_lu_idx",
"from": "srl_lu_col",
"to": "keyspace_id",
"read_lock": "shared"
},
"owner": "t2_lookup"
},
"t2_nrl_lu_vdx": {
"type": "lookup_unique",
"params": {
"table": "TestUnsharded.nrl_lu_idx",
"from": "nrl_lu_col",
"to": "keyspace_id",
"read_lock": "none"
},
"owner": "t2_lookup"
},
"t2_nv_lu_vdx": {
"type": "lookup_unique",
@ -311,8 +341,20 @@ var executorVSchema = `
"name": "hash_index"
},
{
"column": "wo_lu_col",
"name": "t2_wo_lu_vdx"
"column": "wo_lu_col",
"name": "t2_wo_lu_vdx"
},
{
"column": "erl_lu_col",
"name": "t2_erl_lu_vdx"
},
{
"column": "srl_lu_col",
"name": "t2_srl_lu_vdx"
},
{
"column": "nrl_lu_col",
"name": "t2_nrl_lu_vdx"
},
{
"column": "nv_lu_col",
@ -364,6 +406,9 @@ var unshardedVSchema = `
}
},
"wo_lu_idx": {},
"erl_lu_idx": {},
"srl_lu_idx": {},
"nrl_lu_idx": {},
"nv_lu_idx": {},
"lu_idx": {},
"simple": {}

Просмотреть файл

@ -865,8 +865,11 @@ func TestExecutorShow(t *testing.T) {
buildVarCharRow("TestExecutor", "name_user_map", "lookup_hash", "from=name; table=name_user_map; to=user_id", "user"),
buildVarCharRow("TestExecutor", "regional_vdx", "region_experimental", "region_bytes=1", ""),
buildVarCharRow("TestExecutor", "t1_lkp_vdx", "consistent_lookup_unique", "from=unq_col; table=t1_lkp_idx; to=keyspace_id", "t1"),
buildVarCharRow("TestExecutor", "t2_erl_lu_vdx", "lookup_unique", "from=erl_lu_col; read_lock=exclusive; table=TestUnsharded.erl_lu_idx; to=keyspace_id", "t2_lookup"),
buildVarCharRow("TestExecutor", "t2_lu_vdx", "lookup_hash_unique", "from=lu_col; table=TestUnsharded.lu_idx; to=keyspace_id", "t2_lookup"),
buildVarCharRow("TestExecutor", "t2_nrl_lu_vdx", "lookup_unique", "from=nrl_lu_col; read_lock=none; table=TestUnsharded.nrl_lu_idx; to=keyspace_id", "t2_lookup"),
buildVarCharRow("TestExecutor", "t2_nv_lu_vdx", "lookup_unique", "from=nv_lu_col; no_verify=true; table=TestUnsharded.nv_lu_idx; to=keyspace_id", "t2_lookup"),
buildVarCharRow("TestExecutor", "t2_srl_lu_vdx", "lookup_unique", "from=srl_lu_col; read_lock=shared; table=TestUnsharded.srl_lu_idx; to=keyspace_id", "t2_lookup"),
buildVarCharRow("TestExecutor", "t2_wo_lu_vdx", "lookup_unique", "from=wo_lu_col; table=TestUnsharded.wo_lu_idx; to=keyspace_id; write_only=true", "t2_lookup"),
buildVarCharRow("TestMultiCol", "multicol_vdx", "multicol", "column_bytes=1,3,4; column_count=3; column_vindex=hash,binary,unicode_loose_xxhash", ""),
},
@ -998,14 +1001,17 @@ func TestExecutorShow(t *testing.T) {
Fields: buildVarCharFields("Tables"),
Rows: [][]sqltypes.Value{
buildVarCharRow("dual"),
buildVarCharRow("erl_lu_idx"),
buildVarCharRow("ins_lookup"),
buildVarCharRow("lu_idx"),
buildVarCharRow("main1"),
buildVarCharRow("music_user_map"),
buildVarCharRow("name_lastname_keyspace_id_map"),
buildVarCharRow("name_user_map"),
buildVarCharRow("nrl_lu_idx"),
buildVarCharRow("nv_lu_idx"),
buildVarCharRow("simple"),
buildVarCharRow("srl_lu_idx"),
buildVarCharRow("user_msgs"),
buildVarCharRow("user_seq"),
buildVarCharRow("wo_lu_idx"),

Просмотреть файл

@ -180,7 +180,7 @@ func (cached *LookupHash) CachedSize(alloc bool) int64 {
}
size := int64(0)
if alloc {
size += int64(144)
size += int64(176)
}
// field name string
size += hack.RuntimeAllocSize(int64(len(cached.name)))
@ -194,7 +194,7 @@ func (cached *LookupHashUnique) CachedSize(alloc bool) int64 {
}
size := int64(0)
if alloc {
size += int64(144)
size += int64(176)
}
// field name string
size += hack.RuntimeAllocSize(int64(len(cached.name)))
@ -208,7 +208,7 @@ func (cached *LookupNonUnique) CachedSize(alloc bool) int64 {
}
size := int64(0)
if alloc {
size += int64(144)
size += int64(176)
}
// field name string
size += hack.RuntimeAllocSize(int64(len(cached.name)))
@ -222,7 +222,7 @@ func (cached *LookupUnicodeLooseMD5Hash) CachedSize(alloc bool) int64 {
}
size := int64(0)
if alloc {
size += int64(144)
size += int64(176)
}
// field name string
size += hack.RuntimeAllocSize(int64(len(cached.name)))
@ -236,7 +236,7 @@ func (cached *LookupUnicodeLooseMD5HashUnique) CachedSize(alloc bool) int64 {
}
size := int64(0)
if alloc {
size += int64(144)
size += int64(176)
}
// field name string
size += hack.RuntimeAllocSize(int64(len(cached.name)))
@ -250,7 +250,7 @@ func (cached *LookupUnique) CachedSize(alloc bool) int64 {
}
size := int64(0)
if alloc {
size += int64(144)
size += int64(176)
}
// field name string
size += hack.RuntimeAllocSize(int64(len(cached.name)))
@ -492,7 +492,7 @@ func (cached *clCommon) CachedSize(alloc bool) int64 {
}
size := int64(0)
if alloc {
size += int64(256)
size += int64(288)
}
// field name string
size += hack.RuntimeAllocSize(int64(len(cached.name)))
@ -525,7 +525,7 @@ func (cached *lookupInternal) CachedSize(alloc bool) int64 {
}
size := int64(0)
if alloc {
size += int64(112)
size += int64(144)
}
// field Table string
size += hack.RuntimeAllocSize(int64(len(cached.Table)))
@ -538,8 +538,12 @@ func (cached *lookupInternal) CachedSize(alloc bool) int64 {
}
// field To string
size += hack.RuntimeAllocSize(int64(len(cached.To)))
// field ReadLock string
size += hack.RuntimeAllocSize(int64(len(cached.ReadLock)))
// field sel string
size += hack.RuntimeAllocSize(int64(len(cached.sel)))
// field selTxDml string
size += hack.RuntimeAllocSize(int64(len(cached.selTxDml)))
// field ver string
size += hack.RuntimeAllocSize(int64(len(cached.ver)))
// field del string

Просмотреть файл

@ -33,17 +33,31 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
var (
readLockExclusive = "exclusive"
readLockShared = "shared"
readLockNone = "none"
readLockDefault = readLockExclusive
readLockExprs map[string]string = map[string]string{
readLockExclusive: "for update",
readLockShared: "lock in share mode",
readLockNone: "",
}
)
// lookupInternal implements the functions for the Lookup vindexes.
type lookupInternal struct {
Table string `json:"table"`
FromColumns []string `json:"from_columns"`
To string `json:"to"`
Autocommit bool `json:"autocommit,omitempty"`
MultiShardAutocommit bool `json:"multi_shard_autocommit,omitempty"`
Upsert bool `json:"upsert,omitempty"`
IgnoreNulls bool `json:"ignore_nulls,omitempty"`
BatchLookup bool `json:"batch_lookup,omitempty"`
sel, ver, del string // sel: map query, ver: verify query, del: delete query
Table string `json:"table"`
FromColumns []string `json:"from_columns"`
To string `json:"to"`
Autocommit bool `json:"autocommit,omitempty"`
MultiShardAutocommit bool `json:"multi_shard_autocommit,omitempty"`
Upsert bool `json:"upsert,omitempty"`
IgnoreNulls bool `json:"ignore_nulls,omitempty"`
BatchLookup bool `json:"batch_lookup,omitempty"`
ReadLock string `json:"read_lock,omitempty"`
sel, selTxDml, ver, del string // sel: map query, ver: verify query, del: delete query
}
func (lkp *lookupInternal) Init(lookupQueryParams map[string]string, autocommit, upsert, multiShardAutocommit bool) error {
@ -64,6 +78,12 @@ func (lkp *lookupInternal) Init(lookupQueryParams map[string]string, autocommit,
if err != nil {
return err
}
if readLock, ok := lookupQueryParams["read_lock"]; ok {
if _, valid := readLockExprs[readLock]; !valid {
return fmt.Errorf("invalid read_lock value: %s", readLock)
}
lkp.ReadLock = readLock
}
lkp.Autocommit = autocommit
lkp.Upsert = upsert
@ -76,6 +96,15 @@ func (lkp *lookupInternal) Init(lookupQueryParams map[string]string, autocommit,
// as part of face 2 of https://github.com/vitessio/vitess/issues/3481
// For now multi column behaves as a single column for Map and Verify operations
lkp.sel = fmt.Sprintf("select %s, %s from %s where %s in ::%s", lkp.FromColumns[0], lkp.To, lkp.Table, lkp.FromColumns[0], lkp.FromColumns[0])
if lkp.ReadLock != readLockNone {
lockExpr, ok := readLockExprs[lkp.ReadLock]
if !ok {
lockExpr = readLockExprs[readLockDefault]
}
lkp.selTxDml = fmt.Sprintf("%s %s", lkp.sel, lockExpr)
} else {
lkp.selTxDml = lkp.sel
}
lkp.ver = fmt.Sprintf("select %s from %s where %s = :%s and %s = :%s", lkp.FromColumns[0], lkp.Table, lkp.FromColumns[0], lkp.FromColumns[0], lkp.To, lkp.To)
lkp.del = lkp.initDelStmt()
return nil
@ -90,9 +119,11 @@ func (lkp *lookupInternal) Lookup(ctx context.Context, vcursor VCursor, ids []sq
if lkp.Autocommit {
co = vtgatepb.CommitOrder_AUTOCOMMIT
}
sel := lkp.sel
var sel string
if vcursor.InTransactionAndIsDML() {
sel = sel + " for update"
sel = lkp.selTxDml
} else {
sel = lkp.sel
}
if ids[0].IsIntegral() || lkp.BatchLookup {
// for integral types, batch query all ids and then map them back to the input order