зеркало из https://github.com/github/vitess-gh.git
Merge branch 'master' into resharding
This commit is contained in:
Коммит
812ef583bd
|
@ -281,7 +281,7 @@ func (s *Scheduler) EnqueueClusterOperation(ctx context.Context, req *pb.Enqueue
|
|||
clusterOp := NewClusterOperationInstance(clusterOpID, initialTask, &taskIDGenerator)
|
||||
|
||||
s.muOpList.Lock()
|
||||
s.activeClusterOperations[clusterOpID] = clusterOp
|
||||
s.activeClusterOperations[clusterOpID] = clusterOp.Clone()
|
||||
s.muOpList.Unlock()
|
||||
s.toBeScheduledClusterOperations <- clusterOp
|
||||
|
||||
|
|
|
@ -95,7 +95,7 @@ func (q *query) StreamExecute(request *pb.StreamExecuteRequest, stream pbs.Query
|
|||
request.EffectiveCallerId,
|
||||
request.ImmediateCallerId,
|
||||
)
|
||||
return q.server.StreamExecute(ctx, request.Target, &proto.Query{
|
||||
if err := q.server.StreamExecute(ctx, request.Target, &proto.Query{
|
||||
Sql: string(request.Query.Sql),
|
||||
BindVariables: proto.Proto3ToBindVariables(request.Query.BindVariables),
|
||||
SessionId: request.SessionId,
|
||||
|
@ -103,7 +103,10 @@ func (q *query) StreamExecute(request *pb.StreamExecuteRequest, stream pbs.Query
|
|||
return stream.Send(&pb.StreamExecuteResponse{
|
||||
Result: mproto.QueryResultToProto3(reply),
|
||||
})
|
||||
})
|
||||
}); err != nil {
|
||||
return grpc.Errorf(codes.Internal, "%v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Begin is part of the queryservice.QueryServer interface
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/sqltypes"
|
||||
"github.com/youtube/vitess/go/vt/callerid"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
|
||||
"golang.org/x/net/context"
|
||||
|
@ -28,9 +29,11 @@ import (
|
|||
// FakeQueryService has the server side of this fake
|
||||
type FakeQueryService struct {
|
||||
t *testing.T
|
||||
hasError bool
|
||||
panics bool
|
||||
streamExecutePanicsEarly bool
|
||||
panicWait chan struct{}
|
||||
errorWait chan struct{}
|
||||
|
||||
// if set, we will also check Target, ImmediateCallerId and EffectiveCallerId
|
||||
checkExtraFields bool
|
||||
|
@ -43,6 +46,28 @@ func (f *FakeQueryService) HandlePanic(err *error) {
|
|||
}
|
||||
}
|
||||
|
||||
var testTabletError = tabletserver.NewTabletError(tabletserver.ErrFail, "generic error")
|
||||
|
||||
const expectedErrMatch string = "error: generic error"
|
||||
|
||||
// Verifies the returned error has the properties that we expect.
|
||||
func verifyError(t *testing.T, err error, method string) {
|
||||
if err == nil {
|
||||
t.Errorf("%s was expecting an error, didn't get one", method)
|
||||
return
|
||||
}
|
||||
if se, ok := err.(*tabletconn.ServerError); ok {
|
||||
if se.Code != tabletconn.ERR_NORMAL {
|
||||
t.Errorf("Unexpected error code from %s: got %v, wanted %v", method, se.Code, tabletconn.ERR_NORMAL)
|
||||
}
|
||||
} else {
|
||||
t.Errorf("Unexpected error type from %s: got %v, wanted tabletconn.ServerError", method, reflect.TypeOf(err))
|
||||
}
|
||||
if !strings.Contains(err.Error(), expectedErrMatch) {
|
||||
t.Errorf("Unexpected error from %s: got %v, wanted err containing %v", method, err, expectedErrMatch)
|
||||
}
|
||||
}
|
||||
|
||||
// testTarget is the target we use for this test
|
||||
var testTarget = &pb.Target{
|
||||
Keyspace: "test_keyspace",
|
||||
|
@ -100,6 +125,9 @@ func (f *FakeQueryService) GetSessionId(sessionParams *proto.SessionParams, sess
|
|||
|
||||
// Begin is part of the queryservice.QueryService interface
|
||||
func (f *FakeQueryService) Begin(ctx context.Context, target *pb.Target, session *proto.Session, txInfo *proto.TransactionInfo) error {
|
||||
if f.hasError {
|
||||
return testTabletError
|
||||
}
|
||||
if f.panics {
|
||||
panic(fmt.Errorf("test-triggered panic"))
|
||||
}
|
||||
|
@ -131,6 +159,13 @@ func testBegin(t *testing.T, conn tabletconn.TabletConn) {
|
|||
}
|
||||
}
|
||||
|
||||
func testBeginError(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testBeginError")
|
||||
ctx := context.Background()
|
||||
_, err := conn.Begin(ctx)
|
||||
verifyError(t, err, "Begin")
|
||||
}
|
||||
|
||||
func testBeginPanics(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testBeginPanics")
|
||||
ctx := context.Background()
|
||||
|
@ -152,6 +187,13 @@ func testBegin2(t *testing.T, conn tabletconn.TabletConn) {
|
|||
}
|
||||
}
|
||||
|
||||
func testBegin2Error(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testBegin2Error")
|
||||
ctx := context.Background()
|
||||
_, err := conn.Begin2(ctx)
|
||||
verifyError(t, err, "Begin2")
|
||||
}
|
||||
|
||||
func testBegin2Panics(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testBegin2Panics")
|
||||
ctx := context.Background()
|
||||
|
@ -162,6 +204,9 @@ func testBegin2Panics(t *testing.T, conn tabletconn.TabletConn) {
|
|||
|
||||
// Commit is part of the queryservice.QueryService interface
|
||||
func (f *FakeQueryService) Commit(ctx context.Context, target *pb.Target, session *proto.Session) error {
|
||||
if f.hasError {
|
||||
return testTabletError
|
||||
}
|
||||
if f.panics {
|
||||
panic(fmt.Errorf("test-triggered panic"))
|
||||
}
|
||||
|
@ -189,6 +234,13 @@ func testCommit(t *testing.T, conn tabletconn.TabletConn) {
|
|||
}
|
||||
}
|
||||
|
||||
func testCommitError(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testCommitError")
|
||||
ctx := context.Background()
|
||||
err := conn.Commit(ctx, commitTransactionID)
|
||||
verifyError(t, err, "Commit")
|
||||
}
|
||||
|
||||
func testCommitPanics(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testCommitPanics")
|
||||
ctx := context.Background()
|
||||
|
@ -207,6 +259,13 @@ func testCommit2(t *testing.T, conn tabletconn.TabletConn) {
|
|||
}
|
||||
}
|
||||
|
||||
func testCommit2Error(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testCommit2Error")
|
||||
ctx := context.Background()
|
||||
err := conn.Commit2(ctx, commitTransactionID)
|
||||
verifyError(t, err, "Commit2")
|
||||
}
|
||||
|
||||
func testCommit2Panics(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testCommit2Panics")
|
||||
ctx := context.Background()
|
||||
|
@ -217,6 +276,9 @@ func testCommit2Panics(t *testing.T, conn tabletconn.TabletConn) {
|
|||
|
||||
// Rollback is part of the queryservice.QueryService interface
|
||||
func (f *FakeQueryService) Rollback(ctx context.Context, target *pb.Target, session *proto.Session) error {
|
||||
if f.hasError {
|
||||
return testTabletError
|
||||
}
|
||||
if f.panics {
|
||||
panic(fmt.Errorf("test-triggered panic"))
|
||||
}
|
||||
|
@ -244,6 +306,13 @@ func testRollback(t *testing.T, conn tabletconn.TabletConn) {
|
|||
}
|
||||
}
|
||||
|
||||
func testRollbackError(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testRollbackError")
|
||||
ctx := context.Background()
|
||||
err := conn.Rollback(ctx, commitTransactionID)
|
||||
verifyError(t, err, "Rollback")
|
||||
}
|
||||
|
||||
func testRollbackPanics(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testRollbackPanics")
|
||||
ctx := context.Background()
|
||||
|
@ -262,6 +331,13 @@ func testRollback2(t *testing.T, conn tabletconn.TabletConn) {
|
|||
}
|
||||
}
|
||||
|
||||
func testRollback2Error(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testRollback2Error")
|
||||
ctx := context.Background()
|
||||
err := conn.Rollback2(ctx, commitTransactionID)
|
||||
verifyError(t, err, "Rollback2")
|
||||
}
|
||||
|
||||
func testRollback2Panics(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testRollback2Panics")
|
||||
ctx := context.Background()
|
||||
|
@ -272,6 +348,9 @@ func testRollback2Panics(t *testing.T, conn tabletconn.TabletConn) {
|
|||
|
||||
// Execute is part of the queryservice.QueryService interface
|
||||
func (f *FakeQueryService) Execute(ctx context.Context, target *pb.Target, query *proto.Query, reply *mproto.QueryResult) error {
|
||||
if f.hasError {
|
||||
return testTabletError
|
||||
}
|
||||
if f.panics {
|
||||
panic(fmt.Errorf("test-triggered panic"))
|
||||
}
|
||||
|
@ -353,6 +432,20 @@ func testExecute2(t *testing.T, conn tabletconn.TabletConn) {
|
|||
}
|
||||
}
|
||||
|
||||
func testExecuteError(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testExecuteError")
|
||||
ctx := context.Background()
|
||||
_, err := conn.Execute(ctx, executeQuery, executeBindVars, executeTransactionID)
|
||||
verifyError(t, err, "Execute")
|
||||
}
|
||||
|
||||
func testExecute2Error(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testExecute2Error")
|
||||
ctx := context.Background()
|
||||
_, err := conn.Execute2(ctx, executeQuery, executeBindVars, executeTransactionID)
|
||||
verifyError(t, err, "Execute")
|
||||
}
|
||||
|
||||
func testExecutePanics(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testExecutePanics")
|
||||
ctx := context.Background()
|
||||
|
@ -396,6 +489,13 @@ func (f *FakeQueryService) StreamExecute(ctx context.Context, target *pb.Target,
|
|||
f.panicWait = make(chan struct{}) // for next test
|
||||
panic(fmt.Errorf("test-triggered panic late"))
|
||||
}
|
||||
if f.hasError {
|
||||
// wait until the client has the response, since all streaming implementation may not
|
||||
// send previous messages if an error has been triggered.
|
||||
<-f.errorWait
|
||||
f.errorWait = make(chan struct{}) // for next test
|
||||
return testTabletError
|
||||
}
|
||||
if err := sendReply(&streamExecuteQueryResult2); err != nil {
|
||||
f.t.Errorf("sendReply2 failed: %v", err)
|
||||
}
|
||||
|
@ -472,6 +572,34 @@ func testStreamExecute(t *testing.T, conn tabletconn.TabletConn) {
|
|||
}
|
||||
}
|
||||
|
||||
func testStreamExecuteError(t *testing.T, conn tabletconn.TabletConn, fake *FakeQueryService) {
|
||||
t.Log("testStreamExecuteError")
|
||||
ctx := context.Background()
|
||||
stream, errFunc, err := conn.StreamExecute(ctx, streamExecuteQuery, streamExecuteBindVars, streamExecuteTransactionID)
|
||||
if err != nil {
|
||||
t.Fatalf("StreamExecute failed: %v", err)
|
||||
}
|
||||
qr, ok := <-stream
|
||||
if !ok {
|
||||
t.Fatalf("StreamExecute failed: cannot read result1")
|
||||
}
|
||||
if len(qr.Rows) == 0 {
|
||||
qr.Rows = nil
|
||||
}
|
||||
if !reflect.DeepEqual(*qr, streamExecuteQueryResult1) {
|
||||
t.Errorf("Unexpected result1 from StreamExecute: got %v wanted %v", qr, streamExecuteQueryResult1)
|
||||
}
|
||||
// signal to the server that the first result has been received
|
||||
close(fake.errorWait)
|
||||
// After 1 result, we expect to get an error (no more results).
|
||||
qr, ok = <-stream
|
||||
if ok {
|
||||
t.Fatalf("StreamExecute channel wasn't closed")
|
||||
}
|
||||
err = errFunc()
|
||||
verifyError(t, err, "StreamExecute")
|
||||
}
|
||||
|
||||
func testStreamExecutePanics(t *testing.T, conn tabletconn.TabletConn, fake *FakeQueryService) {
|
||||
t.Log("testStreamExecutePanics")
|
||||
// early panic is before sending the Fields, that is returned
|
||||
|
@ -557,6 +685,34 @@ func testStreamExecute2(t *testing.T, conn tabletconn.TabletConn) {
|
|||
}
|
||||
}
|
||||
|
||||
func testStreamExecute2Error(t *testing.T, conn tabletconn.TabletConn, fake *FakeQueryService) {
|
||||
t.Log("testStreamExecute2Error")
|
||||
ctx := context.Background()
|
||||
stream, errFunc, err := conn.StreamExecute2(ctx, streamExecuteQuery, streamExecuteBindVars, streamExecuteTransactionID)
|
||||
if err != nil {
|
||||
t.Fatalf("StreamExecute2 failed: %v", err)
|
||||
}
|
||||
qr, ok := <-stream
|
||||
if !ok {
|
||||
t.Fatalf("StreamExecute2 failed: cannot read result1")
|
||||
}
|
||||
if len(qr.Rows) == 0 {
|
||||
qr.Rows = nil
|
||||
}
|
||||
if !reflect.DeepEqual(*qr, streamExecuteQueryResult1) {
|
||||
t.Errorf("Unexpected result1 from StreamExecute2: got %v wanted %v", qr, streamExecuteQueryResult1)
|
||||
}
|
||||
// signal to the server that the first result has been received
|
||||
close(fake.errorWait)
|
||||
// After 1 result, we expect to get an error (no more results).
|
||||
qr, ok = <-stream
|
||||
if ok {
|
||||
t.Fatalf("StreamExecute2 channel wasn't closed")
|
||||
}
|
||||
err = errFunc()
|
||||
verifyError(t, err, "StreamExecute2")
|
||||
}
|
||||
|
||||
func testStreamExecute2Panics(t *testing.T, conn tabletconn.TabletConn, fake *FakeQueryService) {
|
||||
t.Log("testStreamExecute2Panics")
|
||||
// early panic is before sending the Fields, that is returned
|
||||
|
@ -608,6 +764,9 @@ func testStreamExecute2Panics(t *testing.T, conn tabletconn.TabletConn, fake *Fa
|
|||
|
||||
// ExecuteBatch is part of the queryservice.QueryService interface
|
||||
func (f *FakeQueryService) ExecuteBatch(ctx context.Context, target *pb.Target, queryList *proto.QueryList, reply *proto.QueryResultList) error {
|
||||
if f.hasError {
|
||||
return testTabletError
|
||||
}
|
||||
if f.panics {
|
||||
panic(fmt.Errorf("test-triggered panic"))
|
||||
}
|
||||
|
@ -699,6 +858,13 @@ func testExecuteBatch(t *testing.T, conn tabletconn.TabletConn) {
|
|||
}
|
||||
}
|
||||
|
||||
func testExecuteBatchError(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testBatchExecuteError")
|
||||
ctx := context.Background()
|
||||
_, err := conn.ExecuteBatch(ctx, executeBatchQueries, true, executeBatchTransactionID)
|
||||
verifyError(t, err, "ExecuteBatch")
|
||||
}
|
||||
|
||||
func testExecuteBatchPanics(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testExecuteBatchPanics")
|
||||
ctx := context.Background()
|
||||
|
@ -720,6 +886,13 @@ func testExecuteBatch2(t *testing.T, conn tabletconn.TabletConn) {
|
|||
}
|
||||
}
|
||||
|
||||
func testExecuteBatch2Error(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testBatchExecute2Error")
|
||||
ctx := context.Background()
|
||||
_, err := conn.ExecuteBatch2(ctx, executeBatchQueries, true, executeBatchTransactionID)
|
||||
verifyError(t, err, "ExecuteBatch")
|
||||
}
|
||||
|
||||
func testExecuteBatch2Panics(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testExecuteBatch2Panics")
|
||||
ctx := context.Background()
|
||||
|
@ -730,6 +903,9 @@ func testExecuteBatch2Panics(t *testing.T, conn tabletconn.TabletConn) {
|
|||
|
||||
// SplitQuery is part of the queryservice.QueryService interface
|
||||
func (f *FakeQueryService) SplitQuery(ctx context.Context, target *pb.Target, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error {
|
||||
if f.hasError {
|
||||
return testTabletError
|
||||
}
|
||||
if f.panics {
|
||||
panic(fmt.Errorf("test-triggered panic"))
|
||||
}
|
||||
|
@ -785,6 +961,13 @@ func testSplitQuery(t *testing.T, conn tabletconn.TabletConn) {
|
|||
}
|
||||
}
|
||||
|
||||
func testSplitQueryError(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testSplitQueryError")
|
||||
ctx := context.Background()
|
||||
_, err := conn.SplitQuery(ctx, splitQueryBoundQuery, splitQuerySplitColumn, splitQuerySplitCount)
|
||||
verifyError(t, err, "SplitQuery")
|
||||
}
|
||||
|
||||
func testSplitQueryPanics(t *testing.T, conn tabletconn.TabletConn) {
|
||||
t.Log("testSplitQueryPanics")
|
||||
ctx := context.Background()
|
||||
|
@ -890,6 +1073,7 @@ func CreateFakeServer(t *testing.T) *FakeQueryService {
|
|||
panics: false,
|
||||
streamExecutePanicsEarly: false,
|
||||
panicWait: make(chan struct{}),
|
||||
errorWait: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -915,6 +1099,24 @@ func TestSuite(t *testing.T, protocol string, endPoint *pbt.EndPoint, fake *Fake
|
|||
testSplitQuery(t, conn)
|
||||
testStreamHealth(t, conn)
|
||||
|
||||
// fake should return an error, make sure errors are handled properly
|
||||
fake.hasError = true
|
||||
testBeginError(t, conn)
|
||||
testCommitError(t, conn)
|
||||
testRollbackError(t, conn)
|
||||
testExecuteError(t, conn)
|
||||
testStreamExecuteError(t, conn, fake)
|
||||
testExecuteBatchError(t, conn)
|
||||
testSplitQueryError(t, conn)
|
||||
|
||||
testBegin2Error(t, conn)
|
||||
testCommit2Error(t, conn)
|
||||
testRollback2Error(t, conn)
|
||||
testExecute2Error(t, conn)
|
||||
testStreamExecute2Error(t, conn, fake)
|
||||
testExecuteBatch2Error(t, conn)
|
||||
fake.hasError = false
|
||||
|
||||
// create a new connection that expects the extra fields
|
||||
conn.Close()
|
||||
conn, err = tabletconn.GetDialer()(ctx, endPoint, testTarget.Keyspace, testTarget.Shard, pbt.TabletType_REPLICA, 30*time.Second)
|
||||
|
|
|
@ -119,7 +119,7 @@ func (sdc *ShardConn) Dial(ctx context.Context) error {
|
|||
func (sdc *ShardConn) Execute(ctx context.Context, query string, bindVars map[string]interface{}, transactionID int64) (qr *mproto.QueryResult, err error) {
|
||||
err = sdc.withRetry(ctx, func(conn tabletconn.TabletConn) error {
|
||||
var innerErr error
|
||||
qr, innerErr = conn.Execute(ctx, query, bindVars, transactionID)
|
||||
qr, innerErr = conn.Execute2(ctx, query, bindVars, transactionID)
|
||||
return innerErr
|
||||
}, transactionID, false)
|
||||
return qr, err
|
||||
|
@ -129,7 +129,7 @@ func (sdc *ShardConn) Execute(ctx context.Context, query string, bindVars map[st
|
|||
func (sdc *ShardConn) ExecuteBatch(ctx context.Context, queries []tproto.BoundQuery, asTransaction bool, transactionID int64) (qrs *tproto.QueryResultList, err error) {
|
||||
err = sdc.withRetry(ctx, func(conn tabletconn.TabletConn) error {
|
||||
var innerErr error
|
||||
qrs, innerErr = conn.ExecuteBatch(ctx, queries, asTransaction, transactionID)
|
||||
qrs, innerErr = conn.ExecuteBatch2(ctx, queries, asTransaction, transactionID)
|
||||
return innerErr
|
||||
}, transactionID, false)
|
||||
return qrs, err
|
||||
|
@ -142,7 +142,7 @@ func (sdc *ShardConn) StreamExecute(ctx context.Context, query string, bindVars
|
|||
var results <-chan *mproto.QueryResult
|
||||
err := sdc.withRetry(ctx, func(conn tabletconn.TabletConn) error {
|
||||
var err error
|
||||
results, erFunc, err = conn.StreamExecute(ctx, query, bindVars, transactionID)
|
||||
results, erFunc, err = conn.StreamExecute2(ctx, query, bindVars, transactionID)
|
||||
usedConn = conn
|
||||
return err
|
||||
}, transactionID, true)
|
||||
|
@ -157,7 +157,7 @@ func (sdc *ShardConn) StreamExecute(ctx context.Context, query string, bindVars
|
|||
func (sdc *ShardConn) Begin(ctx context.Context) (transactionID int64, err error) {
|
||||
err = sdc.withRetry(ctx, func(conn tabletconn.TabletConn) error {
|
||||
var innerErr error
|
||||
transactionID, innerErr = conn.Begin(ctx)
|
||||
transactionID, innerErr = conn.Begin2(ctx)
|
||||
return innerErr
|
||||
}, 0, false)
|
||||
return transactionID, err
|
||||
|
@ -166,14 +166,14 @@ func (sdc *ShardConn) Begin(ctx context.Context) (transactionID int64, err error
|
|||
// Commit commits the current transaction. The retry rules are the same as Execute.
|
||||
func (sdc *ShardConn) Commit(ctx context.Context, transactionID int64) (err error) {
|
||||
return sdc.withRetry(ctx, func(conn tabletconn.TabletConn) error {
|
||||
return conn.Commit(ctx, transactionID)
|
||||
return conn.Commit2(ctx, transactionID)
|
||||
}, transactionID, false)
|
||||
}
|
||||
|
||||
// Rollback rolls back the current transaction. The retry rules are the same as Execute.
|
||||
func (sdc *ShardConn) Rollback(ctx context.Context, transactionID int64) (err error) {
|
||||
return sdc.withRetry(ctx, func(conn tabletconn.TabletConn) error {
|
||||
return conn.Rollback(ctx, transactionID)
|
||||
return conn.Rollback2(ctx, transactionID)
|
||||
}, transactionID, false)
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче