tabletserver: honor streaming context deadlines

Changed streaming queries to honor deadlines, if one was supplied.
Also, fixed some lint errors.
This commit is contained in:
Sugu Sougoumarane 2015-03-15 20:00:19 -07:00
Родитель 82163af96b
Коммит f88cee6257
5 изменённых файлов: 109 добавлений и 43 удалений

Просмотреть файл

@ -6,6 +6,7 @@ import (
"time"
"github.com/youtube/vitess/go/mysql"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/stats"
"golang.org/x/net/context"
)
@ -27,9 +28,13 @@ var (
// TestConnectivity is a manual test that requires code changes.
// You first need to bring up a mysql instance to match the connection params.
// Setup: mysqlctl -tablet_uid=62347 -mysql_port=15001 init.
// Connect: mysql -S vt_0000062347/mysql.sock -u vt_dba
// Initialize: create database sougou; use sougou; create table a(id int, primary key(id));
// Run the test normally once. Then you add a 20s sleep in dbconn.execOnce
// and run it again. You also have to check the code coverage to see that all critical
// paths were covered.
// Shutdown: mysqlctl -tablet_uid=62347 -mysql_port=15001 teardown
// TODO(sougou): Figure out a way to automate this.
func TestConnectivity(t *testing.T) {
t.Skip("manual test")
@ -39,11 +44,13 @@ func TestConnectivity(t *testing.T) {
mysqlStats = stats.NewTimings("TestMySQLStats")
pool := NewConnPool("p1", 1, 30*time.Second)
pool.Open(appParams, dbaParams)
conn, err := pool.Get(ctx)
if err != nil {
t.Error(err)
}
conn.Kill()
newctx, cancel := withTimeout(ctx, 2*time.Second)
_, err = conn.Exec(newctx, "select * from a", 1000, true)
cancel()
@ -51,6 +58,7 @@ func TestConnectivity(t *testing.T) {
t.Error(err)
}
conn.Close()
newctx, cancel = withTimeout(ctx, 2*time.Second)
_, err = conn.Exec(newctx, "select * from a", 1000, true)
cancel()
@ -60,6 +68,7 @@ func TestConnectivity(t *testing.T) {
t.Errorf("got: %v, want nil or %s", err, timedout)
}
conn.Recycle()
conn, err = pool.Get(ctx)
if err != nil {
t.Error(err)
@ -79,5 +88,28 @@ func TestConnectivity(t *testing.T) {
conn.Kill()
<-ch
conn.Recycle()
conn, err = pool.Get(ctx)
if err != nil {
t.Error(err)
}
ch = make(chan bool)
go func() {
newctx, cancel = withTimeout(ctx, 2*time.Millisecond)
err = conn.Stream(newctx, "select sleep(1) from dual", func(*mproto.QueryResult) error {
return nil
}, 4096)
cancel()
lostConn := "Lost connection to MySQL server during query (errno 2013) during query: select sleep(1) from dual"
if err == nil || err.Error() != lostConn {
t.Errorf("got: %v, want %s", err, lostConn)
}
ch <- true
}()
time.Sleep(40 * time.Millisecond)
conn.Kill()
<-ch
conn.Recycle()
pool.Close()
}

Просмотреть файл

@ -81,10 +81,23 @@ func (dbc *DBConn) execOnce(ctx context.Context, query string, maxrows int, want
return dbc.conn.ExecuteFetch(query, maxrows, wantfields)
}
// ExecOnce executes the specified query, but does not retry on connection errors.
func (dbc *DBConn) ExecOnce(ctx context.Context, query string, maxrows int, wantfields bool) (*mproto.QueryResult, error) {
return dbc.execOnce(ctx, query, maxrows, wantfields)
}
// Stream executes the query and streams the results.
func (dbc *DBConn) Stream(query string, callback func(*mproto.QueryResult) error, streamBufferSize int) error {
func (dbc *DBConn) Stream(ctx context.Context, query string, callback func(*mproto.QueryResult) error, streamBufferSize int) error {
dbc.current.Set(query)
defer dbc.current.Set("")
done, err := dbc.setDeadline(ctx)
if err != nil {
return err
}
if done != nil {
defer close(done)
}
return dbc.conn.ExecuteStreamFetch(query, callback, streamBufferSize)
}

Просмотреть файл

@ -109,7 +109,7 @@ func (rqc *RequestContext) execSQLNoPanic(conn PoolConn, sql string, wantfields
func (rqc *RequestContext) execStreamSQL(conn *DBConn, sql string, callback func(*mproto.QueryResult) error) {
start := time.Now()
err := conn.Stream(sql, callback, int(rqc.qe.streamBufferSize.Get()))
err := conn.Stream(rqc.ctx, sql, callback, int(rqc.qe.streamBufferSize.Get()))
rqc.logStats.AddRewrittenSql(sql, start)
if err != nil {
panic(NewTabletErrorSql(ErrFail, err))

Просмотреть файл

@ -399,11 +399,7 @@ func (sq *SqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendR
if err = sq.startRequest(query.SessionId, false, false); err != nil {
return err
}
ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get())
defer func() {
cancel()
sq.endRequest()
}()
defer sq.endRequest()
if query.BindVariables == nil {
query.BindVariables = make(map[string]interface{})

Просмотреть файл

@ -33,25 +33,21 @@ SafeFunctions() return os.Error instead of throwing exceptions
// The log format can be inferred by looking at TxConnection.Format.
var TxLogger = streamlog.New("TxLog", 10)
var (
BEGIN = "begin"
COMMIT = "commit"
ROLLBACK = "rollback"
)
// These consts identify how a transaction was resolved.
const (
TX_CLOSE = "close"
TX_COMMIT = "commit"
TX_ROLLBACK = "rollback"
TX_KILL = "kill"
TxClose = "close"
TxCommit = "commit"
TxRollback = "rollback"
TxKill = "kill"
)
const txLogInterval = time.Duration(1 * time.Minute)
// TxPool is the transaction pool for the query service.
type TxPool struct {
pool *ConnPool
activePool *pools.Numbered
lastId sync2.AtomicInt64
lastID sync2.AtomicInt64
timeout sync2.AtomicDuration
poolTimeout sync2.AtomicDuration
ticks *timer.Timer
@ -62,11 +58,12 @@ type TxPool struct {
lastLog time.Time
}
// NewTxPool creates a new TxPool. It's not operational until it's Open'd.
func NewTxPool(name string, capacity int, timeout, poolTimeout, idleTimeout time.Duration) *TxPool {
axp := &TxPool{
pool: NewConnPool(name, capacity, idleTimeout),
activePool: pools.NewNumbered(),
lastId: sync2.AtomicInt64(time.Now().UnixNano()),
lastID: sync2.AtomicInt64(time.Now().UnixNano()),
timeout: sync2.AtomicDuration(timeout),
poolTimeout: sync2.AtomicDuration(poolTimeout),
ticks: timer.NewTimer(timeout / 10),
@ -79,12 +76,15 @@ func NewTxPool(name string, capacity int, timeout, poolTimeout, idleTimeout time
return axp
}
// Open makes the TxPool operational. This also starts the transaction killer
// that will kill long-running transactions.
func (axp *TxPool) Open(appParams, dbaParams *mysql.ConnectionParams) {
log.Infof("Starting transaction id: %d", axp.lastId)
log.Infof("Starting transaction id: %d", axp.lastID)
axp.pool.Open(appParams, dbaParams)
axp.ticks.Start(func() { axp.TransactionKiller() })
axp.ticks.Start(func() { axp.transactionKiller() })
}
// Close closes the TxPool. A closed pool can be reopened.
func (axp *TxPool) Close() {
axp.ticks.Stop()
for _, v := range axp.activePool.GetOutdated(time.Duration(0), "for closing") {
@ -92,26 +92,29 @@ func (axp *TxPool) Close() {
log.Warningf("killing transaction for shutdown: %s", conn.Format(nil))
internalErrors.Add("StrayTransactions", 1)
conn.Close()
conn.discard(TX_CLOSE)
conn.discard(TxClose)
}
axp.pool.Close()
}
// WaitForEmpty waits until all active transactions are completed.
func (axp *TxPool) WaitForEmpty() {
axp.activePool.WaitForEmpty()
}
func (axp *TxPool) TransactionKiller() {
func (axp *TxPool) transactionKiller() {
defer logError()
for _, v := range axp.activePool.GetOutdated(time.Duration(axp.Timeout()), "for rollback") {
conn := v.(*TxConnection)
log.Warningf("killing transaction (exceeded timeout: %v): %s", axp.Timeout(), conn.Format(nil))
killStats.Add("Transactions", 1)
conn.Close()
conn.discard(TX_KILL)
conn.discard(TxKill)
}
}
// Begin begins a transaction, and returns the associated transaction id.
// Subsequent statements can access the connection through the transaction id.
func (axp *TxPool) Begin(ctx context.Context) int64 {
conn, err := axp.pool.Get(ctx)
if err != nil {
@ -124,45 +127,50 @@ func (axp *TxPool) Begin(ctx context.Context) int64 {
}
panic(NewTabletErrorSql(ErrFatal, err))
}
if _, err := conn.Exec(ctx, BEGIN, 1, false); err != nil {
if _, err := conn.Exec(ctx, "begin", 1, false); err != nil {
conn.Recycle()
panic(NewTabletErrorSql(ErrFail, err))
}
transactionId := axp.lastId.Add(1)
axp.activePool.Register(transactionId, newTxConnection(conn, transactionId, axp))
return transactionId
transactionID := axp.lastID.Add(1)
axp.activePool.Register(transactionID, newTxConnection(conn, transactionID, axp))
return transactionID
}
func (axp *TxPool) SafeCommit(ctx context.Context, transactionId int64) (invalidList map[string]DirtyKeys, err error) {
// SafeCommit commits the specified transaction. Unlike other functions, it
// returns an error on failure instead of panic. The connection becomes free
// and can be reused in the future.
func (axp *TxPool) SafeCommit(ctx context.Context, transactionID int64) (invalidList map[string]DirtyKeys, err error) {
defer handleError(&err, nil)
conn := axp.Get(transactionId)
defer conn.discard(TX_COMMIT)
conn := axp.Get(transactionID)
defer conn.discard(TxCommit)
// Assign this upfront to make sure we always return the invalidList.
invalidList = conn.dirtyTables
axp.txStats.Add("Completed", time.Now().Sub(conn.StartTime))
if _, fetchErr := conn.Exec(ctx, COMMIT, 1, false); fetchErr != nil {
if _, fetchErr := conn.Exec(ctx, "commit", 1, false); fetchErr != nil {
conn.Close()
err = NewTabletErrorSql(ErrFail, fetchErr)
}
return
}
func (axp *TxPool) Rollback(ctx context.Context, transactionId int64) {
conn := axp.Get(transactionId)
defer conn.discard(TX_ROLLBACK)
// Rollback rolls back the specified transaction.
func (axp *TxPool) Rollback(ctx context.Context, transactionID int64) {
conn := axp.Get(transactionID)
defer conn.discard(TxRollback)
axp.txStats.Add("Aborted", time.Now().Sub(conn.StartTime))
if _, err := conn.Exec(ctx, ROLLBACK, 1, false); err != nil {
if _, err := conn.Exec(ctx, "rollback", 1, false); err != nil {
conn.Close()
panic(NewTabletErrorSql(ErrFail, err))
}
}
// Get fetches the connection associated to the transactionID.
// You must call Recycle on TxConnection once done.
func (axp *TxPool) Get(transactionId int64) (conn *TxConnection) {
v, err := axp.activePool.Get(transactionId, "for query")
func (axp *TxPool) Get(transactionID int64) (conn *TxConnection) {
v, err := axp.activePool.Get(transactionID, "for query")
if err != nil {
panic(NewTabletError(ErrNotInTx, "Transaction %d: %v", transactionId, err))
panic(NewTabletError(ErrNotInTx, "Transaction %d: %v", transactionID, err))
}
return v.(*TxConnection)
}
@ -182,19 +190,27 @@ func (axp *TxPool) LogActive() {
}
}
// Timeout returns the transaction timeout.
func (axp *TxPool) Timeout() time.Duration {
return axp.timeout.Get()
}
// SetTimeout sets the transaction timeout.
func (axp *TxPool) SetTimeout(timeout time.Duration) {
axp.timeout.Set(timeout)
axp.ticks.SetInterval(timeout / 10)
}
// SetPoolTimeout sets the wait time for the tx pool.
// TODO(sougou): move this to SqlQuery.
func (axp *TxPool) SetPoolTimeout(timeout time.Duration) {
axp.poolTimeout.Set(timeout)
}
// TxConnection is meant for executing transactions. It keeps track
// of dirty keys for rowcache invalidation. It can return itself to
// the tx pool correctly. It also does not retry statements if there
// are failures.
type TxConnection struct {
*DBConn
TransactionID int64
@ -208,10 +224,10 @@ type TxConnection struct {
LogToFile sync2.AtomicInt32
}
func newTxConnection(conn *DBConn, transactionId int64, pool *TxPool) *TxConnection {
func newTxConnection(conn *DBConn, transactionID int64, pool *TxPool) *TxConnection {
return &TxConnection{
DBConn: conn,
TransactionID: transactionId,
TransactionID: transactionID,
pool: pool,
StartTime: time.Now(),
dirtyTables: make(map[string]DirtyKeys),
@ -219,6 +235,8 @@ func newTxConnection(conn *DBConn, transactionId int64, pool *TxPool) *TxConnect
}
}
// DirtyKeys returns the list of rowcache keys that became dirty
// during the transaction.
func (txc *TxConnection) DirtyKeys(tableName string) DirtyKeys {
if list, ok := txc.dirtyTables[tableName]; ok {
return list
@ -228,8 +246,9 @@ func (txc *TxConnection) DirtyKeys(tableName string) DirtyKeys {
return list
}
// Exec executes the statement for the current transaction.
func (txc *TxConnection) Exec(ctx context.Context, query string, maxrows int, wantfields bool) (*proto.QueryResult, error) {
r, err := txc.DBConn.execOnce(ctx, query, maxrows, wantfields)
r, err := txc.DBConn.ExecOnce(ctx, query, maxrows, wantfields)
if err != nil {
if IsConnErr(err) {
go checkMySQL()
@ -240,14 +259,17 @@ func (txc *TxConnection) Exec(ctx context.Context, query string, maxrows int, wa
return r, nil
}
// Recycle returns the connection to the pool. The transaction remains
// active.
func (txc *TxConnection) Recycle() {
if txc.IsClosed() {
txc.discard(TX_CLOSE)
txc.discard(TxClose)
} else {
txc.pool.activePool.Put(txc.TransactionID)
}
}
// RecordQuery records the query against this transaction.
func (txc *TxConnection) RecordQuery(query string) {
txc.Queries = append(txc.Queries, query)
}
@ -265,6 +287,7 @@ func (txc *TxConnection) discard(conclusion string) {
TxLogger.Send(txc)
}
// Format returns a printable version of the connection info.
func (txc *TxConnection) Format(params url.Values) string {
return fmt.Sprintf(
"%v\t%v\t%v\t%.6f\t%v\t%v\t\n",
@ -277,6 +300,8 @@ func (txc *TxConnection) Format(params url.Values) string {
)
}
// DirtyKeys provides a cache-like interface, where
// it just adds keys to its likst as Delete gets called.
type DirtyKeys map[string]bool
// Delete just keeps track of what needs to be deleted