Merge pull request #2224 from sougou/2pc

2pc: commit failure handling
This commit is contained in:
sougou 2016-11-07 16:30:42 -08:00 коммит произвёл GitHub
Родитель 973b258381 323bcdfd04
Коммит 1f5ba0d1e4
12 изменённых файлов: 348 добавлений и 83 удалений

Просмотреть файл

@ -248,8 +248,12 @@ func (qe *QueryEngine) IsMySQLReachable() bool {
}
// PrepareFromRedo replays and prepares the transactions
// from the redo log. This is called when a tablet becomes
// from the redo log. It also loads previously failed transactions
// into the reserved list. This is called when a tablet becomes
// a master.
// TODO(sougou): Make this function set the lastId for tx pool to be
// greater than all those used by dtids. This will prevent dtid
// collisions.
func (qe *QueryEngine) PrepareFromRedo() error {
ctx := context.Background()
var allErr concurrency.AllErrorRecorder
@ -258,13 +262,13 @@ func (qe *QueryEngine) PrepareFromRedo() error {
return err
}
defer readConn.Recycle()
transactions, err := qe.twoPC.ReadPrepared(ctx, readConn)
prepared, failed, err := qe.twoPC.ReadAllRedo(ctx, readConn)
if err != nil {
return err
}
outer:
for dtid, tx := range transactions {
for dtid, tx := range prepared {
conn, err := qe.txPool.LocalBegin(ctx)
if err != nil {
allErr.RecordError(err)
@ -287,6 +291,9 @@ outer:
continue
}
}
for _, dtid := range failed {
qe.preparedPool.SetFailed(dtid)
}
return allErr.Error()
}
@ -301,7 +308,7 @@ func (qe *QueryEngine) RollbackTransactions() {
// this function. In case of any such change, this will
// have to be revisited.
qe.txPool.RollbackNonBusy(ctx)
for _, c := range qe.preparedPool.GetAll() {
for _, c := range qe.preparedPool.FetchAll() {
qe.txPool.LocalConclude(ctx, c)
}

Просмотреть файл

@ -1175,6 +1175,7 @@ func getQueryExecutorSupportedQueries() map[string]*sqltypes.Result {
sqlTurnoffBinlog: {},
fmt.Sprintf(sqlCreateSidecarDB, "_vt"): {},
fmt.Sprintf(sqlCreateTableRedoLogTransaction, "_vt"): {},
fmt.Sprintf(sqlAlterTableRedoLogTransaction, "_vt"): {},
fmt.Sprintf(sqlCreateTableRedoLogStatement, "_vt"): {},
fmt.Sprintf(sqlCreateTableTransaction, "_vt"): {},
fmt.Sprintf(sqlCreateTableParticipant, "_vt"): {},
@ -1390,5 +1391,6 @@ func getQueryExecutorSupportedQueries() map[string]*sqltypes.Result {
},
},
},
fmt.Sprintf(sqlReadAllRedo, "_vt", "_vt"): {},
}
}

Просмотреть файл

@ -73,14 +73,13 @@ func NewQueryServiceStats(statsPrefix string, enablePublishStats bool) *QuerySer
resultBuckets := []int64{0, 1, 5, 10, 50, 100, 500, 1000, 5000, 10000}
queryStats := stats.NewTimings(queryStatsName)
return &QueryServiceStats{
MySQLStats: stats.NewTimings(mysqlStatsName),
QueryStats: queryStats,
WaitStats: stats.NewTimings(waitStatsName),
KillStats: stats.NewCounters(killStatsName, "Transactions", "Queries"),
InfoErrors: stats.NewCounters(infoErrorsName, "Retry", "Fatal", "DupKey"),
ErrorStats: stats.NewCounters(errorStatsName, "Fail", "TxPoolFull", "NotInTx", "Deadlock"),
InternalErrors: stats.NewCounters(internalErrorsName, "Task",
"Mismatch", "StrayTransactions", "Invalidation", "Panic", "HungQuery", "Schema"),
MySQLStats: stats.NewTimings(mysqlStatsName),
QueryStats: queryStats,
WaitStats: stats.NewTimings(waitStatsName),
KillStats: stats.NewCounters(killStatsName, "Transactions", "Queries"),
InfoErrors: stats.NewCounters(infoErrorsName, "Retry", "Fatal", "DupKey"),
ErrorStats: stats.NewCounters(errorStatsName, "Fail", "TxPoolFull", "NotInTx", "Deadlock"),
InternalErrors: stats.NewCounters(internalErrorsName, "Task", "StrayTransactions", "Panic", "HungQuery", "Schema", "TwopcCommit", "TwopcResurrection"),
UserTableQueryCount: stats.NewMultiCounters(
userTableQueryCountName, []string{"TableName", "CallerID", "Type"}),
UserTableQueryTimesNs: stats.NewMultiCounters(

Просмотреть файл

@ -412,7 +412,10 @@ func (tsv *TabletServer) serveNewType() (err error) {
if tsv.target.TabletType == topodatapb.TabletType_MASTER {
err = tsv.qe.PrepareFromRedo()
if err != nil {
// TODO(sougou): raise alarms.
// If this operation fails, we choose to raise an alert and
// continue anyway. Serving traffic is considered more important
// than blocking everything for the sake of a few transactions.
tsv.qe.queryServiceStats.InternalErrors.Add("TwopcResurrection", 1)
log.Errorf("Could not prepare transactions: %v", err)
}
} else {

Просмотреть файл

@ -692,16 +692,17 @@ func TestTabletServerReplicaToMaster(t *testing.T) {
tsv.SetServingType(topodatapb.TabletType_REPLICA, true, nil)
tpc := tsv.qe.twoPC
db.AddQuery(tpc.readPrepared, &sqltypes.Result{})
db.AddQuery(tpc.readAllRedo, &sqltypes.Result{})
tsv.SetServingType(topodatapb.TabletType_MASTER, true, nil)
if len(tsv.qe.preparedPool.conns) != 0 {
t.Errorf("len(tsv.qe.preparedPool.conns): %d, want 0", len(tsv.qe.preparedPool.conns))
}
tsv.SetServingType(topodatapb.TabletType_REPLICA, true, nil)
db.AddQuery(tpc.readPrepared, &sqltypes.Result{
db.AddQuery(tpc.readAllRedo, &sqltypes.Result{
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("update test_table set name = 2 where pk in (1) /* _stream test_table (pk ) (1 ); */")),
}},
@ -718,15 +719,22 @@ func TestTabletServerReplicaToMaster(t *testing.T) {
tsv.SetServingType(topodatapb.TabletType_REPLICA, true, nil)
// Ensure we continue past errors.
db.AddQuery(tpc.readPrepared, &sqltypes.Result{
db.AddQuery(tpc.readAllRedo, &sqltypes.Result{
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("bogus")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("bogus")),
}, {
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("update test_table set name = 2 where pk in (1) /* _stream test_table (pk ) (1 ); */")),
}, {
sqltypes.MakeString([]byte("dtid1")),
sqltypes.MakeString([]byte("Failed")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("unused")),
}},
})
tsv.SetServingType(topodatapb.TabletType_MASTER, true, nil)
@ -738,6 +746,10 @@ func TestTabletServerReplicaToMaster(t *testing.T) {
if !reflect.DeepEqual(got, want) {
t.Errorf("Prepared queries: %v, want %v", got, want)
}
wantFailed := map[string]error{"dtid1": errPrepFailed}
if !reflect.DeepEqual(tsv.qe.preparedPool.reserved, wantFailed) {
t.Errorf("Failed dtids: %v, want %v", tsv.qe.preparedPool.reserved, wantFailed)
}
tsv.SetServingType(topodatapb.TabletType_REPLICA, true, nil)
}
@ -1671,6 +1683,7 @@ func getSupportedQueries() map[string]*sqltypes.Result {
sqlTurnoffBinlog: {},
fmt.Sprintf(sqlCreateSidecarDB, "_vt"): {},
fmt.Sprintf(sqlCreateTableRedoLogTransaction, "_vt"): {},
fmt.Sprintf(sqlAlterTableRedoLogTransaction, "_vt"): {},
fmt.Sprintf(sqlCreateTableRedoLogStatement, "_vt"): {},
fmt.Sprintf(sqlCreateTableTransaction, "_vt"): {},
fmt.Sprintf(sqlCreateTableParticipant, "_vt"): {},
@ -1787,6 +1800,7 @@ func getSupportedQueries() map[string]*sqltypes.Result {
},
},
},
fmt.Sprintf(sqlReadAllRedo, "_vt", "_vt"): {},
}
}

Просмотреть файл

@ -36,6 +36,11 @@ const (
index state_time_idx(state, time_created)
) engine=InnoDB`
// Due to possible legacy issues, do not modify the create
// tablet statement. Alter it instead.
sqlAlterTableRedoLogTransaction = `alter table ` + "`%s`" + `.redo_log_transaction
modify state enum('Prepared', 'Failed')`
sqlCreateTableRedoLogStatement = `create table if not exists ` + "`%s`" + `.redo_log_statement(
dtid varbinary(512),
id bigint,
@ -58,15 +63,20 @@ const (
shard varchar(256),
primary key(dtid, id)
) engine=InnoDB`
sqlReadAllRedo = `select t.dtid, t.state, s.id, s.statement from ` + "`%s`" + `.redo_log_transaction t
join ` + "`%s`" + `.redo_log_statement s on t.dtid = s.dtid
where t.state = 'Prepared' order by t.dtid, s.id`
)
// TwoPC performs 2PC metadata management (MM) functions.
type TwoPC struct {
insertRedoTx *sqlparser.ParsedQuery
insertRedoStmt *sqlparser.ParsedQuery
updateRedoTx *sqlparser.ParsedQuery
deleteRedoTx *sqlparser.ParsedQuery
deleteRedoStmt *sqlparser.ParsedQuery
readPrepared string
readAllRedo string
insertTransaction *sqlparser.ParsedQuery
insertParticipants *sqlparser.ParsedQuery
@ -94,6 +104,7 @@ func (tpc *TwoPC) Open(sidecarDBName string, dbaparams *sqldb.ConnParams) {
sqlTurnoffBinlog,
fmt.Sprintf(sqlCreateSidecarDB, sidecarDBName),
fmt.Sprintf(sqlCreateTableRedoLogTransaction, sidecarDBName),
fmt.Sprintf(sqlAlterTableRedoLogTransaction, sidecarDBName),
fmt.Sprintf(sqlCreateTableRedoLogStatement, sidecarDBName),
fmt.Sprintf(sqlCreateTableTransaction, sidecarDBName),
fmt.Sprintf(sqlCreateTableParticipant, sidecarDBName),
@ -109,17 +120,16 @@ func (tpc *TwoPC) Open(sidecarDBName string, dbaparams *sqldb.ConnParams) {
tpc.insertRedoStmt = buildParsedQuery(
"insert into `%s`.redo_log_statement(dtid, id, statement) values %a",
sidecarDBName, ":vals")
tpc.updateRedoTx = buildParsedQuery(
"update `%s`.redo_log_transaction set state = %a where dtid = %a",
sidecarDBName, ":state", ":dtid")
tpc.deleteRedoTx = buildParsedQuery(
"delete from `%s`.redo_log_transaction where dtid = %a",
sidecarDBName, ":dtid")
tpc.deleteRedoStmt = buildParsedQuery(
"delete from `%s`.redo_log_statement where dtid = %a",
sidecarDBName, ":dtid")
tpc.readPrepared = fmt.Sprintf(
"select s.dtid, s.id, s.statement from `%s`.redo_log_transaction t "+
"join `%s`.redo_log_statement s on t.dtid = s.dtid "+
"where t.state = 'Prepared' order by s.dtid, s.id",
sidecarDBName, sidecarDBName)
tpc.readAllRedo = fmt.Sprintf(sqlReadAllRedo, sidecarDBName, sidecarDBName)
tpc.insertTransaction = buildParsedQuery(
"insert into `%s`.transaction(dtid, state, time_created, time_updated) values (%a, 'Prepare', %a, %a)",
@ -180,6 +190,16 @@ func (tpc *TwoPC) SaveRedo(ctx context.Context, conn *TxConnection, dtid string,
return err
}
// UpdateRedo changes the state of the redo log for the dtid.
func (tpc *TwoPC) UpdateRedo(ctx context.Context, conn *TxConnection, dtid, state string) error {
bindVars := map[string]interface{}{
"dtid": sqltypes.MakeTrusted(sqltypes.VarBinary, []byte(dtid)),
"state": sqltypes.MakeTrusted(sqltypes.VarBinary, []byte(state)),
}
_, err := tpc.exec(ctx, conn, tpc.updateRedoTx, bindVars)
return err
}
// DeleteRedo deletes the redo log for the dtid.
func (tpc *TwoPC) DeleteRedo(ctx context.Context, conn *TxConnection, dtid string) error {
bindVars := map[string]interface{}{
@ -193,32 +213,38 @@ func (tpc *TwoPC) DeleteRedo(ctx context.Context, conn *TxConnection, dtid strin
return err
}
// ReadPrepared returns all the prepared transactions from the redo logs.
func (tpc *TwoPC) ReadPrepared(ctx context.Context, conn *DBConn) (map[string][]string, error) {
qr, err := conn.Exec(ctx, tpc.readPrepared, 10000, false)
// ReadAllRedo returns all the prepared transactions from the redo logs.
func (tpc *TwoPC) ReadAllRedo(ctx context.Context, conn *DBConn) (prepared map[string][]string, failed []string, err error) {
qr, err := conn.Exec(ctx, tpc.readAllRedo, 10000, false)
if err != nil {
return nil, err
return nil, nil, err
}
transactions := make(map[string][]string)
var stmts []string
var dtid string
for i, row := range qr.Rows {
curdtid := row[0].String()
if i == 0 {
dtid = curdtid
}
if dtid == curdtid {
stmts = append(stmts, row[2].String())
// Do this as two loops for better readability.
// Load prepared transactions.
prepared = make(map[string][]string)
for _, row := range qr.Rows {
if row[1].String() != "Prepared" {
continue
}
transactions[dtid] = stmts
dtid = curdtid
stmts = []string{row[2].String()}
dtid := row[0].String()
prepared[dtid] = append(prepared[dtid], row[3].String())
}
if stmts != nil {
transactions[dtid] = stmts
// Load failed transactions.
lastdtid := ""
for _, row := range qr.Rows {
if row[1].String() != "Failed" {
continue
}
dtid := row[0].String()
if dtid == lastdtid {
continue
}
failed = append(failed, dtid)
lastdtid = dtid
}
return transactions, nil
return prepared, failed, nil
}
// CreateTransaction saves the metadata of a 2pc transaction as Prepared.

Просмотреть файл

@ -12,7 +12,7 @@ import (
"github.com/youtube/vitess/go/sqltypes"
)
func TestReadPrepared(t *testing.T) {
func TestReadAllRedo(t *testing.T) {
// Reuse code from tx_executor_test.
_, tsv, db := newTestTxExecutor()
defer tsv.StopService()
@ -25,68 +25,83 @@ func TestReadPrepared(t *testing.T) {
}
defer conn.Recycle()
db.AddQuery(tpc.readPrepared, &sqltypes.Result{})
got, err := tpc.ReadPrepared(ctx, conn)
db.AddQuery(tpc.readAllRedo, &sqltypes.Result{})
prepared, failed, err := tpc.ReadAllRedo(ctx, conn)
if err != nil {
t.Fatal(err)
}
want := map[string][]string{}
if !reflect.DeepEqual(got, want) {
t.Errorf("ReadPrepared: %#v, want %#v", got, want)
if !reflect.DeepEqual(prepared, want) {
t.Errorf("ReadAllRedo: %#v, want %#v", prepared, want)
}
if len(failed) != 0 {
t.Errorf("ReadAllRedo (failed): %v, must be empty", failed)
}
db.AddQuery(tpc.readPrepared, &sqltypes.Result{
db.AddQuery(tpc.readAllRedo, &sqltypes.Result{
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("stmt01")),
}},
})
got, err = tpc.ReadPrepared(ctx, conn)
prepared, failed, err = tpc.ReadAllRedo(ctx, conn)
if err != nil {
t.Fatal(err)
}
want = map[string][]string{"dtid0": {"stmt01"}}
if !reflect.DeepEqual(got, want) {
t.Errorf("ReadPrepared: %#v, want %#v", got, want)
if !reflect.DeepEqual(prepared, want) {
t.Errorf("ReadAllRedo: %#v, want %#v", prepared, want)
}
if len(failed) != 0 {
t.Errorf("ReadAllRedo (failed): %v, must be empty", failed)
}
db.AddQuery(tpc.readPrepared, &sqltypes.Result{
db.AddQuery(tpc.readAllRedo, &sqltypes.Result{
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("stmt01")),
}, {
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("stmt02")),
}},
})
got, err = tpc.ReadPrepared(ctx, conn)
prepared, failed, err = tpc.ReadAllRedo(ctx, conn)
if err != nil {
t.Fatal(err)
}
want = map[string][]string{"dtid0": {"stmt01", "stmt02"}}
if !reflect.DeepEqual(got, want) {
t.Errorf("ReadPrepared: %#v, want %#v", got, want)
if !reflect.DeepEqual(prepared, want) {
t.Errorf("ReadAllRedo: %#v, want %#v", prepared, want)
}
if len(failed) != 0 {
t.Errorf("ReadAllRedo (failed): %v, must be empty", failed)
}
db.AddQuery(tpc.readPrepared, &sqltypes.Result{
db.AddQuery(tpc.readAllRedo, &sqltypes.Result{
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("stmt01")),
}, {
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("stmt02")),
}, {
sqltypes.MakeString([]byte("dtid1")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("stmt11")),
}},
})
got, err = tpc.ReadPrepared(ctx, conn)
prepared, failed, err = tpc.ReadAllRedo(ctx, conn)
if err != nil {
t.Fatal(err)
}
@ -94,7 +109,59 @@ func TestReadPrepared(t *testing.T) {
"dtid0": {"stmt01", "stmt02"},
"dtid1": {"stmt11"},
}
if !reflect.DeepEqual(got, want) {
t.Errorf("ReadPrepared: %#v, want %#v", got, want)
if !reflect.DeepEqual(prepared, want) {
t.Errorf("ReadAllRedo: %#v, want %#v", prepared, want)
}
if len(failed) != 0 {
t.Errorf("ReadAllRedo (failed): %v, must be empty", failed)
}
db.AddQuery(tpc.readAllRedo, &sqltypes.Result{
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("stmt01")),
}, {
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("stmt02")),
}, {
sqltypes.MakeString([]byte("dtid1")),
sqltypes.MakeString([]byte("Failed")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("stmt11")),
}, {
sqltypes.MakeString([]byte("dtid2")),
sqltypes.MakeString([]byte("Failed")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("stmt21")),
}, {
sqltypes.MakeString([]byte("dtid2")),
sqltypes.MakeString([]byte("Failed")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("stmt22")),
}, {
sqltypes.MakeString([]byte("dtid3")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("stmt31")),
}},
})
prepared, failed, err = tpc.ReadAllRedo(ctx, conn)
if err != nil {
t.Fatal(err)
}
want = map[string][]string{
"dtid0": {"stmt01", "stmt02"},
"dtid3": {"stmt31"},
}
if !reflect.DeepEqual(prepared, want) {
t.Errorf("ReadAllRedo: %#v, want %#v", prepared, want)
}
wantFailed := []string{"dtid1", "dtid2"}
if !reflect.DeepEqual(failed, wantFailed) {
t.Errorf("ReadAllRedo failed): %#v, want %#v", failed, wantFailed)
}
}

Просмотреть файл

@ -9,6 +9,7 @@ import (
"golang.org/x/net/context"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/trace"
querypb "github.com/youtube/vitess/go/vt/proto/query"
@ -17,6 +18,7 @@ import (
// TxExecutor is used for executing a transactional request.
type TxExecutor struct {
// TODO(sougou): Parameterize this.
ctx context.Context
logStats *LogStats
qe *QueryEngine
@ -68,12 +70,13 @@ func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error {
// CommitPrepared commits a prepared transaction. If the operation
// fails, an error counter is incremented and the transaction is
// marked as defunct in the redo log. If the marking fails, a
// different error counter is incremented indicating a more
// severe condition.
// marked as failed in the redo log.
func (txe *TxExecutor) CommitPrepared(dtid string) error {
defer txe.qe.queryServiceStats.QueryStats.Record("COMMIT_PREPARED", time.Now())
conn := txe.qe.preparedPool.Get(dtid)
conn, err := txe.qe.preparedPool.FetchForCommit(dtid)
if err != nil {
return NewTabletError(vtrpcpb.ErrorCode_BAD_INPUT, "cannot commit dtid %s, state: %v", dtid, err)
}
if conn == nil {
return nil
}
@ -81,19 +84,47 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error {
// even if the original context expires.
ctx := trace.CopySpan(context.Background(), txe.ctx)
defer txe.qe.txPool.LocalConclude(ctx, conn)
err := txe.qe.twoPC.DeleteRedo(ctx, conn, dtid)
err = txe.qe.twoPC.DeleteRedo(ctx, conn, dtid)
if err != nil {
// TODO(sougou): raise alarms & mark as defunct.
txe.markFailed(ctx, dtid)
return err
}
err = txe.qe.txPool.LocalCommit(ctx, conn)
if err != nil {
// TODO(sougou): raise alarms & mark as defunct.
txe.markFailed(ctx, dtid)
return err
}
txe.qe.preparedPool.Forget(dtid)
return nil
}
// markFailed does the necessary work to mark a CommitPrepared
// as failed. It marks the dtid as failed in the prepared pool,
// increments the InternalErros counter, and also changes the
// state of the transaction in the redo log as failed. If the
// state change does not succeed, it just logs the event.
// The function uses the passed in context that has no timeout
// instead of TxExecutor's context.
func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) {
txe.qe.queryServiceStats.InternalErrors.Add("TwopcCommit", 1)
txe.qe.preparedPool.SetFailed(dtid)
conn, err := txe.qe.txPool.LocalBegin(ctx)
if err != nil {
log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err)
return
}
defer txe.qe.txPool.LocalConclude(ctx, conn)
if err = txe.qe.twoPC.UpdateRedo(ctx, conn, dtid, "Failed"); err != nil {
log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err)
return
}
if err = txe.qe.txPool.LocalCommit(ctx, conn); err != nil {
log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err)
}
}
// RollbackPrepared rolls back a prepared transaction. This function handles
// the case of an incomplete prepare.
//
@ -128,7 +159,7 @@ func (txe *TxExecutor) RollbackPrepared(dtid string, originalID int64) error {
err = txe.qe.txPool.LocalCommit(txe.ctx, conn)
returnConn:
if preparedConn := txe.qe.preparedPool.Get(dtid); preparedConn != nil {
if preparedConn := txe.qe.preparedPool.FetchForRollback(dtid); preparedConn != nil {
txe.qe.txPool.LocalConclude(txe.ctx, preparedConn)
}
if originalID != 0 {

Просмотреть файл

@ -150,10 +150,17 @@ func TestTxExecutorCommitRedoFail(t *testing.T) {
t.Error(err)
}
defer txe.RollbackPrepared("bb", 0)
db.AddQuery("update `_vt`.redo_log_transaction set state = 'Failed' where dtid = 'bb'", &sqltypes.Result{})
err = txe.CommitPrepared("bb")
want := "is not supported"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("Prepare err: %v, must contain %s", err, want)
t.Errorf("txe.CommitPrepared err: %v, must contain %s", err, want)
}
// A retry should fail differently.
err = txe.CommitPrepared("bb")
want = "cannot commit dtid bb, state: failed"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("txe.CommitPrepared err: %v, must contain %s", err, want)
}
}

Просмотреть файл

@ -10,12 +10,18 @@ import (
"sync"
)
var (
errPrepCommiting = errors.New("commiting")
errPrepFailed = errors.New("failed")
)
// TxPreparedPool manages connections for prepared transactions.
// The Prepare functionality and associated orchestration
// is done by TxPool.
type TxPreparedPool struct {
mu sync.Mutex
conns map[string]*TxConnection
reserved map[string]error
capacity int
}
@ -23,15 +29,19 @@ type TxPreparedPool struct {
func NewTxPreparedPool(capacity int) *TxPreparedPool {
return &TxPreparedPool{
conns: make(map[string]*TxConnection, capacity),
reserved: make(map[string]error),
capacity: capacity,
}
}
// Put adds the connection to the pool. It returns an error
// if the pool is full, and panics on duplicate key.
// if the pool is full or on duplicate key.
func (pp *TxPreparedPool) Put(c *TxConnection, dtid string) error {
pp.mu.Lock()
defer pp.mu.Unlock()
if _, ok := pp.reserved[dtid]; ok {
return errors.New("duplicate DTID in Prepare: " + dtid)
}
if _, ok := pp.conns[dtid]; ok {
return errors.New("duplicate DTID in Prepare: " + dtid)
}
@ -42,18 +52,62 @@ func (pp *TxPreparedPool) Put(c *TxConnection, dtid string) error {
return nil
}
// Get returns the connection and removes it from the pool.
// If the connection is not found, it returns nil.
func (pp *TxPreparedPool) Get(dtid string) *TxConnection {
// FetchForRollback returns the connection and removes it from the pool.
// If the connection is not found, it returns nil. If the dtid
// is in the reserved list, it means that an operator is trying
// to resolve a previously failed commit. So, it removes the entry
// and returns nil.
func (pp *TxPreparedPool) FetchForRollback(dtid string) *TxConnection {
pp.mu.Lock()
defer pp.mu.Unlock()
if _, ok := pp.reserved[dtid]; ok {
delete(pp.reserved, dtid)
return nil
}
c := pp.conns[dtid]
delete(pp.conns, dtid)
return c
}
// GetAll removes all connections and returns them as a list.
func (pp *TxPreparedPool) GetAll() []*TxConnection {
// FetchForCommit returns the connection for commit. Before returning,
// it remembers the dtid in its reserved list as "committing". If
// the dtid is already in the reserved list, it returns an error.
// If the commit is successful, the dtid can be removed from the
// reserved list by calling Forget. If the commit failed, SetFailed
// must be called. This will inform future retries that the previous
// commit failed.
func (pp *TxPreparedPool) FetchForCommit(dtid string) (*TxConnection, error) {
pp.mu.Lock()
defer pp.mu.Unlock()
if err, ok := pp.reserved[dtid]; ok {
return nil, err
}
c, ok := pp.conns[dtid]
if ok {
delete(pp.conns, dtid)
pp.reserved[dtid] = errPrepCommiting
}
return c, nil
}
// SetFailed marks the reserved dtid as failed.
// If there was no previous entry, one is created.
func (pp *TxPreparedPool) SetFailed(dtid string) {
pp.mu.Lock()
defer pp.mu.Unlock()
pp.reserved[dtid] = errPrepFailed
}
// Forget removes the dtid from the reserved list.
func (pp *TxPreparedPool) Forget(dtid string) {
pp.mu.Lock()
defer pp.mu.Unlock()
delete(pp.reserved, dtid)
}
// FetchAll removes all connections and returns them as a list.
// It also forgets all reserved dtids.
func (pp *TxPreparedPool) FetchAll() []*TxConnection {
pp.mu.Lock()
defer pp.mu.Unlock()
conns := make([]*TxConnection, 0, len(pp.conns))
@ -61,5 +115,6 @@ func (pp *TxPreparedPool) GetAll() []*TxConnection {
conns = append(conns, c)
}
pp.conns = make(map[string]*TxConnection, pp.capacity)
pp.reserved = make(map[string]error)
return conns
}

Просмотреть файл

@ -37,35 +37,91 @@ func TestPrepPut(t *testing.T) {
if err == nil || err.Error() != want {
t.Errorf("Put err: %v, want %s", err, want)
}
_, err = pp.FetchForCommit("aa")
err = pp.Put(nil, "aa")
want = "duplicate DTID in Prepare: aa"
if err == nil || err.Error() != want {
t.Errorf("Put err: %v, want %s", err, want)
}
pp.Forget("aa")
err = pp.Put(nil, "aa")
if err != nil {
t.Error(err)
}
}
func TestPrepGet(t *testing.T) {
func TestPrepFetchForRollback(t *testing.T) {
pp := NewTxPreparedPool(2)
conn := &TxConnection{}
pp.Put(conn, "aa")
got := pp.Get("bb")
got := pp.FetchForRollback("bb")
if got != nil {
t.Errorf("Get(bb): %v, want nil", got)
}
got = pp.Get("aa")
got = pp.FetchForRollback("aa")
if got != conn {
t.Errorf("pp.Get(aa): %p, want %p", got, conn)
}
got = pp.Get("aa")
got = pp.FetchForRollback("aa")
if got != nil {
t.Errorf("Get(aa): %v, want nil", got)
}
}
func TestPrepGetAll(t *testing.T) {
func TestPrepFetchForCommit(t *testing.T) {
pp := NewTxPreparedPool(2)
conn := &TxConnection{}
got, err := pp.FetchForCommit("aa")
if err != nil {
t.Error(err)
}
if got != nil {
t.Errorf("Get(aa): %v, want nil", got)
}
pp.Put(conn, "aa")
got, err = pp.FetchForCommit("aa")
if err != nil {
t.Error(err)
}
if got != conn {
t.Errorf("pp.Get(aa): %p, want %p", got, conn)
}
got, err = pp.FetchForCommit("aa")
want := "commiting"
if err == nil || err.Error() != want {
t.Errorf("FetchForCommit err: %v, want %s", err, want)
}
pp.SetFailed("aa")
got, err = pp.FetchForCommit("aa")
want = "failed"
if err == nil || err.Error() != want {
t.Errorf("FetchForCommit err: %v, want %s", err, want)
}
pp.SetFailed("bb")
got, err = pp.FetchForCommit("bb")
want = "failed"
if err == nil || err.Error() != want {
t.Errorf("FetchForCommit err: %v, want %s", err, want)
}
pp.Forget("aa")
got, err = pp.FetchForCommit("aa")
if err != nil {
t.Error(err)
}
if got != nil {
t.Errorf("Get(aa): %v, want nil", got)
}
}
func TestPrepFetchAll(t *testing.T) {
pp := NewTxPreparedPool(2)
conn1 := &TxConnection{}
conn2 := &TxConnection{}
pp.Put(conn1, "aa")
pp.Put(conn2, "bb")
got := pp.GetAll()
got := pp.FetchAll()
if len(got) != 2 {
t.Errorf("GetAll len: %d, want 2", len(got))
t.Errorf("FetchAll len: %d, want 2", len(got))
}
if len(pp.conns) != 0 {
t.Errorf("len(pp.conns): %d, want 0", len(pp.conns))

Просмотреть файл

@ -194,8 +194,6 @@ func (txc *TxConn) resumeCommit(ctx context.Context, target *querypb.Target, tra
}
func (txc *TxConn) generateDTID(mmShard *vtgatepb.Session_ShardSession) string {
// TODO(sougou): Change query_engine to start off transaction id counting
// above the highest number used by dtids. This will prevent collisions.
return fmt.Sprintf("%s:%s:0:%d", mmShard.Target.Keyspace, mmShard.Target.Shard, mmShard.TransactionId)
}