Adding context, removing unused params.

This commit is contained in:
Alain Jobart 2014-01-28 15:48:40 -08:00
Родитель 99dad4d1ff
Коммит 428e929fdb
6 изменённых файлов: 48 добавлений и 27 удалений

Просмотреть файл

@ -22,7 +22,7 @@ func init() {
type PoolConnection interface {
ExecuteFetch(query string, maxrows int, wantfields bool) (*proto.QueryResult, error)
ExecuteStreamFetch(query string, callback func(interface{}) error, streamBufferSize int) error
ExecuteStreamFetch(query string, callback func(*proto.QueryResult) error, streamBufferSize int) error
VerifyStrict() bool
Id() int64
Close()
@ -61,7 +61,7 @@ func (dbc *DBConnection) ExecuteFetch(query string, maxrows int, wantfields bool
return &qr, nil
}
func (conn *DBConnection) ExecuteStreamFetch(query string, callback func(interface{}) error, streamBufferSize int) error {
func (conn *DBConnection) ExecuteStreamFetch(query string, callback func(*proto.QueryResult) error, streamBufferSize int) error {
start := time.Now()
err := conn.Connection.ExecuteStreamFetch(query)

Просмотреть файл

@ -23,27 +23,47 @@ func (sq *SqlQuery) GetSessionId(sessionParams *proto.SessionParams, sessionInfo
}
func (sq *SqlQuery) Begin(context *rpcproto.Context, session *proto.Session, txInfo *proto.TransactionInfo) error {
return sq.server.Begin(context, session, txInfo)
return sq.server.Begin(&tabletserver.Context{
RemoteAddr: context.RemoteAddr,
Username: context.Username,
}, session, txInfo)
}
func (sq *SqlQuery) Commit(context *rpcproto.Context, session *proto.Session, noOutput *string) error {
return sq.server.Commit(context, session, noOutput)
return sq.server.Commit(&tabletserver.Context{
RemoteAddr: context.RemoteAddr,
Username: context.Username,
}, session)
}
func (sq *SqlQuery) Rollback(context *rpcproto.Context, session *proto.Session, noOutput *string) error {
return sq.server.Rollback(context, session, noOutput)
return sq.server.Rollback(&tabletserver.Context{
RemoteAddr: context.RemoteAddr,
Username: context.Username,
}, session)
}
func (sq *SqlQuery) Execute(context *rpcproto.Context, query *proto.Query, reply *mproto.QueryResult) error {
return sq.server.Execute(context, query, reply)
return sq.server.Execute(&tabletserver.Context{
RemoteAddr: context.RemoteAddr,
Username: context.Username,
}, query, reply)
}
func (sq *SqlQuery) StreamExecute(context *rpcproto.Context, query *proto.Query, sendReply func(reply interface{}) error) error {
return sq.server.StreamExecute(context, query, sendReply)
return sq.server.StreamExecute(&tabletserver.Context{
RemoteAddr: context.RemoteAddr,
Username: context.Username,
}, query, func(reply *mproto.QueryResult) error {
return sendReply(reply)
})
}
func (sq *SqlQuery) ExecuteBatch(context *rpcproto.Context, queryList *proto.QueryList, reply *proto.QueryResultList) error {
return sq.server.ExecuteBatch(context, queryList, reply)
return sq.server.ExecuteBatch(&tabletserver.Context{
RemoteAddr: context.RemoteAddr,
Username: context.Username,
}, queryList, reply)
}
func init() {

Просмотреть файл

@ -291,7 +291,7 @@ func (qe *QueryEngine) Execute(logStats *sqlQueryStats, query *proto.Query) (rep
// the first QueryResult will have Fields set (and Rows nil)
// the subsequent QueryResult will have Rows set (and Fields nil)
func (qe *QueryEngine) StreamExecute(logStats *sqlQueryStats, query *proto.Query, sendReply func(reply interface{}) error) {
func (qe *QueryEngine) StreamExecute(logStats *sqlQueryStats, query *proto.Query, sendReply func(*mproto.QueryResult) error) {
qe.mu.RLock()
defer qe.mu.RUnlock()
@ -734,7 +734,7 @@ func (qe *QueryEngine) fullFetch(logStats *sqlQueryStats, conn PoolConnection, p
return result
}
func (qe *QueryEngine) fullStreamFetch(logStats *sqlQueryStats, conn PoolConnection, parsed_query *sqlparser.ParsedQuery, bindVars map[string]interface{}, listVars []sqltypes.Value, buildStreamComment []byte, callback func(interface{}) error) {
func (qe *QueryEngine) fullStreamFetch(logStats *sqlQueryStats, conn PoolConnection, parsed_query *sqlparser.ParsedQuery, bindVars map[string]interface{}, listVars []sqltypes.Value, buildStreamComment []byte, callback func(*mproto.QueryResult) error) {
sql := qe.generateFinalSql(parsed_query, bindVars, listVars, buildStreamComment)
qe.executeStreamSql(logStats, conn, sql, callback)
}
@ -775,7 +775,7 @@ func (qe *QueryEngine) executeSql(logStats *sqlQueryStats, conn PoolConnection,
return result, nil
}
func (qe *QueryEngine) executeStreamSql(logStats *sqlQueryStats, conn PoolConnection, sql string, callback func(interface{}) error) {
func (qe *QueryEngine) executeStreamSql(logStats *sqlQueryStats, conn PoolConnection, sql string, callback func(*mproto.QueryResult) error) {
logStats.QuerySources |= QUERY_SOURCE_MYSQL
logStats.NumberOfQueries += 1
logStats.AddRewrittenSql(sql)

Просмотреть файл

@ -12,7 +12,6 @@ import (
log "github.com/golang/glog"
mproto "github.com/youtube/vitess/go/mysql/proto"
rpcproto "github.com/youtube/vitess/go/rpcwrap/proto"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/tabletserver/proto"
)
@ -195,7 +194,7 @@ func GetQueryRules() (qrs *QueryRules) {
// the unhealthiness otherwise.
func IsHealthy() error {
return SqlQueryRpcService.Execute(
new(rpcproto.Context),
new(Context),
&proto.Query{Sql: "select 1 from dual", SessionId: SqlQueryRpcService.sessionId},
new(mproto.QueryResult),
)

Просмотреть файл

@ -15,7 +15,6 @@ import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/mysql"
mproto "github.com/youtube/vitess/go/mysql/proto"
rpcproto "github.com/youtube/vitess/go/rpcwrap/proto"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/sync2"
"github.com/youtube/vitess/go/tb"
@ -48,6 +47,11 @@ var stateName = map[int64]string{
//-----------------------------------------------
// RPC API
type Context struct {
RemoteAddr string
Username string
}
type SqlQuery struct {
// We use a hybrid locking scheme to control state transitions. This is
// optimal for frequent reads and infrequent state changes.
@ -210,7 +214,7 @@ func (sq *SqlQuery) GetSessionId(sessionParams *proto.SessionParams, sessionInfo
return nil
}
func (sq *SqlQuery) Begin(context *rpcproto.Context, session *proto.Session, txInfo *proto.TransactionInfo) (err error) {
func (sq *SqlQuery) Begin(context *Context, session *proto.Session, txInfo *proto.TransactionInfo) (err error) {
logStats := newSqlQueryStats("Begin", context)
logStats.OriginalSql = "begin"
defer handleError(&err, logStats)
@ -220,7 +224,7 @@ func (sq *SqlQuery) Begin(context *rpcproto.Context, session *proto.Session, txI
return nil
}
func (sq *SqlQuery) Commit(context *rpcproto.Context, session *proto.Session, noOutput *string) (err error) {
func (sq *SqlQuery) Commit(context *Context, session *proto.Session) (err error) {
logStats := newSqlQueryStats("Commit", context)
logStats.OriginalSql = "commit"
defer handleError(&err, logStats)
@ -230,7 +234,7 @@ func (sq *SqlQuery) Commit(context *rpcproto.Context, session *proto.Session, no
return nil
}
func (sq *SqlQuery) Rollback(context *rpcproto.Context, session *proto.Session, noOutput *string) (err error) {
func (sq *SqlQuery) Rollback(context *Context, session *proto.Session) (err error) {
logStats := newSqlQueryStats("Rollback", context)
logStats.OriginalSql = "rollback"
defer handleError(&err, logStats)
@ -291,7 +295,7 @@ func handleExecError(query *proto.Query, err *error, logStats *sqlQueryStats) {
}
}
func (sq *SqlQuery) Execute(context *rpcproto.Context, query *proto.Query, reply *mproto.QueryResult) (err error) {
func (sq *SqlQuery) Execute(context *Context, query *proto.Query, reply *mproto.QueryResult) (err error) {
logStats := newSqlQueryStats("Execute", context)
defer handleExecError(query, &err, logStats)
@ -305,7 +309,7 @@ func (sq *SqlQuery) Execute(context *rpcproto.Context, query *proto.Query, reply
// the first QueryResult will have Fields set (and Rows nil)
// the subsequent QueryResult will have Rows set (and Fields nil)
func (sq *SqlQuery) StreamExecute(context *rpcproto.Context, query *proto.Query, sendReply func(reply interface{}) error) (err error) {
func (sq *SqlQuery) StreamExecute(context *Context, query *proto.Query, sendReply func(*mproto.QueryResult) error) (err error) {
logStats := newSqlQueryStats("StreamExecute", context)
defer handleExecError(query, &err, logStats)
@ -319,14 +323,13 @@ func (sq *SqlQuery) StreamExecute(context *rpcproto.Context, query *proto.Query,
return nil
}
func (sq *SqlQuery) ExecuteBatch(context *rpcproto.Context, queryList *proto.QueryList, reply *proto.QueryResultList) (err error) {
func (sq *SqlQuery) ExecuteBatch(context *Context, queryList *proto.QueryList, reply *proto.QueryResultList) (err error) {
defer handleError(&err, nil)
if len(queryList.Queries) == 0 {
panic(NewTabletError(FAIL, "Empty query list"))
}
sq.checkState(queryList.SessionId, false)
begin_called := false
var noOutput string
session := proto.Session{
TransactionId: queryList.TransactionId,
SessionId: queryList.SessionId,
@ -350,7 +353,7 @@ func (sq *SqlQuery) ExecuteBatch(context *rpcproto.Context, queryList *proto.Que
if !begin_called {
panic(NewTabletError(FAIL, "Cannot commit without begin"))
}
if err = sq.Commit(context, &session, &noOutput); err != nil {
if err = sq.Commit(context, &session); err != nil {
return err
}
session.TransactionId = 0
@ -366,7 +369,7 @@ func (sq *SqlQuery) ExecuteBatch(context *rpcproto.Context, queryList *proto.Que
var localReply mproto.QueryResult
if err = sq.Execute(context, &query, &localReply); err != nil {
if begin_called {
sq.Rollback(context, &session, &noOutput)
sq.Rollback(context, &session)
}
return err
}
@ -374,7 +377,7 @@ func (sq *SqlQuery) ExecuteBatch(context *rpcproto.Context, queryList *proto.Que
}
}
if begin_called {
sq.Rollback(context, &session, &noOutput)
sq.Rollback(context, &session)
panic(NewTabletError(FAIL, "begin called with no commit"))
}
return nil

Просмотреть файл

@ -12,7 +12,6 @@ import (
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/rpcwrap/proto"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/streamlog"
)
@ -43,10 +42,10 @@ type sqlQueryStats struct {
CacheInvalidations int64
QuerySources byte
Rows [][]sqltypes.Value
context *proto.Context
context *Context
}
func newSqlQueryStats(methodName string, context *proto.Context) *sqlQueryStats {
func newSqlQueryStats(methodName string, context *Context) *sqlQueryStats {
s := &sqlQueryStats{Method: methodName, StartTime: time.Now(), context: context}
return s
}