зеркало из https://github.com/github/vitess-gh.git
Passing target more on the sqlquery side.
This commit is contained in:
Родитель
70b7dcd823
Коммит
9f9230ccd4
|
@ -52,7 +52,7 @@ func (sq *SqlQuery) GetSessionId2(sessionIdReq *proto.GetSessionIdRequest, sessi
|
|||
// Begin is exposing tabletserver.SqlQuery.Begin
|
||||
func (sq *SqlQuery) Begin(ctx context.Context, session *proto.Session, txInfo *proto.TransactionInfo) (err error) {
|
||||
defer sq.server.HandlePanic(&err)
|
||||
tErr := sq.server.Begin(callinfo.RPCWrapCallInfo(ctx), session, txInfo)
|
||||
tErr := sq.server.Begin(callinfo.RPCWrapCallInfo(ctx), nil, session, txInfo)
|
||||
tabletserver.AddTabletErrorToTransactionInfo(tErr, txInfo)
|
||||
if *tabletserver.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
|
@ -73,7 +73,7 @@ func (sq *SqlQuery) Begin2(ctx context.Context, beginRequest *proto.BeginRequest
|
|||
callerid.GoRPCEffectiveCallerID(beginRequest.EffectiveCallerID),
|
||||
callerid.GoRPCImmediateCallerID(beginRequest.ImmediateCallerID),
|
||||
)
|
||||
tErr := sq.server.Begin(callinfo.RPCWrapCallInfo(ctx), session, txInfo)
|
||||
tErr := sq.server.Begin(callinfo.RPCWrapCallInfo(ctx), proto.TargetToProto3(beginRequest.Target), session, txInfo)
|
||||
// Convert from TxInfo => beginResponse for the output
|
||||
beginResponse.TransactionId = txInfo.TransactionId
|
||||
tabletserver.AddTabletErrorToBeginResponse(tErr, beginResponse)
|
||||
|
@ -86,7 +86,7 @@ func (sq *SqlQuery) Begin2(ctx context.Context, beginRequest *proto.BeginRequest
|
|||
// Commit is exposing tabletserver.SqlQuery.Commit
|
||||
func (sq *SqlQuery) Commit(ctx context.Context, session *proto.Session, noOutput *rpc.Unused) (err error) {
|
||||
defer sq.server.HandlePanic(&err)
|
||||
return sq.server.Commit(callinfo.RPCWrapCallInfo(ctx), session)
|
||||
return sq.server.Commit(callinfo.RPCWrapCallInfo(ctx), nil, session)
|
||||
}
|
||||
|
||||
// Commit2 should not be used by anything other than tests.
|
||||
|
@ -102,7 +102,7 @@ func (sq *SqlQuery) Commit2(ctx context.Context, commitRequest *proto.CommitRequ
|
|||
callerid.GoRPCEffectiveCallerID(commitRequest.EffectiveCallerID),
|
||||
callerid.GoRPCImmediateCallerID(commitRequest.ImmediateCallerID),
|
||||
)
|
||||
tErr := sq.server.Commit(callinfo.RPCWrapCallInfo(ctx), session)
|
||||
tErr := sq.server.Commit(callinfo.RPCWrapCallInfo(ctx), proto.TargetToProto3(commitRequest.Target), session)
|
||||
tabletserver.AddTabletErrorToCommitResponse(tErr, commitResponse)
|
||||
if *tabletserver.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
|
@ -113,7 +113,7 @@ func (sq *SqlQuery) Commit2(ctx context.Context, commitRequest *proto.CommitRequ
|
|||
// Rollback is exposing tabletserver.SqlQuery.Rollback
|
||||
func (sq *SqlQuery) Rollback(ctx context.Context, session *proto.Session, noOutput *rpc.Unused) (err error) {
|
||||
defer sq.server.HandlePanic(&err)
|
||||
return sq.server.Rollback(callinfo.RPCWrapCallInfo(ctx), session)
|
||||
return sq.server.Rollback(callinfo.RPCWrapCallInfo(ctx), nil, session)
|
||||
}
|
||||
|
||||
// Rollback2 should not be used by anything other than tests.
|
||||
|
@ -129,7 +129,7 @@ func (sq *SqlQuery) Rollback2(ctx context.Context, rollbackRequest *proto.Rollba
|
|||
callerid.GoRPCEffectiveCallerID(rollbackRequest.EffectiveCallerID),
|
||||
callerid.GoRPCImmediateCallerID(rollbackRequest.ImmediateCallerID),
|
||||
)
|
||||
tErr := sq.server.Rollback(callinfo.RPCWrapCallInfo(ctx), session)
|
||||
tErr := sq.server.Rollback(callinfo.RPCWrapCallInfo(ctx), proto.TargetToProto3(rollbackRequest.Target), session)
|
||||
tabletserver.AddTabletErrorToRollbackResponse(tErr, rollbackResponse)
|
||||
if *tabletserver.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
|
@ -140,7 +140,7 @@ func (sq *SqlQuery) Rollback2(ctx context.Context, rollbackRequest *proto.Rollba
|
|||
// Execute is exposing tabletserver.SqlQuery.Execute
|
||||
func (sq *SqlQuery) Execute(ctx context.Context, query *proto.Query, reply *mproto.QueryResult) (err error) {
|
||||
defer sq.server.HandlePanic(&err)
|
||||
tErr := sq.server.Execute(callinfo.RPCWrapCallInfo(ctx), query, reply)
|
||||
tErr := sq.server.Execute(callinfo.RPCWrapCallInfo(ctx), nil, query, reply)
|
||||
tabletserver.AddTabletErrorToQueryResult(tErr, reply)
|
||||
if *tabletserver.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
|
@ -157,7 +157,7 @@ func (sq *SqlQuery) Execute2(ctx context.Context, executeRequest *proto.ExecuteR
|
|||
callerid.GoRPCEffectiveCallerID(executeRequest.EffectiveCallerID),
|
||||
callerid.GoRPCImmediateCallerID(executeRequest.ImmediateCallerID),
|
||||
)
|
||||
tErr := sq.server.Execute(callinfo.RPCWrapCallInfo(ctx), &executeRequest.QueryRequest, reply)
|
||||
tErr := sq.server.Execute(callinfo.RPCWrapCallInfo(ctx), nil, &executeRequest.QueryRequest, reply)
|
||||
tabletserver.AddTabletErrorToQueryResult(tErr, reply)
|
||||
if *tabletserver.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
|
@ -168,7 +168,7 @@ func (sq *SqlQuery) Execute2(ctx context.Context, executeRequest *proto.ExecuteR
|
|||
// StreamExecute is exposing tabletserver.SqlQuery.StreamExecute
|
||||
func (sq *SqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(reply interface{}) error) (err error) {
|
||||
defer sq.server.HandlePanic(&err)
|
||||
return sq.server.StreamExecute(callinfo.RPCWrapCallInfo(ctx), query, func(reply *mproto.QueryResult) error {
|
||||
return sq.server.StreamExecute(callinfo.RPCWrapCallInfo(ctx), nil, query, func(reply *mproto.QueryResult) error {
|
||||
return sendReply(reply)
|
||||
})
|
||||
}
|
||||
|
@ -185,7 +185,7 @@ func (sq *SqlQuery) StreamExecute2(ctx context.Context, req *proto.StreamExecute
|
|||
callerid.GoRPCEffectiveCallerID(req.EffectiveCallerID),
|
||||
callerid.GoRPCImmediateCallerID(req.ImmediateCallerID),
|
||||
)
|
||||
tErr := sq.server.StreamExecute(callinfo.RPCWrapCallInfo(ctx), req.Query, func(reply *mproto.QueryResult) error {
|
||||
tErr := sq.server.StreamExecute(callinfo.RPCWrapCallInfo(ctx), proto.TargetToProto3(req.Target), req.Query, func(reply *mproto.QueryResult) error {
|
||||
return sendReply(reply)
|
||||
})
|
||||
if tErr == nil {
|
||||
|
@ -208,7 +208,7 @@ func (sq *SqlQuery) StreamExecute2(ctx context.Context, req *proto.StreamExecute
|
|||
// ExecuteBatch is exposing tabletserver.SqlQuery.ExecuteBatch
|
||||
func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList, reply *proto.QueryResultList) (err error) {
|
||||
defer sq.server.HandlePanic(&err)
|
||||
tErr := sq.server.ExecuteBatch(callinfo.RPCWrapCallInfo(ctx), queryList, reply)
|
||||
tErr := sq.server.ExecuteBatch(callinfo.RPCWrapCallInfo(ctx), nil, queryList, reply)
|
||||
tabletserver.AddTabletErrorToQueryResultList(tErr, reply)
|
||||
if *tabletserver.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
|
@ -228,7 +228,7 @@ func (sq *SqlQuery) ExecuteBatch2(ctx context.Context, req *proto.ExecuteBatchRe
|
|||
callerid.GoRPCEffectiveCallerID(req.EffectiveCallerID),
|
||||
callerid.GoRPCImmediateCallerID(req.ImmediateCallerID),
|
||||
)
|
||||
tErr := sq.server.ExecuteBatch(callinfo.RPCWrapCallInfo(ctx), &req.QueryBatch, reply)
|
||||
tErr := sq.server.ExecuteBatch(callinfo.RPCWrapCallInfo(ctx), nil, &req.QueryBatch, reply)
|
||||
tabletserver.AddTabletErrorToQueryResultList(tErr, reply)
|
||||
if *tabletserver.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
|
@ -243,7 +243,7 @@ func (sq *SqlQuery) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest
|
|||
callerid.GoRPCEffectiveCallerID(req.EffectiveCallerID),
|
||||
callerid.GoRPCImmediateCallerID(req.ImmediateCallerID),
|
||||
)
|
||||
tErr := sq.server.SplitQuery(callinfo.RPCWrapCallInfo(ctx), req, reply)
|
||||
tErr := sq.server.SplitQuery(callinfo.RPCWrapCallInfo(ctx), nil, req, reply)
|
||||
tabletserver.AddTabletErrorToSplitQueryResult(tErr, reply)
|
||||
if *tabletserver.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
|
|
|
@ -52,7 +52,7 @@ func (q *query) Execute(ctx context.Context, request *pb.ExecuteRequest) (respon
|
|||
request.GetImmediateCallerId(),
|
||||
)
|
||||
reply := new(mproto.QueryResult)
|
||||
execErr := q.server.Execute(ctx, &proto.Query{
|
||||
execErr := q.server.Execute(ctx, request.Target, &proto.Query{
|
||||
Sql: string(request.Query.Sql),
|
||||
BindVariables: proto.Proto3ToBindVariables(request.Query.BindVariables),
|
||||
SessionId: request.SessionId,
|
||||
|
@ -76,7 +76,7 @@ func (q *query) ExecuteBatch(ctx context.Context, request *pb.ExecuteBatchReques
|
|||
request.GetImmediateCallerId(),
|
||||
)
|
||||
reply := new(proto.QueryResultList)
|
||||
execErr := q.server.ExecuteBatch(ctx, &proto.QueryList{
|
||||
execErr := q.server.ExecuteBatch(ctx, request.Target, &proto.QueryList{
|
||||
Queries: proto.Proto3ToBoundQueryList(request.Queries),
|
||||
SessionId: request.SessionId,
|
||||
AsTransaction: request.AsTransaction,
|
||||
|
@ -99,7 +99,7 @@ func (q *query) StreamExecute(request *pb.StreamExecuteRequest, stream pbs.Query
|
|||
request.GetEffectiveCallerId(),
|
||||
request.GetImmediateCallerId(),
|
||||
)
|
||||
seErr := q.server.StreamExecute(ctx, &proto.Query{
|
||||
seErr := q.server.StreamExecute(ctx, request.Target, &proto.Query{
|
||||
Sql: string(request.Query.Sql),
|
||||
BindVariables: proto.Proto3ToBindVariables(request.Query.BindVariables),
|
||||
SessionId: request.SessionId,
|
||||
|
@ -127,7 +127,7 @@ func (q *query) Begin(ctx context.Context, request *pb.BeginRequest) (response *
|
|||
request.GetImmediateCallerId(),
|
||||
)
|
||||
txInfo := new(proto.TransactionInfo)
|
||||
if beginErr := q.server.Begin(ctx, &proto.Session{
|
||||
if beginErr := q.server.Begin(ctx, request.Target, &proto.Session{
|
||||
SessionId: request.SessionId,
|
||||
}, txInfo); beginErr != nil {
|
||||
return &pb.BeginResponse{
|
||||
|
@ -147,7 +147,7 @@ func (q *query) Commit(ctx context.Context, request *pb.CommitRequest) (response
|
|||
request.GetEffectiveCallerId(),
|
||||
request.GetImmediateCallerId(),
|
||||
)
|
||||
commitErr := q.server.Commit(ctx, &proto.Session{
|
||||
commitErr := q.server.Commit(ctx, request.Target, &proto.Session{
|
||||
SessionId: request.SessionId,
|
||||
TransactionId: request.TransactionId,
|
||||
})
|
||||
|
@ -163,7 +163,7 @@ func (q *query) Rollback(ctx context.Context, request *pb.RollbackRequest) (resp
|
|||
request.GetEffectiveCallerId(),
|
||||
request.GetImmediateCallerId(),
|
||||
)
|
||||
rollbackErr := q.server.Rollback(ctx, &proto.Session{
|
||||
rollbackErr := q.server.Rollback(ctx, request.Target, &proto.Session{
|
||||
SessionId: request.SessionId,
|
||||
TransactionId: request.TransactionId,
|
||||
})
|
||||
|
@ -181,7 +181,7 @@ func (q *query) SplitQuery(ctx context.Context, request *pb.SplitQueryRequest) (
|
|||
request.GetImmediateCallerId(),
|
||||
)
|
||||
reply := &proto.SplitQueryResult{}
|
||||
if sqErr := q.server.SplitQuery(ctx, &proto.SplitQueryRequest{
|
||||
if sqErr := q.server.SplitQuery(ctx, request.Target, &proto.SplitQueryRequest{
|
||||
Query: *proto.Proto3ToBoundQuery(request.Query),
|
||||
SplitColumn: request.SplitColumn,
|
||||
SplitCount: int(request.SplitCount),
|
||||
|
|
|
@ -8,8 +8,21 @@ import (
|
|||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/query"
|
||||
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
// TargetToProto3 transform the bson RPC target to proto3
|
||||
func TargetToProto3(target *Target) *pb.Target {
|
||||
if target == nil {
|
||||
return nil
|
||||
}
|
||||
return &pb.Target{
|
||||
Keyspace: target.Keyspace,
|
||||
Shard: target.Shard,
|
||||
TabletType: pbt.TabletType(target.TabletType),
|
||||
}
|
||||
}
|
||||
|
||||
// BoundQueryToProto3 converts internal types to proto3 BoundQuery
|
||||
func BoundQueryToProto3(sql string, bindVars map[string]interface{}) *pb.BoundQuery {
|
||||
result := &pb.BoundQuery{
|
||||
|
|
|
@ -1288,7 +1288,7 @@ func newTransaction(sqlQuery *SqlQuery) int64 {
|
|||
TransactionId: 0,
|
||||
}
|
||||
txInfo := proto.TransactionInfo{TransactionId: 0}
|
||||
err := sqlQuery.Begin(context.Background(), &session, &txInfo)
|
||||
err := sqlQuery.Begin(context.Background(), sqlQuery.target, &session, &txInfo)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("failed to start a transaction: %v", err))
|
||||
}
|
||||
|
@ -1313,7 +1313,7 @@ func testCommitHelper(t *testing.T, sqlQuery *SqlQuery, queryExecutor *QueryExec
|
|||
SessionId: sqlQuery.sessionID,
|
||||
TransactionId: queryExecutor.transactionID,
|
||||
}
|
||||
if err := sqlQuery.Commit(queryExecutor.ctx, &session); err != nil {
|
||||
if err := sqlQuery.Commit(queryExecutor.ctx, sqlQuery.target, &session); err != nil {
|
||||
t.Fatalf("failed to commit transaction: %d, err: %v", queryExecutor.transactionID, err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -386,6 +386,7 @@ func (rqsc *realQueryServiceControl) BroadcastHealth(terTimestamp int64, stats *
|
|||
func (rqsc *realQueryServiceControl) IsHealthy() error {
|
||||
return rqsc.sqlQueryRPCService.Execute(
|
||||
context.Background(),
|
||||
nil,
|
||||
&proto.Query{
|
||||
Sql: "select 1 from dual",
|
||||
SessionId: rqsc.sqlQueryRPCService.sessionID,
|
||||
|
|
|
@ -22,17 +22,17 @@ type QueryService interface {
|
|||
GetSessionId(sessionParams *proto.SessionParams, sessionInfo *proto.SessionInfo) error
|
||||
|
||||
// Transaction management
|
||||
Begin(ctx context.Context, session *proto.Session, txInfo *proto.TransactionInfo) error
|
||||
Commit(ctx context.Context, session *proto.Session) error
|
||||
Rollback(ctx context.Context, session *proto.Session) error
|
||||
Begin(ctx context.Context, target *pb.Target, session *proto.Session, txInfo *proto.TransactionInfo) error
|
||||
Commit(ctx context.Context, target *pb.Target, session *proto.Session) error
|
||||
Rollback(ctx context.Context, target *pb.Target, session *proto.Session) error
|
||||
|
||||
// Query execution
|
||||
Execute(ctx context.Context, query *proto.Query, reply *mproto.QueryResult) error
|
||||
StreamExecute(ctx context.Context, query *proto.Query, sendReply func(*mproto.QueryResult) error) error
|
||||
ExecuteBatch(ctx context.Context, queryList *proto.QueryList, reply *proto.QueryResultList) error
|
||||
Execute(ctx context.Context, target *pb.Target, query *proto.Query, reply *mproto.QueryResult) error
|
||||
StreamExecute(ctx context.Context, target *pb.Target, query *proto.Query, sendReply func(*mproto.QueryResult) error) error
|
||||
ExecuteBatch(ctx context.Context, target *pb.Target, queryList *proto.QueryList, reply *proto.QueryResultList) error
|
||||
|
||||
// Map reduce helper
|
||||
SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error
|
||||
SplitQuery(ctx context.Context, target *pb.Target, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error
|
||||
|
||||
// StreamHealthRegister registers a listener for StreamHealth
|
||||
StreamHealthRegister(chan<- *pb.StreamHealthResponse) (int, error)
|
||||
|
|
|
@ -282,12 +282,12 @@ func (sq *SqlQuery) GetSessionId(sessionParams *proto.SessionParams, sessionInfo
|
|||
}
|
||||
|
||||
// Begin starts a new transaction. This is allowed only if the state is StateServing.
|
||||
func (sq *SqlQuery) Begin(ctx context.Context, session *proto.Session, txInfo *proto.TransactionInfo) (err error) {
|
||||
func (sq *SqlQuery) Begin(ctx context.Context, target *pb.Target, session *proto.Session, txInfo *proto.TransactionInfo) (err error) {
|
||||
logStats := newSqlQueryStats("Begin", ctx)
|
||||
logStats.OriginalSql = "begin"
|
||||
defer handleError(&err, logStats, sq.qe.queryServiceStats)
|
||||
|
||||
if err = sq.startRequest(nil, session.SessionId, false, false); err != nil {
|
||||
if err = sq.startRequest(target, session.SessionId, false, false); err != nil {
|
||||
return err
|
||||
}
|
||||
ctx, cancel := withTimeout(ctx, sq.qe.txPool.PoolTimeout())
|
||||
|
@ -303,13 +303,13 @@ func (sq *SqlQuery) Begin(ctx context.Context, session *proto.Session, txInfo *p
|
|||
}
|
||||
|
||||
// Commit commits the specified transaction.
|
||||
func (sq *SqlQuery) Commit(ctx context.Context, session *proto.Session) (err error) {
|
||||
func (sq *SqlQuery) Commit(ctx context.Context, target *pb.Target, session *proto.Session) (err error) {
|
||||
logStats := newSqlQueryStats("Commit", ctx)
|
||||
logStats.OriginalSql = "commit"
|
||||
logStats.TransactionID = session.TransactionId
|
||||
defer handleError(&err, logStats, sq.qe.queryServiceStats)
|
||||
|
||||
if err = sq.startRequest(nil, session.SessionId, false, true); err != nil {
|
||||
if err = sq.startRequest(target, session.SessionId, false, true); err != nil {
|
||||
return err
|
||||
}
|
||||
ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get())
|
||||
|
@ -324,13 +324,13 @@ func (sq *SqlQuery) Commit(ctx context.Context, session *proto.Session) (err err
|
|||
}
|
||||
|
||||
// Rollback rollsback the specified transaction.
|
||||
func (sq *SqlQuery) Rollback(ctx context.Context, session *proto.Session) (err error) {
|
||||
func (sq *SqlQuery) Rollback(ctx context.Context, target *pb.Target, session *proto.Session) (err error) {
|
||||
logStats := newSqlQueryStats("Rollback", ctx)
|
||||
logStats.OriginalSql = "rollback"
|
||||
logStats.TransactionID = session.TransactionId
|
||||
defer handleError(&err, logStats, sq.qe.queryServiceStats)
|
||||
|
||||
if err = sq.startRequest(nil, session.SessionId, false, true); err != nil {
|
||||
if err = sq.startRequest(target, session.SessionId, false, true); err != nil {
|
||||
return err
|
||||
}
|
||||
ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get())
|
||||
|
@ -382,12 +382,12 @@ func (sq *SqlQuery) handleExecErrorNoPanic(query *proto.Query, err interface{},
|
|||
}
|
||||
|
||||
// Execute executes the query and returns the result as response.
|
||||
func (sq *SqlQuery) Execute(ctx context.Context, query *proto.Query, reply *mproto.QueryResult) (err error) {
|
||||
func (sq *SqlQuery) Execute(ctx context.Context, target *pb.Target, query *proto.Query, reply *mproto.QueryResult) (err error) {
|
||||
logStats := newSqlQueryStats("Execute", ctx)
|
||||
defer sq.handleExecError(query, &err, logStats)
|
||||
|
||||
allowShutdown := (query.TransactionId != 0)
|
||||
if err = sq.startRequest(nil, query.SessionId, false, allowShutdown); err != nil {
|
||||
if err = sq.startRequest(target, query.SessionId, false, allowShutdown); err != nil {
|
||||
return err
|
||||
}
|
||||
ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get())
|
||||
|
@ -420,7 +420,7 @@ func (sq *SqlQuery) Execute(ctx context.Context, query *proto.Query, reply *mpro
|
|||
// StreamExecute executes the query and streams the result.
|
||||
// The first QueryResult will have Fields set (and Rows nil).
|
||||
// The subsequent QueryResult will have Rows set (and Fields nil).
|
||||
func (sq *SqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(*mproto.QueryResult) error) (err error) {
|
||||
func (sq *SqlQuery) StreamExecute(ctx context.Context, target *pb.Target, query *proto.Query, sendReply func(*mproto.QueryResult) error) (err error) {
|
||||
// check cases we don't handle yet
|
||||
if query.TransactionId != 0 {
|
||||
return NewTabletError(ErrFail, "Transactions not supported with streaming")
|
||||
|
@ -429,7 +429,7 @@ func (sq *SqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendR
|
|||
logStats := newSqlQueryStats("StreamExecute", ctx)
|
||||
defer sq.handleExecError(query, &err, logStats)
|
||||
|
||||
if err = sq.startRequest(nil, query.SessionId, false, false); err != nil {
|
||||
if err = sq.startRequest(target, query.SessionId, false, false); err != nil {
|
||||
return err
|
||||
}
|
||||
defer sq.endRequest()
|
||||
|
@ -458,7 +458,7 @@ func (sq *SqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendR
|
|||
// ExecuteBatch can be called for an existing transaction, or it can be called with
|
||||
// the AsTransaction flag which will execute all statements inside an independent
|
||||
// transaction. If AsTransaction is true, TransactionId must be 0.
|
||||
func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList, reply *proto.QueryResultList) (err error) {
|
||||
func (sq *SqlQuery) ExecuteBatch(ctx context.Context, target *pb.Target, queryList *proto.QueryList, reply *proto.QueryResultList) (err error) {
|
||||
if len(queryList.Queries) == 0 {
|
||||
return NewTabletError(ErrFail, "Empty query list")
|
||||
}
|
||||
|
@ -467,7 +467,7 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList
|
|||
}
|
||||
|
||||
allowShutdown := (queryList.TransactionId != 0)
|
||||
if err = sq.startRequest(nil, queryList.SessionId, false, allowShutdown); err != nil {
|
||||
if err = sq.startRequest(target, queryList.SessionId, false, allowShutdown); err != nil {
|
||||
return err
|
||||
}
|
||||
defer sq.endRequest()
|
||||
|
@ -479,7 +479,7 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList
|
|||
}
|
||||
if queryList.AsTransaction {
|
||||
var txInfo proto.TransactionInfo
|
||||
if err = sq.Begin(ctx, &session, &txInfo); err != nil {
|
||||
if err = sq.Begin(ctx, target, &session, &txInfo); err != nil {
|
||||
return err
|
||||
}
|
||||
session.TransactionId = txInfo.TransactionId
|
||||
|
@ -487,7 +487,7 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList
|
|||
// that there was an error, roll it back.
|
||||
defer func() {
|
||||
if session.TransactionId != 0 {
|
||||
sq.Rollback(ctx, &session)
|
||||
sq.Rollback(ctx, target, &session)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -500,13 +500,13 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList
|
|||
SessionId: session.SessionId,
|
||||
}
|
||||
var localReply mproto.QueryResult
|
||||
if err = sq.Execute(ctx, &query, &localReply); err != nil {
|
||||
if err = sq.Execute(ctx, target, &query, &localReply); err != nil {
|
||||
return err
|
||||
}
|
||||
reply.List = append(reply.List, localReply)
|
||||
}
|
||||
if queryList.AsTransaction {
|
||||
if err = sq.Commit(ctx, &session); err != nil {
|
||||
if err = sq.Commit(ctx, target, &session); err != nil {
|
||||
session.TransactionId = 0
|
||||
return err
|
||||
}
|
||||
|
@ -516,10 +516,10 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList
|
|||
}
|
||||
|
||||
// SplitQuery splits a BoundQuery into smaller queries that return a subset of rows from the original query.
|
||||
func (sq *SqlQuery) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) (err error) {
|
||||
func (sq *SqlQuery) SplitQuery(ctx context.Context, target *pb.Target, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) (err error) {
|
||||
logStats := newSqlQueryStats("SplitQuery", ctx)
|
||||
defer handleError(&err, logStats, sq.qe.queryServiceStats)
|
||||
if err = sq.startRequest(nil, req.SessionID, false, false); err != nil {
|
||||
if err = sq.startRequest(target, req.SessionID, false, false); err != nil {
|
||||
return err
|
||||
}
|
||||
ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get())
|
||||
|
|
|
@ -212,15 +212,15 @@ func TestSqlQueryCommandFailUnMatchedSessionId(t *testing.T) {
|
|||
TransactionId: 0,
|
||||
}
|
||||
txInfo := proto.TransactionInfo{TransactionId: 0}
|
||||
if err = sqlQuery.Begin(ctx, &session, &txInfo); err == nil {
|
||||
if err = sqlQuery.Begin(ctx, nil, &session, &txInfo); err == nil {
|
||||
t.Fatalf("call SqlQuery.Begin should fail because of an invalid session id: 0")
|
||||
}
|
||||
|
||||
if err = sqlQuery.Commit(ctx, &session); err == nil {
|
||||
if err = sqlQuery.Commit(ctx, nil, &session); err == nil {
|
||||
t.Fatalf("call SqlQuery.Commit should fail because of an invalid session id: 0")
|
||||
}
|
||||
|
||||
if err = sqlQuery.Rollback(ctx, &session); err == nil {
|
||||
if err = sqlQuery.Rollback(ctx, nil, &session); err == nil {
|
||||
t.Fatalf("call SqlQuery.Rollback should fail because of an invalid session id: 0")
|
||||
}
|
||||
|
||||
|
@ -231,12 +231,12 @@ func TestSqlQueryCommandFailUnMatchedSessionId(t *testing.T) {
|
|||
TransactionId: session.TransactionId,
|
||||
}
|
||||
reply := mproto.QueryResult{}
|
||||
if err := sqlQuery.Execute(ctx, &query, &reply); err == nil {
|
||||
if err := sqlQuery.Execute(ctx, nil, &query, &reply); err == nil {
|
||||
t.Fatalf("call SqlQuery.Execute should fail because of an invalid session id: 0")
|
||||
}
|
||||
|
||||
streamSendReply := func(*mproto.QueryResult) error { return nil }
|
||||
if err = sqlQuery.StreamExecute(ctx, &query, streamSendReply); err == nil {
|
||||
if err = sqlQuery.StreamExecute(ctx, nil, &query, streamSendReply); err == nil {
|
||||
t.Fatalf("call SqlQuery.StreamExecute should fail because of an invalid session id: 0")
|
||||
}
|
||||
|
||||
|
@ -256,7 +256,7 @@ func TestSqlQueryCommandFailUnMatchedSessionId(t *testing.T) {
|
|||
mproto.QueryResult{},
|
||||
},
|
||||
}
|
||||
if err = sqlQuery.ExecuteBatch(ctx, &batchQuery, &batchReply); err == nil {
|
||||
if err = sqlQuery.ExecuteBatch(ctx, nil, &batchQuery, &batchReply); err == nil {
|
||||
t.Fatalf("call SqlQuery.ExecuteBatch should fail because of an invalid session id: 0")
|
||||
}
|
||||
|
||||
|
@ -280,7 +280,7 @@ func TestSqlQueryCommandFailUnMatchedSessionId(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
if err = sqlQuery.SplitQuery(ctx, &splitQuery, &splitQueryReply); err == nil {
|
||||
if err = sqlQuery.SplitQuery(ctx, nil, &splitQuery, &splitQueryReply); err == nil {
|
||||
t.Fatalf("call SqlQuery.SplitQuery should fail because of an invalid session id: 0")
|
||||
}
|
||||
}
|
||||
|
@ -311,7 +311,7 @@ func TestSqlQueryCommitTransaciton(t *testing.T) {
|
|||
TransactionId: 0,
|
||||
}
|
||||
txInfo := proto.TransactionInfo{TransactionId: 0}
|
||||
if err = sqlQuery.Begin(ctx, &session, &txInfo); err != nil {
|
||||
if err = sqlQuery.Begin(ctx, nil, &session, &txInfo); err != nil {
|
||||
t.Fatalf("call SqlQuery.Begin failed")
|
||||
}
|
||||
session.TransactionId = txInfo.TransactionId
|
||||
|
@ -322,10 +322,10 @@ func TestSqlQueryCommitTransaciton(t *testing.T) {
|
|||
TransactionId: session.TransactionId,
|
||||
}
|
||||
reply := mproto.QueryResult{}
|
||||
if err := sqlQuery.Execute(ctx, &query, &reply); err != nil {
|
||||
if err := sqlQuery.Execute(ctx, nil, &query, &reply); err != nil {
|
||||
t.Fatalf("failed to execute query: %s", query.Sql)
|
||||
}
|
||||
if err := sqlQuery.Commit(ctx, &session); err != nil {
|
||||
if err := sqlQuery.Commit(ctx, nil, &session); err != nil {
|
||||
t.Fatalf("call SqlQuery.Commit failed")
|
||||
}
|
||||
}
|
||||
|
@ -356,7 +356,7 @@ func TestSqlQueryRollback(t *testing.T) {
|
|||
TransactionId: 0,
|
||||
}
|
||||
txInfo := proto.TransactionInfo{TransactionId: 0}
|
||||
if err = sqlQuery.Begin(ctx, &session, &txInfo); err != nil {
|
||||
if err = sqlQuery.Begin(ctx, nil, &session, &txInfo); err != nil {
|
||||
t.Fatalf("call SqlQuery.Begin failed")
|
||||
}
|
||||
session.TransactionId = txInfo.TransactionId
|
||||
|
@ -367,10 +367,10 @@ func TestSqlQueryRollback(t *testing.T) {
|
|||
TransactionId: session.TransactionId,
|
||||
}
|
||||
reply := mproto.QueryResult{}
|
||||
if err := sqlQuery.Execute(ctx, &query, &reply); err != nil {
|
||||
if err := sqlQuery.Execute(ctx, nil, &query, &reply); err != nil {
|
||||
t.Fatalf("failed to execute query: %s", query.Sql)
|
||||
}
|
||||
if err := sqlQuery.Rollback(ctx, &session); err != nil {
|
||||
if err := sqlQuery.Rollback(ctx, nil, &session); err != nil {
|
||||
t.Fatalf("call SqlQuery.Rollback failed")
|
||||
}
|
||||
}
|
||||
|
@ -402,7 +402,7 @@ func TestSqlQueryStreamExecute(t *testing.T) {
|
|||
TransactionId: 0,
|
||||
}
|
||||
txInfo := proto.TransactionInfo{TransactionId: 0}
|
||||
if err = sqlQuery.Begin(ctx, &session, &txInfo); err != nil {
|
||||
if err = sqlQuery.Begin(ctx, nil, &session, &txInfo); err != nil {
|
||||
t.Fatalf("call SqlQuery.Begin failed")
|
||||
}
|
||||
session.TransactionId = txInfo.TransactionId
|
||||
|
@ -413,14 +413,14 @@ func TestSqlQueryStreamExecute(t *testing.T) {
|
|||
TransactionId: session.TransactionId,
|
||||
}
|
||||
sendReply := func(*mproto.QueryResult) error { return nil }
|
||||
if err := sqlQuery.StreamExecute(ctx, &query, sendReply); err == nil {
|
||||
if err := sqlQuery.StreamExecute(ctx, nil, &query, sendReply); err == nil {
|
||||
t.Fatalf("SqlQuery.StreamExecute should fail: %s", query.Sql)
|
||||
}
|
||||
if err := sqlQuery.Rollback(ctx, &session); err != nil {
|
||||
if err := sqlQuery.Rollback(ctx, nil, &session); err != nil {
|
||||
t.Fatalf("call SqlQuery.Rollback failed")
|
||||
}
|
||||
query.TransactionId = 0
|
||||
if err := sqlQuery.StreamExecute(ctx, &query, sendReply); err != nil {
|
||||
if err := sqlQuery.StreamExecute(ctx, nil, &query, sendReply); err != nil {
|
||||
t.Fatalf("SqlQuery.StreamExecute should success: %s, but get error: %v",
|
||||
query.Sql, err)
|
||||
}
|
||||
|
@ -462,7 +462,7 @@ func TestSqlQueryExecuteBatch(t *testing.T) {
|
|||
mproto.QueryResult{},
|
||||
},
|
||||
}
|
||||
if err := sqlQuery.ExecuteBatch(ctx, &query, &reply); err != nil {
|
||||
if err := sqlQuery.ExecuteBatch(ctx, nil, &query, &reply); err != nil {
|
||||
t.Fatalf("SqlQuery.ExecuteBatch should success: %v, but get error: %v",
|
||||
query, err)
|
||||
}
|
||||
|
@ -488,7 +488,7 @@ func TestSqlQueryExecuteBatchFailEmptyQueryList(t *testing.T) {
|
|||
reply := proto.QueryResultList{
|
||||
List: []mproto.QueryResult{},
|
||||
}
|
||||
err = sqlQuery.ExecuteBatch(ctx, &query, &reply)
|
||||
err = sqlQuery.ExecuteBatch(ctx, nil, &query, &reply)
|
||||
verifyTabletError(t, err, ErrFail)
|
||||
}
|
||||
|
||||
|
@ -519,7 +519,7 @@ func TestSqlQueryExecuteBatchFailAsTransaction(t *testing.T) {
|
|||
reply := proto.QueryResultList{
|
||||
List: []mproto.QueryResult{},
|
||||
}
|
||||
err = sqlQuery.ExecuteBatch(ctx, &query, &reply)
|
||||
err = sqlQuery.ExecuteBatch(ctx, nil, &query, &reply)
|
||||
verifyTabletError(t, err, ErrFail)
|
||||
}
|
||||
|
||||
|
@ -552,7 +552,7 @@ func TestSqlQueryExecuteBatchBeginFail(t *testing.T) {
|
|||
mproto.QueryResult{},
|
||||
},
|
||||
}
|
||||
if err := sqlQuery.ExecuteBatch(ctx, &query, &reply); err == nil {
|
||||
if err := sqlQuery.ExecuteBatch(ctx, nil, &query, &reply); err == nil {
|
||||
t.Fatalf("SqlQuery.ExecuteBatch should fail")
|
||||
}
|
||||
}
|
||||
|
@ -591,7 +591,7 @@ func TestSqlQueryExecuteBatchCommitFail(t *testing.T) {
|
|||
mproto.QueryResult{},
|
||||
},
|
||||
}
|
||||
if err := sqlQuery.ExecuteBatch(ctx, &query, &reply); err == nil {
|
||||
if err := sqlQuery.ExecuteBatch(ctx, nil, &query, &reply); err == nil {
|
||||
t.Fatalf("SqlQuery.ExecuteBatch should fail")
|
||||
}
|
||||
}
|
||||
|
@ -642,7 +642,7 @@ func TestSqlQueryExecuteBatchSqlExecFailInTransaction(t *testing.T) {
|
|||
t.Fatalf("rollback should not be executed.")
|
||||
}
|
||||
|
||||
if err := sqlQuery.ExecuteBatch(ctx, &query, &reply); err == nil {
|
||||
if err := sqlQuery.ExecuteBatch(ctx, nil, &query, &reply); err == nil {
|
||||
t.Fatalf("SqlQuery.ExecuteBatch should fail")
|
||||
}
|
||||
|
||||
|
@ -689,7 +689,7 @@ func TestSqlQueryExecuteBatchSqlSucceedInTransaction(t *testing.T) {
|
|||
*sqlResult,
|
||||
},
|
||||
}
|
||||
if err := sqlQuery.ExecuteBatch(ctx, &query, &reply); err != nil {
|
||||
if err := sqlQuery.ExecuteBatch(ctx, nil, &query, &reply); err != nil {
|
||||
t.Fatalf("SqlQuery.ExecuteBatch should succeed")
|
||||
}
|
||||
}
|
||||
|
@ -721,7 +721,7 @@ func TestSqlQueryExecuteBatchCallCommitWithoutABegin(t *testing.T) {
|
|||
mproto.QueryResult{},
|
||||
},
|
||||
}
|
||||
if err := sqlQuery.ExecuteBatch(ctx, &query, &reply); err == nil {
|
||||
if err := sqlQuery.ExecuteBatch(ctx, nil, &query, &reply); err == nil {
|
||||
t.Fatalf("SqlQuery.ExecuteBatch should fail")
|
||||
}
|
||||
}
|
||||
|
@ -779,7 +779,7 @@ func TestExecuteBatchNestedTransaction(t *testing.T) {
|
|||
mproto.QueryResult{},
|
||||
},
|
||||
}
|
||||
if err := sqlQuery.ExecuteBatch(ctx, &query, &reply); err == nil {
|
||||
if err := sqlQuery.ExecuteBatch(ctx, nil, &query, &reply); err == nil {
|
||||
t.Fatalf("SqlQuery.Execute should fail because of nested transaction")
|
||||
}
|
||||
sqlQuery.qe.txPool.SetTimeout(10)
|
||||
|
@ -830,7 +830,7 @@ func TestSqlQuerySplitQuery(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
if err := sqlQuery.SplitQuery(ctx, &query, &reply); err != nil {
|
||||
if err := sqlQuery.SplitQuery(ctx, nil, &query, &reply); err != nil {
|
||||
t.Fatalf("SqlQuery.SplitQuery should success: %v, but get error: %v",
|
||||
query, err)
|
||||
}
|
||||
|
@ -869,7 +869,7 @@ func TestSqlQuerySplitQueryInvalidQuery(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
if err := sqlQuery.SplitQuery(ctx, &query, &reply); err == nil {
|
||||
if err := sqlQuery.SplitQuery(ctx, nil, &query, &reply); err == nil {
|
||||
t.Fatalf("SqlQuery.SplitQuery should fail")
|
||||
}
|
||||
}
|
||||
|
@ -922,7 +922,7 @@ func TestSqlQuerySplitQueryInvalidMinMax(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
if err := sqlQuery.SplitQuery(ctx, &query, &reply); err == nil {
|
||||
if err := sqlQuery.SplitQuery(ctx, nil, &query, &reply); err == nil {
|
||||
t.Fatalf("SqlQuery.SplitQuery should fail")
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче