Merge pull request #4739 from tinyspeck/autocommit-batch-execute

Single round trip commit on BatchExecute
This commit is contained in:
Michael Demmer 2019-03-22 08:47:38 -07:00 коммит произвёл GitHub
Родитель 7a201e3f04 20b59bb3ca
Коммит 56c2a938d0
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 814 добавлений и 757 удалений

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -1076,6 +1076,18 @@ func (tsv *TabletServer) ExecuteBatch(ctx context.Context, target *querypb.Targe
defer tsv.endRequest(false)
defer tsv.handlePanicAndSendLogStats("batch", nil, nil)
// When all these conditions are met, we send the queries directly
// to the MySQL without creating a transaction. This optimization
// yields better throughput.
// Setting ExecuteOptions_AUTOCOMMIT will get a connection out of the
// pool without actually begin/commit the transaction.
if (options == nil || options.TransactionIsolation == querypb.ExecuteOptions_DEFAULT) &&
tsv.qe.autoCommit.Get() &&
asTransaction &&
tsv.qe.passthroughDMLs.Get() {
options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT
}
if asTransaction {
transactionID, err = tsv.Begin(ctx, target, options)
if err != nil {
@ -1844,9 +1856,9 @@ func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.Real
target := tsv.target
tsv.mu.Unlock()
shr := &querypb.StreamHealthResponse{
Target: &target,
TabletAlias: &tsv.alias,
Serving: tsv.IsServing(),
Target: &target,
TabletAlias: &tsv.alias,
Serving: tsv.IsServing(),
TabletExternallyReparentedTimestamp: terTimestamp,
RealtimeStats: stats,
}

Просмотреть файл

@ -239,6 +239,8 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (
return 0, err
}
autocommitTransaction := false
if queries, ok := txIsolations[options.GetTransactionIsolation()]; ok {
if queries.setIsolationLevel != "" {
if _, err := conn.Exec(ctx, "set transaction isolation level "+queries.setIsolationLevel, 1, false); err != nil {
@ -249,6 +251,8 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (
if _, err := conn.Exec(ctx, queries.openTransaction, 1, false); err != nil {
return 0, err
}
} else if options.GetTransactionIsolation() == querypb.ExecuteOptions_AUTOCOMMIT {
autocommitTransaction = true
} else {
return 0, fmt.Errorf("don't know how to open a transaction of this type: %v", options.GetTransactionIsolation())
}
@ -263,6 +267,7 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (
axp,
immediateCaller,
effectiveCaller,
autocommitTransaction,
),
options.GetWorkload() != querypb.ExecuteOptions_DBA,
)
@ -312,6 +317,12 @@ func (axp *TxPool) LocalBegin(ctx context.Context, options *querypb.ExecuteOptio
func (axp *TxPool) LocalCommit(ctx context.Context, conn *TxConnection, mc messageCommitter) error {
defer conn.conclude(TxCommit, "transaction committed")
defer mc.LockDB(conn.NewMessages, conn.ChangedMessages)()
if conn.Autocommit {
mc.UpdateCaches(conn.NewMessages, conn.ChangedMessages)
return nil
}
if _, err := conn.Exec(ctx, "commit", 1, false); err != nil {
conn.Close()
return err
@ -379,9 +390,10 @@ type TxConnection struct {
LogToFile sync2.AtomicInt32
ImmediateCallerID *querypb.VTGateCallerID
EffectiveCallerID *vtrpcpb.CallerID
Autocommit bool
}
func newTxConnection(conn *connpool.DBConn, transactionID int64, pool *TxPool, immediate *querypb.VTGateCallerID, effective *vtrpcpb.CallerID) *TxConnection {
func newTxConnection(conn *connpool.DBConn, transactionID int64, pool *TxPool, immediate *querypb.VTGateCallerID, effective *vtrpcpb.CallerID, autocommit bool) *TxConnection {
return &TxConnection{
DBConn: conn,
TransactionID: transactionID,
@ -391,6 +403,7 @@ func newTxConnection(conn *connpool.DBConn, transactionID int64, pool *TxPool, i
ChangedMessages: make(map[string][]string),
ImmediateCallerID: immediate,
EffectiveCallerID: effective,
Autocommit: autocommit,
}
}

Просмотреть файл

@ -265,6 +265,27 @@ func TestTxPoolTransactionIsolation(t *testing.T) {
}
}
func TestTxPoolAutocommit(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
txPool := newTxPool()
txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
ctx := context.Background()
// Start a transaction with autocommit. This will ensure that the executor does not send begin/commit statements
// to mysql.
// This test is meaningful because if txPool.Begin were to send a BEGIN statement to the connection, it will fatal
// because is not in the list of expected queries (i.e db.AddQuery hasn't been called).
txid, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT})
if err != nil {
t.Fatal(err)
}
err = txPool.Commit(ctx, txid, &fakeMessageCommitter{})
if err != nil {
t.Fatal(err)
}
}
// TestTxPoolBeginWithPoolConnectionError_TransientErrno2006 tests the case
// where we see a transient errno 2006 e.g. because MySQL killed the
// db connection. DBConn.Exec() is going to reconnect and retry automatically

Просмотреть файл

@ -292,6 +292,10 @@ message ExecuteOptions {
// This is not an "official" transaction level but it will do a
// START TRANSACTION WITH CONSISTENT SNAPSHOT, READ ONLY
CONSISTENT_SNAPSHOT_READ_ONLY = 5;
// This not an "official" transaction level, it will send queries to mysql
// without wrapping them in a transaction
AUTOCOMMIT = 6;
}
TransactionIsolation transaction_isolation = 9;

Различия файлов скрыты, потому что одна или несколько строк слишком длинны