2pc: tabletserver Prepare endtoend test (#2119)

* More changes to TabletServer transitions: Found more corner
cases where the new transactional requests can race against
state transitions. To simplify this, we allow such commands
only if the tabletserver is the master. Also, we wait for
all inflight transactional requests to complete before
rolling back transactions. This will prevent a new
transactional command from landing after a tx pool
had been fully rolled back.
* Some more clean-up on TabletServer: SplitQueryV2 also
uses execRequest now.
* Typos: aciton->action
This commit is contained in:
sougou 2016-10-06 15:10:32 -07:00 коммит произвёл GitHub
Родитель 041fca210b
Коммит a152b853fc
6 изменённых файлов: 273 добавлений и 104 удалений

Просмотреть файл

@ -14,6 +14,7 @@ import (
"github.com/youtube/vitess/go/vt/tabletserver/querytypes"
"golang.org/x/net/context"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc"
)
@ -65,6 +66,29 @@ func (client *QueryClient) Rollback() error {
return client.server.Rollback(client.ctx, &client.target, client.transactionID)
}
// Prepare executes a prepare on the current transaction.
func (client *QueryClient) Prepare(dtid string) error {
defer func() { client.transactionID = 0 }()
return client.server.Prepare(client.ctx, &client.target, client.transactionID, dtid)
}
// CommitPrepared commits a prepared transaction.
func (client *QueryClient) CommitPrepared(dtid string) error {
return client.server.CommitPrepared(client.ctx, &client.target, dtid)
}
// RollbackPrepared rollsback a prepared transaction.
func (client *QueryClient) RollbackPrepared(dtid string, originalID int64) error {
return client.server.RollbackPrepared(client.ctx, &client.target, dtid, originalID)
}
// SetServingType is for testing transitions.
// It currently supports only master->replica and back.
func (client *QueryClient) SetServingType(tabletType topodatapb.TabletType) error {
_, err := client.server.SetServingType(tabletType, true, nil)
return err
}
// Execute executes a query.
func (client *QueryClient) Execute(query string, bindvars map[string]interface{}) (*sqltypes.Result, error) {
return client.server.Execute(

Просмотреть файл

@ -12,6 +12,8 @@ import (
"time"
"github.com/youtube/vitess/go/vt/tabletserver/endtoend/framework"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
func TestCommit(t *testing.T) {
@ -389,3 +391,126 @@ func TestForUpdate(t *testing.T) {
}
}
}
func TestPrepareRollback(t *testing.T) {
client := framework.NewClient()
defer client.Execute("delete from vitess_test where intval=4", nil)
query := "insert into vitess_test (intval, floatval, charval, binval) " +
"values(4, null, null, null)"
err := client.Begin()
if err != nil {
t.Error(err)
return
}
_, err = client.Execute(query, nil)
if err != nil {
t.Error(err)
return
}
err = client.Prepare("aa")
if err != nil {
client.RollbackPrepared("aa", 0)
t.Error(err)
return
}
err = client.RollbackPrepared("aa", 0)
if err != nil {
t.Error(err)
return
}
qr, err := client.Execute("select * from vitess_test", nil)
if err != nil {
t.Error(err)
return
}
if qr.RowsAffected != 3 {
t.Errorf("rows affected: %d, want 3", qr.RowsAffected)
}
}
func TestPrepareCommit(t *testing.T) {
client := framework.NewClient()
defer client.Execute("delete from vitess_test where intval=4", nil)
query := "insert into vitess_test (intval, floatval, charval, binval) " +
"values(4, null, null, null)"
err := client.Begin()
if err != nil {
t.Error(err)
return
}
_, err = client.Execute(query, nil)
if err != nil {
t.Error(err)
return
}
err = client.Prepare("aa")
if err != nil {
client.RollbackPrepared("aa", 0)
t.Error(err)
return
}
err = client.CommitPrepared("aa")
if err != nil {
t.Error(err)
return
}
qr, err := client.Execute("select * from vitess_test", nil)
if err != nil {
t.Error(err)
return
}
if qr.RowsAffected != 4 {
t.Errorf("rows affected: %d, want 4", qr.RowsAffected)
}
}
func TestPrepareReparentCommit(t *testing.T) {
client := framework.NewClient()
defer client.Execute("delete from vitess_test where intval=4", nil)
query := "insert into vitess_test (intval, floatval, charval, binval) " +
"values(4, null, null, null)"
err := client.Begin()
if err != nil {
t.Error(err)
return
}
_, err = client.Execute(query, nil)
if err != nil {
t.Error(err)
return
}
err = client.Prepare("aa")
if err != nil {
client.RollbackPrepared("aa", 0)
t.Error(err)
return
}
// Rollback all transactions
err = client.SetServingType(topodatapb.TabletType_REPLICA)
if err != nil {
t.Error(err)
return
}
// This should resurrect the prepared transaction.
err = client.SetServingType(topodatapb.TabletType_MASTER)
if err != nil {
t.Error(err)
return
}
err = client.CommitPrepared("aa")
if err != nil {
t.Error(err)
return
}
qr, err := client.Execute("select * from vitess_test", nil)
if err != nil {
t.Error(err)
return
}
if qr.RowsAffected != 4 {
t.Errorf("rows affected: %d, want 4", qr.RowsAffected)
}
}

Просмотреть файл

@ -54,11 +54,12 @@ const (
StateServing
// StateTransitioning is a transient state indicating that
// the tabletserver is tranisitioning to a new state.
// In order to achieve clean transitions, no requests are
// allowed during this state.
StateTransitioning
// StateShuttingDown is a transient state indicating that
// the tabletserver is shutting down. This state differs from
// StateTransitioning because we allow queries for transactions
// that are still in flight.
// StateShuttingDown indicates that the tabletserver
// is shutting down. In this state, we wait for outstanding
// requests and transactions to conclude.
StateShuttingDown
)
@ -94,13 +95,13 @@ type TabletServer struct {
// for health checks. This does not affect how queries are served.
// target specifies the primary target type, and also allow specifies
// secondary types that should be additionally allowed.
mu sync.Mutex
state int64
lameduck sync2.AtomicInt32
target querypb.Target
alsoAllow []topodatapb.TabletType
requests sync.WaitGroup
begins sync.WaitGroup
mu sync.Mutex
state int64
lameduck sync2.AtomicInt32
target querypb.Target
alsoAllow []topodatapb.TabletType
requests sync.WaitGroup
txRequests sync.WaitGroup
// The following variables should be initialized only once
// before starting the tabletserver.
@ -416,6 +417,9 @@ func (tsv *TabletServer) serveNewType() (err error) {
log.Errorf("Could not prepare transactions: %v", err)
}
} else {
// Wait for in-flight transactional requests to complete
// before rolling back everything.
tsv.txRequests.Wait()
tsv.qe.RollbackTransactions()
tsv.startReplicationStreamer()
}
@ -456,8 +460,8 @@ func (tsv *TabletServer) StopService() {
}
func (tsv *TabletServer) waitForShutdown() {
// Wait till begins have completed before waiting on tx pool.
tsv.begins.Wait()
// Wait till txRequests have completed before waiting on tx pool.
tsv.txRequests.Wait()
tsv.qe.WaitForTxEmpty()
tsv.qe.streamQList.TerminateAll()
tsv.updateStreamList.Stop()
@ -629,7 +633,7 @@ func (tsv *TabletServer) replicationStreamer(ctx context.Context) {
// Begin starts a new transaction. This is allowed only if the state is StateServing.
func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target) (transactionID int64, err error) {
err = tsv.execRequest(
ctx,
ctx, tsv.BeginTimeout.Get(),
"Begin", "begin", nil,
target, true, false,
func(ctx context.Context, logStats *LogStats) error {
@ -645,9 +649,9 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target) (tra
// Commit commits the specified transaction.
func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, transactionID int64) (err error) {
return tsv.execRequest(
ctx,
ctx, tsv.QueryTimeout.Get(),
"Commit", "commit", nil,
target, false, true,
target, true, true,
func(ctx context.Context, logStats *LogStats) error {
defer tsv.qe.queryServiceStats.QueryStats.Record("COMMIT", time.Now())
logStats.TransactionID = transactionID
@ -659,9 +663,9 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra
// Rollback rollsback the specified transaction.
func (tsv *TabletServer) Rollback(ctx context.Context, target *querypb.Target, transactionID int64) (err error) {
return tsv.execRequest(
ctx,
ctx, tsv.QueryTimeout.Get(),
"Rollback", "rollback", nil,
target, false, true,
target, true, true,
func(ctx context.Context, logStats *LogStats) error {
defer tsv.qe.queryServiceStats.QueryStats.Record("ROLLBACK", time.Now())
logStats.TransactionID = transactionID
@ -673,9 +677,9 @@ func (tsv *TabletServer) Rollback(ctx context.Context, target *querypb.Target, t
// Prepare prepares the specified transaction.
func (tsv *TabletServer) Prepare(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) {
return tsv.execRequest(
ctx,
ctx, tsv.QueryTimeout.Get(),
"Prepare", "prepare", nil,
target, false, true,
target, true, true,
func(ctx context.Context, logStats *LogStats) error {
txe := &TxExecutor{
ctx: ctx,
@ -690,9 +694,9 @@ func (tsv *TabletServer) Prepare(ctx context.Context, target *querypb.Target, tr
// CommitPrepared commits the prepared transaction.
func (tsv *TabletServer) CommitPrepared(ctx context.Context, target *querypb.Target, dtid string) (err error) {
return tsv.execRequest(
ctx,
ctx, tsv.QueryTimeout.Get(),
"CommitPrepared", "commit_prepared", nil,
target, false, true,
target, true, true,
func(ctx context.Context, logStats *LogStats) error {
txe := &TxExecutor{
ctx: ctx,
@ -707,9 +711,9 @@ func (tsv *TabletServer) CommitPrepared(ctx context.Context, target *querypb.Tar
// RollbackPrepared commits the prepared transaction.
func (tsv *TabletServer) RollbackPrepared(ctx context.Context, target *querypb.Target, dtid string, originalID int64) (err error) {
return tsv.execRequest(
ctx,
ctx, tsv.QueryTimeout.Get(),
"RollbackPrepared", "rollback_prepared", nil,
target, false, true,
target, true, true,
func(ctx context.Context, logStats *LogStats) error {
txe := &TxExecutor{
ctx: ctx,
@ -725,7 +729,7 @@ func (tsv *TabletServer) RollbackPrepared(ctx context.Context, target *querypb.T
func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, transactionID int64, options *querypb.ExecuteOptions) (result *sqltypes.Result, err error) {
allowShutdown := (transactionID != 0)
err = tsv.execRequest(
ctx,
ctx, tsv.QueryTimeout.Get(),
"Execute", sql, bindVariables,
target, false, allowShutdown,
func(ctx context.Context, logStats *LogStats) error {
@ -809,7 +813,7 @@ func (tsv *TabletServer) computeExtras(options *querypb.ExecuteOptions) *querypb
// The subsequent QueryResult will have Rows set (and Fields nil).
func (tsv *TabletServer) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, options *querypb.ExecuteOptions, sendReply func(*sqltypes.Result) error) (err error) {
err = tsv.execRequest(
ctx,
ctx, tsv.QueryTimeout.Get(),
"StreamExecute", sql, bindVariables,
target, false, false,
func(ctx context.Context, logStats *LogStats) error {
@ -913,7 +917,7 @@ func (tsv *TabletServer) BeginExecuteBatch(ctx context.Context, target *querypb.
// SplitQuery V2.
func (tsv *TabletServer) SplitQuery(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int64) (splits []querytypes.QuerySplit, err error) {
err = tsv.execRequest(
ctx,
ctx, tsv.QueryTimeout.Get(),
"SplitQuery", sql, bindVariables,
target, false, false,
func(ctx context.Context, logStats *LogStats) error {
@ -967,89 +971,79 @@ func (tsv *TabletServer) SplitQueryV2(
numRowsPerQueryPart int64,
algorithm querypb.SplitQueryRequest_Algorithm,
) (splits []querytypes.QuerySplit, err error) {
logStats := newLogStats("SplitQuery", ctx)
logStats.OriginalSQL = sql
logStats.BindVariables = bindVariables
defer tsv.handleError(sql, bindVariables, &err, logStats)
if err = tsv.startRequest(target, false, false); err != nil {
return nil, err
}
// We should not set a timeout for SplitQueryV2. So, we can't use execRequest.
err = tsv.execRequest(
ctx, 24*365*10*time.Hour, /* practically forever */
"SplitQuery", sql, bindVariables,
target, false, false,
func(ctx context.Context, logStats *LogStats) error {
// SplitQuery using the Full Scan algorithm can take a while and
// we don't expect too many of these queries to run concurrently.
ciSplitColumns := make([]sqlparser.ColIdent, 0, len(splitColumns))
for _, s := range splitColumns {
ciSplitColumns = append(ciSplitColumns, sqlparser.NewColIdent(s))
}
// SplitQuery using the Full Scan algorithm can take a while and
// we don't expect too many of these queries to run concurrently.
defer tsv.endRequest(false)
ciSplitColumns := make([]sqlparser.ColIdent, 0, len(splitColumns))
for _, s := range splitColumns {
ciSplitColumns = append(ciSplitColumns, sqlparser.NewColIdent(s))
}
if err := validateSplitQueryParameters(
target,
sql,
bindVariables,
splitColumns,
splitCount,
numRowsPerQueryPart,
algorithm,
); err != nil {
return nil, tsv.handleErrorNoPanic(sql, bindVariables, err, logStats)
}
schema := getSchemaForSplitQuery(tsv.qe.schemaInfo)
splitParams, err := createSplitParams(
sql, bindVariables, ciSplitColumns, splitCount, numRowsPerQueryPart, schema)
if err != nil {
return nil, tsv.handleErrorNoPanic(sql, bindVariables, err, logStats)
}
defer func(start time.Time) {
splitTableName := splitParams.GetSplitTableName()
addUserTableQueryStats(
tsv.qe.queryServiceStats, ctx, splitTableName, "SplitQuery", int64(time.Now().Sub(start)))
}(time.Now())
sqlExecuter, err := newSplitQuerySQLExecuter(ctx, logStats, tsv.qe)
if err != nil {
return nil, tsv.handleErrorNoPanic(sql, bindVariables, err, logStats)
}
defer sqlExecuter.done()
algorithmObject, err := createSplitQueryAlgorithmObject(algorithm, splitParams, sqlExecuter)
if err != nil {
return nil, tsv.handleErrorNoPanic(sql, bindVariables, err, logStats)
}
result, err := splitquery.NewSplitter(splitParams, algorithmObject).Split()
if err != nil {
err = splitQueryToTabletError(err)
return nil, tsv.handleErrorNoPanic(sql, bindVariables, err, logStats)
}
return result, nil
if err := validateSplitQueryParameters(
target,
sql,
bindVariables,
splitColumns,
splitCount,
numRowsPerQueryPart,
algorithm,
); err != nil {
return err
}
schema := getSchemaForSplitQuery(tsv.qe.schemaInfo)
splitParams, err := createSplitParams(
sql, bindVariables, ciSplitColumns, splitCount, numRowsPerQueryPart, schema)
if err != nil {
return err
}
defer func(start time.Time) {
splitTableName := splitParams.GetSplitTableName()
addUserTableQueryStats(
tsv.qe.queryServiceStats, ctx, splitTableName, "SplitQuery", int64(time.Now().Sub(start)))
}(time.Now())
sqlExecuter, err := newSplitQuerySQLExecuter(ctx, logStats, tsv.qe)
if err != nil {
return err
}
defer sqlExecuter.done()
algorithmObject, err := createSplitQueryAlgorithmObject(algorithm, splitParams, sqlExecuter)
if err != nil {
return err
}
splits, err = splitquery.NewSplitter(splitParams, algorithmObject).Split()
if err != nil {
return splitQueryToTabletError(err)
}
return nil
},
)
return splits, err
}
// execRequest performs verfications, sets up the necessary environments
// and calls the supplied function for executing the request.
func (tsv *TabletServer) execRequest(
ctx context.Context,
ctx context.Context, timeout time.Duration,
requestName, sql string, bindVariables map[string]interface{},
target *querypb.Target, isBegin, allowShutdown bool,
target *querypb.Target, isTx, allowShutdown bool,
exec func(ctx context.Context, logStats *LogStats) error,
) (err error) {
logStats := newLogStats(requestName, ctx)
logStats.OriginalSQL = sql
logStats.BindVariables = bindVariables
defer tsv.handleError(sql, bindVariables, &err, logStats)
if err = tsv.startRequest(target, isBegin, allowShutdown); err != nil {
if err = tsv.startRequest(target, isTx, allowShutdown); err != nil {
return err
}
var timeout time.Duration
if isBegin {
timeout = tsv.BeginTimeout.Get()
} else {
timeout = tsv.QueryTimeout.Get()
}
ctx, cancel := withTimeout(ctx, timeout)
defer func(start time.Time) {
defer func() {
cancel()
tsv.endRequest(isBegin)
}(time.Now())
tsv.endRequest(isTx)
}()
err = exec(ctx, logStats)
if err != nil {
@ -1401,14 +1395,17 @@ func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.Real
// the request (a waitgroup) as started. Every startRequest requires one
// and only one corresponding endRequest. When the service shuts down,
// StopService will wait on this waitgroup to ensure that there are
// no requests in flight.
func (tsv *TabletServer) startRequest(target *querypb.Target, isBegin, allowShutdown bool) (err error) {
// no requests in flight. For transactional requests like begin, etc.,
// isTx must be set to true, which increments an additional waitgroup.
// During state transitions, this waitgroup will be checked to make
// sure that no such statements are in-flight while we resolve the tx pool.
func (tsv *TabletServer) startRequest(target *querypb.Target, isTx, allowShutdown bool) (err error) {
tsv.mu.Lock()
defer tsv.mu.Unlock()
if tsv.state == StateServing {
goto verifyTarget
}
if (isBegin || allowShutdown) && tsv.state == StateShuttingDown {
if allowShutdown && tsv.state == StateShuttingDown {
goto verifyTarget
}
return NewTabletError(vtrpcpb.ErrorCode_QUERY_NOT_SERVED, "operation not allowed in state %s", stateName[tsv.state])
@ -1422,6 +1419,9 @@ verifyTarget:
if target.Shard != tsv.target.Shard {
return NewTabletError(vtrpcpb.ErrorCode_QUERY_NOT_SERVED, "Invalid shard %v", target.Shard)
}
if isTx && tsv.target.TabletType != topodatapb.TabletType_MASTER {
return NewTabletError(vtrpcpb.ErrorCode_QUERY_NOT_SERVED, "transactional statement disallowed on non-master tablet: %v", tsv.target.TabletType)
}
if target.TabletType != tsv.target.TabletType {
for _, otherType := range tsv.alsoAllow {
if target.TabletType == otherType {
@ -1439,17 +1439,17 @@ ok:
tsv.requests.Add(1)
// If it's a begin, we should make the shutdown code
// wait for the call to end before it waits for tx empty.
if isBegin {
tsv.begins.Add(1)
if isTx {
tsv.txRequests.Add(1)
}
return nil
}
// endRequest unregisters the current request (a waitgroup) as done.
func (tsv *TabletServer) endRequest(isBegin bool) {
func (tsv *TabletServer) endRequest(isTx bool) {
tsv.requests.Done()
if isBegin {
tsv.begins.Done()
if isTx {
tsv.txRequests.Done()
}
}

Просмотреть файл

@ -573,6 +573,26 @@ func TestTabletServerTarget(t *testing.T) {
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("err: %v, must contain %s", err, want)
}
// Disallow tx statements if non-master.
tsv.SetServingType(topodatapb.TabletType_REPLICA, true, nil)
_, err = tsv.Begin(ctx, &target1)
want = "transactional statement disallowed on non-master tablet"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("err: %v, must contain %s", err, want)
}
err = tsv.Commit(ctx, &target1, 1)
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("err: %v, must contain %s", err, want)
}
// Disallow all if service is stopped.
tsv.StopService()
_, err = tsv.Execute(ctx, &target1, "select * from test_table limit 1000", nil, 0, nil)
want = "operation not allowed in state NOT_SERVING"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("err: %v, must contain %s", err, want)
}
}
func TestTabletServerStopWithPrepare(t *testing.T) {
@ -743,7 +763,7 @@ func TestTabletServerBeginFail(t *testing.T) {
}
}
func TestTabletServerCommitTransaciton(t *testing.T) {
func TestTabletServerCommitTransaction(t *testing.T) {
db := setUpTabletServerTest()
testUtils := newTestUtils()
// sql that will be executed in this test

Просмотреть файл

@ -59,7 +59,7 @@ const (
sqlInsertRedoStmt = "insert into `%s`.redo_log_statement(dtid, id, statement) values %a"
sqlDeleteRedoTx = "delete from `%s`.redo_log_transaction where dtid = %a"
sqlDeleteRedoStmt = "delete from `%s`.redo_log_statement where dtid = %a"
sqlReadPrepared = "select s.dtid, s.id, s.statement from `%s`.redo_log_transaction t join `%s`.redo_log_statement s on t.dtid = s.dtid where t.resolution = 'Prepared' order by s.dtid, s.id"
sqlReadPrepared = "select s.dtid, s.id, s.statement from `%s`.redo_log_transaction t join `%s`.redo_log_statement s on t.dtid = s.dtid where t.state = 'Prepared' order by s.dtid, s.id"
)
// TwoPC performs 2PC metadata management (MM) functions.

Просмотреть файл

@ -21,7 +21,7 @@ var (
actionTimeout = flag.Duration("action_timeout", wrangler.DefaultActionTimeout, "time to wait for an action before resorting to force")
)
// ActionResult contains the result of an action. If Error, the aciton failed.
// ActionResult contains the result of an action. If Error, the action failed.
type ActionResult struct {
Name string
Parameters string