* 2pc: Repair UI

* 2pc: json format for /twopcz

* code review comments
This commit is contained in:
sougou 2016-12-09 10:14:07 -08:00 коммит произвёл GitHub
Родитель 3985ebb172
Коммит 83dcb80c59
10 изменённых файлов: 638 добавлений и 77 удалений

Просмотреть файл

@ -65,6 +65,7 @@ var (
<a href="/debug/consolidations">Consolidations</a></br>
<a href="/querylogz">Current&nbsp;Query&nbsp;Log</a></br>
<a href="/txlogz">Current&nbsp;Transaction&nbsp;Log</a></br>
<a href="/twopcz">In-flight&nbsp;2PC&nbsp;Transactions</a></br>
</td>
<td width="25%" border="">
<a href="/healthz">Health Check</a></br>

Просмотреть файл

@ -11,6 +11,7 @@ import (
"testing"
"time"
"github.com/youtube/vitess/go/mysql"
"github.com/youtube/vitess/go/vt/tabletserver/endtoend/framework"
querypb "github.com/youtube/vitess/go/vt/proto/query"
@ -710,3 +711,73 @@ func TestWatchdog(t *testing.T) {
case <-time.After(2 * time.Second):
}
}
func TestManualTwopcz(t *testing.T) {
// This is a manual test. Uncomment the Skip to perform this test.
// The test will print the twopcz URL. Navigate to that location
// and perform all the operations allowed. They should all succeed
// and cause the transactions to be resolved.
t.Skip()
client := framework.NewClient()
defer client.Execute("delete from vitess_test where intval=4", nil)
conn, err := mysql.Connect(connParams)
if err != nil {
t.Error(err)
return
}
defer conn.Close()
// Successful prepare.
err = client.Begin()
if err != nil {
t.Error(err)
return
}
_, err = client.Execute("insert into vitess_test (intval, floatval, charval, binval) values(4, null, null, null)", nil)
_, err = client.Execute("insert into vitess_test (intval, floatval, charval, binval) values(5, null, null, null)", nil)
if err != nil {
t.Error(err)
return
}
err = client.Prepare("dtidsuccess")
defer client.RollbackPrepared("dtidsuccess", 0)
if err != nil {
t.Error(err)
return
}
// Failed transaction.
err = client.Begin()
if err != nil {
t.Error(err)
return
}
_, err = client.Execute("insert into vitess_test (intval, floatval, charval, binval) values(6, null, null, null)", nil)
_, err = client.Execute("insert into vitess_test (intval, floatval, charval, binval) values(7, null, null, null)", nil)
if err != nil {
t.Error(err)
return
}
err = client.Prepare("dtidfail")
defer client.RollbackPrepared("dtidfail", 0)
if err != nil {
t.Error(err)
return
}
conn.ExecuteFetch("update _vt.redo_log_transaction set state = 'Failed' where dtid = 'dtidfail'", 10, false)
conn.ExecuteFetch("commit", 10, false)
// Distributed transaction.
err = client.CreateTransaction("distributed", []*querypb.Target{{
Keyspace: "k1",
Shard: "s1",
}, {
Keyspace: "k2",
Shard: "s2",
}})
defer client.ConcludeTransaction("distributed")
fmt.Printf("%s/twopcz\n", framework.ServerAddress)
fmt.Print("Sleeping for 30 seconds\n")
time.Sleep(30 * time.Second)
}

Просмотреть файл

@ -79,7 +79,7 @@ func NewQueryServiceStats(statsPrefix string, enablePublishStats bool) *QuerySer
KillStats: stats.NewCounters(killStatsName, "Transactions", "Queries"),
InfoErrors: stats.NewCounters(infoErrorsName, "Retry", "DupKey"),
ErrorStats: stats.NewCounters(errorStatsName, "Fail", "TxPoolFull", "NotInTx", "Deadlock", "Fatal"),
InternalErrors: stats.NewCounters(internalErrorsName, "Task", "StrayTransactions", "Panic", "HungQuery", "Schema", "TwopcCommit", "TwopcResurrection"),
InternalErrors: stats.NewCounters(internalErrorsName, "Task", "StrayTransactions", "Panic", "HungQuery", "Schema", "TwopcCommit", "TwopcResurrection", "WatchdogFail"),
UserTableQueryCount: stats.NewMultiCounters(
userTableQueryCountName, []string{"TableName", "CallerID", "Type"}),
UserTableQueryTimesNs: stats.NewMultiCounters(

Просмотреть файл

@ -209,6 +209,7 @@ func (tsv *TabletServer) Register() {
tsv.registerQueryzHandler()
tsv.registerSchemazHandler()
tsv.registerStreamQueryzHandlers()
tsv.registerTwopczHandler()
}
// RegisterQueryRuleSource registers ruleSource for setting query rules.
@ -1559,6 +1560,18 @@ func (tsv *TabletServer) registerSchemazHandler() {
})
}
func (tsv *TabletServer) registerTwopczHandler() {
http.HandleFunc("/twopcz", func(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
txe := &TxExecutor{
ctx: ctx,
logStats: NewLogStats(ctx, "twopcz"),
te: tsv.te,
}
twopczHandler(txe, w, r)
})
}
// SetPoolSize changes the pool size to the specified value.
func (tsv *TabletServer) SetPoolSize(val int) {
tsv.qe.connPool.SetCapacity(val)

Просмотреть файл

@ -65,9 +65,15 @@ const (
primary key(dtid, id)
) engine=InnoDB`
sqlReadAllRedo = `select t.dtid, t.state, s.id, s.statement from ` + "`%s`" + `.redo_log_transaction t
sqlReadAllRedo = `select t.dtid, t.state, t.time_created, s.statement
from ` + "`%s`" + `.redo_log_transaction t
join ` + "`%s`" + `.redo_log_statement s on t.dtid = s.dtid
where t.state = 'Prepared' order by t.dtid, s.id`
order by t.dtid, s.id`
sqlReadAllTransactions = `select t.dtid, t.state, t.time_created, p.keyspace, p.shard
from ` + "`%s`" + `.transaction t
join ` + "`%s`" + `.participant p on t.dtid = p.dtid
order by t.dtid, p.id`
)
// TwoPC performs 2PC metadata management (MM) functions.
@ -81,14 +87,15 @@ type TwoPC struct {
deleteRedoStmt *sqlparser.ParsedQuery
readAllRedo string
insertTransaction *sqlparser.ParsedQuery
insertParticipants *sqlparser.ParsedQuery
transition *sqlparser.ParsedQuery
deleteTransaction *sqlparser.ParsedQuery
deleteParticipants *sqlparser.ParsedQuery
readTransaction *sqlparser.ParsedQuery
readParticipants *sqlparser.ParsedQuery
readAbandoned *sqlparser.ParsedQuery
insertTransaction *sqlparser.ParsedQuery
insertParticipants *sqlparser.ParsedQuery
transition *sqlparser.ParsedQuery
deleteTransaction *sqlparser.ParsedQuery
deleteParticipants *sqlparser.ParsedQuery
readTransaction *sqlparser.ParsedQuery
readParticipants *sqlparser.ParsedQuery
readAbandoned *sqlparser.ParsedQuery
readAllTransactions string
}
// NewTwoPC creates a TwoPC variable.
@ -159,6 +166,7 @@ func (tpc *TwoPC) Init(sidecarDBName string, dbaparams *sqldb.ConnParams) error
tpc.readAbandoned = buildParsedQuery(
"select dtid, time_created from `%s`.transaction where time_created < %a",
sidecarDBName, ":time_created")
tpc.readAllTransactions = fmt.Sprintf(sqlReadAllTransactions, sidecarDBName, sidecarDBName)
return nil
}
@ -227,8 +235,15 @@ func (tpc *TwoPC) DeleteRedo(ctx context.Context, conn *TxConnection, dtid strin
return err
}
// PreparedTx represents a displayable version of a prepared transaction.
type PreparedTx struct {
Dtid string
Queries []string
Time time.Time
}
// ReadAllRedo returns all the prepared transactions from the redo logs.
func (tpc *TwoPC) ReadAllRedo(ctx context.Context) (prepared map[string][]string, failed []string, err error) {
func (tpc *TwoPC) ReadAllRedo(ctx context.Context) (prepared, failed []*PreparedTx, err error) {
conn, err := tpc.readPool.Get(ctx)
if err != nil {
return nil, nil, err
@ -240,29 +255,24 @@ func (tpc *TwoPC) ReadAllRedo(ctx context.Context) (prepared map[string][]string
return nil, nil, err
}
// Do this as two loops for better readability.
// Load prepared transactions.
prepared = make(map[string][]string)
var curTx *PreparedTx
for _, row := range qr.Rows {
if row[1].String() != "Prepared" {
continue
}
dtid := row[0].String()
prepared[dtid] = append(prepared[dtid], row[3].String())
}
// Load failed transactions.
lastdtid := ""
for _, row := range qr.Rows {
if row[1].String() != "Failed" {
continue
if curTx == nil || dtid != curTx.Dtid {
// Initialize the new element.
v, _ := strconv.ParseInt(row[2].String(), 10, 64)
curTx = &PreparedTx{
Dtid: dtid,
Time: time.Unix(0, v),
}
switch row[1].String() {
case "Prepared":
prepared = append(prepared, curTx)
default:
failed = append(failed, curTx)
}
}
dtid := row[0].String()
if dtid == lastdtid {
continue
}
failed = append(failed, dtid)
lastdtid = dtid
curTx.Queries = append(curTx.Queries, row[3].String())
}
return prepared, failed, nil
}
@ -406,6 +416,50 @@ func (tpc *TwoPC) ReadAbandoned(ctx context.Context, abandonTime time.Time) (map
return txs, nil
}
// DistributedTx is similar to querypb.TransactionMetadata, but
// is display friendly.
type DistributedTx struct {
Dtid string
State string
Created time.Time
Participants []querypb.Target
}
// ReadAllTransactions returns info about all distributed transactions.
func (tpc *TwoPC) ReadAllTransactions(ctx context.Context) ([]*DistributedTx, error) {
conn, err := tpc.readPool.Get(ctx)
if err != nil {
return nil, err
}
defer conn.Recycle()
qr, err := conn.Exec(ctx, tpc.readAllTransactions, 10000, false)
if err != nil {
return nil, err
}
var curTx *DistributedTx
var distributed []*DistributedTx
for _, row := range qr.Rows {
dtid := row[0].String()
if curTx == nil || dtid != curTx.Dtid {
// Initialize the new element.
v, _ := strconv.ParseInt(row[2].String(), 10, 64)
curTx = &DistributedTx{
Dtid: dtid,
State: row[1].String(),
Created: time.Unix(0, v),
}
distributed = append(distributed, curTx)
}
curTx.Participants = append(curTx.Participants, querypb.Target{
Keyspace: row[3].String(),
Shard: row[4].String(),
})
}
return distributed, nil
}
func (tpc *TwoPC) exec(ctx context.Context, conn *TxConnection, pq *sqlparser.ParsedQuery, bindVars map[string]interface{}) (*sqltypes.Result, error) {
b, err := pq.GenerateQuery(bindVars)
if err != nil {

Просмотреть файл

@ -6,10 +6,14 @@ package tabletserver
import (
"context"
"encoding/json"
"reflect"
"testing"
"time"
"github.com/youtube/vitess/go/sqltypes"
querypb "github.com/youtube/vitess/go/vt/proto/query"
)
func TestReadAllRedo(t *testing.T) {
@ -30,19 +34,19 @@ func TestReadAllRedo(t *testing.T) {
if err != nil {
t.Fatal(err)
}
want := map[string][]string{}
var want []*PreparedTx
if !reflect.DeepEqual(prepared, want) {
t.Errorf("ReadAllRedo: %#v, want %#v", prepared, want)
t.Errorf("ReadAllRedo: %s, want %s", jsonStr(prepared), jsonStr(want))
}
if len(failed) != 0 {
t.Errorf("ReadAllRedo (failed): %v, must be empty", failed)
t.Errorf("ReadAllRedo (failed): %v, must be empty", jsonStr(failed))
}
db.AddQuery(tpc.readAllRedo, &sqltypes.Result{
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("stmt01")),
}},
})
@ -50,24 +54,28 @@ func TestReadAllRedo(t *testing.T) {
if err != nil {
t.Fatal(err)
}
want = map[string][]string{"dtid0": {"stmt01"}}
want = []*PreparedTx{{
Dtid: "dtid0",
Queries: []string{"stmt01"},
Time: time.Unix(0, 1),
}}
if !reflect.DeepEqual(prepared, want) {
t.Errorf("ReadAllRedo: %#v, want %#v", prepared, want)
t.Errorf("ReadAllRedo: %s, want %s", jsonStr(prepared), jsonStr(want))
}
if len(failed) != 0 {
t.Errorf("ReadAllRedo (failed): %v, must be empty", failed)
t.Errorf("ReadAllRedo (failed): %v, must be empty", jsonStr(failed))
}
db.AddQuery(tpc.readAllRedo, &sqltypes.Result{
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("stmt01")),
}, {
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("stmt02")),
}},
})
@ -75,29 +83,33 @@ func TestReadAllRedo(t *testing.T) {
if err != nil {
t.Fatal(err)
}
want = map[string][]string{"dtid0": {"stmt01", "stmt02"}}
want = []*PreparedTx{{
Dtid: "dtid0",
Queries: []string{"stmt01", "stmt02"},
Time: time.Unix(0, 1),
}}
if !reflect.DeepEqual(prepared, want) {
t.Errorf("ReadAllRedo: %#v, want %#v", prepared, want)
t.Errorf("ReadAllRedo: %s, want %s", jsonStr(prepared), jsonStr(want))
}
if len(failed) != 0 {
t.Errorf("ReadAllRedo (failed): %v, must be empty", failed)
t.Errorf("ReadAllRedo (failed): %v, must be empty", jsonStr(failed))
}
db.AddQuery(tpc.readAllRedo, &sqltypes.Result{
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("stmt01")),
}, {
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("stmt02")),
}, {
sqltypes.MakeString([]byte("dtid1")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("stmt11")),
}},
})
@ -105,47 +117,52 @@ func TestReadAllRedo(t *testing.T) {
if err != nil {
t.Fatal(err)
}
want = map[string][]string{
"dtid0": {"stmt01", "stmt02"},
"dtid1": {"stmt11"},
}
want = []*PreparedTx{{
Dtid: "dtid0",
Queries: []string{"stmt01", "stmt02"},
Time: time.Unix(0, 1),
}, {
Dtid: "dtid1",
Queries: []string{"stmt11"},
Time: time.Unix(0, 1),
}}
if !reflect.DeepEqual(prepared, want) {
t.Errorf("ReadAllRedo: %#v, want %#v", prepared, want)
t.Errorf("ReadAllRedo: %s, want %s", jsonStr(prepared), jsonStr(want))
}
if len(failed) != 0 {
t.Errorf("ReadAllRedo (failed): %v, must be empty", failed)
t.Errorf("ReadAllRedo (failed): %v, must be empty", jsonStr(failed))
}
db.AddQuery(tpc.readAllRedo, &sqltypes.Result{
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("stmt01")),
}, {
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("stmt02")),
}, {
sqltypes.MakeString([]byte("dtid1")),
sqltypes.MakeString([]byte("Failed")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("stmt11")),
}, {
sqltypes.MakeString([]byte("dtid2")),
sqltypes.MakeString([]byte("Failed")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("stmt21")),
}, {
sqltypes.MakeString([]byte("dtid2")),
sqltypes.MakeString([]byte("Failed")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("stmt22")),
}, {
sqltypes.MakeString([]byte("dtid3")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("stmt31")),
}},
})
@ -153,15 +170,166 @@ func TestReadAllRedo(t *testing.T) {
if err != nil {
t.Fatal(err)
}
want = map[string][]string{
"dtid0": {"stmt01", "stmt02"},
"dtid3": {"stmt31"},
}
want = []*PreparedTx{{
Dtid: "dtid0",
Queries: []string{"stmt01", "stmt02"},
Time: time.Unix(0, 1),
}, {
Dtid: "dtid3",
Queries: []string{"stmt31"},
Time: time.Unix(0, 1),
}}
if !reflect.DeepEqual(prepared, want) {
t.Errorf("ReadAllRedo: %#v, want %#v", prepared, want)
t.Errorf("ReadAllRedo: %s, want %s", jsonStr(prepared), jsonStr(want))
}
wantFailed := []string{"dtid1", "dtid2"}
wantFailed := []*PreparedTx{{
Dtid: "dtid1",
Queries: []string{"stmt11"},
Time: time.Unix(0, 1),
}, {
Dtid: "dtid2",
Queries: []string{"stmt21", "stmt22"},
Time: time.Unix(0, 1),
}}
if !reflect.DeepEqual(failed, wantFailed) {
t.Errorf("ReadAllRedo failed): %#v, want %#v", failed, wantFailed)
t.Errorf("ReadAllRedo failed): %s, want %s", jsonStr(failed), jsonStr(wantFailed))
}
}
func TestReadAllTransactions(t *testing.T) {
_, tsv, db := newTestTxExecutor()
defer tsv.StopService()
tpc := tsv.te.twoPC
ctx := context.Background()
conn, err := tsv.qe.connPool.Get(ctx)
if err != nil {
t.Fatal(err)
}
defer conn.Recycle()
db.AddQuery(tpc.readAllTransactions, &sqltypes.Result{})
distributed, err := tpc.ReadAllTransactions(ctx)
if err != nil {
t.Fatal(err)
}
var want []*DistributedTx
if !reflect.DeepEqual(distributed, want) {
t.Errorf("ReadAllTransactions: %s, want %s", jsonStr(distributed), jsonStr(want))
}
db.AddQuery(tpc.readAllTransactions, &sqltypes.Result{
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("ks01")),
sqltypes.MakeString([]byte("shard01")),
}},
})
distributed, err = tpc.ReadAllTransactions(ctx)
if err != nil {
t.Fatal(err)
}
want = []*DistributedTx{{
Dtid: "dtid0",
State: "Prepared",
Created: time.Unix(0, 1),
Participants: []querypb.Target{{
Keyspace: "ks01",
Shard: "shard01",
}},
}}
if !reflect.DeepEqual(distributed, want) {
t.Errorf("ReadAllTransactions:\n%s, want\n%s", jsonStr(distributed), jsonStr(want))
}
db.AddQuery(tpc.readAllTransactions, &sqltypes.Result{
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("ks01")),
sqltypes.MakeString([]byte("shard01")),
}, {
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("ks02")),
sqltypes.MakeString([]byte("shard02")),
}},
})
distributed, err = tpc.ReadAllTransactions(ctx)
if err != nil {
t.Fatal(err)
}
want = []*DistributedTx{{
Dtid: "dtid0",
State: "Prepared",
Created: time.Unix(0, 1),
Participants: []querypb.Target{{
Keyspace: "ks01",
Shard: "shard01",
}, {
Keyspace: "ks02",
Shard: "shard02",
}},
}}
if !reflect.DeepEqual(distributed, want) {
t.Errorf("ReadAllTransactions:\n%s, want\n%s", jsonStr(distributed), jsonStr(want))
}
db.AddQuery(tpc.readAllTransactions, &sqltypes.Result{
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("ks01")),
sqltypes.MakeString([]byte("shard01")),
}, {
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("ks02")),
sqltypes.MakeString([]byte("shard02")),
}, {
sqltypes.MakeString([]byte("dtid1")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("ks11")),
sqltypes.MakeString([]byte("shard11")),
}},
})
distributed, err = tpc.ReadAllTransactions(ctx)
if err != nil {
t.Fatal(err)
}
want = []*DistributedTx{{
Dtid: "dtid0",
State: "Prepared",
Created: time.Unix(0, 1),
Participants: []querypb.Target{{
Keyspace: "ks01",
Shard: "shard01",
}, {
Keyspace: "ks02",
Shard: "shard02",
}},
}, {
Dtid: "dtid1",
State: "Prepared",
Created: time.Unix(0, 1),
Participants: []querypb.Target{{
Keyspace: "ks11",
Shard: "shard11",
}},
}}
if !reflect.DeepEqual(distributed, want) {
t.Errorf("ReadAllTransactions:\n%s, want\n%s", jsonStr(distributed), jsonStr(want))
}
}
func jsonStr(v interface{}) string {
out, _ := json.Marshal(v)
return string(out)
}

Просмотреть файл

@ -0,0 +1,201 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tabletserver
import (
"encoding/json"
"fmt"
"html/template"
"net/http"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/acl"
)
var (
gridTable = []byte(`<!DOCTYPE html>
<style type="text/css">
table.gridtable {
font-family: verdana,arial,sans-serif;
font-size: 11px;
border-width: 1px;
border-collapse: collapse; table-layout:fixed; overflow: hidden;
}
table.gridtable th {
border-width: 1px;
padding: 8px;
border-style: solid;
background-color: #dedede;
white-space: nowrap;
}
table.gridtable td {
border-width: 1px;
padding: 5px;
border-style: solid;
}
table.gridtable th {
padding-left: 2em;
padding-right: 2em;
}
</style>
`)
startTable = []byte(`
<table class="gridtable">
`)
endTable = []byte(`
</table>
`)
failedzHeader = []byte(`
<h3>Failed Transactions</h3>
<thead><tr>
<th>DTID</th>
<th>Queries</th>
<th>Time</th>
<th>Action</th>
</tr></thead>
`)
failedzRow = template.Must(template.New("failedz").Parse(`
<tr>
<td>{{.Dtid}}</td>
<td>{{range .Queries}}{{.}}<br>{{end}}</td>
<td>{{.Time}}</td>
<td><form>
<input type="hidden" name="dtid" value="{{.Dtid}}"></input>
<input type="submit" name="Action" value="Discard"></input>
</form></td>
</tr>
`))
preparedzHeader = []byte(`
<h3>Prepared Transactions</h3>
<thead><tr>
<th>DTID</th>
<th>Queries</th>
<th>Time</th>
<th>Action</th>
</tr></thead>
`)
preparedzRow = template.Must(template.New("preparedz").Parse(`
<tr>
<td>{{.Dtid}}</td>
<td>{{range .Queries}}{{.}}<br>{{end}}</td>
<td>{{.Time}}</td>
<td><form>
<input type="hidden" name="dtid" value="{{.Dtid}}"></input>
<input type="submit" name="Action" value="Rollback"></input>
<input type="submit" name="Action" value="Commit"></input>
</form></td>
</tr>
`))
distributedzHeader = []byte(`
<h3>Distributed Transactions</h3>
<thead><tr>
<th>DTID</th>
<th>State</th>
<th>Time</th>
<th>Participants</th>
<th>Action</th>
</tr></thead>
`)
distributedzRow = template.Must(template.New("distributedz").Parse(`
<tr>
<td>{{.Dtid}}</td>
<td>{{.State}}</td>
<td>{{.Created}}</td>
<td>{{range .Participants}}{{.Keyspace}}:{{.Shard}}<br>{{end}}</td>
<td><form>
<input type="hidden" name="dtid" value="{{.Dtid}}"></input>
<input type="submit" name="Action" value="Conclude"></input>
</form></td>
</tr>
`))
)
func twopczHandler(txe *TxExecutor, w http.ResponseWriter, r *http.Request) {
if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil {
acl.SendError(w, err)
return
}
var err error
dtid := r.FormValue("dtid")
action := r.FormValue("Action")
switch action {
case "Discard", "Rollback":
err = txe.RollbackPrepared(dtid, 0)
case "Commit":
err = txe.CommitPrepared(dtid)
case "Conclude":
err = txe.ConcludeTransaction(dtid)
}
var msg string
if action != "" {
if err != nil {
msg = err.Error()
msg = fmt.Sprintf("%s(%s): %v", r.FormValue("Action"), dtid, err)
} else {
msg = fmt.Sprintf("%s(%s): completed.", r.FormValue("Action"), dtid)
}
}
distributed, prepared, failed, err := txe.ReadTwopcInflight()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
format := r.FormValue("format")
if format == "json" {
w.Header().Set("Content-Type", "application/json")
js, err := json.Marshal(struct {
Distributed []*DistributedTx
Prepared, Failed []*PreparedTx
}{
Distributed: distributed,
Prepared: prepared,
Failed: failed,
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(js)
return
}
w.Write(gridTable)
w.Write([]byte("<h2>WARNING: Actions on this page can jeopardize data integrity.</h2>\n"))
if msg != "" {
w.Write([]byte(fmt.Sprintf("%s\n", msg)))
}
w.Write(startTable)
w.Write(failedzHeader)
for _, row := range failed {
if err := failedzRow.Execute(w, row); err != nil {
log.Errorf("queryz: couldn't execute template: %v", err)
}
}
w.Write(endTable)
w.Write(startTable)
w.Write(preparedzHeader)
for _, row := range prepared {
if err := preparedzRow.Execute(w, row); err != nil {
log.Errorf("queryz: couldn't execute template: %v", err)
}
}
w.Write(endTable)
w.Write(startTable)
w.Write(distributedzHeader)
for _, row := range distributed {
if err := distributedzRow.Execute(w, row); err != nil {
log.Errorf("queryz: couldn't execute template: %v", err)
}
}
w.Write(endTable)
}

Просмотреть файл

@ -185,8 +185,8 @@ func (te *TxEngine) prepareFromRedo() error {
maxid := int64(0)
outer:
for dtid, tx := range prepared {
txid, err := dtids.TransactionID(dtid)
for _, tx := range prepared {
txid, err := dtids.TransactionID(tx.Dtid)
if err != nil {
log.Errorf("Error extracting transaction ID from ditd: %v", err)
}
@ -198,7 +198,7 @@ outer:
allErr.RecordError(err)
continue
}
for _, stmt := range tx {
for _, stmt := range tx.Queries {
conn.RecordQuery(stmt)
_, err := conn.Exec(ctx, stmt, 1, false)
if err != nil {
@ -209,21 +209,21 @@ outer:
}
// We should not use the external Prepare because
// we don't want to write again to the redo log.
err = te.preparedPool.Put(conn, dtid)
err = te.preparedPool.Put(conn, tx.Dtid)
if err != nil {
allErr.RecordError(err)
continue
}
}
for _, dtid := range failed {
txid, err := dtids.TransactionID(dtid)
for _, tx := range failed {
txid, err := dtids.TransactionID(tx.Dtid)
if err != nil {
log.Errorf("Error extracting transaction ID from ditd: %v", err)
}
if txid > maxid {
maxid = txid
}
te.preparedPool.SetFailed(dtid)
te.preparedPool.SetFailed(tx.Dtid)
}
te.txPool.AdjustLastID(maxid)
log.Infof("Prepared %d transactions, and registered %d failures.", len(prepared), len(failed))
@ -254,7 +254,7 @@ func (te *TxEngine) startWatchdog() {
defer cancel()
txs, err := te.twoPC.ReadAbandoned(ctx, time.Now().Add(-te.abandonAge))
if err != nil {
// TODO(sougou): increment error counter.
te.queryServiceStats.InternalErrors.Add("WatchdogFail", 1)
log.Errorf("Error reading transactions for 2pc watchdog: %v", err)
return
}
@ -264,8 +264,8 @@ func (te *TxEngine) startWatchdog() {
coordConn, err := vtgateconn.Dial(ctx, te.coordinatorAddress, te.abandonAge/4, "")
if err != nil {
// TODO(sougou): increment error counter.
log.Errorf("error connecting to coordinator '%v': %v", te.coordinatorAddress, err)
te.queryServiceStats.InternalErrors.Add("WatchdogFail", 1)
log.Errorf("Error connecting to coordinator '%v': %v", te.coordinatorAddress, err)
return
}
defer coordConn.Close()
@ -276,7 +276,7 @@ func (te *TxEngine) startWatchdog() {
go func(dtid string) {
defer wg.Done()
if err := coordConn.ResolveTransaction(ctx, dtid); err != nil {
// TODO(sougou): increment error counter.
te.queryServiceStats.InternalErrors.Add("WatchdogFail", 1)
log.Errorf("Error notifying for dtid %s: %v", dtid, err)
}
}(tx)

Просмотреть файл

@ -279,3 +279,19 @@ func (txe *TxExecutor) ReadTransaction(dtid string) (*querypb.TransactionMetadat
}
return txe.te.twoPC.ReadTransaction(txe.ctx, dtid)
}
// ReadTwopcInflight returns info about all in-flight 2pc transactions.
func (txe *TxExecutor) ReadTwopcInflight() (distributed []*DistributedTx, prepared, failed []*PreparedTx, err error) {
if !txe.te.twopcEnabled {
return nil, nil, nil, NewTabletError(vtrpcpb.ErrorCode_BAD_INPUT, "2pc is not enabled")
}
prepared, failed, err = txe.te.twoPC.ReadAllRedo(txe.ctx)
if err != nil {
return nil, nil, nil, NewTabletError(vtrpcpb.ErrorCode_INTERNAL_ERROR, "Could not read redo: %v", err)
}
distributed, err = txe.te.twoPC.ReadAllTransactions(txe.ctx)
if err != nil {
return nil, nil, nil, NewTabletError(vtrpcpb.ErrorCode_INTERNAL_ERROR, "Could not read redo: %v", err)
}
return distributed, prepared, failed, nil
}

Просмотреть файл

@ -365,6 +365,37 @@ func TestExecutorReadTransaction(t *testing.T) {
}
}
func TestExecutorReadAllTransactions(t *testing.T) {
txe, tsv, db := newTestTxExecutor()
defer tsv.StopService()
db.AddQuery(txe.te.twoPC.readAllTransactions, &sqltypes.Result{
Rows: [][]sqltypes.Value{{
sqltypes.MakeString([]byte("dtid0")),
sqltypes.MakeString([]byte("Prepared")),
sqltypes.MakeString([]byte("1")),
sqltypes.MakeString([]byte("ks01")),
sqltypes.MakeString([]byte("shard01")),
}},
})
got, _, _, err := txe.ReadTwopcInflight()
if err != nil {
t.Error(err)
}
want := []*DistributedTx{{
Dtid: "dtid0",
State: "Prepared",
Created: time.Unix(0, 1),
Participants: []querypb.Target{{
Keyspace: "ks01",
Shard: "shard01",
}},
}}
if !reflect.DeepEqual(got, want) {
t.Errorf("ReadAllTransactions: %v, want %v", got, want)
}
}
// These vars and types are used only for TestExecutorResolveTransaction
var dtidCh = make(chan string)
@ -439,6 +470,12 @@ func TestNoTwopc(t *testing.T) {
_, err := txe.ReadTransaction("aa")
return err
},
}, {
desc: "ReadAllTransactions",
fun: func() error {
_, _, _, err := txe.ReadTwopcInflight()
return err
},
}}
want := "error: 2pc is not enabled"