Merge pull request #965 from alainjobart/replication

Replication
This commit is contained in:
Alain Jobart 2015-08-05 13:36:27 -07:00
Родитель 883b9313bc e58395c43b
Коммит 813e4f1de9
25 изменённых файлов: 274 добавлений и 170 удалений

Просмотреть файл

@ -12,6 +12,7 @@ import (
"golang.org/x/net/context"
pb "github.com/youtube/vitess/go/vt/proto/query"
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
)
// This file maintains a tablet health cache. It establishes streaming
@ -86,8 +87,9 @@ func (th *tabletHealth) stream(ctx context.Context, ts topo.Server, tabletAlias
return err
}
// pass in empty keyspace and shard to not ask for sessionId
conn, err := tabletconn.GetDialer()(ctx, ep, "", "", 30*time.Second)
// Pass in a tablet type that is not UNKNOWN, so we don't ask
// for sessionId.
conn, err := tabletconn.GetDialer()(ctx, ep, "", "", pbt.TabletType_MASTER, 30*time.Second)
if err != nil {
return err
}

Просмотреть файл

@ -52,7 +52,7 @@ func (sq *SqlQuery) GetSessionId2(sessionIdReq *proto.GetSessionIdRequest, sessi
// Begin is exposing tabletserver.SqlQuery.Begin
func (sq *SqlQuery) Begin(ctx context.Context, session *proto.Session, txInfo *proto.TransactionInfo) (err error) {
defer sq.server.HandlePanic(&err)
tErr := sq.server.Begin(callinfo.RPCWrapCallInfo(ctx), session, txInfo)
tErr := sq.server.Begin(callinfo.RPCWrapCallInfo(ctx), nil, session, txInfo)
tabletserver.AddTabletErrorToTransactionInfo(tErr, txInfo)
if *tabletserver.RPCErrorOnlyInReply {
return nil
@ -73,7 +73,7 @@ func (sq *SqlQuery) Begin2(ctx context.Context, beginRequest *proto.BeginRequest
callerid.GoRPCEffectiveCallerID(beginRequest.EffectiveCallerID),
callerid.GoRPCImmediateCallerID(beginRequest.ImmediateCallerID),
)
tErr := sq.server.Begin(callinfo.RPCWrapCallInfo(ctx), session, txInfo)
tErr := sq.server.Begin(callinfo.RPCWrapCallInfo(ctx), proto.TargetToProto3(beginRequest.Target), session, txInfo)
// Convert from TxInfo => beginResponse for the output
beginResponse.TransactionId = txInfo.TransactionId
tabletserver.AddTabletErrorToBeginResponse(tErr, beginResponse)
@ -86,7 +86,7 @@ func (sq *SqlQuery) Begin2(ctx context.Context, beginRequest *proto.BeginRequest
// Commit is exposing tabletserver.SqlQuery.Commit
func (sq *SqlQuery) Commit(ctx context.Context, session *proto.Session, noOutput *rpc.Unused) (err error) {
defer sq.server.HandlePanic(&err)
return sq.server.Commit(callinfo.RPCWrapCallInfo(ctx), session)
return sq.server.Commit(callinfo.RPCWrapCallInfo(ctx), nil, session)
}
// Commit2 should not be used by anything other than tests.
@ -102,7 +102,7 @@ func (sq *SqlQuery) Commit2(ctx context.Context, commitRequest *proto.CommitRequ
callerid.GoRPCEffectiveCallerID(commitRequest.EffectiveCallerID),
callerid.GoRPCImmediateCallerID(commitRequest.ImmediateCallerID),
)
tErr := sq.server.Commit(callinfo.RPCWrapCallInfo(ctx), session)
tErr := sq.server.Commit(callinfo.RPCWrapCallInfo(ctx), proto.TargetToProto3(commitRequest.Target), session)
tabletserver.AddTabletErrorToCommitResponse(tErr, commitResponse)
if *tabletserver.RPCErrorOnlyInReply {
return nil
@ -113,7 +113,7 @@ func (sq *SqlQuery) Commit2(ctx context.Context, commitRequest *proto.CommitRequ
// Rollback is exposing tabletserver.SqlQuery.Rollback
func (sq *SqlQuery) Rollback(ctx context.Context, session *proto.Session, noOutput *rpc.Unused) (err error) {
defer sq.server.HandlePanic(&err)
return sq.server.Rollback(callinfo.RPCWrapCallInfo(ctx), session)
return sq.server.Rollback(callinfo.RPCWrapCallInfo(ctx), nil, session)
}
// Rollback2 should not be used by anything other than tests.
@ -129,7 +129,7 @@ func (sq *SqlQuery) Rollback2(ctx context.Context, rollbackRequest *proto.Rollba
callerid.GoRPCEffectiveCallerID(rollbackRequest.EffectiveCallerID),
callerid.GoRPCImmediateCallerID(rollbackRequest.ImmediateCallerID),
)
tErr := sq.server.Rollback(callinfo.RPCWrapCallInfo(ctx), session)
tErr := sq.server.Rollback(callinfo.RPCWrapCallInfo(ctx), proto.TargetToProto3(rollbackRequest.Target), session)
tabletserver.AddTabletErrorToRollbackResponse(tErr, rollbackResponse)
if *tabletserver.RPCErrorOnlyInReply {
return nil
@ -140,7 +140,7 @@ func (sq *SqlQuery) Rollback2(ctx context.Context, rollbackRequest *proto.Rollba
// Execute is exposing tabletserver.SqlQuery.Execute
func (sq *SqlQuery) Execute(ctx context.Context, query *proto.Query, reply *mproto.QueryResult) (err error) {
defer sq.server.HandlePanic(&err)
tErr := sq.server.Execute(callinfo.RPCWrapCallInfo(ctx), query, reply)
tErr := sq.server.Execute(callinfo.RPCWrapCallInfo(ctx), nil, query, reply)
tabletserver.AddTabletErrorToQueryResult(tErr, reply)
if *tabletserver.RPCErrorOnlyInReply {
return nil
@ -157,7 +157,7 @@ func (sq *SqlQuery) Execute2(ctx context.Context, executeRequest *proto.ExecuteR
callerid.GoRPCEffectiveCallerID(executeRequest.EffectiveCallerID),
callerid.GoRPCImmediateCallerID(executeRequest.ImmediateCallerID),
)
tErr := sq.server.Execute(callinfo.RPCWrapCallInfo(ctx), &executeRequest.QueryRequest, reply)
tErr := sq.server.Execute(callinfo.RPCWrapCallInfo(ctx), proto.TargetToProto3(executeRequest.Target), &executeRequest.QueryRequest, reply)
tabletserver.AddTabletErrorToQueryResult(tErr, reply)
if *tabletserver.RPCErrorOnlyInReply {
return nil
@ -168,7 +168,7 @@ func (sq *SqlQuery) Execute2(ctx context.Context, executeRequest *proto.ExecuteR
// StreamExecute is exposing tabletserver.SqlQuery.StreamExecute
func (sq *SqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(reply interface{}) error) (err error) {
defer sq.server.HandlePanic(&err)
return sq.server.StreamExecute(callinfo.RPCWrapCallInfo(ctx), query, func(reply *mproto.QueryResult) error {
return sq.server.StreamExecute(callinfo.RPCWrapCallInfo(ctx), nil, query, func(reply *mproto.QueryResult) error {
return sendReply(reply)
})
}
@ -185,7 +185,7 @@ func (sq *SqlQuery) StreamExecute2(ctx context.Context, req *proto.StreamExecute
callerid.GoRPCEffectiveCallerID(req.EffectiveCallerID),
callerid.GoRPCImmediateCallerID(req.ImmediateCallerID),
)
tErr := sq.server.StreamExecute(callinfo.RPCWrapCallInfo(ctx), req.Query, func(reply *mproto.QueryResult) error {
tErr := sq.server.StreamExecute(callinfo.RPCWrapCallInfo(ctx), proto.TargetToProto3(req.Target), req.Query, func(reply *mproto.QueryResult) error {
return sendReply(reply)
})
if tErr == nil {
@ -208,7 +208,7 @@ func (sq *SqlQuery) StreamExecute2(ctx context.Context, req *proto.StreamExecute
// ExecuteBatch is exposing tabletserver.SqlQuery.ExecuteBatch
func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList, reply *proto.QueryResultList) (err error) {
defer sq.server.HandlePanic(&err)
tErr := sq.server.ExecuteBatch(callinfo.RPCWrapCallInfo(ctx), queryList, reply)
tErr := sq.server.ExecuteBatch(callinfo.RPCWrapCallInfo(ctx), nil, queryList, reply)
tabletserver.AddTabletErrorToQueryResultList(tErr, reply)
if *tabletserver.RPCErrorOnlyInReply {
return nil
@ -228,7 +228,7 @@ func (sq *SqlQuery) ExecuteBatch2(ctx context.Context, req *proto.ExecuteBatchRe
callerid.GoRPCEffectiveCallerID(req.EffectiveCallerID),
callerid.GoRPCImmediateCallerID(req.ImmediateCallerID),
)
tErr := sq.server.ExecuteBatch(callinfo.RPCWrapCallInfo(ctx), &req.QueryBatch, reply)
tErr := sq.server.ExecuteBatch(callinfo.RPCWrapCallInfo(ctx), proto.TargetToProto3(req.Target), &req.QueryBatch, reply)
tabletserver.AddTabletErrorToQueryResultList(tErr, reply)
if *tabletserver.RPCErrorOnlyInReply {
return nil
@ -243,7 +243,7 @@ func (sq *SqlQuery) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest
callerid.GoRPCEffectiveCallerID(req.EffectiveCallerID),
callerid.GoRPCImmediateCallerID(req.ImmediateCallerID),
)
tErr := sq.server.SplitQuery(callinfo.RPCWrapCallInfo(ctx), req, reply)
tErr := sq.server.SplitQuery(callinfo.RPCWrapCallInfo(ctx), proto.TargetToProto3(req.Target), req, reply)
tabletserver.AddTabletErrorToSplitQueryResult(tErr, reply)
if *tabletserver.RPCErrorOnlyInReply {
return nil

Просмотреть файл

@ -25,25 +25,31 @@ import (
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
)
const protocolName = "gorpc"
var (
tabletBsonUsername = flag.String("tablet-bson-username", "", "user to use for bson rpc connections")
tabletBsonPassword = flag.String("tablet-bson-password", "", "password to use for bson rpc connections (ignored if username is empty)")
)
func init() {
tabletconn.RegisterDialer("gorpc", DialTablet)
tabletconn.RegisterDialer(protocolName, DialTablet)
}
// TabletBson implements a bson rpcplus implementation for TabletConn
type TabletBson struct {
mu sync.RWMutex
// endPoint is set at construction time, and never changed
endPoint *pbt.EndPoint
// mu protects the next fields
mu sync.RWMutex
rpcClient *rpcplus.Client
sessionID int64
target *tproto.Target
}
// DialTablet creates and initializes TabletBson.
func DialTablet(ctx context.Context, endPoint *pbt.EndPoint, keyspace, shard string, timeout time.Duration) (tabletconn.TabletConn, error) {
func DialTablet(ctx context.Context, endPoint *pbt.EndPoint, keyspace, shard string, tabletType pbt.TabletType, timeout time.Duration) (tabletconn.TabletConn, error) {
addr := netutil.JoinHostPort(endPoint.Host, endPoint.PortMap["vt"])
conn := &TabletBson{endPoint: endPoint}
var err error
@ -56,7 +62,8 @@ func DialTablet(ctx context.Context, endPoint *pbt.EndPoint, keyspace, shard str
return nil, tabletError(err)
}
if keyspace != "" || shard != "" {
if tabletType == pbt.TabletType_UNKNOWN {
// we use session
var sessionInfo tproto.SessionInfo
if err = conn.rpcClient.Call(ctx, "SqlQuery.GetSessionId", tproto.SessionParams{Keyspace: keyspace, Shard: shard}, &sessionInfo); err != nil {
conn.rpcClient.Close()
@ -68,6 +75,13 @@ func DialTablet(ctx context.Context, endPoint *pbt.EndPoint, keyspace, shard str
return nil, tabletError(err)
}
conn.sessionID = sessionInfo.SessionId
} else {
// we use target
conn.target = &tproto.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: tproto.TabletType(tabletType),
}
}
return conn, nil
}
@ -524,6 +538,24 @@ func (conn *TabletBson) Close() {
rpcClient.Close()
}
// SetTarget can be called to change the target used for subsequent calls.
func (conn *TabletBson) SetTarget(keyspace, shard string, tabletType pbt.TabletType) error {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.target == nil {
return fmt.Errorf("cannot set target on sessionId based conn")
}
if tabletType == pbt.TabletType_UNKNOWN {
return fmt.Errorf("cannot set tablet type to UNKNOWN")
}
conn.target = &tproto.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: tproto.TabletType(tabletType),
}
return nil
}
// EndPoint returns the rpc end point.
func (conn *TabletBson) EndPoint() *pbt.EndPoint {
return conn.endPoint

Просмотреть файл

@ -8,14 +8,12 @@ import (
"net"
"net/http"
"testing"
"time"
"github.com/youtube/vitess/go/rpcplus"
"github.com/youtube/vitess/go/rpcwrap/bsonrpc"
"github.com/youtube/vitess/go/vt/tabletserver"
"github.com/youtube/vitess/go/vt/tabletserver/gorpcqueryservice"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconntest"
"golang.org/x/net/context"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
)
@ -47,23 +45,13 @@ func testGoRPCTabletConn(t *testing.T, rpcOnlyInReply bool) {
// Handle errors appropriately
*tabletserver.RPCErrorOnlyInReply = rpcOnlyInReply
// Create a Go RPC client connecting to the server
ctx := context.Background()
client, err := DialTablet(ctx, &pb.EndPoint{
// run the test suite
tabletconntest.TestSuite(t, protocolName, &pb.EndPoint{
Host: "localhost",
PortMap: map[string]int32{
"vt": int32(port),
},
}, tabletconntest.TestKeyspace, tabletconntest.TestShard, 30*time.Second)
if err != nil {
t.Fatalf("dial failed: %v", err)
}
// run the test suite
tabletconntest.TestSuite(t, client, service)
// and clean up
client.Close()
}, service)
}
func TestGoRPCTabletConn(t *testing.T) {

Просмотреть файл

@ -52,7 +52,7 @@ func (q *query) Execute(ctx context.Context, request *pb.ExecuteRequest) (respon
request.GetImmediateCallerId(),
)
reply := new(mproto.QueryResult)
execErr := q.server.Execute(ctx, &proto.Query{
execErr := q.server.Execute(ctx, request.Target, &proto.Query{
Sql: string(request.Query.Sql),
BindVariables: proto.Proto3ToBindVariables(request.Query.BindVariables),
SessionId: request.SessionId,
@ -76,7 +76,7 @@ func (q *query) ExecuteBatch(ctx context.Context, request *pb.ExecuteBatchReques
request.GetImmediateCallerId(),
)
reply := new(proto.QueryResultList)
execErr := q.server.ExecuteBatch(ctx, &proto.QueryList{
execErr := q.server.ExecuteBatch(ctx, request.Target, &proto.QueryList{
Queries: proto.Proto3ToBoundQueryList(request.Queries),
SessionId: request.SessionId,
AsTransaction: request.AsTransaction,
@ -99,7 +99,7 @@ func (q *query) StreamExecute(request *pb.StreamExecuteRequest, stream pbs.Query
request.GetEffectiveCallerId(),
request.GetImmediateCallerId(),
)
seErr := q.server.StreamExecute(ctx, &proto.Query{
seErr := q.server.StreamExecute(ctx, request.Target, &proto.Query{
Sql: string(request.Query.Sql),
BindVariables: proto.Proto3ToBindVariables(request.Query.BindVariables),
SessionId: request.SessionId,
@ -127,7 +127,7 @@ func (q *query) Begin(ctx context.Context, request *pb.BeginRequest) (response *
request.GetImmediateCallerId(),
)
txInfo := new(proto.TransactionInfo)
if beginErr := q.server.Begin(ctx, &proto.Session{
if beginErr := q.server.Begin(ctx, request.Target, &proto.Session{
SessionId: request.SessionId,
}, txInfo); beginErr != nil {
return &pb.BeginResponse{
@ -147,7 +147,7 @@ func (q *query) Commit(ctx context.Context, request *pb.CommitRequest) (response
request.GetEffectiveCallerId(),
request.GetImmediateCallerId(),
)
commitErr := q.server.Commit(ctx, &proto.Session{
commitErr := q.server.Commit(ctx, request.Target, &proto.Session{
SessionId: request.SessionId,
TransactionId: request.TransactionId,
})
@ -163,7 +163,7 @@ func (q *query) Rollback(ctx context.Context, request *pb.RollbackRequest) (resp
request.GetEffectiveCallerId(),
request.GetImmediateCallerId(),
)
rollbackErr := q.server.Rollback(ctx, &proto.Session{
rollbackErr := q.server.Rollback(ctx, request.Target, &proto.Session{
SessionId: request.SessionId,
TransactionId: request.TransactionId,
})
@ -181,7 +181,7 @@ func (q *query) SplitQuery(ctx context.Context, request *pb.SplitQueryRequest) (
request.GetImmediateCallerId(),
)
reply := &proto.SplitQueryResult{}
if sqErr := q.server.SplitQuery(ctx, &proto.SplitQueryRequest{
if sqErr := q.server.SplitQuery(ctx, request.Target, &proto.SplitQueryRequest{
Query: *proto.Proto3ToBoundQuery(request.Query),
SplitColumn: request.SplitColumn,
SplitCount: int(request.SplitCount),

Просмотреть файл

@ -24,21 +24,27 @@ import (
pbv "github.com/youtube/vitess/go/vt/proto/vtrpc"
)
const protocolName = "grpc"
func init() {
tabletconn.RegisterDialer("grpc", DialTablet)
tabletconn.RegisterDialer(protocolName, DialTablet)
}
// gRPCQueryClient implements a gRPC implementation for TabletConn
type gRPCQueryClient struct {
mu sync.RWMutex
// endPoint is set at construction time, and never changed
endPoint *pbt.EndPoint
// mu protects the next fields
mu sync.RWMutex
cc *grpc.ClientConn
c pbs.QueryClient
sessionID int64
target *pb.Target
}
// DialTablet creates and initializes gRPCQueryClient.
func DialTablet(ctx context.Context, endPoint *pbt.EndPoint, keyspace, shard string, timeout time.Duration) (tabletconn.TabletConn, error) {
func DialTablet(ctx context.Context, endPoint *pbt.EndPoint, keyspace, shard string, tabletType pbt.TabletType, timeout time.Duration) (tabletconn.TabletConn, error) {
// create the RPC client
addr := netutil.JoinHostPort(endPoint.Host, endPoint.PortMap["grpc"])
cc, err := grpc.Dial(addr, grpc.WithBlock(), grpc.WithTimeout(timeout))
@ -52,7 +58,8 @@ func DialTablet(ctx context.Context, endPoint *pbt.EndPoint, keyspace, shard str
cc: cc,
c: c,
}
if keyspace != "" || shard != "" {
if tabletType == pbt.TabletType_UNKNOWN {
// we use session
gsir, err := c.GetSessionId(ctx, &pb.GetSessionIdRequest{
Keyspace: keyspace,
Shard: shard,
@ -66,6 +73,13 @@ func DialTablet(ctx context.Context, endPoint *pbt.EndPoint, keyspace, shard str
return nil, tabletErrorFromRPCError(gsir.Error)
}
result.sessionID = gsir.SessionId
} else {
// we use target
result.target = &pb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: tabletType,
}
}
return result, nil
@ -329,6 +343,24 @@ func (conn *gRPCQueryClient) Close() {
cc.Close()
}
// SetTarget can be called to change the target used for subsequent calls.
func (conn *gRPCQueryClient) SetTarget(keyspace, shard string, tabletType pbt.TabletType) error {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.target == nil {
return fmt.Errorf("cannot set target on sessionId based conn")
}
if tabletType == pbt.TabletType_UNKNOWN {
return fmt.Errorf("cannot set tablet type to UNKNOWN")
}
conn.target = &pb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: tabletType,
}
return nil
}
// EndPoint returns the rpc end point.
func (conn *gRPCQueryClient) EndPoint() *pbt.EndPoint {
return conn.endPoint

Просмотреть файл

@ -7,13 +7,11 @@ package grpctabletconn
import (
"net"
"testing"
"time"
"google.golang.org/grpc"
"github.com/youtube/vitess/go/vt/tabletserver/grpcqueryservice"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconntest"
"golang.org/x/net/context"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
)
@ -36,21 +34,11 @@ func TestGoRPCTabletConn(t *testing.T) {
grpcqueryservice.RegisterForTest(server, service)
go server.Serve(listener)
// Create a gRPC client connecting to the server
ctx := context.Background()
client, err := DialTablet(ctx, &pb.EndPoint{
// run the test suite
tabletconntest.TestSuite(t, protocolName, &pb.EndPoint{
Host: host,
PortMap: map[string]int32{
"grpc": int32(port),
},
}, tabletconntest.TestKeyspace, tabletconntest.TestShard, 30*time.Second)
if err != nil {
t.Fatalf("dial failed: %v", err)
}
// run the test suite
tabletconntest.TestSuite(t, client, service)
// and clean up
client.Close()
}, service)
}

Просмотреть файл

@ -8,8 +8,21 @@ import (
mproto "github.com/youtube/vitess/go/mysql/proto"
pb "github.com/youtube/vitess/go/vt/proto/query"
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
)
// TargetToProto3 transform the bson RPC target to proto3
func TargetToProto3(target *Target) *pb.Target {
if target == nil {
return nil
}
return &pb.Target{
Keyspace: target.Keyspace,
Shard: target.Shard,
TabletType: pbt.TabletType(target.TabletType),
}
}
// BoundQueryToProto3 converts internal types to proto3 BoundQuery
func BoundQueryToProto3(sql string, bindVars map[string]interface{}) *pb.BoundQuery {
result := &pb.BoundQuery{

Просмотреть файл

@ -50,6 +50,7 @@ type ExecuteRequest struct {
QueryRequest Query
EffectiveCallerID *CallerID
ImmediateCallerID *VTGateCallerID
Target *Target
}
//go:generate bsongen -file $GOFILE -type Query -o query_bson.go
@ -103,6 +104,7 @@ type ExecuteBatchRequest struct {
QueryBatch QueryList
EffectiveCallerID *CallerID
ImmediateCallerID *VTGateCallerID
Target *Target
}
//go:generate bsongen -file $GOFILE -type QueryList -o query_list_bson.go
@ -145,6 +147,7 @@ type SplitQueryRequest struct {
SessionID int64
EffectiveCallerID *CallerID
ImmediateCallerID *VTGateCallerID
Target *Target
}
// QuerySplit represents a split of SplitQueryRequest.Query. RowCount is only

Просмотреть файл

@ -1288,7 +1288,7 @@ func newTransaction(sqlQuery *SqlQuery) int64 {
TransactionId: 0,
}
txInfo := proto.TransactionInfo{TransactionId: 0}
err := sqlQuery.Begin(context.Background(), &session, &txInfo)
err := sqlQuery.Begin(context.Background(), sqlQuery.target, &session, &txInfo)
if err != nil {
panic(fmt.Errorf("failed to start a transaction: %v", err))
}
@ -1313,7 +1313,7 @@ func testCommitHelper(t *testing.T, sqlQuery *SqlQuery, queryExecutor *QueryExec
SessionId: sqlQuery.sessionID,
TransactionId: queryExecutor.transactionID,
}
if err := sqlQuery.Commit(queryExecutor.ctx, &session); err != nil {
if err := sqlQuery.Commit(queryExecutor.ctx, sqlQuery.target, &session); err != nil {
t.Fatalf("failed to commit transaction: %d, err: %v", queryExecutor.transactionID, err)
}
}

Просмотреть файл

@ -386,6 +386,7 @@ func (rqsc *realQueryServiceControl) BroadcastHealth(terTimestamp int64, stats *
func (rqsc *realQueryServiceControl) IsHealthy() error {
return rqsc.sqlQueryRPCService.Execute(
context.Background(),
nil,
&proto.Query{
Sql: "select 1 from dual",
SessionId: rqsc.sqlQueryRPCService.sessionID,

Просмотреть файл

@ -22,17 +22,17 @@ type QueryService interface {
GetSessionId(sessionParams *proto.SessionParams, sessionInfo *proto.SessionInfo) error
// Transaction management
Begin(ctx context.Context, session *proto.Session, txInfo *proto.TransactionInfo) error
Commit(ctx context.Context, session *proto.Session) error
Rollback(ctx context.Context, session *proto.Session) error
Begin(ctx context.Context, target *pb.Target, session *proto.Session, txInfo *proto.TransactionInfo) error
Commit(ctx context.Context, target *pb.Target, session *proto.Session) error
Rollback(ctx context.Context, target *pb.Target, session *proto.Session) error
// Query execution
Execute(ctx context.Context, query *proto.Query, reply *mproto.QueryResult) error
StreamExecute(ctx context.Context, query *proto.Query, sendReply func(*mproto.QueryResult) error) error
ExecuteBatch(ctx context.Context, queryList *proto.QueryList, reply *proto.QueryResultList) error
Execute(ctx context.Context, target *pb.Target, query *proto.Query, reply *mproto.QueryResult) error
StreamExecute(ctx context.Context, target *pb.Target, query *proto.Query, sendReply func(*mproto.QueryResult) error) error
ExecuteBatch(ctx context.Context, target *pb.Target, queryList *proto.QueryList, reply *proto.QueryResultList) error
// Map reduce helper
SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error
SplitQuery(ctx context.Context, target *pb.Target, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error
// StreamHealthRegister registers a listener for StreamHealth
StreamHealthRegister(chan<- *pb.StreamHealthResponse) (int, error)
@ -57,37 +57,37 @@ func (e *ErrorQueryService) GetSessionId(sessionParams *proto.SessionParams, ses
}
// Begin is part of QueryService interface
func (e *ErrorQueryService) Begin(ctx context.Context, session *proto.Session, txInfo *proto.TransactionInfo) error {
func (e *ErrorQueryService) Begin(ctx context.Context, target *pb.Target, session *proto.Session, txInfo *proto.TransactionInfo) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}
// Commit is part of QueryService interface
func (e *ErrorQueryService) Commit(ctx context.Context, session *proto.Session) error {
func (e *ErrorQueryService) Commit(ctx context.Context, target *pb.Target, session *proto.Session) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}
// Rollback is part of QueryService interface
func (e *ErrorQueryService) Rollback(ctx context.Context, session *proto.Session) error {
func (e *ErrorQueryService) Rollback(ctx context.Context, target *pb.Target, session *proto.Session) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}
// Execute is part of QueryService interface
func (e *ErrorQueryService) Execute(ctx context.Context, query *proto.Query, reply *mproto.QueryResult) error {
func (e *ErrorQueryService) Execute(ctx context.Context, target *pb.Target, query *proto.Query, reply *mproto.QueryResult) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}
// StreamExecute is part of QueryService interface
func (e *ErrorQueryService) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(*mproto.QueryResult) error) error {
func (e *ErrorQueryService) StreamExecute(ctx context.Context, target *pb.Target, query *proto.Query, sendReply func(*mproto.QueryResult) error) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}
// ExecuteBatch is part of QueryService interface
func (e *ErrorQueryService) ExecuteBatch(ctx context.Context, queryList *proto.QueryList, reply *proto.QueryResultList) error {
func (e *ErrorQueryService) ExecuteBatch(ctx context.Context, target *pb.Target, queryList *proto.QueryList, reply *proto.QueryResultList) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}
// SplitQuery is part of QueryService interface
func (e *ErrorQueryService) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error {
func (e *ErrorQueryService) SplitQuery(ctx context.Context, target *pb.Target, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}
@ -104,3 +104,6 @@ func (e *ErrorQueryService) StreamHealthUnregister(int) error {
// HandlePanic is part of QueryService interface
func (e *ErrorQueryService) HandlePanic(*error) {
}
// make sure ErrorQueryService implements QueryService
var _ QueryService = &ErrorQueryService{}

Просмотреть файл

@ -282,12 +282,12 @@ func (sq *SqlQuery) GetSessionId(sessionParams *proto.SessionParams, sessionInfo
}
// Begin starts a new transaction. This is allowed only if the state is StateServing.
func (sq *SqlQuery) Begin(ctx context.Context, session *proto.Session, txInfo *proto.TransactionInfo) (err error) {
func (sq *SqlQuery) Begin(ctx context.Context, target *pb.Target, session *proto.Session, txInfo *proto.TransactionInfo) (err error) {
logStats := newSqlQueryStats("Begin", ctx)
logStats.OriginalSql = "begin"
defer handleError(&err, logStats, sq.qe.queryServiceStats)
if err = sq.startRequest(nil, session.SessionId, false, false); err != nil {
if err = sq.startRequest(target, session.SessionId, false, false); err != nil {
return err
}
ctx, cancel := withTimeout(ctx, sq.qe.txPool.PoolTimeout())
@ -303,13 +303,13 @@ func (sq *SqlQuery) Begin(ctx context.Context, session *proto.Session, txInfo *p
}
// Commit commits the specified transaction.
func (sq *SqlQuery) Commit(ctx context.Context, session *proto.Session) (err error) {
func (sq *SqlQuery) Commit(ctx context.Context, target *pb.Target, session *proto.Session) (err error) {
logStats := newSqlQueryStats("Commit", ctx)
logStats.OriginalSql = "commit"
logStats.TransactionID = session.TransactionId
defer handleError(&err, logStats, sq.qe.queryServiceStats)
if err = sq.startRequest(nil, session.SessionId, false, true); err != nil {
if err = sq.startRequest(target, session.SessionId, false, true); err != nil {
return err
}
ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get())
@ -324,13 +324,13 @@ func (sq *SqlQuery) Commit(ctx context.Context, session *proto.Session) (err err
}
// Rollback rollsback the specified transaction.
func (sq *SqlQuery) Rollback(ctx context.Context, session *proto.Session) (err error) {
func (sq *SqlQuery) Rollback(ctx context.Context, target *pb.Target, session *proto.Session) (err error) {
logStats := newSqlQueryStats("Rollback", ctx)
logStats.OriginalSql = "rollback"
logStats.TransactionID = session.TransactionId
defer handleError(&err, logStats, sq.qe.queryServiceStats)
if err = sq.startRequest(nil, session.SessionId, false, true); err != nil {
if err = sq.startRequest(target, session.SessionId, false, true); err != nil {
return err
}
ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get())
@ -382,12 +382,12 @@ func (sq *SqlQuery) handleExecErrorNoPanic(query *proto.Query, err interface{},
}
// Execute executes the query and returns the result as response.
func (sq *SqlQuery) Execute(ctx context.Context, query *proto.Query, reply *mproto.QueryResult) (err error) {
func (sq *SqlQuery) Execute(ctx context.Context, target *pb.Target, query *proto.Query, reply *mproto.QueryResult) (err error) {
logStats := newSqlQueryStats("Execute", ctx)
defer sq.handleExecError(query, &err, logStats)
allowShutdown := (query.TransactionId != 0)
if err = sq.startRequest(nil, query.SessionId, false, allowShutdown); err != nil {
if err = sq.startRequest(target, query.SessionId, false, allowShutdown); err != nil {
return err
}
ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get())
@ -420,7 +420,7 @@ func (sq *SqlQuery) Execute(ctx context.Context, query *proto.Query, reply *mpro
// StreamExecute executes the query and streams the result.
// The first QueryResult will have Fields set (and Rows nil).
// The subsequent QueryResult will have Rows set (and Fields nil).
func (sq *SqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(*mproto.QueryResult) error) (err error) {
func (sq *SqlQuery) StreamExecute(ctx context.Context, target *pb.Target, query *proto.Query, sendReply func(*mproto.QueryResult) error) (err error) {
// check cases we don't handle yet
if query.TransactionId != 0 {
return NewTabletError(ErrFail, "Transactions not supported with streaming")
@ -429,7 +429,7 @@ func (sq *SqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendR
logStats := newSqlQueryStats("StreamExecute", ctx)
defer sq.handleExecError(query, &err, logStats)
if err = sq.startRequest(nil, query.SessionId, false, false); err != nil {
if err = sq.startRequest(target, query.SessionId, false, false); err != nil {
return err
}
defer sq.endRequest()
@ -458,7 +458,7 @@ func (sq *SqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendR
// ExecuteBatch can be called for an existing transaction, or it can be called with
// the AsTransaction flag which will execute all statements inside an independent
// transaction. If AsTransaction is true, TransactionId must be 0.
func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList, reply *proto.QueryResultList) (err error) {
func (sq *SqlQuery) ExecuteBatch(ctx context.Context, target *pb.Target, queryList *proto.QueryList, reply *proto.QueryResultList) (err error) {
if len(queryList.Queries) == 0 {
return NewTabletError(ErrFail, "Empty query list")
}
@ -467,7 +467,7 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList
}
allowShutdown := (queryList.TransactionId != 0)
if err = sq.startRequest(nil, queryList.SessionId, false, allowShutdown); err != nil {
if err = sq.startRequest(target, queryList.SessionId, false, allowShutdown); err != nil {
return err
}
defer sq.endRequest()
@ -479,7 +479,7 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList
}
if queryList.AsTransaction {
var txInfo proto.TransactionInfo
if err = sq.Begin(ctx, &session, &txInfo); err != nil {
if err = sq.Begin(ctx, target, &session, &txInfo); err != nil {
return err
}
session.TransactionId = txInfo.TransactionId
@ -487,7 +487,7 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList
// that there was an error, roll it back.
defer func() {
if session.TransactionId != 0 {
sq.Rollback(ctx, &session)
sq.Rollback(ctx, target, &session)
}
}()
}
@ -500,13 +500,13 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList
SessionId: session.SessionId,
}
var localReply mproto.QueryResult
if err = sq.Execute(ctx, &query, &localReply); err != nil {
if err = sq.Execute(ctx, target, &query, &localReply); err != nil {
return err
}
reply.List = append(reply.List, localReply)
}
if queryList.AsTransaction {
if err = sq.Commit(ctx, &session); err != nil {
if err = sq.Commit(ctx, target, &session); err != nil {
session.TransactionId = 0
return err
}
@ -516,10 +516,10 @@ func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList
}
// SplitQuery splits a BoundQuery into smaller queries that return a subset of rows from the original query.
func (sq *SqlQuery) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) (err error) {
func (sq *SqlQuery) SplitQuery(ctx context.Context, target *pb.Target, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) (err error) {
logStats := newSqlQueryStats("SplitQuery", ctx)
defer handleError(&err, logStats, sq.qe.queryServiceStats)
if err = sq.startRequest(nil, req.SessionID, false, false); err != nil {
if err = sq.startRequest(target, req.SessionID, false, false); err != nil {
return err
}
ctx, cancel := withTimeout(ctx, sq.qe.queryTimeout.Get())

Просмотреть файл

@ -212,15 +212,15 @@ func TestSqlQueryCommandFailUnMatchedSessionId(t *testing.T) {
TransactionId: 0,
}
txInfo := proto.TransactionInfo{TransactionId: 0}
if err = sqlQuery.Begin(ctx, &session, &txInfo); err == nil {
if err = sqlQuery.Begin(ctx, nil, &session, &txInfo); err == nil {
t.Fatalf("call SqlQuery.Begin should fail because of an invalid session id: 0")
}
if err = sqlQuery.Commit(ctx, &session); err == nil {
if err = sqlQuery.Commit(ctx, nil, &session); err == nil {
t.Fatalf("call SqlQuery.Commit should fail because of an invalid session id: 0")
}
if err = sqlQuery.Rollback(ctx, &session); err == nil {
if err = sqlQuery.Rollback(ctx, nil, &session); err == nil {
t.Fatalf("call SqlQuery.Rollback should fail because of an invalid session id: 0")
}
@ -231,12 +231,12 @@ func TestSqlQueryCommandFailUnMatchedSessionId(t *testing.T) {
TransactionId: session.TransactionId,
}
reply := mproto.QueryResult{}
if err := sqlQuery.Execute(ctx, &query, &reply); err == nil {
if err := sqlQuery.Execute(ctx, nil, &query, &reply); err == nil {
t.Fatalf("call SqlQuery.Execute should fail because of an invalid session id: 0")
}
streamSendReply := func(*mproto.QueryResult) error { return nil }
if err = sqlQuery.StreamExecute(ctx, &query, streamSendReply); err == nil {
if err = sqlQuery.StreamExecute(ctx, nil, &query, streamSendReply); err == nil {
t.Fatalf("call SqlQuery.StreamExecute should fail because of an invalid session id: 0")
}
@ -256,7 +256,7 @@ func TestSqlQueryCommandFailUnMatchedSessionId(t *testing.T) {
mproto.QueryResult{},
},
}
if err = sqlQuery.ExecuteBatch(ctx, &batchQuery, &batchReply); err == nil {
if err = sqlQuery.ExecuteBatch(ctx, nil, &batchQuery, &batchReply); err == nil {
t.Fatalf("call SqlQuery.ExecuteBatch should fail because of an invalid session id: 0")
}
@ -280,7 +280,7 @@ func TestSqlQueryCommandFailUnMatchedSessionId(t *testing.T) {
},
},
}
if err = sqlQuery.SplitQuery(ctx, &splitQuery, &splitQueryReply); err == nil {
if err = sqlQuery.SplitQuery(ctx, nil, &splitQuery, &splitQueryReply); err == nil {
t.Fatalf("call SqlQuery.SplitQuery should fail because of an invalid session id: 0")
}
}
@ -311,7 +311,7 @@ func TestSqlQueryCommitTransaciton(t *testing.T) {
TransactionId: 0,
}
txInfo := proto.TransactionInfo{TransactionId: 0}
if err = sqlQuery.Begin(ctx, &session, &txInfo); err != nil {
if err = sqlQuery.Begin(ctx, nil, &session, &txInfo); err != nil {
t.Fatalf("call SqlQuery.Begin failed")
}
session.TransactionId = txInfo.TransactionId
@ -322,10 +322,10 @@ func TestSqlQueryCommitTransaciton(t *testing.T) {
TransactionId: session.TransactionId,
}
reply := mproto.QueryResult{}
if err := sqlQuery.Execute(ctx, &query, &reply); err != nil {
if err := sqlQuery.Execute(ctx, nil, &query, &reply); err != nil {
t.Fatalf("failed to execute query: %s", query.Sql)
}
if err := sqlQuery.Commit(ctx, &session); err != nil {
if err := sqlQuery.Commit(ctx, nil, &session); err != nil {
t.Fatalf("call SqlQuery.Commit failed")
}
}
@ -356,7 +356,7 @@ func TestSqlQueryRollback(t *testing.T) {
TransactionId: 0,
}
txInfo := proto.TransactionInfo{TransactionId: 0}
if err = sqlQuery.Begin(ctx, &session, &txInfo); err != nil {
if err = sqlQuery.Begin(ctx, nil, &session, &txInfo); err != nil {
t.Fatalf("call SqlQuery.Begin failed")
}
session.TransactionId = txInfo.TransactionId
@ -367,10 +367,10 @@ func TestSqlQueryRollback(t *testing.T) {
TransactionId: session.TransactionId,
}
reply := mproto.QueryResult{}
if err := sqlQuery.Execute(ctx, &query, &reply); err != nil {
if err := sqlQuery.Execute(ctx, nil, &query, &reply); err != nil {
t.Fatalf("failed to execute query: %s", query.Sql)
}
if err := sqlQuery.Rollback(ctx, &session); err != nil {
if err := sqlQuery.Rollback(ctx, nil, &session); err != nil {
t.Fatalf("call SqlQuery.Rollback failed")
}
}
@ -402,7 +402,7 @@ func TestSqlQueryStreamExecute(t *testing.T) {
TransactionId: 0,
}
txInfo := proto.TransactionInfo{TransactionId: 0}
if err = sqlQuery.Begin(ctx, &session, &txInfo); err != nil {
if err = sqlQuery.Begin(ctx, nil, &session, &txInfo); err != nil {
t.Fatalf("call SqlQuery.Begin failed")
}
session.TransactionId = txInfo.TransactionId
@ -413,14 +413,14 @@ func TestSqlQueryStreamExecute(t *testing.T) {
TransactionId: session.TransactionId,
}
sendReply := func(*mproto.QueryResult) error { return nil }
if err := sqlQuery.StreamExecute(ctx, &query, sendReply); err == nil {
if err := sqlQuery.StreamExecute(ctx, nil, &query, sendReply); err == nil {
t.Fatalf("SqlQuery.StreamExecute should fail: %s", query.Sql)
}
if err := sqlQuery.Rollback(ctx, &session); err != nil {
if err := sqlQuery.Rollback(ctx, nil, &session); err != nil {
t.Fatalf("call SqlQuery.Rollback failed")
}
query.TransactionId = 0
if err := sqlQuery.StreamExecute(ctx, &query, sendReply); err != nil {
if err := sqlQuery.StreamExecute(ctx, nil, &query, sendReply); err != nil {
t.Fatalf("SqlQuery.StreamExecute should success: %s, but get error: %v",
query.Sql, err)
}
@ -462,7 +462,7 @@ func TestSqlQueryExecuteBatch(t *testing.T) {
mproto.QueryResult{},
},
}
if err := sqlQuery.ExecuteBatch(ctx, &query, &reply); err != nil {
if err := sqlQuery.ExecuteBatch(ctx, nil, &query, &reply); err != nil {
t.Fatalf("SqlQuery.ExecuteBatch should success: %v, but get error: %v",
query, err)
}
@ -488,7 +488,7 @@ func TestSqlQueryExecuteBatchFailEmptyQueryList(t *testing.T) {
reply := proto.QueryResultList{
List: []mproto.QueryResult{},
}
err = sqlQuery.ExecuteBatch(ctx, &query, &reply)
err = sqlQuery.ExecuteBatch(ctx, nil, &query, &reply)
verifyTabletError(t, err, ErrFail)
}
@ -519,7 +519,7 @@ func TestSqlQueryExecuteBatchFailAsTransaction(t *testing.T) {
reply := proto.QueryResultList{
List: []mproto.QueryResult{},
}
err = sqlQuery.ExecuteBatch(ctx, &query, &reply)
err = sqlQuery.ExecuteBatch(ctx, nil, &query, &reply)
verifyTabletError(t, err, ErrFail)
}
@ -552,7 +552,7 @@ func TestSqlQueryExecuteBatchBeginFail(t *testing.T) {
mproto.QueryResult{},
},
}
if err := sqlQuery.ExecuteBatch(ctx, &query, &reply); err == nil {
if err := sqlQuery.ExecuteBatch(ctx, nil, &query, &reply); err == nil {
t.Fatalf("SqlQuery.ExecuteBatch should fail")
}
}
@ -591,7 +591,7 @@ func TestSqlQueryExecuteBatchCommitFail(t *testing.T) {
mproto.QueryResult{},
},
}
if err := sqlQuery.ExecuteBatch(ctx, &query, &reply); err == nil {
if err := sqlQuery.ExecuteBatch(ctx, nil, &query, &reply); err == nil {
t.Fatalf("SqlQuery.ExecuteBatch should fail")
}
}
@ -642,7 +642,7 @@ func TestSqlQueryExecuteBatchSqlExecFailInTransaction(t *testing.T) {
t.Fatalf("rollback should not be executed.")
}
if err := sqlQuery.ExecuteBatch(ctx, &query, &reply); err == nil {
if err := sqlQuery.ExecuteBatch(ctx, nil, &query, &reply); err == nil {
t.Fatalf("SqlQuery.ExecuteBatch should fail")
}
@ -689,7 +689,7 @@ func TestSqlQueryExecuteBatchSqlSucceedInTransaction(t *testing.T) {
*sqlResult,
},
}
if err := sqlQuery.ExecuteBatch(ctx, &query, &reply); err != nil {
if err := sqlQuery.ExecuteBatch(ctx, nil, &query, &reply); err != nil {
t.Fatalf("SqlQuery.ExecuteBatch should succeed")
}
}
@ -721,7 +721,7 @@ func TestSqlQueryExecuteBatchCallCommitWithoutABegin(t *testing.T) {
mproto.QueryResult{},
},
}
if err := sqlQuery.ExecuteBatch(ctx, &query, &reply); err == nil {
if err := sqlQuery.ExecuteBatch(ctx, nil, &query, &reply); err == nil {
t.Fatalf("SqlQuery.ExecuteBatch should fail")
}
}
@ -779,7 +779,7 @@ func TestExecuteBatchNestedTransaction(t *testing.T) {
mproto.QueryResult{},
},
}
if err := sqlQuery.ExecuteBatch(ctx, &query, &reply); err == nil {
if err := sqlQuery.ExecuteBatch(ctx, nil, &query, &reply); err == nil {
t.Fatalf("SqlQuery.Execute should fail because of nested transaction")
}
sqlQuery.qe.txPool.SetTimeout(10)
@ -830,7 +830,7 @@ func TestSqlQuerySplitQuery(t *testing.T) {
},
},
}
if err := sqlQuery.SplitQuery(ctx, &query, &reply); err != nil {
if err := sqlQuery.SplitQuery(ctx, nil, &query, &reply); err != nil {
t.Fatalf("SqlQuery.SplitQuery should success: %v, but get error: %v",
query, err)
}
@ -869,7 +869,7 @@ func TestSqlQuerySplitQueryInvalidQuery(t *testing.T) {
},
},
}
if err := sqlQuery.SplitQuery(ctx, &query, &reply); err == nil {
if err := sqlQuery.SplitQuery(ctx, nil, &query, &reply); err == nil {
t.Fatalf("SqlQuery.SplitQuery should fail")
}
}
@ -922,7 +922,7 @@ func TestSqlQuerySplitQueryInvalidMinMax(t *testing.T) {
},
},
}
if err := sqlQuery.SplitQuery(ctx, &query, &reply); err == nil {
if err := sqlQuery.SplitQuery(ctx, nil, &query, &reply); err == nil {
t.Fatalf("SqlQuery.SplitQuery should fail")
}
}

Просмотреть файл

@ -58,9 +58,13 @@ func (e OperationalError) Error() string { return string(e) }
// TabletDialer represents a function that will return a TabletConn
// object that can communicate with a tablet.
// If both keyspace and shard are empty, we will not ask for a sessionId
// (and assume we're using the target field for the queries).
type TabletDialer func(ctx context.Context, endPoint *pbt.EndPoint, keyspace, shard string, timeout time.Duration) (TabletConn, error)
//
// We support two modes of operation:
// 1 - using GetSessionId (right after dialing) to get a sessionId.
// 2 - using Target with each call (and never calling GetSessionId).
// If tabletType is set to UNKNOWN, we'll use mode 1.
// Mode 1 is being deprecated.
type TabletDialer func(ctx context.Context, endPoint *pbt.EndPoint, keyspace, shard string, tabletType pbt.TabletType, timeout time.Duration) (TabletConn, error)
// TabletConn defines the interface for a vttablet client. It should
// not be concurrently used across goroutines.
@ -94,6 +98,11 @@ type TabletConn interface {
// Close must be called for releasing resources.
Close()
// SetTarget can be called to change the target used for
// subsequent calls. Can only be called if tabletType was not
// set to UNKNOWN in TabletDialer.
SetTarget(keyspace, shard string, tabletType pbt.TabletType) error
// GetEndPoint returns the end point info.
EndPoint() *pbt.EndPoint

Просмотреть файл

@ -11,6 +11,7 @@ import (
"reflect"
"strings"
"testing"
"time"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqltypes"
@ -44,6 +45,9 @@ const TestKeyspace = "test_keyspace"
// TestShard is the Shard we use for this test
const TestShard = "test_shard"
// TestTabletType is the TabletType we use for this test
const TestTabletType = pbt.TabletType_UNKNOWN
const testAsTransaction bool = true
const testSessionID int64 = 5678
@ -83,7 +87,7 @@ func (f *FakeQueryService) GetSessionId(sessionParams *proto.SessionParams, sess
}
// Begin is part of the queryservice.QueryService interface
func (f *FakeQueryService) Begin(ctx context.Context, session *proto.Session, txInfo *proto.TransactionInfo) error {
func (f *FakeQueryService) Begin(ctx context.Context, target *pb.Target, session *proto.Session, txInfo *proto.TransactionInfo) error {
if f.hasError {
return testTabletError
}
@ -157,7 +161,7 @@ func testBegin2Panics(t *testing.T, conn tabletconn.TabletConn) {
}
// Commit is part of the queryservice.QueryService interface
func (f *FakeQueryService) Commit(ctx context.Context, session *proto.Session) error {
func (f *FakeQueryService) Commit(ctx context.Context, target *pb.Target, session *proto.Session) error {
if f.hasError {
return testTabletError
}
@ -224,7 +228,7 @@ func testCommit2Panics(t *testing.T, conn tabletconn.TabletConn) {
}
// Rollback is part of the queryservice.QueryService interface
func (f *FakeQueryService) Rollback(ctx context.Context, session *proto.Session) error {
func (f *FakeQueryService) Rollback(ctx context.Context, target *pb.Target, session *proto.Session) error {
if f.hasError {
return testTabletError
}
@ -291,7 +295,7 @@ func testRollback2Panics(t *testing.T, conn tabletconn.TabletConn) {
}
// Execute is part of the queryservice.QueryService interface
func (f *FakeQueryService) Execute(ctx context.Context, query *proto.Query, reply *mproto.QueryResult) error {
func (f *FakeQueryService) Execute(ctx context.Context, target *pb.Target, query *proto.Query, reply *mproto.QueryResult) error {
if f.hasError {
return testTabletError
}
@ -405,7 +409,7 @@ var panicWait chan struct{}
var errorWait chan struct{}
// StreamExecute is part of the queryservice.QueryService interface
func (f *FakeQueryService) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(*mproto.QueryResult) error) error {
func (f *FakeQueryService) StreamExecute(ctx context.Context, target *pb.Target, query *proto.Query, sendReply func(*mproto.QueryResult) error) error {
if f.panics && f.streamExecutePanicsEarly {
panic(fmt.Errorf("test-triggered panic early"))
}
@ -705,7 +709,7 @@ func testStreamExecute2Panics(t *testing.T, conn tabletconn.TabletConn, fake *Fa
}
// ExecuteBatch is part of the queryservice.QueryService interface
func (f *FakeQueryService) ExecuteBatch(ctx context.Context, queryList *proto.QueryList, reply *proto.QueryResultList) error {
func (f *FakeQueryService) ExecuteBatch(ctx context.Context, target *pb.Target, queryList *proto.QueryList, reply *proto.QueryResultList) error {
if f.hasError {
return testTabletError
}
@ -839,7 +843,7 @@ func testExecuteBatch2Panics(t *testing.T, conn tabletconn.TabletConn) {
}
// SplitQuery is part of the queryservice.QueryService interface
func (f *FakeQueryService) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error {
func (f *FakeQueryService) SplitQuery(ctx context.Context, target *pb.Target, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error {
if f.hasError {
return testTabletError
}
@ -1013,7 +1017,19 @@ func CreateFakeServer(t *testing.T) *FakeQueryService {
}
// TestSuite runs all the tests
func TestSuite(t *testing.T, conn tabletconn.TabletConn, fake *FakeQueryService) {
func TestSuite(t *testing.T, protocol string, endPoint *pbt.EndPoint, fake *FakeQueryService) {
// make sure we use the right client
*tabletconn.TabletProtocol = protocol
// create a connection
ctx := context.Background()
conn, err := tabletconn.GetDialer()(ctx, endPoint, TestKeyspace, TestShard, TestTabletType, 30*time.Second)
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn.Close()
// run the normal tests
testBegin(t, conn)
testBegin2(t, conn)
testCommit(t, conn)

Просмотреть файл

@ -18,6 +18,8 @@ import (
"github.com/youtube/vitess/go/vt/vtgate/vtgateconn"
"github.com/youtube/vitess/go/vt/wrangler"
"golang.org/x/net/context"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// This file contains the query command group for vtctl.
@ -48,7 +50,7 @@ func init() {
addCommand(queriesGroupName, command{
"VtTabletExecute",
commandVtTabletExecute,
"[-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-transaction_id <transaction_id>] -keyspace <keyspace> -shard <shard> <tablet alias> <sql>",
"[-bind_variables <JSON map>] [-connect_timeout <connect timeout>] [-transaction_id <transaction_id>] [-tablet_type <tablet_type>] -keyspace <keyspace> -shard <shard> <tablet alias> <sql>",
"Executes the given query on the given tablet."})
addCommand(queriesGroupName, command{
"VtTabletStreamHealth",
@ -196,6 +198,7 @@ func commandVtTabletExecute(ctx context.Context, wr *wrangler.Wrangler, subFlags
bindVariables := newBindvars(subFlags)
keyspace := subFlags.String("keyspace", "", "keyspace the tablet belongs to")
shard := subFlags.String("shard", "", "shard the tablet belongs to")
tabletType := subFlags.String("tablet_type", "unknown", "tablet type we expect from the tablet (use unknown to use sessionId)")
connectTimeout := subFlags.Duration("connect_timeout", 30*time.Second, "Connection timeout for vttablet client")
if err := subFlags.Parse(args); err != nil {
return err
@ -203,6 +206,10 @@ func commandVtTabletExecute(ctx context.Context, wr *wrangler.Wrangler, subFlags
if subFlags.NArg() != 2 {
return fmt.Errorf("the <tablet_alis> and <sql> arguments are required for the VtTabletExecute command")
}
tt, err := parseTabletType3(*tabletType)
if err != nil {
return err
}
tabletAlias, err := topo.ParseTabletAliasString(subFlags.Arg(0))
if err != nil {
return err
@ -216,8 +223,7 @@ func commandVtTabletExecute(ctx context.Context, wr *wrangler.Wrangler, subFlags
return fmt.Errorf("cannot get EndPoint from tablet record: %v", err)
}
// pass in empty keyspace and shard to not ask for sessionId
conn, err := tabletconn.GetDialer()(ctx, ep, *keyspace, *shard, *connectTimeout)
conn, err := tabletconn.GetDialer()(ctx, ep, *keyspace, *shard, tt, *connectTimeout)
if err != nil {
return fmt.Errorf("cannot connect to tablet %v: %v", tabletAlias, err)
}
@ -254,8 +260,8 @@ func commandVtTabletStreamHealth(ctx context.Context, wr *wrangler.Wrangler, sub
return fmt.Errorf("cannot get EndPoint from tablet record: %v", err)
}
// pass in empty keyspace and shard to not ask for sessionId
conn, err := tabletconn.GetDialer()(ctx, ep, "", "", *connectTimeout)
// pass in a non-UNKNOWN tablet type to not use sessionId
conn, err := tabletconn.GetDialer()(ctx, ep, "", "", pb.TabletType_MASTER, *connectTimeout)
if err != nil {
return fmt.Errorf("cannot connect to tablet %v: %v", tabletAlias, err)
}

Просмотреть файл

@ -290,7 +290,7 @@ func (sct *sandboxTopo) GetEndPoints(ctx context.Context, cell, keyspace, shard
return ep, -1, nil
}
func sandboxDialer(ctx context.Context, endPoint *pbt.EndPoint, keyspace, shard string, timeout time.Duration) (tabletconn.TabletConn, error) {
func sandboxDialer(ctx context.Context, endPoint *pbt.EndPoint, keyspace, shard string, tabletType pbt.TabletType, timeout time.Duration) (tabletconn.TabletConn, error) {
sand := getSandbox(keyspace)
sand.sandmu.Lock()
defer sand.sandmu.Unlock()
@ -524,6 +524,10 @@ func (sbc *sandboxConn) Close() {
sbc.CloseCount.Add(1)
}
func (sbc *sandboxConn) SetTarget(keyspace, shard string, tabletType pbt.TabletType) error {
return fmt.Errorf("not implemented, vtgate doesn't use target yet")
}
func (sbc *sandboxConn) EndPoint() *pbt.EndPoint {
return sbc.endPoint
}

Просмотреть файл

@ -297,7 +297,7 @@ func (sdc *ShardConn) getNewConn(ctx context.Context) (conn tabletconn.TabletCon
allErrors := new(concurrency.AllErrorRecorder)
for _, endPoint := range endPoints {
perConnStartTime := time.Now()
conn, err = tabletconn.GetDialer()(ctx, endPoint, sdc.keyspace, sdc.shard, perConnTimeout)
conn, err = tabletconn.GetDialer()(ctx, endPoint, sdc.keyspace, sdc.shard, pb.TabletType_UNKNOWN, perConnTimeout)
if err == nil {
sdc.connectTimings.Record([]string{sdc.keyspace, sdc.shard, string(sdc.tabletType)}, perConnStartTime)
sdc.mu.Lock()

Просмотреть файл

@ -45,7 +45,8 @@ func NewQueryResultReaderForTablet(ctx context.Context, ts topo.Server, tabletAl
return nil, err
}
conn, err := tabletconn.GetDialer()(ctx, endPoint, tablet.Keyspace, tablet.Shard, *remoteActionsTimeout)
// use sessionId for now
conn, err := tabletconn.GetDialer()(ctx, endPoint, tablet.Keyspace, tablet.Shard, pb.TabletType_UNKNOWN, *remoteActionsTimeout)
if err != nil {
return nil, err
}

Просмотреть файл

@ -27,7 +27,8 @@ import (
"github.com/youtube/vitess/go/vt/zktopo"
"golang.org/x/net/context"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
pb "github.com/youtube/vitess/go/vt/proto/query"
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
)
// testQueryService is a local QueryService implementation to support the tests
@ -36,7 +37,7 @@ type testQueryService struct {
t *testing.T
}
func (sq *testQueryService) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(reply *mproto.QueryResult) error) error {
func (sq *testQueryService) StreamExecute(ctx context.Context, target *pb.Target, query *proto.Query, sendReply func(reply *mproto.QueryResult) error) error {
// Custom parsing of the query we expect
min := 100
max := 200
@ -273,7 +274,7 @@ func testSplitClone(t *testing.T, strategy string) {
if err := topo.CreateShard(ctx, ts, "ks", "80-"); err != nil {
t.Fatalf("CreateShard(\"-80\") failed: %v", err)
}
if err := wr.SetKeyspaceShardingInfo(ctx, "ks", "keyspace_id", pb.KeyspaceIdType_UINT64, 4, false); err != nil {
if err := wr.SetKeyspaceShardingInfo(ctx, "ks", "keyspace_id", pbt.KeyspaceIdType_UINT64, 4, false); err != nil {
t.Fatalf("SetKeyspaceShardingInfo failed: %v", err)
}
if err := wr.RebuildKeyspaceGraph(ctx, "ks", nil, true); err != nil {

Просмотреть файл

@ -24,7 +24,8 @@ import (
"github.com/youtube/vitess/go/vt/zktopo"
"golang.org/x/net/context"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
pb "github.com/youtube/vitess/go/vt/proto/query"
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
)
// destinationSqlQuery is a local QueryService implementation to
@ -35,7 +36,7 @@ type destinationSqlQuery struct {
excludedTable string
}
func (sq *destinationSqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(reply *mproto.QueryResult) error) error {
func (sq *destinationSqlQuery) StreamExecute(ctx context.Context, target *pb.Target, query *proto.Query, sendReply func(reply *mproto.QueryResult) error) error {
if strings.Contains(query.Sql, sq.excludedTable) {
sq.t.Errorf("Split Diff operation on destination should skip the excluded table: %v query: %v", sq.excludedTable, query.Sql)
}
@ -91,7 +92,7 @@ type sourceSqlQuery struct {
excludedTable string
}
func (sq *sourceSqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(reply *mproto.QueryResult) error) error {
func (sq *sourceSqlQuery) StreamExecute(ctx context.Context, target *pb.Target, query *proto.Query, sendReply func(reply *mproto.QueryResult) error) error {
if strings.Contains(query.Sql, sq.excludedTable) {
sq.t.Errorf("Split Diff operation on source should skip the excluded table: %v query: %v", sq.excludedTable, query.Sql)
}
@ -176,7 +177,7 @@ func TestSplitDiff(t *testing.T) {
t.Fatalf("CreateShard(\"-80\") failed: %v", err)
}
wr.SetSourceShards(ctx, "ks", "-40", []topo.TabletAlias{sourceRdonly1.Tablet.Alias}, nil)
if err := wr.SetKeyspaceShardingInfo(ctx, "ks", "keyspace_id", pb.KeyspaceIdType_UINT64, 4, false); err != nil {
if err := wr.SetKeyspaceShardingInfo(ctx, "ks", "keyspace_id", pbt.KeyspaceIdType_UINT64, 4, false); err != nil {
t.Fatalf("SetKeyspaceShardingInfo failed: %v", err)
}
if err := wr.RebuildKeyspaceGraph(ctx, "ks", nil, true); err != nil {

Просмотреть файл

@ -22,6 +22,8 @@ import (
"github.com/youtube/vitess/go/vt/wrangler/testlib"
"github.com/youtube/vitess/go/vt/zktopo"
"golang.org/x/net/context"
pb "github.com/youtube/vitess/go/vt/proto/query"
)
// sqlDifferSqlQuery is a local QueryService implementation to support the tests
@ -30,7 +32,7 @@ type sqlDifferSqlQuery struct {
t *testing.T
}
func (sq *sqlDifferSqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(reply *mproto.QueryResult) error) error {
func (sq *sqlDifferSqlQuery) StreamExecute(ctx context.Context, target *pb.Target, query *proto.Query, sendReply func(reply *mproto.QueryResult) error) error {
sq.t.Logf("SqlDifferSqlQuery: got query: %v", *query)
// Send the headers

Просмотреть файл

@ -27,7 +27,8 @@ import (
"github.com/youtube/vitess/go/vt/zktopo"
"golang.org/x/net/context"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
pb "github.com/youtube/vitess/go/vt/proto/query"
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
)
// verticalSqlQuery is a local QueryService implementation to support the tests
@ -36,7 +37,7 @@ type verticalSqlQuery struct {
t *testing.T
}
func (sq *verticalSqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(reply *mproto.QueryResult) error) error {
func (sq *verticalSqlQuery) StreamExecute(ctx context.Context, target *pb.Target, query *proto.Query, sendReply func(reply *mproto.QueryResult) error) error {
// Custom parsing of the query we expect
min := 100
max := 200
@ -239,18 +240,18 @@ func testVerticalSplitClone(t *testing.T, strategy string) {
topo.TYPE_RDONLY, testlib.TabletKeyspaceShard(t, "source_ks", "0"))
// Create the destination keyspace with the appropriate ServedFromMap
ki := &pb.Keyspace{
ServedFroms: []*pb.Keyspace_ServedFrom{
&pb.Keyspace_ServedFrom{
TabletType: pb.TabletType_MASTER,
ki := &pbt.Keyspace{
ServedFroms: []*pbt.Keyspace_ServedFrom{
&pbt.Keyspace_ServedFrom{
TabletType: pbt.TabletType_MASTER,
Keyspace: "source_ks",
},
&pb.Keyspace_ServedFrom{
TabletType: pb.TabletType_REPLICA,
&pbt.Keyspace_ServedFrom{
TabletType: pbt.TabletType_REPLICA,
Keyspace: "source_ks",
},
&pb.Keyspace_ServedFrom{
TabletType: pb.TabletType_RDONLY,
&pbt.Keyspace_ServedFrom{
TabletType: pbt.TabletType_RDONLY,
Keyspace: "source_ks",
},
},

Просмотреть файл

@ -24,7 +24,8 @@ import (
"github.com/youtube/vitess/go/vt/zktopo"
"golang.org/x/net/context"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
pb "github.com/youtube/vitess/go/vt/proto/query"
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
)
// verticalDiffSqlQuery is a local QueryService implementation to
@ -35,7 +36,7 @@ type verticalDiffSqlQuery struct {
excludedTable string
}
func (sq *verticalDiffSqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(reply *mproto.QueryResult) error) error {
func (sq *verticalDiffSqlQuery) StreamExecute(ctx context.Context, target *pb.Target, query *proto.Query, sendReply func(reply *mproto.QueryResult) error) error {
if strings.Contains(query.Sql, sq.excludedTable) {
sq.t.Errorf("Vertical Split Diff operation should skip the excluded table: %v query: %v", sq.excludedTable, query.Sql)
}
@ -95,18 +96,18 @@ func TestVerticalSplitDiff(t *testing.T) {
topo.TYPE_RDONLY, testlib.TabletKeyspaceShard(t, "source_ks", "0"))
// Create the destination keyspace with the appropriate ServedFromMap
ki := &pb.Keyspace{
ServedFroms: []*pb.Keyspace_ServedFrom{
&pb.Keyspace_ServedFrom{
TabletType: pb.TabletType_MASTER,
ki := &pbt.Keyspace{
ServedFroms: []*pbt.Keyspace_ServedFrom{
&pbt.Keyspace_ServedFrom{
TabletType: pbt.TabletType_MASTER,
Keyspace: "source_ks",
},
&pb.Keyspace_ServedFrom{
TabletType: pb.TabletType_REPLICA,
&pbt.Keyspace_ServedFrom{
TabletType: pbt.TabletType_REPLICA,
Keyspace: "source_ks",
},
&pb.Keyspace_ServedFrom{
TabletType: pb.TabletType_RDONLY,
&pbt.Keyspace_ServedFrom{
TabletType: pbt.TabletType_RDONLY,
Keyspace: "source_ks",
},
},