From 90494660ce3ae67fd5924d6e4ae0ab12d10cb011 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Wed, 12 Apr 2017 23:23:14 -0700 Subject: [PATCH] vttablet: rbr support Addresses #2530 This change makes VTTablet recognize that MySQL is running in RBR mode. If this mode is detected, VTTablet will not rewrite INSERT...ON DUPLICATE KEY constructs. It will just pass them through instead. This allows MySQL to be run with CLIENT_FOUND_ROWS option turned on, and the returned RowsAffected will be the same as MySQL. Additionally, if this mode is detected, VTTablet will also skip adding update stream comments to other DMLs because they're unnecessary. --- go/vt/dbconnpool/connection.go | 32 ---- .../vttablet/tabletserver/connpool/dbconn.go | 63 ++++++- go/vt/vttablet/tabletserver/query_engine.go | 23 ++- go/vt/vttablet/tabletserver/query_executor.go | 18 +- .../tabletserver/query_executor_test.go | 155 +++++++++++++++++- .../schema/schematest/schematest.go | 12 ++ .../tabletserver/tabletserver_test.go | 12 ++ 7 files changed, 265 insertions(+), 50 deletions(-) diff --git a/go/vt/dbconnpool/connection.go b/go/vt/dbconnpool/connection.go index cb2b5e4149..c398506eef 100644 --- a/go/vt/dbconnpool/connection.go +++ b/go/vt/dbconnpool/connection.go @@ -6,7 +6,6 @@ package dbconnpool import ( "fmt" - "strings" "time" "github.com/youtube/vitess/go/sqldb" @@ -105,37 +104,6 @@ func (dbc *DBConnection) ExecuteStreamFetch(query string, callback func(*sqltype return nil } -var ( - getModeSQL = "select @@global.sql_mode" - getAutocommit = "select @@autocommit" -) - -// VerifyMode is a helper method to verify mysql is running with -// sql_mode = STRICT_TRANS_TABLES and autocommit=ON. -func (dbc *DBConnection) VerifyMode() error { - qr, err := dbc.ExecuteFetch(getModeSQL, 2, false) - if err != nil { - return fmt.Errorf("could not verify mode: %v", err) - } - if len(qr.Rows) != 1 { - return fmt.Errorf("incorrect rowcount received for %s: %d", getModeSQL, len(qr.Rows)) - } - if !strings.Contains(qr.Rows[0][0].String(), "STRICT_TRANS_TABLES") { - return fmt.Errorf("require sql_mode to be STRICT_TRANS_TABLES: got %s", qr.Rows[0][0].String()) - } - qr, err = dbc.ExecuteFetch(getAutocommit, 2, false) - if err != nil { - return fmt.Errorf("could not verify mode: %v", err) - } - if len(qr.Rows) != 1 { - return fmt.Errorf("incorrect rowcount received for %s: %d", getAutocommit, len(qr.Rows)) - } - if !strings.Contains(qr.Rows[0][0].String(), "1") { - return fmt.Errorf("require autocommit to be 1: got %s", qr.Rows[0][0].String()) - } - return nil -} - // NewDBConnection returns a new DBConnection based on the ConnParams // and will use the provided stats to collect timing. func NewDBConnection(info *sqldb.ConnParams, mysqlStats *stats.Timings) (*DBConnection, error) { diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn.go b/go/vt/vttablet/tabletserver/connpool/dbconn.go index ee3dac20a7..0409b906aa 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn.go @@ -6,6 +6,7 @@ package connpool import ( "fmt" + "strings" "sync" "time" @@ -21,6 +22,16 @@ import ( "golang.org/x/net/context" ) +// BinlogFormat is used for for specifying the binlog format. +type BinlogFormat int + +// The following constants specify the possible binlog format values. +const ( + BinlogFormatStatement = BinlogFormat(iota) + BinlogFormatRow + BinlogFormatMixed +) + // DBConn is a db connection for tabletserver. // It performs automatic reconnects as needed. // Its Execute function has a timeout that can kill @@ -157,9 +168,55 @@ func (dbc *DBConn) streamOnce(ctx context.Context, query string, callback func(* return dbc.conn.ExecuteStreamFetch(query, callback, streamBufferSize) } -// VerifyMode returns an error if the connection mode is incorrect. -func (dbc *DBConn) VerifyMode() error { - return dbc.conn.VerifyMode() +var ( + getModeSQL = "select @@global.sql_mode" + getAutocommit = "select @@autocommit" + showBinlog = "show variables like 'binlog_format'" +) + +// VerifyMode is a helper method to verify mysql is running with +// sql_mode = STRICT_TRANS_TABLES and autocommit=ON. It also returns +// the current binlog format. +func (dbc *DBConn) VerifyMode() (BinlogFormat, error) { + qr, err := dbc.conn.ExecuteFetch(getModeSQL, 2, false) + if err != nil { + return 0, fmt.Errorf("could not verify mode: %v", err) + } + if len(qr.Rows) != 1 { + return 0, fmt.Errorf("incorrect rowcount received for %s: %d", getModeSQL, len(qr.Rows)) + } + if !strings.Contains(qr.Rows[0][0].String(), "STRICT_TRANS_TABLES") { + return 0, fmt.Errorf("require sql_mode to be STRICT_TRANS_TABLES: got %s", qr.Rows[0][0].String()) + } + qr, err = dbc.conn.ExecuteFetch(getAutocommit, 2, false) + if err != nil { + return 0, fmt.Errorf("could not verify mode: %v", err) + } + if len(qr.Rows) != 1 { + return 0, fmt.Errorf("incorrect rowcount received for %s: %d", getAutocommit, len(qr.Rows)) + } + if !strings.Contains(qr.Rows[0][0].String(), "1") { + return 0, fmt.Errorf("require autocommit to be 1: got %s", qr.Rows[0][0].String()) + } + qr, err = dbc.conn.ExecuteFetch(showBinlog, 10, false) + if err != nil { + return 0, fmt.Errorf("could not fetch binlog format: %v", err) + } + if len(qr.Rows) != 1 { + return 0, fmt.Errorf("incorrect rowcount received for %s: %d", showBinlog, len(qr.Rows)) + } + if len(qr.Rows[0]) != 2 { + return 0, fmt.Errorf("incorrect column count received for %s: %d", showBinlog, len(qr.Rows[0])) + } + switch qr.Rows[0][1].String() { + case "STATEMENT": + return BinlogFormatStatement, nil + case "ROW": + return BinlogFormatRow, nil + case "MIXED": + return BinlogFormatMixed, nil + } + return 0, fmt.Errorf("unexpected binlog format for %s: %s", showBinlog, qr.Rows[0][1].String()) } // Close closes the DBConn. diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index 2a9fc44b8c..c5612fa2ca 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -121,6 +121,7 @@ type QueryEngine struct { // Vars strictMode sync2.AtomicBool + binlogFormat connpool.BinlogFormat autoCommit sync2.AtomicBool maxResultSize sync2.AtomicInt64 maxDMLRows sync2.AtomicInt64 @@ -232,19 +233,17 @@ func (qe *QueryEngine) Open(dbconfigs dbconfigs.DBConfigs) error { qe.dbconfigs = dbconfigs qe.conns.Open(&qe.dbconfigs.App, &qe.dbconfigs.Dba) - if qe.strictMode.Get() { - conn, err := qe.conns.Get(tabletenv.LocalContext()) - if err != nil { - qe.conns.Close() - return err - } - err = conn.VerifyMode() - conn.Recycle() + conn, err := qe.conns.Get(tabletenv.LocalContext()) + if err != nil { + qe.conns.Close() + return err + } + qe.binlogFormat, err = conn.VerifyMode() + conn.Recycle() - if err != nil { - qe.conns.Close() - return err - } + if err != nil { + qe.conns.Close() + return err } qe.streamConns.Open(&qe.dbconfigs.App, &qe.dbconfigs.Dba) diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 6ecae3c5f8..bf0014b501 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -493,11 +493,21 @@ func (qre *QueryExecutor) execInsertSubquery(conn *TxConnection) (*sqltypes.Resu } func (qre *QueryExecutor) execInsertPKRows(conn *TxConnection, pkRows [][]sqltypes.Value) (*sqltypes.Result, error) { - bsc := buildStreamComment(qre.plan.Table, pkRows, nil) + var bsc []byte + // don't build comment for RBR. + if qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow { + bsc = buildStreamComment(qre.plan.Table, pkRows, nil) + } return qre.txFetch(conn, qre.plan.OuterQuery, qre.bindVars, bsc, false, true) } func (qre *QueryExecutor) execUpsertPK(conn *TxConnection) (*sqltypes.Result, error) { + // For RBR, upserts are passed through. + if qre.tsv.qe.binlogFormat == connpool.BinlogFormatRow { + return qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, false, true) + } + + // For statement or mixed mode, we have to split into two ops. pkRows, err := buildValueList(qre.plan.Table, qre.plan.PKValues, qre.bindVars) if err != nil { return nil, err @@ -568,7 +578,11 @@ func (qre *QueryExecutor) execDMLPKRows(conn *TxConnection, query *sqlparser.Par if secondaryList != nil { secondaryList = secondaryList[i:end] } - bsc := buildStreamComment(qre.plan.Table, pkRows, secondaryList) + var bsc []byte + // Don't build comment for RBR. + if qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow { + bsc = buildStreamComment(qre.plan.Table, pkRows, secondaryList) + } qre.bindVars["#pk"] = sqlparser.TupleEqualityList{ Columns: qre.plan.Table.Indexes[0].Columns, Rows: pkRows, diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index d4ff45d00f..0f9d799090 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -23,10 +23,11 @@ import ( "github.com/youtube/vitess/go/vt/callinfo/fakecallinfo" "github.com/youtube/vitess/go/vt/tableacl" "github.com/youtube/vitess/go/vt/tableacl/simpleacl" + "github.com/youtube/vitess/go/vt/vterrors" + "github.com/youtube/vitess/go/vt/vttablet/tabletserver/connpool" "github.com/youtube/vitess/go/vt/vttablet/tabletserver/planbuilder" "github.com/youtube/vitess/go/vt/vttablet/tabletserver/rules" "github.com/youtube/vitess/go/vt/vttablet/tabletserver/tabletenv" - "github.com/youtube/vitess/go/vt/vterrors" querypb "github.com/youtube/vitess/go/vt/proto/query" tableaclpb "github.com/youtube/vitess/go/vt/proto/tableacl" @@ -293,6 +294,50 @@ func TestQueryExecutorPlanInsertSubQuery(t *testing.T) { } } +func TestQueryExecutorPlanInsertSubQueryRBR(t *testing.T) { + db := setUpQueryExecutorTest(t) + defer db.Close() + query := "insert into test_table(pk) select pk from test_table where pk = 1 limit 1000" + want := &sqltypes.Result{} + db.AddQuery(query, want) + selectQuery := "select pk from test_table where pk = 1 limit 1000" + db.AddQuery(selectQuery, &sqltypes.Result{ + Fields: []*querypb.Field{{ + Name: "pk", + Type: sqltypes.Int32, + }}, + RowsAffected: 1, + Rows: [][]sqltypes.Value{ + {sqltypes.MakeTrusted(sqltypes.Int32, []byte("2"))}, + }, + }) + + insertQuery := "insert into test_table(pk) values (2)" + + db.AddQuery(insertQuery, &sqltypes.Result{}) + ctx := context.Background() + tsv := newTestTabletServer(ctx, enableStrict, db) + txid := newTransaction(tsv) + qre := newTestQueryExecutor(ctx, tsv, query, txid) + tsv.qe.binlogFormat = connpool.BinlogFormatRow + + defer tsv.StopService() + defer testCommitHelper(t, tsv, qre) + checkPlanID(t, planbuilder.PlanInsertSubquery, qre.plan.PlanID) + got, err := qre.Execute() + if err != nil { + t.Fatalf("qre.Execute() = %v, want nil", err) + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("got: %v, want: %v", got, want) + } + wantqueries := []string{"insert into test_table(pk) values (2)"} + gotqueries := fetchRecordedQueries(qre) + if !reflect.DeepEqual(gotqueries, wantqueries) { + t.Errorf("queries: %v, want %v", gotqueries, wantqueries) + } +} + func TestQueryExecutorPlanUpsertPk(t *testing.T) { db := setUpQueryExecutorTest(t) defer db.Close() @@ -378,6 +423,34 @@ func TestQueryExecutorPlanUpsertPk(t *testing.T) { } } +func TestQueryExecutorPlanUpsertPkRBR(t *testing.T) { + db := setUpQueryExecutorTest(t) + defer db.Close() + query := "insert into test_table values (1) on duplicate key update val = 1" + db.AddQuery(query, &sqltypes.Result{}) + want := &sqltypes.Result{} + ctx := context.Background() + tsv := newTestTabletServer(ctx, enableStrict, db) + txid := newTransaction(tsv) + qre := newTestQueryExecutor(ctx, tsv, query, txid) + tsv.qe.binlogFormat = connpool.BinlogFormatRow + defer tsv.StopService() + defer testCommitHelper(t, tsv, qre) + checkPlanID(t, planbuilder.PlanUpsertPK, qre.plan.PlanID) + got, err := qre.Execute() + if err != nil { + t.Fatalf("qre.Execute() = %v, want nil", err) + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("got: %v, want: %v", got, want) + } + wantqueries := []string{query} + gotqueries := fetchRecordedQueries(qre) + if !reflect.DeepEqual(gotqueries, wantqueries) { + t.Errorf("queries: %v, want %v", gotqueries, wantqueries) + } +} + func TestQueryExecutorPlanUpsertPkAutoCommit(t *testing.T) { db := setUpQueryExecutorTest(t) defer db.Close() @@ -462,6 +535,34 @@ func TestQueryExecutorPlanDmlPk(t *testing.T) { } } +func TestQueryExecutorPlanDmlPkRBR(t *testing.T) { + db := setUpQueryExecutorTest(t) + defer db.Close() + query := "update test_table set name = 2 where pk in (1)" + want := &sqltypes.Result{} + db.AddQuery(query, want) + ctx := context.Background() + tsv := newTestTabletServer(ctx, enableStrict, db) + txid := newTransaction(tsv) + qre := newTestQueryExecutor(ctx, tsv, query, txid) + tsv.qe.binlogFormat = connpool.BinlogFormatRow + defer tsv.StopService() + defer testCommitHelper(t, tsv, qre) + checkPlanID(t, planbuilder.PlanDMLPK, qre.plan.PlanID) + got, err := qre.Execute() + if err != nil { + t.Fatalf("qre.Execute() = %v, want nil", err) + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("got: %v, want: %v", got, want) + } + wantqueries := []string{query} + gotqueries := fetchRecordedQueries(qre) + if !reflect.DeepEqual(gotqueries, wantqueries) { + t.Errorf("queries: %v, want %v", gotqueries, wantqueries) + } +} + func TestQueryExecutorPlanDmlMessage(t *testing.T) { db := setUpQueryExecutorTest(t) defer db.Close() @@ -560,6 +661,46 @@ func TestQueryExecutorPlanDmlSubQuery(t *testing.T) { } } +func TestQueryExecutorPlanDmlSubQueryRBR(t *testing.T) { + db := setUpQueryExecutorTest(t) + defer db.Close() + query := "update test_table set addr = 3 where name = 1 limit 1000" + expandedQuery := "select pk from test_table where name = 1 limit 1000 for update" + want := &sqltypes.Result{} + db.AddQuery(query, want) + db.AddQuery(expandedQuery, &sqltypes.Result{ + Fields: []*querypb.Field{ + {Type: sqltypes.Int32}, + }, + RowsAffected: 1, + Rows: [][]sqltypes.Value{ + {sqltypes.MakeTrusted(sqltypes.Int32, []byte("2"))}, + }, + }) + updateQuery := "update test_table set addr = 3 where pk in (2)" + db.AddQuery(updateQuery, want) + ctx := context.Background() + tsv := newTestTabletServer(ctx, enableStrict, db) + txid := newTransaction(tsv) + qre := newTestQueryExecutor(ctx, tsv, query, txid) + tsv.qe.binlogFormat = connpool.BinlogFormatRow + defer tsv.StopService() + defer testCommitHelper(t, tsv, qre) + checkPlanID(t, planbuilder.PlanDMLSubquery, qre.plan.PlanID) + got, err := qre.Execute() + if err != nil { + t.Fatalf("qre.Execute() = %v, want nil", err) + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("got: %v, want: %v", got, want) + } + wantqueries := []string{updateQuery} + gotqueries := fetchRecordedQueries(qre) + if !reflect.DeepEqual(gotqueries, wantqueries) { + t.Errorf("queries: %v, want %v", gotqueries, wantqueries) + } +} + func TestQueryExecutorPlanDmlSubQueryAutoCommit(t *testing.T) { db := setUpQueryExecutorTest(t) defer db.Close() @@ -1418,6 +1559,18 @@ func getQueryExecutorSupportedQueries() map[string]*sqltypes.Result { {sqltypes.MakeString([]byte("1"))}, }, }, + "show variables like 'binlog_format'": { + Fields: []*querypb.Field{{ + Type: sqltypes.VarChar, + }, { + Type: sqltypes.VarChar, + }}, + RowsAffected: 1, + Rows: [][]sqltypes.Value{{ + sqltypes.MakeString([]byte("binlog_format")), + sqltypes.MakeString([]byte("STATEMENT")), + }}, + }, mysqlconn.BaseShowTables: { Fields: mysqlconn.BaseShowTablesFields, RowsAffected: 3, diff --git a/go/vt/vttablet/tabletserver/schema/schematest/schematest.go b/go/vt/vttablet/tabletserver/schema/schematest/schematest.go index ddf0191eed..f121116d73 100644 --- a/go/vt/vttablet/tabletserver/schema/schematest/schematest.go +++ b/go/vt/vttablet/tabletserver/schema/schematest/schematest.go @@ -45,6 +45,18 @@ func Queries() map[string]*sqltypes.Result { {sqltypes.MakeString([]byte("1"))}, }, }, + "show variables like 'binlog_format'": { + Fields: []*querypb.Field{{ + Type: sqltypes.VarChar, + }, { + Type: sqltypes.VarChar, + }}, + RowsAffected: 1, + Rows: [][]sqltypes.Value{{ + sqltypes.MakeString([]byte("binlog_format")), + sqltypes.MakeString([]byte("STATEMENT")), + }}, + }, mysqlconn.BaseShowTables: { Fields: mysqlconn.BaseShowTablesFields, RowsAffected: 3, diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 4e92371a6e..8b55243433 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -2392,6 +2392,18 @@ func getSupportedQueries() map[string]*sqltypes.Result { {sqltypes.MakeString([]byte("1"))}, }, }, + "show variables like 'binlog_format'": { + Fields: []*querypb.Field{{ + Type: sqltypes.VarChar, + }, { + Type: sqltypes.VarChar, + }}, + RowsAffected: 1, + Rows: [][]sqltypes.Value{{ + sqltypes.MakeString([]byte("binlog_format")), + sqltypes.MakeString([]byte("STATEMENT")), + }}, + }, "select * from test_table where 1 != 1": { Fields: []*querypb.Field{{ Name: "pk",