No reserved connection on modifying system settings (#11088)

* feat: added queryserver_enable_settings_pool flag to enable settings pool

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* test: added vttablet e2e test for simple Execute and StreamExecute flow with settings

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* feat: ReserveExecute to use new settings pool path to execute the query with settings without reserving the connection

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* feat: ReserveStreamExecute to use new pool path

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* feat: integrated settings pool with dml and ddl queries

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* test: fix flag test expectation

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* feat: support settings on tx without reserved connection

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* test: added test for beginExecute and beginStreamExecute with settings to not reserve connection

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* feat: support settings on begin execute streaming and non-streaming without reserved connection

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* test: add test for checking reusing of connections with settings

Signed-off-by: Manan Gupta <manan@planetscale.com>

* test: added reset pool conn func for testing

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* refactor: pass connReserved as bool instead of connection id

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* refactor: tabletserver api to reuse methods

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* feat: allow set query to be executed in tablet with settings

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* feat: added lock function on reserve api to not use settings pool and instead use reserve connection. refactored code around it to make this simpler

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* test: e2e test on vtgate, run vttablet with settings pool enabled and disabled

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* feat: support temporary table by way of fallback to reserved connection when settings pool is enabled

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

Signed-off-by: Harshit Gangal <harshit@planetscale.com>
Signed-off-by: Manan Gupta <manan@planetscale.com>
Co-authored-by: Manan Gupta <manan@planetscale.com>
This commit is contained in:
Harshit Gangal 2022-09-02 11:10:49 +05:30 коммит произвёл GitHub
Родитель 3e7a73483c
Коммит d3d9e71396
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
49 изменённых файлов: 1114 добавлений и 430 удалений

Просмотреть файл

@ -164,6 +164,7 @@ Usage of vtctld:
--queryserver-config-txpool-waiter-cap int query server transaction pool waiter limit, this is the maximum number of transactions that can be queued waiting to get a connection (default 5000)
--queryserver-config-warn-result-size int query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this
--queryserver_enable_online_ddl Enable online DDL. (default true)
--queryserver_enable_settings_pool Enable pooling of connections with modified system settings
--remote_operation_timeout duration time to wait for a remote operation (default 30s)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--s3_backup_aws_endpoint string endpoint of the S3 backend (region must be provided).

Просмотреть файл

@ -179,6 +179,7 @@ Usage of vtexplain:
--queryserver-config-txpool-waiter-cap int query server transaction pool waiter limit, this is the maximum number of transactions that can be queued waiting to get a connection (default 5000)
--queryserver-config-warn-result-size int query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this
--queryserver_enable_online_ddl Enable online DDL. (default true)
--queryserver_enable_settings_pool Enable pooling of connections with modified system settings
--remote_operation_timeout duration time to wait for a remote operation (default 30s)
--replication-mode string The replication mode to simulate -- must be set to either ROW or STATEMENT (default "ROW")
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)

Просмотреть файл

@ -343,6 +343,7 @@ Usage of vttablet:
--queryserver-config-txpool-waiter-cap int query server transaction pool waiter limit, this is the maximum number of transactions that can be queued waiting to get a connection (default 5000)
--queryserver-config-warn-result-size int query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this
--queryserver_enable_online_ddl Enable online DDL. (default true)
--queryserver_enable_settings_pool Enable pooling of connections with modified system settings
--redact-debug-ui-queries redact full queries and bind variables from debug UI
--relay_log_max_items int Maximum number of rows for VReplication target buffering. (default 5000)
--relay_log_max_size int Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)

Просмотреть файл

@ -99,45 +99,60 @@ CREATE TABLE test_vdx (
`
)
var enableSettingsPool bool
func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()
code := runAllTests(m)
if code != 0 {
os.Exit(code)
}
// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}
println("running with settings pool enabled")
// run again with settings pool enabled.
enableSettingsPool = true
code = runAllTests(m)
os.Exit(code)
}
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
VSchema: vSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-transaction-timeout", "5"}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil {
return 1
}
func runAllTests(m *testing.M) int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()
// Start vtgate
// This test requires setting the mysql_server_version vtgate flag
// to 5.7 regardless of the actual MySQL version used for the tests.
clusterInstance.VtGateExtraArgs = []string{"--lock_heartbeat_time", "2s", "--mysql_server_version", "5.7.0"}
if err := clusterInstance.StartVtgate(); err != nil {
return 1
}
// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run()
}()
os.Exit(exitCode)
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
VSchema: vSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-transaction-timeout", "5"}
if enableSettingsPool {
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--queryserver_enable_settings_pool")
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil {
return 1
}
// Start vtgate
// This test requires setting the mysql_server_version vtgate flag
// to 5.7 regardless of the actual MySQL version used for the tests.
clusterInstance.VtGateExtraArgs = []string{"--lock_heartbeat_time", "2s", "--mysql_server_version", "5.7.0"}
if err := clusterInstance.StartVtgate(); err != nil {
return 1
}
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run()
}
func assertIsEmpty(t *testing.T, conn *mysql.Conn, query string) {

Просмотреть файл

@ -62,42 +62,58 @@ var (
`
)
var enableSettingsPool bool
func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()
code := runAllTests(m)
if code != 0 {
os.Exit(code)
}
// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}
println("running with settings pool enabled")
// run again with settings pool enabled.
enableSettingsPool = true
code = runAllTests(m)
os.Exit(code)
}
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
VSchema: vSchema,
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, true); err != nil {
return 1
}
func runAllTests(m *testing.M) int {
// Start vtgate
clusterInstance.VtGateExtraArgs = []string{"--lock_heartbeat_time", "2s", "--enable_system_settings=true"}
if err := clusterInstance.StartVtgate(); err != nil {
return 1
}
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run()
}()
os.Exit(exitCode)
// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
VSchema: vSchema,
}
if enableSettingsPool {
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--queryserver_enable_settings_pool")
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, true); err != nil {
return 1
}
// Start vtgate
clusterInstance.VtGateExtraArgs = []string{"--lock_heartbeat_time", "2s"}
if err := clusterInstance.StartVtgate(); err != nil {
return 1
}
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run()
}
func TestServingChange(t *testing.T) {

Просмотреть файл

@ -63,43 +63,58 @@ var (
`
)
var enableSettingsPool bool
func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()
code := runAllTests(m)
if code != 0 {
os.Exit(code)
}
// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}
println("running with settings pool enabled")
// run again with settings pool enabled.
enableSettingsPool = true
code = runAllTests(m)
os.Exit(code)
}
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
VSchema: vSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-transaction-timeout", "5"}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, true); err != nil {
return 1
}
func runAllTests(m *testing.M) int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()
// Start vtgate
clusterInstance.VtGateExtraArgs = []string{"--lock_heartbeat_time", "2s", "--enable_system_settings=true"}
if err := clusterInstance.StartVtgate(); err != nil {
return 1
}
// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run()
}()
os.Exit(exitCode)
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
VSchema: vSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-transaction-timeout", "5"}
if enableSettingsPool {
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--queryserver_enable_settings_pool")
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, true); err != nil {
return 1
}
// Start vtgate
clusterInstance.VtGateExtraArgs = []string{"--lock_heartbeat_time", "2s"}
if err := clusterInstance.StartVtgate(); err != nil {
return 1
}
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run()
}
func TestTabletChange(t *testing.T) {

Просмотреть файл

@ -39,41 +39,55 @@ var (
sqlSchema = `create table test(id bigint primary key)Engine=InnoDB;`
)
var enableSettingsPool bool
func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()
code := runAllTests(m)
if code != 0 {
os.Exit(code)
}
// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}
println("running with settings pool enabled")
// run again with settings pool enabled.
enableSettingsPool = true
code = runAllTests(m)
os.Exit(code)
}
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
}
if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 2, false); err != nil {
return 1
}
func runAllTests(m *testing.M) int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()
// Start vtgate
clusterInstance.VtGateExtraArgs = []string{"--enable_system_settings=true"}
if err := clusterInstance.StartVtgate(); err != nil {
return 1
}
// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run()
}()
os.Exit(exitCode)
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
}
if enableSettingsPool {
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--queryserver_enable_settings_pool")
}
if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 2, false); err != nil {
return 1
}
// Start vtgate
if err := clusterInstance.StartVtgate(); err != nil {
return 1
}
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run()
}
func TestMysqlDownServingChange(t *testing.T) {

Просмотреть файл

@ -39,41 +39,55 @@ var (
sqlSchema = `create table test(id bigint primary key)Engine=InnoDB;`
)
var enableSettingsPool bool
func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()
code := runAllTests(m)
if code != 0 {
os.Exit(code)
}
// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}
println("running with settings pool enabled")
// run again with settings pool enabled.
enableSettingsPool = true
code = runAllTests(m)
os.Exit(code)
}
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
}
if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 2, false); err != nil {
return 1
}
func runAllTests(m *testing.M) int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()
// Start vtgate
clusterInstance.VtGateExtraArgs = []string{"--enable_system_settings=true"}
if err := clusterInstance.StartVtgate(); err != nil {
return 1
}
// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run()
}()
os.Exit(exitCode)
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
}
if enableSettingsPool {
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--queryserver_enable_settings_pool")
}
if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 2, false); err != nil {
return 1
}
// Start vtgate
if err := clusterInstance.StartVtgate(); err != nil {
return 1
}
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run()
}
func TestVttabletDownServingChange(t *testing.T) {

Просмотреть файл

@ -147,47 +147,62 @@ END;
`
)
var enableSettingsPool bool
func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()
code := runAllTests(m)
if code != 0 {
os.Exit(code)
}
// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}
println("running with settings pool enabled")
// run again with settings pool enabled.
enableSettingsPool = true
code = runAllTests(m)
os.Exit(code)
}
// Start keyspace
Keyspace := &cluster.Keyspace{
Name: KeyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-transaction-timeout", "3", "--queryserver-config-max-result-size", "30"}
if err := clusterInstance.StartUnshardedKeyspace(*Keyspace, 0, false); err != nil {
log.Fatal(err.Error())
return 1
}
func runAllTests(m *testing.M) int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()
// Start vtgate
clusterInstance.VtGateExtraArgs = []string{"--warn_sharded_only=true"}
if err := clusterInstance.StartVtgate(); err != nil {
log.Fatal(err.Error())
return 1
}
// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}
primaryTablet := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet().VttabletProcess
if _, err := primaryTablet.QueryTablet(createProcSQL, KeyspaceName, false); err != nil {
log.Fatal(err.Error())
return 1
}
// Start keyspace
Keyspace := &cluster.Keyspace{
Name: KeyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-transaction-timeout", "3", "--queryserver-config-max-result-size", "30"}
if enableSettingsPool {
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--queryserver_enable_settings_pool")
}
if err := clusterInstance.StartUnshardedKeyspace(*Keyspace, 0, false); err != nil {
log.Fatal(err.Error())
return 1
}
return m.Run()
}()
os.Exit(exitCode)
// Start vtgate
clusterInstance.VtGateExtraArgs = []string{"--warn_sharded_only=true"}
if err := clusterInstance.StartVtgate(); err != nil {
log.Fatal(err.Error())
return 1
}
primaryTablet := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet().VttabletProcess
if _, err := primaryTablet.QueryTablet(createProcSQL, KeyspaceName, false); err != nil {
log.Fatal(err.Error())
return 1
}
return m.Run()
}
func TestSelectIntoAndLoadFrom(t *testing.T) {

Просмотреть файл

@ -64,7 +64,7 @@ func newTestLoadTable(tableName, comment string, db *fakesqldb.DB) (*schema.Tabl
IdleTimeoutSeconds: 10,
})
connPool.Open(appParams, dbaParams, appParams)
conn, err := connPool.Get(ctx)
conn, err := connPool.Get(ctx, nil)
if err != nil {
return nil, err
}

Просмотреть файл

@ -304,6 +304,27 @@ func (client *QueryClient) ReserveExecute(query string, preQueries []string, bin
return qr, nil
}
// ReserveStreamExecute performs a ReserveStreamExecute.
func (client *QueryClient) ReserveStreamExecute(query string, preQueries []string, bindvars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
if client.reservedID != 0 {
return nil, errors.New("already reserved a connection")
}
result := &sqltypes.Result{}
state, err := client.server.ReserveStreamExecute(client.ctx, client.target, preQueries, query, bindvars, client.transactionID, &querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL},
func(res *sqltypes.Result) error {
if result.Fields == nil {
result.Fields = res.Fields
}
result.Rows = append(result.Rows, res.Rows...)
return nil
})
client.reservedID = state.ReservedID
if err != nil {
return nil, err
}
return result, nil
}
// ReserveBeginExecute performs a ReserveBeginExecute.
func (client *QueryClient) ReserveBeginExecute(query string, preQueries []string, postBeginQueries []string, bindvars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
if client.reservedID != 0 {
@ -322,6 +343,32 @@ func (client *QueryClient) ReserveBeginExecute(query string, preQueries []string
return qr, nil
}
// ReserveBeginStreamExecute performs a ReserveBeginStreamExecute.
func (client *QueryClient) ReserveBeginStreamExecute(query string, preQueries []string, postBeginQueries []string, bindvars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
if client.reservedID != 0 {
return nil, errors.New("already reserved a connection")
}
if client.transactionID != 0 {
return nil, errors.New("already in transaction")
}
result := &sqltypes.Result{}
state, err := client.server.ReserveBeginStreamExecute(client.ctx, client.target, preQueries, postBeginQueries, query, bindvars, &querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL},
func(res *sqltypes.Result) error {
if result.Fields == nil {
result.Fields = res.Fields
}
result.Rows = append(result.Rows, res.Rows...)
return nil
})
client.transactionID = state.TransactionID
client.reservedID = state.ReservedID
client.sessionStateChanges = state.SessionStateChanges
if err != nil {
return nil, err
}
return result, nil
}
// Release performs a Release.
func (client *QueryClient) Release() error {
err := client.server.Release(client.ctx, client.target, client.transactionID, client.reservedID)

Просмотреть файл

@ -315,6 +315,13 @@ var tableACLConfig = `{
"readers": ["dev"],
"writers": ["dev"],
"admins": ["dev"]
},
{
"name": "vitess_settings",
"table_names_or_prefixes": ["temp"],
"readers": ["dev"],
"writers": ["dev"],
"admins": ["dev"]
}
]
}`

Просмотреть файл

@ -0,0 +1,391 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endtoend
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
)
func TestSelectNoConnectionReservationOnSettings(t *testing.T) {
framework.Server.Config().EnableSettingsPool = true
defer func() {
framework.Server.Config().EnableSettingsPool = false
}()
client := framework.NewClient()
defer client.Release()
query := "select @@sql_mode"
setting := "set @@sql_mode = ''"
for _, withTx := range []bool{false, true} {
if withTx {
err := client.Begin(false)
require.NoError(t, err)
}
qr, err := client.ReserveExecute(query, []string{setting}, nil)
require.NoError(t, err)
assert.Zero(t, client.ReservedID())
assert.Equal(t, `[[VARCHAR("")]]`, fmt.Sprintf("%v", qr.Rows))
qr, err = client.ReserveStreamExecute(query, []string{setting}, nil)
require.NoError(t, err)
assert.Zero(t, client.ReservedID())
assert.Equal(t, `[[VARCHAR("")]]`, fmt.Sprintf("%v", qr.Rows))
}
}
func TestSetttingsReuseConnWithSettings(t *testing.T) {
framework.Server.Config().EnableSettingsPool = true
defer func() {
framework.Server.Config().EnableSettingsPool = false
}()
resetTxConnPool(t)
client := framework.NewClient()
defer client.Release()
connectionIDQuery := "select connection_id()"
setting := "set @@sql_mode = ''"
// Create a connection
connectionIDRes, err := client.BeginExecute(connectionIDQuery, nil, nil)
require.NoError(t, err)
// Add settings to the connection
res, err := client.ReserveExecute(connectionIDQuery, []string{setting}, nil)
require.NoError(t, err)
require.Equal(t, connectionIDRes, res)
// Release the connection back
err = client.Rollback()
require.NoError(t, err)
// We iterate in a loop and try to get a connection with the same settings as before
// but only 1 at a time. So we expect the same connection to be reused everytime.
for i := 0; i < 100; i++ {
res, err = client.ReserveBeginExecute(connectionIDQuery, []string{setting}, nil, nil)
require.NoError(t, err)
require.True(t, connectionIDRes.Equal(res))
err = client.Rollback()
require.NoError(t, err)
}
// Create a new client
client2 := framework.NewClient()
defer client2.Release()
// Ask for a connection with the same settings. This too should be reused.
res, err = client.ReserveBeginExecute(connectionIDQuery, []string{setting}, nil, nil)
require.NoError(t, err)
require.True(t, connectionIDRes.Equal(res))
// Ask for a connection with the same settings, but the previous connection hasn't been released yet. So this will be a new connection.
connectionIDRes2, err := client2.ReserveBeginExecute(connectionIDQuery, []string{setting}, nil, nil)
require.NoError(t, err)
require.False(t, connectionIDRes.Equal(connectionIDRes2))
// Release both the connections
err = client.Rollback()
require.NoError(t, err)
err = client2.Rollback()
require.NoError(t, err)
// We iterate in a loop and try to get a connection with the same settings as before
// but only 1 at a time. So we expect the two connections to be reused, and we should be seeing both of them.
reusedConnection1 := false
reusedConnection2 := false
for i := 0; i < 100; i++ {
res, err = client.ReserveBeginExecute(connectionIDQuery, []string{setting}, nil, nil)
require.NoError(t, err)
if connectionIDRes.Equal(res) {
reusedConnection1 = true
} else if connectionIDRes2.Equal(res) {
reusedConnection2 = true
} else {
t.Fatalf("The connection should be either of the already created connections")
}
err = client.Rollback()
require.NoError(t, err)
if reusedConnection2 && reusedConnection1 {
break
}
}
require.True(t, reusedConnection1)
require.True(t, reusedConnection2)
}
// resetTxConnPool resets the settings pool by fetching all the connections from the pool with no settings.
// this will make sure that the settings pool connections if any will be taken and settings are reset.
func resetTxConnPool(t *testing.T) {
txPoolSize := framework.Server.Config().TxPool.Size
clients := make([]*framework.QueryClient, txPoolSize)
for i := 0; i < txPoolSize; i++ {
client := framework.NewClient()
_, err := client.BeginExecute("select 1", nil, nil)
require.NoError(t, err)
clients[i] = client
}
for _, client := range clients {
require.NoError(t,
client.Release())
}
}
func TestDDLNoConnectionReservationOnSettings(t *testing.T) {
framework.Server.Config().EnableSettingsPool = true
defer func() {
framework.Server.Config().EnableSettingsPool = false
}()
client := framework.NewClient()
defer client.Release()
query := "create table temp(c_date datetime default '0000-00-00')"
setting := "set sql_mode='TRADITIONAL'"
for _, withTx := range []bool{false, true} {
if withTx {
err := client.Begin(false)
require.NoError(t, err)
}
_, err := client.ReserveExecute(query, []string{setting}, nil)
require.Error(t, err, "create table should have failed with TRADITIONAL mode")
require.Contains(t, err.Error(), "Invalid default value")
assert.Zero(t, client.ReservedID())
}
}
func TestDMLNoConnectionReservationOnSettings(t *testing.T) {
framework.Server.Config().EnableSettingsPool = true
defer func() {
framework.Server.Config().EnableSettingsPool = false
}()
client := framework.NewClient()
defer client.Release()
_, err := client.Execute("create table temp(c_date datetime)", nil)
require.NoError(t, err)
defer client.Execute("drop table temp", nil)
_, err = client.Execute("insert into temp values ('2022-08-25')", nil)
require.NoError(t, err)
setting := "set sql_mode='TRADITIONAL'"
queries := []string{
"insert into temp values('0000-00-00')",
"update temp set c_date = '0000-00-00'",
}
for _, withTx := range []bool{false, true} {
if withTx {
err := client.Begin(false)
require.NoError(t, err)
}
for _, query := range queries {
t.Run(query, func(t *testing.T) {
_, err = client.ReserveExecute(query, []string{setting}, nil)
require.Error(t, err, "query should have failed with TRADITIONAL mode")
require.Contains(t, err.Error(), "Incorrect datetime value")
assert.Zero(t, client.ReservedID())
})
}
}
}
func TestSelectNoConnectionReservationOnSettingsWithTx(t *testing.T) {
framework.Server.Config().EnableSettingsPool = true
defer func() {
framework.Server.Config().EnableSettingsPool = false
}()
client := framework.NewClient()
query := "select @@sql_mode"
setting := "set @@sql_mode = ''"
qr, err := client.ReserveBeginExecute(query, []string{setting}, nil, nil)
require.NoError(t, err)
assert.Zero(t, client.ReservedID())
assert.Equal(t, `[[VARCHAR("")]]`, fmt.Sprintf("%v", qr.Rows))
require.NoError(t,
client.Release())
qr, err = client.ReserveBeginStreamExecute(query, []string{setting}, nil, nil)
require.NoError(t, err)
assert.Zero(t, client.ReservedID())
assert.Equal(t, `[[VARCHAR("")]]`, fmt.Sprintf("%v", qr.Rows))
require.NoError(t,
client.Release())
}
func TestDDLNoConnectionReservationOnSettingsWithTx(t *testing.T) {
framework.Server.Config().EnableSettingsPool = true
defer func() {
framework.Server.Config().EnableSettingsPool = false
}()
client := framework.NewClient()
defer client.Release()
query := "create table temp(c_date datetime default '0000-00-00')"
setting := "set sql_mode='TRADITIONAL'"
_, err := client.ReserveBeginExecute(query, []string{setting}, nil, nil)
require.Error(t, err, "create table should have failed with TRADITIONAL mode")
require.Contains(t, err.Error(), "Invalid default value")
assert.Zero(t, client.ReservedID())
}
func TestDMLNoConnectionReservationOnSettingsWithTx(t *testing.T) {
framework.Server.Config().EnableSettingsPool = true
defer func() {
framework.Server.Config().EnableSettingsPool = false
}()
client := framework.NewClient()
_, err := client.Execute("create table temp(c_date datetime)", nil)
require.NoError(t, err)
defer client.Execute("drop table temp", nil)
_, err = client.Execute("insert into temp values ('2022-08-25')", nil)
require.NoError(t, err)
setting := "set sql_mode='TRADITIONAL'"
queries := []string{
"insert into temp values('0000-00-00')",
"update temp set c_date = '0000-00-00'",
}
for _, query := range queries {
t.Run(query, func(t *testing.T) {
_, err = client.ReserveBeginExecute(query, []string{setting}, nil, nil)
require.Error(t, err, "query should have failed with TRADITIONAL mode")
require.Contains(t, err.Error(), "Incorrect datetime value")
assert.Zero(t, client.ReservedID())
require.NoError(t,
client.Release())
})
}
}
func TestSetQueryOnReserveApis(t *testing.T) {
framework.Server.Config().EnableSettingsPool = true
defer func() {
framework.Server.Config().EnableSettingsPool = false
}()
client := framework.NewClient()
defer client.Release()
setting := "set @@sql_mode = ''"
_, err := client.ReserveExecute(setting, []string{setting}, nil)
require.NoError(t, err)
assert.Zero(t, client.ReservedID())
_, err = client.ReserveBeginExecute(setting, []string{setting}, nil, nil)
require.NoError(t, err)
assert.Zero(t, client.ReservedID())
}
func TestGetLockQueryOnReserveExecute(t *testing.T) {
framework.Server.Config().EnableSettingsPool = true
defer func() {
framework.Server.Config().EnableSettingsPool = false
}()
client := framework.NewClient()
defer client.Release()
lockQuery := "select get_lock('test', 1)"
// without settings
_, err := client.ReserveExecute(lockQuery, nil, nil)
require.NoError(t, err)
assert.NotZero(t, client.ReservedID())
require.NoError(t,
client.Release())
// with settings
_, err = client.ReserveExecute(lockQuery, []string{"set @@sql_mode = ''"}, nil)
require.NoError(t, err)
assert.NotZero(t, client.ReservedID())
require.NoError(t,
client.Release())
}
func TestTempTableOnReserveExecute(t *testing.T) {
framework.Server.Config().EnableSettingsPool = true
defer func() {
framework.Server.Config().EnableSettingsPool = false
}()
client := framework.NewClient()
defer client.Release()
defer client.Execute("drop table if exists temp", nil)
tempTblQuery := "create temporary table if not exists temp(id bigint primary key)"
_, err := client.Execute(tempTblQuery, nil)
require.Error(t, err)
_, err = client.ReserveExecute(tempTblQuery, nil, nil)
require.NoError(t, err)
assert.NotZero(t, client.ReservedID())
require.NoError(t,
client.Release())
_, err = client.ReserveBeginExecute(tempTblQuery, nil, nil, nil)
require.NoError(t, err)
assert.NotZero(t, client.ReservedID())
require.NoError(t,
client.Release())
// drop the table
_, err = client.Execute("drop table if exists temp", nil)
require.NoError(t, err)
// with settings
tempTblQuery = "create temporary table if not exists temp(c_date datetime default '0000-00-00')"
setting := "set sql_mode='TRADITIONAL'"
_, err = client.ReserveExecute(tempTblQuery, []string{setting}, nil)
require.Error(t, err, "create table should have failed with TRADITIONAL mode")
require.Contains(t, err.Error(), "Invalid default value")
assert.NotZero(t, client.ReservedID(), "as this goes through fallback path of reserving a connection due to temporary tables")
require.NoError(t,
client.Release())
_, err = client.ReserveBeginExecute(tempTblQuery, []string{setting}, nil, nil)
require.Error(t, err, "create table should have failed with TRADITIONAL mode")
require.Contains(t, err.Error(), "Invalid default value")
assert.NotZero(t, client.ReservedID(), "as this goes through fallback path of reserving a connection due to temporary tables")
require.NoError(t,
client.Release())
}

Просмотреть файл

@ -254,7 +254,7 @@ func NewExecutor(env tabletenv.Env, tabletAlias *topodatapb.TabletAlias, ts *top
func (e *Executor) execQuery(ctx context.Context, query string) (result *sqltypes.Result, err error) {
defer e.env.LogError()
conn, err := e.pool.Get(ctx)
conn, err := e.pool.Get(ctx, nil)
if err != nil {
return result, err
}
@ -470,7 +470,7 @@ func (e *Executor) ptPidFileName(uuid string) string {
// readMySQLVariables contacts the backend MySQL server to read some of its configuration
func (e *Executor) readMySQLVariables(ctx context.Context) (variables *mysqlVariables, err error) {
conn, err := e.pool.Get(ctx)
conn, err := e.pool.Get(ctx, nil)
if err != nil {
return nil, err
}

Просмотреть файл

@ -23,8 +23,6 @@ import (
"sync"
"time"
"github.com/cespare/xxhash/v2"
"vitess.io/vitess/go/pools"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/servenv"
@ -49,14 +47,13 @@ import (
// its own queries and the underlying connection.
// It will also trigger a CheckMySQL whenever applicable.
type DBConn struct {
conn *dbconnpool.DBConnection
info dbconfigs.Connector
pool *Pool
dbaPool *dbconnpool.ConnectionPool
stats *tabletenv.Stats
current sync2.AtomicString
settings []string
settingsHash uint64
conn *dbconnpool.DBConnection
info dbconfigs.Connector
pool *Pool
dbaPool *dbconnpool.ConnectionPool
stats *tabletenv.Stats
current sync2.AtomicString
settings []string
// err will be set if a query is killed through a Kill.
errmu sync.Mutex
@ -84,18 +81,26 @@ func NewDBConn(ctx context.Context, cp *Pool, appParams dbconfigs.Connector) (*D
}
// NewDBConnNoPool creates a new DBConn without a pool.
func NewDBConnNoPool(ctx context.Context, params dbconfigs.Connector, dbaPool *dbconnpool.ConnectionPool) (*DBConn, error) {
func NewDBConnNoPool(ctx context.Context, params dbconfigs.Connector, dbaPool *dbconnpool.ConnectionPool, settings []string) (*DBConn, error) {
c, err := dbconnpool.NewDBConnection(ctx, params)
if err != nil {
return nil, err
}
return &DBConn{
dbconn := &DBConn{
conn: c,
info: params,
dbaPool: dbaPool,
pool: nil,
stats: tabletenv.NewStats(servenv.NewExporter("Temp", "Tablet")),
}, nil
}
if len(settings) == 0 {
return dbconn, nil
}
if err = dbconn.ApplySettings(ctx, settings); err != nil {
dbconn.Close()
return nil, err
}
return dbconn, nil
}
// Err returns an error if there was a client initiated error
@ -320,17 +325,12 @@ func (dbc *DBConn) Close() {
}
func (dbc *DBConn) ApplySettings(ctx context.Context, settings []string) error {
digest := xxhash.New()
for _, q := range settings {
if _, err := dbc.execOnce(ctx, q, 1, false); err != nil {
return err
}
if _, err := digest.WriteString(q); err != nil {
return err
}
}
dbc.settings = settings
dbc.settingsHash = digest.Sum64()
return nil
}

Просмотреть файл

@ -325,7 +325,7 @@ func TestDBNoPoolConnKill(t *testing.T) {
connPool := newPool()
connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
defer connPool.Close()
dbConn, err := NewDBConnNoPool(context.Background(), db.ConnParams(), connPool.dbaPool)
dbConn, err := NewDBConnNoPool(context.Background(), db.ConnParams(), connPool.dbaPool, nil)
if dbConn != nil {
defer dbConn.Close()
}

Просмотреть файл

@ -164,7 +164,7 @@ func (cp *Pool) Close() {
// Get returns a connection.
// You must call Recycle on DBConn once done.
func (cp *Pool) Get(ctx context.Context) (*DBConn, error) {
func (cp *Pool) Get(ctx context.Context, settings []string) (*DBConn, error) {
span, ctx := trace.NewSpan(ctx, "Pool.Get")
defer span.Finish()
@ -178,7 +178,7 @@ func (cp *Pool) Get(ctx context.Context) (*DBConn, error) {
}
if cp.isCallerIDAppDebug(ctx) {
return NewDBConnNoPool(ctx, cp.appDebugParams, cp.dbaPool)
return NewDBConnNoPool(ctx, cp.appDebugParams, cp.dbaPool, nil)
}
p := cp.pool()
if p == nil {
@ -194,7 +194,7 @@ func (cp *Pool) Get(ctx context.Context) (*DBConn, error) {
ctx, cancel = context.WithTimeout(ctx, cp.timeout)
defer cancel()
}
r, err := p.Get(ctx, nil)
r, err := p.Get(ctx, settings)
if err != nil {
return nil, err
}

Просмотреть файл

@ -38,7 +38,7 @@ func TestConnPoolGet(t *testing.T) {
connPool := newPool()
connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
defer connPool.Close()
dbConn, err := connPool.Get(context.Background())
dbConn, err := connPool.Get(context.Background(), nil)
if err != nil {
t.Fatalf("should not get an error, but got: %v", err)
}
@ -62,10 +62,10 @@ func TestConnPoolTimeout(t *testing.T) {
})
connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
defer connPool.Close()
dbConn, err := connPool.Get(context.Background())
dbConn, err := connPool.Get(context.Background(), nil)
require.NoError(t, err)
defer dbConn.Recycle()
_, err = connPool.Get(context.Background())
_, err = connPool.Get(context.Background(), nil)
assert.EqualError(t, err, "resource pool timed out")
}
@ -78,7 +78,7 @@ func TestConnPoolMaxWaiters(t *testing.T) {
})
connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
defer connPool.Close()
dbConn, err := connPool.Get(context.Background())
dbConn, err := connPool.Get(context.Background(), nil)
require.NoError(t, err)
// waiter 1
@ -86,7 +86,7 @@ func TestConnPoolMaxWaiters(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
c1, err := connPool.Get(context.Background())
c1, err := connPool.Get(context.Background(), nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
return
@ -102,7 +102,7 @@ func TestConnPoolMaxWaiters(t *testing.T) {
}
// waiter 2
_, err = connPool.Get(context.Background())
_, err = connPool.Get(context.Background(), nil)
assert.EqualError(t, err, "pool TestPool waiter count exceeded")
// This recycle will make waiter1 succeed.
@ -121,7 +121,7 @@ func TestConnPoolGetEmptyDebugConfig(t *testing.T) {
ctx := context.Background()
ctx = callerid.NewContext(ctx, ecid, im)
defer connPool.Close()
dbConn, err := connPool.Get(ctx)
dbConn, err := connPool.Get(ctx, nil)
if err != nil {
t.Fatalf("should not get an error, but got: %v", err)
}
@ -146,7 +146,7 @@ func TestConnPoolGetAppDebug(t *testing.T) {
connPool := newPool()
connPool.Open(db.ConnParams(), db.ConnParams(), debugConn)
defer connPool.Close()
dbConn, err := connPool.Get(ctx)
dbConn, err := connPool.Get(ctx, nil)
if err != nil {
t.Fatalf("should not get an error, but got: %v", err)
}
@ -259,7 +259,7 @@ func TestConnPoolStateWhilePoolIsOpen(t *testing.T) {
if connPool.InUse() != 0 {
t.Fatalf("pool inUse connections should be 0")
}
dbConn, _ := connPool.Get(context.Background())
dbConn, _ := connPool.Get(context.Background(), nil)
if connPool.Available() != 99 {
t.Fatalf("pool available connections should be 99")
}

Просмотреть файл

@ -67,6 +67,6 @@ func FuzzGetPlan(data []byte) int {
qe.SetQueryPlanCacheCap(1024)
// Call target
_, _ = qe.GetPlan(context.Background(), logStats, query2, true, 0)
_, _ = qe.GetPlan(context.Background(), logStats, query2, true)
return 1
}

Просмотреть файл

@ -391,7 +391,7 @@ func (collector *TableGC) checkTables(ctx context.Context) error {
return nil
}
conn, err := collector.pool.Get(ctx)
conn, err := collector.pool.Get(ctx, nil)
if err != nil {
return err
}
@ -547,7 +547,7 @@ func (collector *TableGC) dropTable(ctx context.Context, tableName string) error
return nil
}
conn, err := collector.pool.Get(ctx)
conn, err := collector.pool.Get(ctx, nil)
if err != nil {
return err
}
@ -572,7 +572,7 @@ func (collector *TableGC) transitionTable(ctx context.Context, transition *trans
return nil
}
conn, err := collector.pool.Get(ctx)
conn, err := collector.pool.Get(ctx, nil)
if err != nil {
return err
}

Просмотреть файл

@ -319,7 +319,7 @@ func (hs *healthStreamer) reload() error {
}
ctx := hs.ctx
conn, err := hs.conns.Get(ctx)
conn, err := hs.conns.Get(ctx, nil)
if err != nil {
return err
}

Просмотреть файл

@ -57,6 +57,11 @@ func analyzeSelect(sel *sqlparser.Select, tables map[string]*schema.Table) (plan
plan.NextCount = v
plan.FullQuery = nil
}
if hasLockFunc(sel) {
plan.PlanID = PlanSelectLockFunc
plan.NeedsReservedConn = true
}
return plan, nil
}
@ -169,8 +174,9 @@ func showTableRewrite(show *sqlparser.ShowBasic, dbName string) {
func analyzeSet(set *sqlparser.Set) (plan *Plan) {
return &Plan{
PlanID: PlanSet,
FullQuery: GenerateFullQuery(set),
PlanID: PlanSet,
FullQuery: GenerateFullQuery(set),
NeedsReservedConn: true,
}
}
@ -197,3 +203,15 @@ func lookupSingleTable(tableExpr sqlparser.TableExpr, tables map[string]*schema.
}
return tables[tableName.String()]
}
func analyzeDDL(stmt sqlparser.DDLStatement, tables map[string]*schema.Table) *Plan {
// DDLs and some other statements below don't get fully parsed.
// We have to use the original query at the time of execution.
// We are in the process of changing this
var fullQuery *sqlparser.ParsedQuery
// If the query is fully parsed, then use the ast and store the fullQuery
if stmt.IsFullyParsed() {
fullQuery = GenerateFullQuery(stmt)
}
return &Plan{PlanID: PlanDDL, FullQuery: fullQuery, FullStmt: stmt, NeedsReservedConn: stmt.IsTemporary()}
}

Просмотреть файл

@ -47,6 +47,7 @@ const (
PlanSelect PlanType = iota
PlanNextval
PlanSelectImpossible
PlanSelectLockFunc
PlanInsert
PlanInsertMessage
PlanUpdate
@ -85,6 +86,7 @@ var planName = []string{
"Select",
"Nextval",
"SelectImpossible",
"SelectLockFunc",
"Insert",
"InsertMessage",
"Update",
@ -139,11 +141,6 @@ func PlanByNameIC(s string) (pt PlanType, ok bool) {
return NumPlans, false
}
// IsSelect returns true if PlanType is about a select query.
func (pt PlanType) IsSelect() bool {
return pt == PlanSelect || pt == PlanSelectImpossible
}
// MarshalJSON returns a json string for PlanType.
func (pt PlanType) MarshalJSON() ([]byte, error) {
return json.Marshal(pt.String())
@ -174,6 +171,9 @@ type Plan struct {
// FullStmt can be used when the query does not operate on tables
FullStmt sqlparser.Statement
// NeedsReservedConn indicates at a reserved connection is needed to execute this plan
NeedsReservedConn bool
}
// TableName returns the table name for the plan.
@ -198,7 +198,7 @@ func (plan *Plan) TableNames() (names []string) {
}
// Build builds a plan based on the schema.
func Build(statement sqlparser.Statement, tables map[string]*schema.Table, isReservedConn bool, dbName string) (plan *Plan, err error) {
func Build(statement sqlparser.Statement, tables map[string]*schema.Table, dbName string) (plan *Plan, err error) {
switch stmt := statement.(type) {
case *sqlparser.Union:
plan, err = &Plan{
@ -207,15 +207,6 @@ func Build(statement sqlparser.Statement, tables map[string]*schema.Table, isRes
}, nil
case *sqlparser.Select:
plan, err = analyzeSelect(stmt, tables)
if err != nil {
return nil, err
}
if plan != nil && plan.PlanID != PlanSelectImpossible && !isReservedConn {
err = checkForPoolingUnsafeConstructs(statement)
if err != nil {
return nil, err
}
}
case *sqlparser.Insert:
plan, err = analyzeInsert(stmt, tables)
case *sqlparser.Update:
@ -223,20 +214,9 @@ func Build(statement sqlparser.Statement, tables map[string]*schema.Table, isRes
case *sqlparser.Delete:
plan, err = analyzeDelete(stmt, tables)
case *sqlparser.Set:
if !isReservedConn {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s not allowed without a reserved connections", sqlparser.String(stmt))
}
plan, err = analyzeSet(stmt), nil
case sqlparser.DDLStatement:
// DDLs and some other statements below don't get fully parsed.
// We have to use the original query at the time of execution.
// We are in the process of changing this
var fullQuery *sqlparser.ParsedQuery
// If the query is fully parsed, then use the ast and store the fullQuery
if stmt.IsFullyParsed() {
fullQuery = GenerateFullQuery(stmt)
}
plan = &Plan{PlanID: PlanDDL, FullQuery: fullQuery, FullStmt: stmt}
plan = analyzeDDL(stmt, tables)
case *sqlparser.AlterMigration:
plan, err = &Plan{PlanID: PlanAlterMigration, FullStmt: stmt}, nil
case *sqlparser.RevertMigration:
@ -274,19 +254,12 @@ func Build(statement sqlparser.Statement, tables map[string]*schema.Table, isRes
}
// BuildStreaming builds a streaming plan based on the schema.
func BuildStreaming(sql string, tables map[string]*schema.Table, isReservedConn bool) (*Plan, error) {
func BuildStreaming(sql string, tables map[string]*schema.Table) (*Plan, error) {
statement, err := sqlparser.Parse(sql)
if err != nil {
return nil, err
}
if !isReservedConn {
err = checkForPoolingUnsafeConstructs(statement)
if err != nil {
return nil, err
}
}
plan := &Plan{
PlanID: PlanSelectStream,
FullQuery: GenerateFullQuery(statement),
@ -295,11 +268,14 @@ func BuildStreaming(sql string, tables map[string]*schema.Table, isReservedConn
switch stmt := statement.(type) {
case *sqlparser.Select:
if hasLockFunc(stmt) {
plan.NeedsReservedConn = true
}
plan.Table, plan.AllTables = lookupTables(stmt.From, tables)
case *sqlparser.OtherRead, *sqlparser.Show, *sqlparser.Union, *sqlparser.CallProc, sqlparser.Explain:
// pass
default:
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "'%v' not allowed for streaming", sqlparser.String(stmt))
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s not allowed for streaming", sqlparser.ASTToStatementType(statement))
}
return plan, nil
@ -324,31 +300,20 @@ func BuildMessageStreaming(name string, tables map[string]*schema.Table) (*Plan,
return plan, nil
}
// checkForPoolingUnsafeConstructs returns an error if the SQL expression contains
// a call to GET_LOCK(), which is unsafe with server-side connection pooling.
// For more background, see https://github.com/vitessio/vitess/issues/3631.
func checkForPoolingUnsafeConstructs(expr sqlparser.SQLNode) error {
genError := func(node sqlparser.SQLNode) error {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s not allowed without a reserved connections", sqlparser.String(node))
}
if _, isSetStmt := expr.(*sqlparser.Set); isSetStmt {
return genError(expr)
}
sel, isSel := expr.(*sqlparser.Select)
if !isSel {
return nil
}
return sqlparser.Walk(func(in sqlparser.SQLNode) (kontinue bool, err error) {
// hasLockFunc looks for get_lock function in the select query.
// If it is present then it returns true otherwise false
func hasLockFunc(sel *sqlparser.Select) bool {
var found bool
_ = sqlparser.Walk(func(in sqlparser.SQLNode) (bool, error) {
lFunc, isLFunc := in.(*sqlparser.LockingFunc)
if !isLFunc {
return true, nil
}
if lFunc.Type == sqlparser.GetLock {
return false, genError(lFunc)
found = true
return false, nil
}
return true, nil
}, sel.SelectExprs)
return found
}

Просмотреть файл

@ -41,13 +41,14 @@ import (
// This is only for testing.
func (p *Plan) MarshalJSON() ([]byte, error) {
mplan := struct {
PlanID PlanType
TableName sqlparser.IdentifierCS `json:",omitempty"`
Permissions []Permission `json:",omitempty"`
FieldQuery *sqlparser.ParsedQuery `json:",omitempty"`
FullQuery *sqlparser.ParsedQuery `json:",omitempty"`
NextCount string `json:",omitempty"`
WhereClause *sqlparser.ParsedQuery `json:",omitempty"`
PlanID PlanType
TableName sqlparser.IdentifierCS `json:",omitempty"`
Permissions []Permission `json:",omitempty"`
FieldQuery *sqlparser.ParsedQuery `json:",omitempty"`
FullQuery *sqlparser.ParsedQuery `json:",omitempty"`
NextCount string `json:",omitempty"`
WhereClause *sqlparser.ParsedQuery `json:",omitempty"`
NeedsReservedConn bool `json:",omitempty"`
}{
PlanID: p.PlanID,
TableName: p.TableName(),
@ -58,6 +59,9 @@ func (p *Plan) MarshalJSON() ([]byte, error) {
if p.NextCount != nil {
mplan.NextCount = evalengine.FormatExpr(p.NextCount)
}
if p.NeedsReservedConn {
mplan.NeedsReservedConn = true
}
return json.Marshal(&mplan)
}
@ -72,7 +76,7 @@ func TestPlan(t *testing.T) {
var err error
statement, err := sqlparser.Parse(tcase.input)
if err == nil {
plan, err = Build(statement, testSchema, false, "dbName")
plan, err = Build(statement, testSchema, "dbName")
}
PassthroughDMLs = false
@ -106,10 +110,11 @@ func TestPlanPoolUnsafe(t *testing.T) {
var err error
statement, err := sqlparser.Parse(tcase.input)
require.NoError(t, err)
// In Pooled Connection, plan building will fail.
plan, err = Build(statement, testSchema, false /* isReservedConn */, "dbName")
require.Error(t, err)
out := err.Error()
// Plan building will not fail, but it will mark that reserved connection is needed.
plan, err = Build(statement, testSchema, "dbName")
require.NoError(t, err)
require.True(t, plan.NeedsReservedConn)
out := fmt.Sprintf("%s not allowed without reserved connection", plan.PlanID.String())
if out != tcase.output {
t.Errorf("Line:%v\ngot = %s\nwant = %s", tcase.lineno, out, tcase.output)
if err != nil {
@ -120,10 +125,6 @@ func TestPlanPoolUnsafe(t *testing.T) {
}
fmt.Printf("\"%s\"\n%s\n\n", tcase.input, out)
}
// In Reserved Connection, plan will be built.
plan, err = Build(statement, testSchema, true /* isReservedConn */, "dbName")
require.NoError(t, err)
require.NotEmpty(t, plan)
})
}
}
@ -139,7 +140,7 @@ func TestPlanInReservedConn(t *testing.T) {
var err error
statement, err := sqlparser.Parse(tcase.input)
if err == nil {
plan, err = Build(statement, testSchema, true, "dbName")
plan, err = Build(statement, testSchema, "dbName")
}
PassthroughDMLs = false
@ -190,7 +191,7 @@ func TestCustom(t *testing.T) {
if err != nil {
t.Fatalf("Got error: %v, parsing sql: %v", err.Error(), tcase.input)
}
plan, err := Build(statement, schem, false, "dbName")
plan, err := Build(statement, schem, "dbName")
var out string
if err != nil {
out = err.Error()
@ -212,7 +213,7 @@ func TestCustom(t *testing.T) {
func TestStreamPlan(t *testing.T) {
testSchema := loadSchema("schema_test.json")
for tcase := range iterateExecFile("stream_cases.txt") {
plan, err := BuildStreaming(tcase.input, testSchema, false)
plan, err := BuildStreaming(tcase.input, testSchema)
var out string
if err != nil {
out = err.Error()
@ -226,7 +227,6 @@ func TestStreamPlan(t *testing.T) {
if out != tcase.output {
t.Errorf("Line:%v\ngot = %s\nwant = %s", tcase.lineno, out, tcase.output)
}
//fmt.Printf("%s\n%s\n\n", tcase.input, out)
}
}
@ -273,7 +273,7 @@ func TestLockPlan(t *testing.T) {
var err error
statement, err := sqlparser.Parse(tcase.input)
if err == nil {
plan, err = Build(statement, testSchema, false, "dbName")
plan, err = Build(statement, testSchema, "dbName")
}
var out string

Просмотреть файл

@ -954,3 +954,18 @@ options:PassthroughDMLs
],
"FullQuery": "create table function_default (\n\tx varchar(25) default (trim(' check '))\n)"
}
# temporary table
"create temporary table temp(a int)"
{
"PlanID": "DDL",
"TableName": "",
"Permissions": [
{
"TableName": "temp",
"Role": 2
}
],
"FullQuery": "create temporary table temp (\n\ta int\n)",
"NeedsReservedConn": true
}

Просмотреть файл

@ -70,4 +70,15 @@
# get_lock cannot be executed outside of reserved connection
"select get_lock('foo', 10) from dual"
"get_lock('foo', 10) not allowed without a reserved connections"
{
"PlanID": "SelectLockFunc",
"TableName": "dual",
"Permissions": [
{
"TableName": "dual",
"Role": 0
}
],
"FullQuery": "select get_lock('foo', 10) from dual limit :#maxLimit",
"NeedsReservedConn": true
}

Просмотреть файл

@ -1,15 +1,15 @@
# get_lock named locks are unsafe with server-side connection pooling
"select get_lock('foo', 10) from dual"
"get_lock('foo', 10) not allowed without a reserved connections"
"SelectLockFunc not allowed without reserved connection"
# setting system variables must happen inside reserved connections
"set sql_safe_updates = false"
"set @@sql_safe_updates = false not allowed without a reserved connections"
"Set not allowed without reserved connection"
# setting system variables must happen inside reserved connections
"set @@sql_safe_updates = false"
"set @@sql_safe_updates = false not allowed without a reserved connections"
"Set not allowed without reserved connection"
# setting system variables must happen inside reserved connections
"set @udv = false"
"set @udv = false not allowed without a reserved connections"
"Set not allowed without reserved connection"

Просмотреть файл

@ -60,16 +60,27 @@
# dml
"update a set b = 1"
"'update a set b = 1' not allowed for streaming"
"UPDATE not allowed for streaming"
# syntax error
"syntax error"
"syntax error at position 7 near 'syntax'"
# named locks are unsafe with server-side connection pooling
# named locks are unsafe with server-side connection pooling, plan is generated with NeedReservedConn set to true
"select get_lock('foo', 10) from dual"
"get_lock('foo', 10) not allowed without a reserved connections"
{
"PlanID": "SelectStream",
"TableName": "dual",
"Permissions": [
{
"TableName": "dual",
"Role": 0
}
],
"FullQuery": "select get_lock('foo', 10) from dual",
"NeedsReservedConn": true
}
# set statement unsafe with pooling
"set @udv = 10"
"set @udv = 10 not allowed without a reserved connections"
"SET not allowed for streaming"

Просмотреть файл

@ -96,6 +96,23 @@ func (ep *TabletPlan) buildAuthorized() {
}
}
func (ep *TabletPlan) IsValid(hasReservedCon, hasSysSettings bool) bool {
if !ep.NeedsReservedConn {
return true
}
switch ep.PlanID {
case planbuilder.PlanSelectLockFunc, planbuilder.PlanDDL:
if hasReservedCon {
return true
}
case planbuilder.PlanSet:
if hasReservedCon || hasSysSettings {
return true
}
}
return false
}
// _______________________________________________
// QueryEngine implements the core functionality of tabletserver.
@ -248,7 +265,7 @@ func (qe *QueryEngine) Open() error {
qe.conns.Open(qe.env.Config().DB.AppWithDB(), qe.env.Config().DB.DbaWithDB(), qe.env.Config().DB.AppDebugWithDB())
conn, err := qe.conns.Get(tabletenv.LocalContext())
conn, err := qe.conns.Get(tabletenv.LocalContext(), nil)
if err != nil {
qe.conns.Close()
return err
@ -287,7 +304,7 @@ func (qe *QueryEngine) Close() {
}
// GetPlan returns the TabletPlan that for the query. Plans are cached in a cache.LRUCache.
func (qe *QueryEngine) GetPlan(ctx context.Context, logStats *tabletenv.LogStats, sql string, skipQueryPlanCache bool, reservedConnID int64) (*TabletPlan, error) {
func (qe *QueryEngine) GetPlan(ctx context.Context, logStats *tabletenv.LogStats, sql string, skipQueryPlanCache bool) (*TabletPlan, error) {
span, _ := trace.NewSpan(ctx, "QueryEngine.GetPlan")
defer span.Finish()
if !skipQueryPlanCache {
@ -308,7 +325,7 @@ func (qe *QueryEngine) GetPlan(ctx context.Context, logStats *tabletenv.LogStats
if err != nil {
return nil, err
}
splan, err := planbuilder.Build(statement, qe.tables, reservedConnID != 0, qe.env.Config().DB.DBName)
splan, err := planbuilder.Build(statement, qe.tables, qe.env.Config().DB.DBName)
if err != nil {
return nil, err
}
@ -326,10 +343,10 @@ func (qe *QueryEngine) GetPlan(ctx context.Context, logStats *tabletenv.LogStats
// GetStreamPlan is similar to GetPlan, but doesn't use the cache
// and doesn't enforce a limit. It just returns the parsed query.
func (qe *QueryEngine) GetStreamPlan(sql string, isReservedConn bool) (*TabletPlan, error) {
func (qe *QueryEngine) GetStreamPlan(sql string) (*TabletPlan, error) {
qe.mu.RLock()
defer qe.mu.RUnlock()
splan, err := planbuilder.BuildStreaming(sql, qe.tables, isReservedConn)
splan, err := planbuilder.BuildStreaming(sql, qe.tables)
if err != nil {
return nil, err
}

Просмотреть файл

@ -105,7 +105,7 @@ func TestGetPlanPanicDuetoEmptyQuery(t *testing.T) {
ctx := context.Background()
logStats := tabletenv.NewLogStats(ctx, "GetPlanStats")
_, err := qe.GetPlan(ctx, logStats, "", false, 0)
_, err := qe.GetPlan(ctx, logStats, "", false)
require.EqualError(t, err, "Query was empty")
}
@ -194,14 +194,14 @@ func TestQueryPlanCache(t *testing.T) {
// this cache capacity is in number of elements
qe.SetQueryPlanCacheCap(1)
}
firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, false, 0)
firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, false)
if err != nil {
t.Fatal(err)
}
if firstPlan == nil {
t.Fatalf("plan should not be nil")
}
secondPlan, err := qe.GetPlan(ctx, logStats, secondQuery, false, 0)
secondPlan, err := qe.GetPlan(ctx, logStats, secondQuery, false)
if err != nil {
t.Fatal(err)
}
@ -232,7 +232,7 @@ func TestNoQueryPlanCache(t *testing.T) {
ctx := context.Background()
logStats := tabletenv.NewLogStats(ctx, "GetPlanStats")
qe.SetQueryPlanCacheCap(1024)
firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, true, 0)
firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, true)
if err != nil {
t.Fatal(err)
}
@ -261,7 +261,7 @@ func TestNoQueryPlanCacheDirective(t *testing.T) {
ctx := context.Background()
logStats := tabletenv.NewLogStats(ctx, "GetPlanStats")
qe.SetQueryPlanCacheCap(1024)
firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, false, 0)
firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, false)
if err != nil {
t.Fatal(err)
}
@ -285,7 +285,7 @@ func TestStatsURL(t *testing.T) {
// warm up cache
ctx := context.Background()
logStats := tabletenv.NewLogStats(ctx, "GetPlanStats")
qe.GetPlan(ctx, logStats, query, false, 0)
qe.GetPlan(ctx, logStats, query, false)
request, _ := http.NewRequest("GET", "/debug/tablet_plans", nil)
response := httptest.NewRecorder()
@ -390,7 +390,7 @@ func BenchmarkPlanCacheThroughput(b *testing.B) {
for i := 0; i < b.N; i++ {
query := fmt.Sprintf("SELECT (a, b, c) FROM test_table_%d", rand.Intn(500))
_, err := qe.GetPlan(ctx, logStats, query, false, 0)
_, err := qe.GetPlan(ctx, logStats, query, false)
if err != nil {
b.Fatal(err)
}
@ -421,7 +421,7 @@ func benchmarkPlanCache(b *testing.B, db *fakesqldb.DB, lfu bool, par int) {
for pb.Next() {
query := fmt.Sprintf("SELECT (a, b, c) FROM test_table_%d", rand.Intn(500))
_, err := qe.GetPlan(ctx, logStats, query, false, 0)
_, err := qe.GetPlan(ctx, logStats, query, false)
require.NoErrorf(b, err, "bad query: %s", query)
}
})
@ -547,7 +547,7 @@ func TestPlanCachePollution(t *testing.T) {
query := sample()
start := time.Now()
_, err := qe.GetPlan(ctx, logStats, query, false, 0)
_, err := qe.GetPlan(ctx, logStats, query, false)
require.NoErrorf(t, err, "bad query: %s", query)
stats.interval += time.Since(start)

Просмотреть файл

@ -59,6 +59,7 @@ type QueryExecutor struct {
logStats *tabletenv.LogStats
tsv *TabletServer
tabletType topodatapb.TabletType
settings []string
}
const (
@ -126,7 +127,7 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
qre.tsv.Stats().ResultHistogram.Add(int64(len(reply.Rows)))
}(time.Now())
if err := qre.checkPermissions(); err != nil {
if err = qre.checkPermissions(); err != nil {
return nil, err
}
@ -135,12 +136,18 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
}
if qre.connID != 0 {
var conn *StatefulConnection
// Need upfront connection for DMLs and transactions
conn, err := qre.tsv.te.txPool.GetAndLock(qre.connID, "for query")
conn, err = qre.tsv.te.txPool.GetAndLock(qre.connID, "for query")
if err != nil {
return nil, err
}
defer conn.Unlock()
if len(qre.settings) > 0 {
if err = conn.ApplySettings(qre.ctx, qre.settings); err != nil {
return nil, vterrors.Wrap(err, "failed to execute system settings on the connection")
}
}
return qre.txConnExec(conn)
}
@ -159,9 +166,7 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
return nil, err
}
return qr, nil
case p.PlanOtherRead, p.PlanOtherAdmin, p.PlanFlush:
return qre.execOther()
case p.PlanSavepoint, p.PlanRelease, p.PlanSRollback:
case p.PlanOtherRead, p.PlanOtherAdmin, p.PlanFlush, p.PlanSavepoint, p.PlanRelease, p.PlanSRollback:
return qre.execOther()
case p.PlanInsert, p.PlanUpdate, p.PlanDelete, p.PlanInsertMessage, p.PlanDDL, p.PlanLoad:
return qre.execAutocommit(qre.txConnExec)
@ -177,6 +182,12 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
return qre.execShowMigrationLogs()
case p.PlanShowThrottledApps:
return qre.execShowThrottledApps()
case p.PlanSet:
if len(qre.settings) == 0 {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "[BUG] %s not allowed without settings connection", qre.query)
}
// The execution is not required as this setting will be applied when any other query type is executed.
return &sqltypes.Result{}, nil
}
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] %s unexpected plan type", qre.plan.PlanID.String())
}
@ -187,7 +198,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}
qre.options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT
conn, _, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil)
conn, _, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.settings)
if err != nil {
return nil, err
@ -198,7 +209,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}
func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil)
conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.settings)
if err != nil {
return nil, err
}
@ -238,7 +249,7 @@ func (qre *QueryExecutor) txConnExec(conn *StatefulConnection) (*sqltypes.Result
return qre.execStatefulConn(conn, qre.query, true)
case p.PlanSavepoint, p.PlanRelease, p.PlanSRollback:
return qre.execStatefulConn(conn, qre.query, true)
case p.PlanSelect, p.PlanSelectImpossible, p.PlanShow:
case p.PlanSelect, p.PlanSelectImpossible, p.PlanShow, p.PlanSelectLockFunc:
maxrows := qre.getSelectLimit()
qre.bindVars["#maxLimit"] = sqltypes.Int64BindVariable(maxrows + 1)
if qre.bindVars[sqltypes.BvReplaceSchemaName] != nil {
@ -316,6 +327,11 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error {
return err
}
defer txConn.Unlock()
if len(qre.settings) > 0 {
if err = txConn.ApplySettings(qre.ctx, qre.settings); err != nil {
return vterrors.Wrap(err, "failed to execute system settings on the connection")
}
}
conn = txConn.UnderlyingDBConn()
} else {
dbConn, err := qre.getStreamConn()
@ -696,7 +712,7 @@ func (qre *QueryExecutor) getConn() (*connpool.DBConn, error) {
defer span.Finish()
start := time.Now()
conn, err := qre.tsv.qe.conns.Get(ctx)
conn, err := qre.tsv.qe.conns.Get(ctx, qre.settings)
switch err {
case nil:
@ -713,7 +729,7 @@ func (qre *QueryExecutor) getStreamConn() (*connpool.DBConn, error) {
defer span.Finish()
start := time.Now()
conn, err := qre.tsv.qe.streamConns.Get(ctx)
conn, err := qre.tsv.qe.streamConns.Get(ctx, qre.settings)
switch err {
case nil:
qre.logStats.WaitingForConnection += time.Since(start)

Просмотреть файл

@ -1273,7 +1273,7 @@ func newTransaction(tsv *TabletServer, options *querypb.ExecuteOptions) int64 {
func newTestQueryExecutor(ctx context.Context, tsv *TabletServer, sql string, txID int64) *QueryExecutor {
logStats := tabletenv.NewLogStats(ctx, "TestQueryExecutor")
plan, err := tsv.qe.GetPlan(ctx, logStats, sql, false, 0)
plan, err := tsv.qe.GetPlan(ctx, logStats, sql, false)
if err != nil {
panic(err)
}

Просмотреть файл

@ -170,7 +170,7 @@ func (r *heartbeatReader) readHeartbeat() {
// fetchMostRecentHeartbeat fetches the most recently recorded heartbeat from the heartbeat table,
// returning a result with the timestamp of the heartbeat.
func (r *heartbeatReader) fetchMostRecentHeartbeat(ctx context.Context) (*sqltypes.Result, error) {
conn, err := r.pool.Get(ctx)
conn, err := r.pool.Get(ctx, nil)
if err != nil {
return nil, err
}

Просмотреть файл

@ -317,7 +317,7 @@ func (se *Engine) reload(ctx context.Context) error {
se.env.LogError()
}()
conn, err := se.conns.Get(ctx)
conn, err := se.conns.Get(ctx, nil)
if err != nil {
return err
}
@ -589,7 +589,7 @@ func (se *Engine) GetSchema() map[string]*Table {
// GetConnection returns a connection from the pool
func (se *Engine) GetConnection(ctx context.Context) (*connpool.DBConn, error) {
return se.conns.Get(ctx)
return se.conns.Get(ctx, nil)
}
func (se *Engine) handleDebugSchema(response http.ResponseWriter, request *http.Request) {

Просмотреть файл

@ -159,7 +159,7 @@ func (h *historian) GetTableForPos(tableName sqlparser.IdentifierCS, gtid string
// loadFromDB loads all rows from the schema_version table that the historian does not have as yet
// caller should have locked h.mu
func (h *historian) loadFromDB(ctx context.Context) error {
conn, err := h.conns.Get(ctx)
conn, err := h.conns.Get(ctx, nil)
if err != nil {
return err
}

Просмотреть файл

@ -211,7 +211,7 @@ func newTestLoadTable(tableType string, comment string, db *fakesqldb.DB) (*Tabl
IdleTimeoutSeconds: 10,
})
connPool.Open(appParams, dbaParams, appParams)
conn, err := connPool.Get(ctx)
conn, err := connPool.Get(ctx, nil)
if err != nil {
return nil, err
}

Просмотреть файл

@ -292,3 +292,10 @@ func (sc *StatefulConnection) getUsername() string {
}
return callerid.GetUsername(sc.reservedProps.ImmediateCaller)
}
func (sc *StatefulConnection) ApplySettings(ctx context.Context, settings []string) error {
if sc.dbConn.IsSameSetting(settings) {
return nil
}
return sc.dbConn.ApplySettings(ctx, settings)
}

Просмотреть файл

@ -168,15 +168,15 @@ func (sf *StatefulConnectionPool) GetAndLock(id int64, reason string) (*Stateful
// NewConn creates a new StatefulConnection. It will be created from either the normal pool or
// the found_rows pool, depending on the options provided
func (sf *StatefulConnectionPool) NewConn(ctx context.Context, options *querypb.ExecuteOptions) (*StatefulConnection, error) {
func (sf *StatefulConnectionPool) NewConn(ctx context.Context, options *querypb.ExecuteOptions, settings []string) (*StatefulConnection, error) {
var conn *connpool.DBConn
var err error
if options.GetClientFoundRows() {
conn, err = sf.foundRowsPool.Get(ctx)
conn, err = sf.foundRowsPool.Get(ctx, settings)
} else {
conn, err = sf.conns.Get(ctx)
conn, err = sf.conns.Get(ctx, settings)
}
if err != nil {
return nil, err

Просмотреть файл

@ -42,11 +42,11 @@ func TestActivePoolClientRowsFound(t *testing.T) {
startNormalSize := pool.conns.Available()
startFoundRowsSize := pool.foundRowsPool.Available()
conn1, err := pool.NewConn(ctx, &querypb.ExecuteOptions{})
conn1, err := pool.NewConn(ctx, &querypb.ExecuteOptions{}, nil)
require.NoError(t, err)
assert.Equal(t, startNormalSize-1, pool.conns.Available(), "default pool not used")
conn2, err := pool.NewConn(ctx, &querypb.ExecuteOptions{ClientFoundRows: true})
conn2, err := pool.NewConn(ctx, &querypb.ExecuteOptions{ClientFoundRows: true}, nil)
require.NoError(t, err)
assert.Equal(t, startFoundRowsSize-1, pool.conns.Available(), "foundRows pool not used")
@ -62,15 +62,15 @@ func TestActivePoolForAllTxProps(t *testing.T) {
defer db.Close()
pool := newActivePool()
pool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
conn1, err := pool.NewConn(ctx, &querypb.ExecuteOptions{})
conn1, err := pool.NewConn(ctx, &querypb.ExecuteOptions{}, nil)
require.NoError(t, err)
conn1.txProps = &tx.Properties{}
conn2, err := pool.NewConn(ctx, &querypb.ExecuteOptions{})
conn2, err := pool.NewConn(ctx, &querypb.ExecuteOptions{}, nil)
require.NoError(t, err)
// for the second connection, we are not going to set a tx state
conn3, err := pool.NewConn(ctx, &querypb.ExecuteOptions{})
conn3, err := pool.NewConn(ctx, &querypb.ExecuteOptions{}, nil)
require.NoError(t, err)
conn3.txProps = &tx.Properties{}
@ -90,20 +90,20 @@ func TestStatefulPoolShutdownNonTx(t *testing.T) {
pool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
// conn1 non-tx, not in use.
conn1, err := pool.NewConn(ctx, &querypb.ExecuteOptions{})
conn1, err := pool.NewConn(ctx, &querypb.ExecuteOptions{}, nil)
require.NoError(t, err)
conn1.Taint(ctx, nil)
conn1.Unlock()
// conn2 tx, not in use.
conn2, err := pool.NewConn(ctx, &querypb.ExecuteOptions{})
conn2, err := pool.NewConn(ctx, &querypb.ExecuteOptions{}, nil)
require.NoError(t, err)
conn2.Taint(ctx, nil)
conn2.txProps = &tx.Properties{}
conn2.Unlock()
// conn3 non-tx, in use.
conn3, err := pool.NewConn(ctx, &querypb.ExecuteOptions{})
conn3, err := pool.NewConn(ctx, &querypb.ExecuteOptions{}, nil)
require.NoError(t, err)
conn3.Taint(ctx, nil)
@ -128,13 +128,13 @@ func TestStatefulPoolShutdownAll(t *testing.T) {
pool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
// conn1 not in use
conn1, err := pool.NewConn(ctx, &querypb.ExecuteOptions{})
conn1, err := pool.NewConn(ctx, &querypb.ExecuteOptions{}, nil)
require.NoError(t, err)
conn1.txProps = &tx.Properties{}
conn1.Unlock()
// conn2 in use.
conn2, err := pool.NewConn(ctx, &querypb.ExecuteOptions{})
conn2, err := pool.NewConn(ctx, &querypb.ExecuteOptions{}, nil)
require.NoError(t, err)
conn2.txProps = &tx.Properties{}
@ -162,7 +162,7 @@ func TestExecWithAbortedCtx(t *testing.T) {
defer db.Close()
pool := newActivePool()
pool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
conn, err := pool.NewConn(ctx, &querypb.ExecuteOptions{})
conn, err := pool.NewConn(ctx, &querypb.ExecuteOptions{}, nil)
require.NoError(t, err)
cancel()
_, err = conn.Exec(ctx, "", 0, false)
@ -174,7 +174,7 @@ func TestExecWithDbconnClosed(t *testing.T) {
defer db.Close()
pool := newActivePool()
pool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
conn, err := pool.NewConn(ctx, &querypb.ExecuteOptions{})
conn, err := pool.NewConn(ctx, &querypb.ExecuteOptions{}, nil)
require.NoError(t, err)
conn.Close()
@ -187,7 +187,7 @@ func TestExecWithDbconnClosedHavingTx(t *testing.T) {
defer db.Close()
pool := newActivePool()
pool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
conn, err := pool.NewConn(ctx, &querypb.ExecuteOptions{})
conn, err := pool.NewConn(ctx, &querypb.ExecuteOptions{}, nil)
require.NoError(t, err)
conn.txProps = &tx.Properties{Conclusion: "foobar"}
conn.Close()
@ -201,13 +201,13 @@ func TestFailOnConnectionRegistering(t *testing.T) {
defer db.Close()
pool := newActivePool()
pool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
conn, err := pool.NewConn(ctx, &querypb.ExecuteOptions{})
conn, err := pool.NewConn(ctx, &querypb.ExecuteOptions{}, nil)
require.NoError(t, err)
defer conn.Close()
pool.lastID.Set(conn.ConnID - 1)
_, err = pool.NewConn(ctx, &querypb.ExecuteOptions{})
_, err = pool.NewConn(ctx, &querypb.ExecuteOptions{}, nil)
require.Error(t, err, "already present")
}

Просмотреть файл

@ -151,6 +151,7 @@ func init() {
flag.BoolVar(&enableReplicationReporter, "enable_replication_reporter", false, "Use polling to track replication lag.")
flag.BoolVar(&currentConfig.EnableOnlineDDL, "queryserver_enable_online_ddl", true, "Enable online DDL.")
flag.BoolVar(&currentConfig.SanitizeLogMessages, "sanitize_log_messages", false, "Remove potentially sensitive information in tablet INFO, WARNING, and ERROR log messages such as query parameters.")
flag.BoolVar(&currentConfig.EnableSettingsPool, "queryserver_enable_settings_pool", false, "Enable pooling of connections with modified system settings")
flag.Int64Var(&currentConfig.RowStreamer.MaxInnoDBTrxHistLen, "vreplication_copy_phase_max_innodb_history_list_length", 1000000, "The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet.")
flag.Int64Var(&currentConfig.RowStreamer.MaxMySQLReplLagSecs, "vreplication_copy_phase_max_mysql_replication_lag", 43200, "The maximum MySQL replication lag (in seconds) that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet.")
@ -279,6 +280,7 @@ type TabletConfig struct {
EnforceStrictTransTables bool `json:"-"`
EnableOnlineDDL bool `json:"-"`
EnableSettingsPool bool `json:"-"`
RowStreamer RowStreamerConfig `json:"rowStreamer,omitempty"`
}

Просмотреть файл

@ -45,10 +45,6 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
@ -70,6 +66,11 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
"vitess.io/vitess/go/vt/vttablet/vexec"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
// logPoolFull is for throttling transaction / query pool full messages in the log.
@ -477,10 +478,10 @@ func (tsv *TabletServer) SchemaEngine() *schema.Engine {
// Begin starts a new transaction. This is allowed only if the state is StateServing.
func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (state queryservice.TransactionState, err error) {
return tsv.begin(ctx, target, nil, 0, options)
return tsv.begin(ctx, target, nil, 0, nil, options)
}
func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, preQueries []string, reservedID int64, options *querypb.ExecuteOptions) (state queryservice.TransactionState, err error) {
func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, savepointQueries []string, reservedID int64, settings []string, options *querypb.ExecuteOptions) (state queryservice.TransactionState, err error) {
state.TabletAlias = tsv.alias
err = tsv.execRequest(
ctx, tsv.QueryTimeout.Get(),
@ -491,7 +492,7 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, preQ
if tsv.txThrottler.Throttle() {
return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled")
}
transactionID, beginSQL, sessionStateChanges, err := tsv.te.Begin(ctx, preQueries, reservedID, options)
transactionID, beginSQL, sessionStateChanges, err := tsv.te.Begin(ctx, savepointQueries, reservedID, settings, options)
state.TransactionID = transactionID
state.SessionStateChanges = sessionStateChanges
logStats.TransactionID = transactionID
@ -715,6 +716,10 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "[BUG] transactionID and reserveID must match if both are non-zero")
}
return tsv.execute(ctx, target, sql, bindVariables, transactionID, reservedID, nil, options)
}
func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, reservedID int64, settings []string, options *querypb.ExecuteOptions) (result *sqltypes.Result, err error) {
allowOnShutdown := false
timeout := tsv.QueryTimeout.Get()
if transactionID != 0 {
@ -732,10 +737,13 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq
bindVariables = make(map[string]*querypb.BindVariable)
}
query, comments := sqlparser.SplitMarginComments(sql)
plan, err := tsv.qe.GetPlan(ctx, logStats, query, skipQueryPlanCache(options), reservedID)
plan, err := tsv.qe.GetPlan(ctx, logStats, query, skipQueryPlanCache(options))
if err != nil {
return err
}
if !plan.IsValid(reservedID != 0, len(settings) > 0) {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s not allowed without reserved connection", plan.PlanID.String())
}
// If both the values are non-zero then by design they are same value. So, it is safe to overwrite.
connID := reservedID
if transactionID != 0 {
@ -754,6 +762,7 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq
logStats: logStats,
tsv: tsv,
tabletType: target.GetTabletType(),
settings: settings,
}
result, err = qre.Execute()
if err != nil {
@ -803,6 +812,10 @@ func (tsv *TabletServer) StreamExecute(ctx context.Context, target *querypb.Targ
return vterrors.New(vtrpcpb.Code_INTERNAL, "[BUG] transactionID and reserveID must match if both are non-zero")
}
return tsv.streamExecute(ctx, target, sql, bindVariables, transactionID, reservedID, nil, options, callback)
}
func (tsv *TabletServer) streamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, reservedID int64, settings []string, options *querypb.ExecuteOptions, callback func(*sqltypes.Result) error) error {
allowOnShutdown := false
var timeout time.Duration
if transactionID != 0 {
@ -820,11 +833,13 @@ func (tsv *TabletServer) StreamExecute(ctx context.Context, target *querypb.Targ
bindVariables = make(map[string]*querypb.BindVariable)
}
query, comments := sqlparser.SplitMarginComments(sql)
// TODO: update the isReservedConn logic when StreamExecute supports reserved connections.
plan, err := tsv.qe.GetStreamPlan(query, false /* isReservedConn */)
plan, err := tsv.qe.GetStreamPlan(query)
if err != nil {
return err
}
if !plan.IsValid(reservedID != 0, len(settings) > 0) {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s not allowed without reserved connection", plan.PlanID.String())
}
// If both the values are non-zero then by design they are same value. So, it is safe to overwrite.
connID := reservedID
if transactionID != 0 {
@ -840,6 +855,7 @@ func (tsv *TabletServer) StreamExecute(ctx context.Context, target *querypb.Targ
ctx: ctx,
logStats: logStats,
tsv: tsv,
settings: settings,
}
return qre.Stream(callback)
},
@ -860,7 +876,7 @@ func (tsv *TabletServer) BeginExecute(ctx context.Context, target *querypb.Targe
}
}
state, err := tsv.begin(ctx, target, preQueries, reservedID, options)
state, err := tsv.begin(ctx, target, preQueries, reservedID, nil, options)
if err != nil {
return state, nil, err
}
@ -880,7 +896,7 @@ func (tsv *TabletServer) BeginStreamExecute(
options *querypb.ExecuteOptions,
callback func(*sqltypes.Result) error,
) (queryservice.TransactionState, error) {
state, err := tsv.begin(ctx, target, preQueries, reservedID, options)
state, err := tsv.begin(ctx, target, preQueries, reservedID, nil, options)
if err != nil {
return state, err
}
@ -936,7 +952,7 @@ func (tsv *TabletServer) beginWaitForSameRangeTransactions(ctx context.Context,
func (tsv *TabletServer) computeTxSerializerKey(ctx context.Context, logStats *tabletenv.LogStats, sql string, bindVariables map[string]*querypb.BindVariable) (string, string) {
// Strip trailing comments so we don't pollute the query cache.
sql, _ = sqlparser.SplitMarginComments(sql)
plan, err := tsv.qe.GetPlan(ctx, logStats, sql, false, 0)
plan, err := tsv.qe.GetPlan(ctx, logStats, sql, false)
if err != nil {
logComputeRowSerializerKey.Errorf("failed to get plan for query: %v err: %v", sql, err)
return "", ""
@ -1099,6 +1115,19 @@ func (tsv *TabletServer) VStreamResults(ctx context.Context, target *querypb.Tar
// ReserveBeginExecute implements the QueryService interface
func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (state queryservice.ReservedTransactionState, result *sqltypes.Result, err error) {
if tsv.config.EnableSettingsPool {
state, result, err = tsv.beginExecuteWithSettings(ctx, target, preQueries, postBeginQueries, sql, bindVariables, options)
// If there is an error and the error message is about allowing query in reserved connection only,
// then we do not return an error from here and continue to use the reserved connection path.
// This is specially for get_lock function call from vtgate that needs a reserved connection.
if err == nil || !strings.Contains(err.Error(), "not allowed without reserved connection") {
return state, result, err
}
// rollback if transaction was started.
if state.TransactionID != 0 {
_, _ = tsv.Rollback(ctx, target, state.TransactionID)
}
}
var connID int64
var sessionStateChanges string
state.TabletAlias = tsv.alias
@ -1126,7 +1155,7 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp
state.TransactionID = connID
state.SessionStateChanges = sessionStateChanges
result, err = tsv.Execute(ctx, target, sql, bindVariables, state.TransactionID, state.ReservedID, options)
result, err = tsv.execute(ctx, target, sql, bindVariables, state.TransactionID, state.ReservedID, nil, options)
return state, result, err
}
@ -1141,6 +1170,10 @@ func (tsv *TabletServer) ReserveBeginStreamExecute(
options *querypb.ExecuteOptions,
callback func(*sqltypes.Result) error,
) (state queryservice.ReservedTransactionState, err error) {
if tsv.config.EnableSettingsPool {
return tsv.beginStreamExecuteWithSettings(ctx, target, preQueries, postBeginQueries, sql, bindVariables, options, callback)
}
var connID int64
var sessionStateChanges string
@ -1168,12 +1201,22 @@ func (tsv *TabletServer) ReserveBeginStreamExecute(
state.TabletAlias = tsv.alias
state.SessionStateChanges = sessionStateChanges
err = tsv.StreamExecute(ctx, target, sql, bindVariables, state.TransactionID, state.ReservedID, options, callback)
err = tsv.streamExecute(ctx, target, sql, bindVariables, state.TransactionID, state.ReservedID, nil, options, callback)
return state, err
}
// ReserveExecute implements the QueryService interface
func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions) (state queryservice.ReservedState, result *sqltypes.Result, err error) {
if tsv.config.EnableSettingsPool {
result, err = tsv.executeWithSettings(ctx, target, preQueries, sql, bindVariables, transactionID, options)
// If there is an error and the error message is about allowing query in reserved connection only,
// then we do not return an error from here and continue to use the reserved connection path.
// This is specially for get_lock function call from vtgate that needs a reserved connection.
if err == nil || !strings.Contains(err.Error(), "not allowed without reserved connection") {
return state, result, err
}
}
state.TabletAlias = tsv.alias
allowOnShutdown := false
@ -1204,7 +1247,7 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar
return state, nil, err
}
result, err = tsv.Execute(ctx, target, sql, bindVariables, state.ReservedID, state.ReservedID, options)
result, err = tsv.execute(ctx, target, sql, bindVariables, state.ReservedID, state.ReservedID, nil, options)
return state, result, err
}
@ -1219,6 +1262,10 @@ func (tsv *TabletServer) ReserveStreamExecute(
options *querypb.ExecuteOptions,
callback func(*sqltypes.Result) error,
) (state queryservice.ReservedState, err error) {
if tsv.config.EnableSettingsPool {
return state, tsv.streamExecute(ctx, target, sql, bindVariables, transactionID, 0, preQueries, options, callback)
}
state.TabletAlias = tsv.alias
allowOnShutdown := false
@ -1249,7 +1296,7 @@ func (tsv *TabletServer) ReserveStreamExecute(
return state, err
}
err = tsv.StreamExecute(ctx, target, sql, bindVariables, state.ReservedID, state.ReservedID, options, callback)
err = tsv.streamExecute(ctx, target, sql, bindVariables, state.ReservedID, state.ReservedID, nil, options, callback)
return state, err
}
@ -1277,6 +1324,42 @@ func (tsv *TabletServer) Release(ctx context.Context, target *querypb.Target, tr
)
}
func (tsv *TabletServer) executeWithSettings(ctx context.Context, target *querypb.Target, settings []string, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions) (result *sqltypes.Result, err error) {
span, ctx := trace.NewSpan(ctx, "TabletServer.ExecuteWithSettings")
trace.AnnotateSQL(span, sqlparser.Preview(sql))
defer span.Finish()
return tsv.execute(ctx, target, sql, bindVariables, transactionID, 0, settings, options)
}
func (tsv *TabletServer) beginExecuteWithSettings(ctx context.Context, target *querypb.Target, settings []string, savepointQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (queryservice.ReservedTransactionState, *sqltypes.Result, error) {
txState, err := tsv.begin(ctx, target, savepointQueries, 0, settings, options)
if err != nil {
return txToReserveState(txState), nil, err
}
result, err := tsv.execute(ctx, target, sql, bindVariables, txState.TransactionID, 0, settings, options)
return txToReserveState(txState), result, err
}
func (tsv *TabletServer) beginStreamExecuteWithSettings(ctx context.Context, target *querypb.Target, settings []string, savepointQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(*sqltypes.Result) error) (queryservice.ReservedTransactionState, error) {
txState, err := tsv.begin(ctx, target, savepointQueries, 0, settings, options)
if err != nil {
return txToReserveState(txState), err
}
err = tsv.streamExecute(ctx, target, sql, bindVariables, txState.TransactionID, 0, settings, options, callback)
return txToReserveState(txState), err
}
func txToReserveState(state queryservice.TransactionState) queryservice.ReservedTransactionState {
return queryservice.ReservedTransactionState{
TabletAlias: state.TabletAlias,
TransactionID: state.TransactionID,
SessionStateChanges: state.SessionStateChanges,
}
}
// execRequest performs verifications, sets up the necessary environments
// and calls the supplied function for executing the request.
func (tsv *TabletServer) execRequest(

Просмотреть файл

@ -336,7 +336,7 @@ func (throttler *Throttler) readSelfMySQLThrottleMetric() *mysql.MySQLThrottleMe
Err: nil,
}
ctx := context.Background()
conn, err := throttler.pool.Get(ctx)
conn, err := throttler.pool.Get(ctx, nil)
if err != nil {
metric.Err = err
return metric

Просмотреть файл

@ -262,7 +262,7 @@ func (tpc *TwoPC) DeleteRedo(ctx context.Context, conn *StatefulConnection, dtid
// ReadAllRedo returns all the prepared transactions from the redo logs.
func (tpc *TwoPC) ReadAllRedo(ctx context.Context) (prepared, failed []*tx.PreparedTx, err error) {
conn, err := tpc.readPool.Get(ctx)
conn, err := tpc.readPool.Get(ctx, nil)
if err != nil {
return nil, nil, err
}
@ -306,7 +306,7 @@ func (tpc *TwoPC) ReadAllRedo(ctx context.Context) (prepared, failed []*tx.Prepa
// CountUnresolvedRedo returns the number of prepared transactions that are still unresolved.
func (tpc *TwoPC) CountUnresolvedRedo(ctx context.Context, unresolvedTime time.Time) (int64, error) {
conn, err := tpc.readPool.Get(ctx)
conn, err := tpc.readPool.Get(ctx, nil)
if err != nil {
return 0, err
}
@ -391,7 +391,7 @@ func (tpc *TwoPC) DeleteTransaction(ctx context.Context, conn *StatefulConnectio
// ReadTransaction returns the metadata for the transaction.
func (tpc *TwoPC) ReadTransaction(ctx context.Context, dtid string) (*querypb.TransactionMetadata, error) {
conn, err := tpc.readPool.Get(ctx)
conn, err := tpc.readPool.Get(ctx, nil)
if err != nil {
return nil, err
}
@ -441,7 +441,7 @@ func (tpc *TwoPC) ReadTransaction(ctx context.Context, dtid string) (*querypb.Tr
// ReadAbandoned returns the list of abandoned transactions
// and their associated start time.
func (tpc *TwoPC) ReadAbandoned(ctx context.Context, abandonTime time.Time) (map[string]time.Time, error) {
conn, err := tpc.readPool.Get(ctx)
conn, err := tpc.readPool.Get(ctx, nil)
if err != nil {
return nil, err
}
@ -467,7 +467,7 @@ func (tpc *TwoPC) ReadAbandoned(ctx context.Context, abandonTime time.Time) (map
// ReadAllTransactions returns info about all distributed transactions.
func (tpc *TwoPC) ReadAllTransactions(ctx context.Context) ([]*tx.DistributedTx, error) {
conn, err := tpc.readPool.Get(ctx)
conn, err := tpc.readPool.Get(ctx, nil)
if err != nil {
return nil, err
}

Просмотреть файл

@ -39,7 +39,7 @@ func TestReadAllRedo(t *testing.T) {
tpc := tsv.te.twoPC
ctx := context.Background()
conn, err := tsv.qe.conns.Get(ctx)
conn, err := tsv.qe.conns.Get(ctx, nil)
if err != nil {
t.Fatal(err)
}
@ -243,7 +243,7 @@ func TestReadAllTransactions(t *testing.T) {
tpc := tsv.te.twoPC
ctx := context.Background()
conn, err := tsv.qe.conns.Get(ctx)
conn, err := tsv.qe.conns.Get(ctx, nil)
if err != nil {
t.Fatal(err)
}

Просмотреть файл

@ -22,23 +22,22 @@ import (
"sync"
"time"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/dtids"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
"vitess.io/vitess/go/vt/vttablet/tabletserver/txlimiter"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
type txEngineState int
@ -223,16 +222,22 @@ func (te *TxEngine) isTxPoolAvailable(addToWaitGroup func(int)) error {
// statement(s) used to execute the begin (if any).
//
// Subsequent statements can access the connection through the transaction id.
func (te *TxEngine) Begin(ctx context.Context, preQueries []string, reservedID int64, options *querypb.ExecuteOptions) (int64, string, string, error) {
func (te *TxEngine) Begin(ctx context.Context, savepointQueries []string, reservedID int64, settings []string, options *querypb.ExecuteOptions) (int64, string, string, error) {
span, ctx := trace.NewSpan(ctx, "TxEngine.Begin")
defer span.Finish()
// if the connection is already reserved then, we should not apply the settings.
if reservedID != 0 && len(settings) > 0 {
return 0, "", "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] should not mix reserved connection and connection with settings")
}
err := te.isTxPoolAvailable(te.beginRequests.Add)
if err != nil {
return 0, "", "", err
}
defer te.beginRequests.Done()
conn, beginSQL, sessionStateChanges, err := te.txPool.Begin(ctx, options, te.state == AcceptingReadOnly, reservedID, preQueries)
conn, beginSQL, sessionStateChanges, err := te.txPool.Begin(ctx, options, te.state == AcceptingReadOnly, reservedID, savepointQueries, settings)
if err != nil {
return 0, "", "", err
}
@ -389,7 +394,7 @@ outer:
if txid > maxid {
maxid = txid
}
conn, _, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
if err != nil {
allErr.RecordError(err)
continue
@ -504,7 +509,7 @@ func (te *TxEngine) stopWatchdog() {
}
// ReserveBegin creates a reserved connection, and in it opens a transaction
func (te *TxEngine) ReserveBegin(ctx context.Context, options *querypb.ExecuteOptions, preQueries []string, postBeginQueries []string) (int64, string, error) {
func (te *TxEngine) ReserveBegin(ctx context.Context, options *querypb.ExecuteOptions, preQueries []string, savepointQueries []string) (int64, string, error) {
span, ctx := trace.NewSpan(ctx, "TxEngine.ReserveBegin")
defer span.Finish()
err := te.isTxPoolAvailable(te.beginRequests.Add)
@ -518,7 +523,7 @@ func (te *TxEngine) ReserveBegin(ctx context.Context, options *querypb.ExecuteOp
return 0, "", err
}
defer conn.UnlockUpdateTime()
_, sessionStateChanges, err := te.txPool.begin(ctx, options, te.state == AcceptingReadOnly, conn, postBeginQueries)
_, sessionStateChanges, err := te.txPool.begin(ctx, options, te.state == AcceptingReadOnly, conn, savepointQueries)
if err != nil {
conn.Close()
conn.Release(tx.ConnInitFail)
@ -561,7 +566,7 @@ func (te *TxEngine) Reserve(ctx context.Context, options *querypb.ExecuteOptions
// Reserve creates a reserved connection and returns the id to it
func (te *TxEngine) reserve(ctx context.Context, options *querypb.ExecuteOptions, preQueries []string) (*StatefulConnection, error) {
conn, err := te.txPool.scp.NewConn(ctx, options)
conn, err := te.txPool.scp.NewConn(ctx, options, nil)
if err != nil {
return nil, err
}

Просмотреть файл

@ -58,11 +58,11 @@ func TestTxEngineClose(t *testing.T) {
// Normal close with timeout wait.
te.AcceptReadWrite()
c, beginSQL, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
c, beginSQL, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err)
require.Equal(t, "begin", beginSQL)
c.Unlock()
c, beginSQL, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
c, beginSQL, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err)
require.Equal(t, "begin", beginSQL)
c.Unlock()
@ -74,7 +74,7 @@ func TestTxEngineClose(t *testing.T) {
// Immediate close.
te.AcceptReadOnly()
c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
if err != nil {
t.Fatal(err)
}
@ -86,7 +86,7 @@ func TestTxEngineClose(t *testing.T) {
// Normal close with short grace period.
te.shutdownGracePeriod = 25 * time.Millisecond
te.AcceptReadWrite()
c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err)
c.Unlock()
start = time.Now()
@ -97,7 +97,7 @@ func TestTxEngineClose(t *testing.T) {
// Normal close with short grace period, but pool gets empty early.
te.shutdownGracePeriod = 25 * time.Millisecond
te.AcceptReadWrite()
c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err)
c.Unlock()
go func() {
@ -113,7 +113,7 @@ func TestTxEngineClose(t *testing.T) {
// Immediate close, but connection is in use.
te.AcceptReadOnly()
c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err)
go func() {
time.Sleep(100 * time.Millisecond)
@ -153,7 +153,7 @@ func TestTxEngineBegin(t *testing.T) {
for _, exec := range []func() (int64, string, error){
func() (int64, string, error) {
tx, _, schemaStateChanges, err := te.Begin(ctx, nil, 0, &querypb.ExecuteOptions{})
tx, _, schemaStateChanges, err := te.Begin(ctx, nil, 0, nil, &querypb.ExecuteOptions{})
return tx, schemaStateChanges, err
},
func() (int64, string, error) {
@ -204,7 +204,7 @@ func TestTxEngineRenewFails(t *testing.T) {
conn.Unlock() // but we keep holding on to it... sneaky....
// this next bit sets up the scp so our renew will fail
conn2, err := te.txPool.scp.NewConn(ctx, options)
conn2, err := te.txPool.scp.NewConn(ctx, options, nil)
require.NoError(t, err)
defer conn2.Release(tx.TxCommit)
te.txPool.scp.lastID.Set(conn2.ConnID - 1)
@ -551,7 +551,7 @@ func startTransaction(te *TxEngine, writeTransaction bool) error {
} else {
options.TransactionIsolation = querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY
}
_, _, _, err := te.Begin(context.Background(), nil, 0, options)
_, _, _, err := te.Begin(context.Background(), nil, 0, nil, options)
return err
}
@ -583,7 +583,7 @@ func TestTxEngineFailReserve(t *testing.T) {
_, err = te.Reserve(ctx, options, nonExistingID, nil)
assert.EqualError(t, err, "transaction 42: not found")
txID, _, _, err := te.Begin(ctx, nil, 0, options)
txID, _, _, err := te.Begin(ctx, nil, 0, nil, options)
require.NoError(t, err)
conn, err := te.txPool.GetAndLock(txID, "for test")
require.NoError(t, err)

Просмотреть файл

@ -117,7 +117,7 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error {
func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) {
txe.te.env.Stats().InternalErrors.Add("TwopcCommit", 1)
txe.te.preparedPool.SetFailed(dtid)
conn, _, _, err := txe.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txe.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
if err != nil {
log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err)
return
@ -260,7 +260,7 @@ func (txe *TxExecutor) ReadTwopcInflight() (distributed []*tx.DistributedTx, pre
}
func (txe *TxExecutor) inTransaction(f func(*StatefulConnection) error) error {
conn, _, _, err := txe.te.txPool.Begin(txe.ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txe.te.txPool.Begin(txe.ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
if err != nil {
return err
}

Просмотреть файл

@ -22,19 +22,16 @@ import (
"time"
"vitess.io/vitess/go/pools"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
"vitess.io/vitess/go/vt/vttablet/tabletserver/txlimiter"
querypb "vitess.io/vitess/go/vt/proto/query"
@ -222,7 +219,7 @@ func (tp *TxPool) Rollback(ctx context.Context, txConn *StatefulConnection) erro
// the statements (if any) executed to initiate the transaction. In autocommit
// mode the statement will be "".
// The connection returned is locked for the callee and its responsibility is to unlock the connection.
func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, readOnly bool, reservedID int64, preQueries []string) (*StatefulConnection, string, string, error) {
func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, readOnly bool, reservedID int64, savepointQueries []string, settings []string) (*StatefulConnection, string, string, error) {
span, ctx := trace.NewSpan(ctx, "TxPool.Begin")
defer span.Finish()
@ -239,7 +236,7 @@ func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, re
if !tp.limiter.Get(immediateCaller, effectiveCaller) {
return nil, "", "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "per-user transaction pool connection limit exceeded")
}
conn, err = tp.createConn(ctx, options)
conn, err = tp.createConn(ctx, options, settings)
defer func() {
if err != nil {
// The transaction limiter frees transactions on rollback or commit. If we fail to create the transaction,
@ -251,7 +248,7 @@ func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, re
if err != nil {
return nil, "", "", err
}
sql, sessionStateChanges, err := tp.begin(ctx, options, readOnly, conn, preQueries)
sql, sessionStateChanges, err := tp.begin(ctx, options, readOnly, conn, savepointQueries)
if err != nil {
conn.Close()
conn.Release(tx.ConnInitFail)
@ -260,10 +257,10 @@ func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, re
return conn, sql, sessionStateChanges, nil
}
func (tp *TxPool) begin(ctx context.Context, options *querypb.ExecuteOptions, readOnly bool, conn *StatefulConnection, preQueries []string) (string, string, error) {
func (tp *TxPool) begin(ctx context.Context, options *querypb.ExecuteOptions, readOnly bool, conn *StatefulConnection, savepointQueries []string) (string, string, error) {
immediateCaller := callerid.ImmediateCallerIDFromContext(ctx)
effectiveCaller := callerid.EffectiveCallerIDFromContext(ctx)
beginQueries, autocommit, sessionStateChanges, err := createTransaction(ctx, options, conn, readOnly, preQueries)
beginQueries, autocommit, sessionStateChanges, err := createTransaction(ctx, options, conn, readOnly, savepointQueries)
if err != nil {
return "", "", err
}
@ -273,8 +270,8 @@ func (tp *TxPool) begin(ctx context.Context, options *querypb.ExecuteOptions, re
return beginQueries, sessionStateChanges, nil
}
func (tp *TxPool) createConn(ctx context.Context, options *querypb.ExecuteOptions) (*StatefulConnection, error) {
conn, err := tp.scp.NewConn(ctx, options)
func (tp *TxPool) createConn(ctx context.Context, options *querypb.ExecuteOptions, settings []string) (*StatefulConnection, error) {
conn, err := tp.scp.NewConn(ctx, options, settings)
if err != nil {
errCode := vterrors.Code(err)
switch err {
@ -290,7 +287,7 @@ func (tp *TxPool) createConn(ctx context.Context, options *querypb.ExecuteOption
return conn, nil
}
func createTransaction(ctx context.Context, options *querypb.ExecuteOptions, conn *StatefulConnection, readOnly bool, preQueries []string) (string, bool, string, error) {
func createTransaction(ctx context.Context, options *querypb.ExecuteOptions, conn *StatefulConnection, readOnly bool, savepointQueries []string) (string, bool, string, error) {
beginQueries := ""
autocommitTransaction := false
@ -332,8 +329,8 @@ func createTransaction(ctx context.Context, options *querypb.ExecuteOptions, con
return "", false, "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "don't know how to open a transaction of this type: %v", options.GetTransactionIsolation())
}
for _, preQuery := range preQueries {
if _, err := conn.Exec(ctx, preQuery, 1, false); err != nil {
for _, savepoint := range savepointQueries {
if _, err := conn.Exec(ctx, savepoint, 1, false); err != nil {
return "", false, "", err
}
}

Просмотреть файл

@ -44,7 +44,7 @@ func TestTxPoolExecuteCommit(t *testing.T) {
sql := "select 'this is a query'"
// begin a transaction and then return the connection
conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err)
id := conn.ReservedID()
@ -76,7 +76,7 @@ func TestTxPoolExecuteRollback(t *testing.T) {
db, txPool, _, closer := setup(t)
defer closer()
conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err)
defer conn.Release(tx.TxRollback)
@ -94,7 +94,7 @@ func TestTxPoolExecuteRollbackOnClosedConn(t *testing.T) {
db, txPool, _, closer := setup(t)
defer closer()
conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err)
defer conn.Release(tx.TxRollback)
@ -112,9 +112,9 @@ func TestTxPoolRollbackNonBusy(t *testing.T) {
defer closer()
// start two transactions, and mark one of them as unused
conn1, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn1, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err)
conn2, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn2, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err)
conn2.Unlock() // this marks conn2 as NonBusy
@ -138,7 +138,7 @@ func TestTxPoolTransactionIsolation(t *testing.T) {
db, txPool, _, closer := setup(t)
defer closer()
c2, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_READ_COMMITTED}, false, 0, nil)
c2, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_READ_COMMITTED}, false, 0, nil, nil)
require.NoError(t, err)
c2.Release(tx.TxClose)
@ -153,7 +153,7 @@ func TestTxPoolAutocommit(t *testing.T) {
// to mysql.
// This test is meaningful because if txPool.Begin were to send a BEGIN statement to the connection, it will fatal
// because is not in the list of expected queries (i.e db.AddQuery hasn't been called).
conn1, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT}, false, 0, nil)
conn1, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT}, false, 0, nil, nil)
require.NoError(t, err)
// run a query to see it in the query log
@ -182,7 +182,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2006_Transient(t *testing.T) {
err := db.WaitForClose(2 * time.Second)
require.NoError(t, err)
txConn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
txConn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err, "Begin should have succeeded after the retry in DBConn.Exec()")
txConn.Release(tx.TxCommit)
}
@ -201,7 +201,7 @@ func primeTxPoolWithConnection(t *testing.T) (*fakesqldb.DB, *TxPool) {
// reused by subsequent transactions.
db.AddQuery("begin", &sqltypes.Result{})
db.AddQuery("rollback", &sqltypes.Result{})
txConn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
txConn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err)
txConn.Release(tx.TxCommit)
@ -221,7 +221,7 @@ func TestTxPoolBeginWithError(t *testing.T) {
}
ctxWithCallerID := callerid.NewContext(ctx, ef, im)
_, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil)
_, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "error: rejected")
require.Equal(t, vtrpcpb.Code_UNKNOWN, vterrors.Code(err), "wrong error code for Begin error")
@ -247,7 +247,7 @@ func TestTxPoolBeginWithPreQueryError(t *testing.T) {
db, txPool, _, closer := setup(t)
defer closer()
db.AddRejectedQuery("pre_query", errRejected)
_, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, []string{"pre_query"})
_, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, []string{"pre_query"}, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "error: rejected")
require.Equal(t, vtrpcpb.Code_UNKNOWN, vterrors.Code(err), "wrong error code for Begin error")
@ -261,7 +261,7 @@ func TestTxPoolCancelledContextError(t *testing.T) {
cancel()
// when
_, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
_, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
// then
require.Error(t, err)
@ -280,12 +280,12 @@ func TestTxPoolWaitTimeoutError(t *testing.T) {
defer closer()
// lock the only connection in the pool.
conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err)
defer conn.Unlock()
// try locking one more connection.
_, _, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
_, _, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
// then
require.Error(t, err)
@ -302,7 +302,7 @@ func TestTxPoolRollbackFailIsPassedThrough(t *testing.T) {
defer closer()
db.AddRejectedQuery("rollback", errRejected)
conn1, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn1, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err)
_, err = conn1.Exec(ctx, sql, 1, true)
@ -319,7 +319,7 @@ func TestTxPoolRollbackFailIsPassedThrough(t *testing.T) {
func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) {
db, txPool, _, _ := setup(t)
defer db.Close()
conn1, _, _, _ := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn1, _, _, _ := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
id := conn1.ReservedID()
conn1.Unlock()
txPool.Close()
@ -341,7 +341,7 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) {
txPool, _ = newTxPool()
txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
conn1, _, _, _ = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn1, _, _, _ = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
id = conn1.ReservedID()
_, err := txPool.Commit(ctx, conn1)
require.NoError(t, err)
@ -355,7 +355,7 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) {
txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
defer txPool.Close()
conn1, _, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn1, _, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err, "unable to start transaction: %v", err)
conn1.Unlock()
id = conn1.ReservedID()
@ -371,7 +371,7 @@ func TestTxPoolCloseKillsStrayTransactions(t *testing.T) {
startingStray := txPool.env.Stats().InternalErrors.Counts()["StrayTransactions"]
// Start stray transaction.
conn, _, _, err := txPool.Begin(context.Background(), &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txPool.Begin(context.Background(), &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err)
conn.Unlock()
@ -400,7 +400,7 @@ func TestTxTimeoutKillsTransactions(t *testing.T) {
ctxWithCallerID := callerid.NewContext(ctx, ef, im)
// Start transaction.
conn, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil, nil)
require.NoError(t, err)
conn.Unlock()