Addresses #2530

This change makes VTTablet recognize that MySQL is running in RBR
mode. If this mode is detected, VTTablet will not rewrite
INSERT...ON DUPLICATE KEY constructs. It will just pass them
through instead. This allows MySQL to be run with CLIENT_FOUND_ROWS
option turned on, and the returned RowsAffected will be the
same as MySQL.

Additionally, if this mode is detected, VTTablet will also
skip adding update stream comments to other DMLs because
they're unnecessary.
This commit is contained in:
Sugu Sougoumarane 2017-04-12 23:23:14 -07:00 коммит произвёл Sugu Sougoumarane
Родитель e54bb14bbf
Коммит 90494660ce
7 изменённых файлов: 265 добавлений и 50 удалений

Просмотреть файл

@ -6,7 +6,6 @@ package dbconnpool
import (
"fmt"
"strings"
"time"
"github.com/youtube/vitess/go/sqldb"
@ -105,37 +104,6 @@ func (dbc *DBConnection) ExecuteStreamFetch(query string, callback func(*sqltype
return nil
}
var (
getModeSQL = "select @@global.sql_mode"
getAutocommit = "select @@autocommit"
)
// VerifyMode is a helper method to verify mysql is running with
// sql_mode = STRICT_TRANS_TABLES and autocommit=ON.
func (dbc *DBConnection) VerifyMode() error {
qr, err := dbc.ExecuteFetch(getModeSQL, 2, false)
if err != nil {
return fmt.Errorf("could not verify mode: %v", err)
}
if len(qr.Rows) != 1 {
return fmt.Errorf("incorrect rowcount received for %s: %d", getModeSQL, len(qr.Rows))
}
if !strings.Contains(qr.Rows[0][0].String(), "STRICT_TRANS_TABLES") {
return fmt.Errorf("require sql_mode to be STRICT_TRANS_TABLES: got %s", qr.Rows[0][0].String())
}
qr, err = dbc.ExecuteFetch(getAutocommit, 2, false)
if err != nil {
return fmt.Errorf("could not verify mode: %v", err)
}
if len(qr.Rows) != 1 {
return fmt.Errorf("incorrect rowcount received for %s: %d", getAutocommit, len(qr.Rows))
}
if !strings.Contains(qr.Rows[0][0].String(), "1") {
return fmt.Errorf("require autocommit to be 1: got %s", qr.Rows[0][0].String())
}
return nil
}
// NewDBConnection returns a new DBConnection based on the ConnParams
// and will use the provided stats to collect timing.
func NewDBConnection(info *sqldb.ConnParams, mysqlStats *stats.Timings) (*DBConnection, error) {

Просмотреть файл

@ -6,6 +6,7 @@ package connpool
import (
"fmt"
"strings"
"sync"
"time"
@ -21,6 +22,16 @@ import (
"golang.org/x/net/context"
)
// BinlogFormat is used for for specifying the binlog format.
type BinlogFormat int
// The following constants specify the possible binlog format values.
const (
BinlogFormatStatement = BinlogFormat(iota)
BinlogFormatRow
BinlogFormatMixed
)
// DBConn is a db connection for tabletserver.
// It performs automatic reconnects as needed.
// Its Execute function has a timeout that can kill
@ -157,9 +168,55 @@ func (dbc *DBConn) streamOnce(ctx context.Context, query string, callback func(*
return dbc.conn.ExecuteStreamFetch(query, callback, streamBufferSize)
}
// VerifyMode returns an error if the connection mode is incorrect.
func (dbc *DBConn) VerifyMode() error {
return dbc.conn.VerifyMode()
var (
getModeSQL = "select @@global.sql_mode"
getAutocommit = "select @@autocommit"
showBinlog = "show variables like 'binlog_format'"
)
// VerifyMode is a helper method to verify mysql is running with
// sql_mode = STRICT_TRANS_TABLES and autocommit=ON. It also returns
// the current binlog format.
func (dbc *DBConn) VerifyMode() (BinlogFormat, error) {
qr, err := dbc.conn.ExecuteFetch(getModeSQL, 2, false)
if err != nil {
return 0, fmt.Errorf("could not verify mode: %v", err)
}
if len(qr.Rows) != 1 {
return 0, fmt.Errorf("incorrect rowcount received for %s: %d", getModeSQL, len(qr.Rows))
}
if !strings.Contains(qr.Rows[0][0].String(), "STRICT_TRANS_TABLES") {
return 0, fmt.Errorf("require sql_mode to be STRICT_TRANS_TABLES: got %s", qr.Rows[0][0].String())
}
qr, err = dbc.conn.ExecuteFetch(getAutocommit, 2, false)
if err != nil {
return 0, fmt.Errorf("could not verify mode: %v", err)
}
if len(qr.Rows) != 1 {
return 0, fmt.Errorf("incorrect rowcount received for %s: %d", getAutocommit, len(qr.Rows))
}
if !strings.Contains(qr.Rows[0][0].String(), "1") {
return 0, fmt.Errorf("require autocommit to be 1: got %s", qr.Rows[0][0].String())
}
qr, err = dbc.conn.ExecuteFetch(showBinlog, 10, false)
if err != nil {
return 0, fmt.Errorf("could not fetch binlog format: %v", err)
}
if len(qr.Rows) != 1 {
return 0, fmt.Errorf("incorrect rowcount received for %s: %d", showBinlog, len(qr.Rows))
}
if len(qr.Rows[0]) != 2 {
return 0, fmt.Errorf("incorrect column count received for %s: %d", showBinlog, len(qr.Rows[0]))
}
switch qr.Rows[0][1].String() {
case "STATEMENT":
return BinlogFormatStatement, nil
case "ROW":
return BinlogFormatRow, nil
case "MIXED":
return BinlogFormatMixed, nil
}
return 0, fmt.Errorf("unexpected binlog format for %s: %s", showBinlog, qr.Rows[0][1].String())
}
// Close closes the DBConn.

Просмотреть файл

@ -121,6 +121,7 @@ type QueryEngine struct {
// Vars
strictMode sync2.AtomicBool
binlogFormat connpool.BinlogFormat
autoCommit sync2.AtomicBool
maxResultSize sync2.AtomicInt64
maxDMLRows sync2.AtomicInt64
@ -232,19 +233,17 @@ func (qe *QueryEngine) Open(dbconfigs dbconfigs.DBConfigs) error {
qe.dbconfigs = dbconfigs
qe.conns.Open(&qe.dbconfigs.App, &qe.dbconfigs.Dba)
if qe.strictMode.Get() {
conn, err := qe.conns.Get(tabletenv.LocalContext())
if err != nil {
qe.conns.Close()
return err
}
err = conn.VerifyMode()
conn.Recycle()
conn, err := qe.conns.Get(tabletenv.LocalContext())
if err != nil {
qe.conns.Close()
return err
}
qe.binlogFormat, err = conn.VerifyMode()
conn.Recycle()
if err != nil {
qe.conns.Close()
return err
}
if err != nil {
qe.conns.Close()
return err
}
qe.streamConns.Open(&qe.dbconfigs.App, &qe.dbconfigs.Dba)

Просмотреть файл

@ -493,11 +493,21 @@ func (qre *QueryExecutor) execInsertSubquery(conn *TxConnection) (*sqltypes.Resu
}
func (qre *QueryExecutor) execInsertPKRows(conn *TxConnection, pkRows [][]sqltypes.Value) (*sqltypes.Result, error) {
bsc := buildStreamComment(qre.plan.Table, pkRows, nil)
var bsc []byte
// don't build comment for RBR.
if qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow {
bsc = buildStreamComment(qre.plan.Table, pkRows, nil)
}
return qre.txFetch(conn, qre.plan.OuterQuery, qre.bindVars, bsc, false, true)
}
func (qre *QueryExecutor) execUpsertPK(conn *TxConnection) (*sqltypes.Result, error) {
// For RBR, upserts are passed through.
if qre.tsv.qe.binlogFormat == connpool.BinlogFormatRow {
return qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, false, true)
}
// For statement or mixed mode, we have to split into two ops.
pkRows, err := buildValueList(qre.plan.Table, qre.plan.PKValues, qre.bindVars)
if err != nil {
return nil, err
@ -568,7 +578,11 @@ func (qre *QueryExecutor) execDMLPKRows(conn *TxConnection, query *sqlparser.Par
if secondaryList != nil {
secondaryList = secondaryList[i:end]
}
bsc := buildStreamComment(qre.plan.Table, pkRows, secondaryList)
var bsc []byte
// Don't build comment for RBR.
if qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow {
bsc = buildStreamComment(qre.plan.Table, pkRows, secondaryList)
}
qre.bindVars["#pk"] = sqlparser.TupleEqualityList{
Columns: qre.plan.Table.Indexes[0].Columns,
Rows: pkRows,

Просмотреть файл

@ -23,10 +23,11 @@ import (
"github.com/youtube/vitess/go/vt/callinfo/fakecallinfo"
"github.com/youtube/vitess/go/vt/tableacl"
"github.com/youtube/vitess/go/vt/tableacl/simpleacl"
"github.com/youtube/vitess/go/vt/vterrors"
"github.com/youtube/vitess/go/vt/vttablet/tabletserver/connpool"
"github.com/youtube/vitess/go/vt/vttablet/tabletserver/planbuilder"
"github.com/youtube/vitess/go/vt/vttablet/tabletserver/rules"
"github.com/youtube/vitess/go/vt/vttablet/tabletserver/tabletenv"
"github.com/youtube/vitess/go/vt/vterrors"
querypb "github.com/youtube/vitess/go/vt/proto/query"
tableaclpb "github.com/youtube/vitess/go/vt/proto/tableacl"
@ -293,6 +294,50 @@ func TestQueryExecutorPlanInsertSubQuery(t *testing.T) {
}
}
func TestQueryExecutorPlanInsertSubQueryRBR(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
query := "insert into test_table(pk) select pk from test_table where pk = 1 limit 1000"
want := &sqltypes.Result{}
db.AddQuery(query, want)
selectQuery := "select pk from test_table where pk = 1 limit 1000"
db.AddQuery(selectQuery, &sqltypes.Result{
Fields: []*querypb.Field{{
Name: "pk",
Type: sqltypes.Int32,
}},
RowsAffected: 1,
Rows: [][]sqltypes.Value{
{sqltypes.MakeTrusted(sqltypes.Int32, []byte("2"))},
},
})
insertQuery := "insert into test_table(pk) values (2)"
db.AddQuery(insertQuery, &sqltypes.Result{})
ctx := context.Background()
tsv := newTestTabletServer(ctx, enableStrict, db)
txid := newTransaction(tsv)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
tsv.qe.binlogFormat = connpool.BinlogFormatRow
defer tsv.StopService()
defer testCommitHelper(t, tsv, qre)
checkPlanID(t, planbuilder.PlanInsertSubquery, qre.plan.PlanID)
got, err := qre.Execute()
if err != nil {
t.Fatalf("qre.Execute() = %v, want nil", err)
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got: %v, want: %v", got, want)
}
wantqueries := []string{"insert into test_table(pk) values (2)"}
gotqueries := fetchRecordedQueries(qre)
if !reflect.DeepEqual(gotqueries, wantqueries) {
t.Errorf("queries: %v, want %v", gotqueries, wantqueries)
}
}
func TestQueryExecutorPlanUpsertPk(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
@ -378,6 +423,34 @@ func TestQueryExecutorPlanUpsertPk(t *testing.T) {
}
}
func TestQueryExecutorPlanUpsertPkRBR(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
query := "insert into test_table values (1) on duplicate key update val = 1"
db.AddQuery(query, &sqltypes.Result{})
want := &sqltypes.Result{}
ctx := context.Background()
tsv := newTestTabletServer(ctx, enableStrict, db)
txid := newTransaction(tsv)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
tsv.qe.binlogFormat = connpool.BinlogFormatRow
defer tsv.StopService()
defer testCommitHelper(t, tsv, qre)
checkPlanID(t, planbuilder.PlanUpsertPK, qre.plan.PlanID)
got, err := qre.Execute()
if err != nil {
t.Fatalf("qre.Execute() = %v, want nil", err)
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got: %v, want: %v", got, want)
}
wantqueries := []string{query}
gotqueries := fetchRecordedQueries(qre)
if !reflect.DeepEqual(gotqueries, wantqueries) {
t.Errorf("queries: %v, want %v", gotqueries, wantqueries)
}
}
func TestQueryExecutorPlanUpsertPkAutoCommit(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
@ -462,6 +535,34 @@ func TestQueryExecutorPlanDmlPk(t *testing.T) {
}
}
func TestQueryExecutorPlanDmlPkRBR(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
query := "update test_table set name = 2 where pk in (1)"
want := &sqltypes.Result{}
db.AddQuery(query, want)
ctx := context.Background()
tsv := newTestTabletServer(ctx, enableStrict, db)
txid := newTransaction(tsv)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
tsv.qe.binlogFormat = connpool.BinlogFormatRow
defer tsv.StopService()
defer testCommitHelper(t, tsv, qre)
checkPlanID(t, planbuilder.PlanDMLPK, qre.plan.PlanID)
got, err := qre.Execute()
if err != nil {
t.Fatalf("qre.Execute() = %v, want nil", err)
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got: %v, want: %v", got, want)
}
wantqueries := []string{query}
gotqueries := fetchRecordedQueries(qre)
if !reflect.DeepEqual(gotqueries, wantqueries) {
t.Errorf("queries: %v, want %v", gotqueries, wantqueries)
}
}
func TestQueryExecutorPlanDmlMessage(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
@ -560,6 +661,46 @@ func TestQueryExecutorPlanDmlSubQuery(t *testing.T) {
}
}
func TestQueryExecutorPlanDmlSubQueryRBR(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
query := "update test_table set addr = 3 where name = 1 limit 1000"
expandedQuery := "select pk from test_table where name = 1 limit 1000 for update"
want := &sqltypes.Result{}
db.AddQuery(query, want)
db.AddQuery(expandedQuery, &sqltypes.Result{
Fields: []*querypb.Field{
{Type: sqltypes.Int32},
},
RowsAffected: 1,
Rows: [][]sqltypes.Value{
{sqltypes.MakeTrusted(sqltypes.Int32, []byte("2"))},
},
})
updateQuery := "update test_table set addr = 3 where pk in (2)"
db.AddQuery(updateQuery, want)
ctx := context.Background()
tsv := newTestTabletServer(ctx, enableStrict, db)
txid := newTransaction(tsv)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
tsv.qe.binlogFormat = connpool.BinlogFormatRow
defer tsv.StopService()
defer testCommitHelper(t, tsv, qre)
checkPlanID(t, planbuilder.PlanDMLSubquery, qre.plan.PlanID)
got, err := qre.Execute()
if err != nil {
t.Fatalf("qre.Execute() = %v, want nil", err)
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got: %v, want: %v", got, want)
}
wantqueries := []string{updateQuery}
gotqueries := fetchRecordedQueries(qre)
if !reflect.DeepEqual(gotqueries, wantqueries) {
t.Errorf("queries: %v, want %v", gotqueries, wantqueries)
}
}
func TestQueryExecutorPlanDmlSubQueryAutoCommit(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
@ -1418,6 +1559,18 @@ func getQueryExecutorSupportedQueries() map[string]*sqltypes.Result {
{sqltypes.MakeString([]byte("1"))},
},
},
"show variables like 'binlog_format'": {
Fields: []*querypb.Field{{
Type: sqltypes.VarChar,
}, {
Type: sqltypes.VarChar,
}},
RowsAffected: 1,
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("binlog_format")),
sqltypes.MakeString([]byte("STATEMENT")),
}},
},
mysqlconn.BaseShowTables: {
Fields: mysqlconn.BaseShowTablesFields,
RowsAffected: 3,

Просмотреть файл

@ -45,6 +45,18 @@ func Queries() map[string]*sqltypes.Result {
{sqltypes.MakeString([]byte("1"))},
},
},
"show variables like 'binlog_format'": {
Fields: []*querypb.Field{{
Type: sqltypes.VarChar,
}, {
Type: sqltypes.VarChar,
}},
RowsAffected: 1,
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("binlog_format")),
sqltypes.MakeString([]byte("STATEMENT")),
}},
},
mysqlconn.BaseShowTables: {
Fields: mysqlconn.BaseShowTablesFields,
RowsAffected: 3,

Просмотреть файл

@ -2392,6 +2392,18 @@ func getSupportedQueries() map[string]*sqltypes.Result {
{sqltypes.MakeString([]byte("1"))},
},
},
"show variables like 'binlog_format'": {
Fields: []*querypb.Field{{
Type: sqltypes.VarChar,
}, {
Type: sqltypes.VarChar,
}},
RowsAffected: 1,
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("binlog_format")),
sqltypes.MakeString([]byte("STATEMENT")),
}},
},
"select * from test_table where 1 != 1": {
Fields: []*querypb.Field{{
Name: "pk",