Add back unit tests for error handling in tabletconn, fix gRPC StreamExecute to return errors correctly

This commit is contained in:
Ammar Aijazi 2015-08-07 14:13:54 -07:00
Родитель 8fae3c7c30
Коммит 21945ddfb6
2 изменённых файлов: 207 добавлений и 2 удалений

Просмотреть файл

@ -95,7 +95,7 @@ func (q *query) StreamExecute(request *pb.StreamExecuteRequest, stream pbs.Query
request.EffectiveCallerId,
request.ImmediateCallerId,
)
return q.server.StreamExecute(ctx, request.Target, &proto.Query{
if err := q.server.StreamExecute(ctx, request.Target, &proto.Query{
Sql: string(request.Query.Sql),
BindVariables: proto.Proto3ToBindVariables(request.Query.BindVariables),
SessionId: request.SessionId,
@ -103,7 +103,10 @@ func (q *query) StreamExecute(request *pb.StreamExecuteRequest, stream pbs.Query
return stream.Send(&pb.StreamExecuteResponse{
Result: mproto.QueryResultToProto3(reply),
})
})
}); err != nil {
return grpc.Errorf(codes.Internal, "%v", err)
}
return nil
}
// Begin is part of the queryservice.QueryServer interface

Просмотреть файл

@ -16,6 +16,7 @@ import (
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/callerid"
"github.com/youtube/vitess/go/vt/tabletserver"
"github.com/youtube/vitess/go/vt/tabletserver/proto"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
"golang.org/x/net/context"
@ -28,9 +29,11 @@ import (
// FakeQueryService has the server side of this fake
type FakeQueryService struct {
t *testing.T
hasError bool
panics bool
streamExecutePanicsEarly bool
panicWait chan struct{}
errorWait chan struct{}
// if set, we will also check Target, ImmediateCallerId and EffectiveCallerId
checkExtraFields bool
@ -43,6 +46,28 @@ func (f *FakeQueryService) HandlePanic(err *error) {
}
}
var testTabletError = tabletserver.NewTabletError(tabletserver.ErrFail, "generic error")
const expectedErrMatch string = "error: generic error"
// Verifies the returned error has the properties that we expect.
func verifyError(t *testing.T, err error, method string) {
if err == nil {
t.Errorf("%s was expecting an error, didn't get one", method)
return
}
if se, ok := err.(*tabletconn.ServerError); ok {
if se.Code != tabletconn.ERR_NORMAL {
t.Errorf("Unexpected error code from %s: got %v, wanted %v", method, se.Code, tabletconn.ERR_NORMAL)
}
} else {
t.Errorf("Unexpected error type from %s: got %v, wanted tabletconn.ServerError", method, reflect.TypeOf(err))
}
if !strings.Contains(err.Error(), expectedErrMatch) {
t.Errorf("Unexpected error from %s: got %v, wanted err containing %v", method, err, expectedErrMatch)
}
}
// testTarget is the target we use for this test
var testTarget = &pb.Target{
Keyspace: "test_keyspace",
@ -100,6 +125,9 @@ func (f *FakeQueryService) GetSessionId(sessionParams *proto.SessionParams, sess
// Begin is part of the queryservice.QueryService interface
func (f *FakeQueryService) Begin(ctx context.Context, target *pb.Target, session *proto.Session, txInfo *proto.TransactionInfo) error {
if f.hasError {
return testTabletError
}
if f.panics {
panic(fmt.Errorf("test-triggered panic"))
}
@ -131,6 +159,13 @@ func testBegin(t *testing.T, conn tabletconn.TabletConn) {
}
}
func testBeginError(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testBeginError")
ctx := context.Background()
_, err := conn.Begin(ctx)
verifyError(t, err, "Begin")
}
func testBeginPanics(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testBeginPanics")
ctx := context.Background()
@ -152,6 +187,13 @@ func testBegin2(t *testing.T, conn tabletconn.TabletConn) {
}
}
func testBegin2Error(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testBegin2Error")
ctx := context.Background()
_, err := conn.Begin2(ctx)
verifyError(t, err, "Begin2")
}
func testBegin2Panics(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testBegin2Panics")
ctx := context.Background()
@ -162,6 +204,9 @@ func testBegin2Panics(t *testing.T, conn tabletconn.TabletConn) {
// Commit is part of the queryservice.QueryService interface
func (f *FakeQueryService) Commit(ctx context.Context, target *pb.Target, session *proto.Session) error {
if f.hasError {
return testTabletError
}
if f.panics {
panic(fmt.Errorf("test-triggered panic"))
}
@ -189,6 +234,13 @@ func testCommit(t *testing.T, conn tabletconn.TabletConn) {
}
}
func testCommitError(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testCommitError")
ctx := context.Background()
err := conn.Commit(ctx, commitTransactionID)
verifyError(t, err, "Commit")
}
func testCommitPanics(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testCommitPanics")
ctx := context.Background()
@ -207,6 +259,13 @@ func testCommit2(t *testing.T, conn tabletconn.TabletConn) {
}
}
func testCommit2Error(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testCommit2Error")
ctx := context.Background()
err := conn.Commit2(ctx, commitTransactionID)
verifyError(t, err, "Commit2")
}
func testCommit2Panics(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testCommit2Panics")
ctx := context.Background()
@ -217,6 +276,9 @@ func testCommit2Panics(t *testing.T, conn tabletconn.TabletConn) {
// Rollback is part of the queryservice.QueryService interface
func (f *FakeQueryService) Rollback(ctx context.Context, target *pb.Target, session *proto.Session) error {
if f.hasError {
return testTabletError
}
if f.panics {
panic(fmt.Errorf("test-triggered panic"))
}
@ -244,6 +306,13 @@ func testRollback(t *testing.T, conn tabletconn.TabletConn) {
}
}
func testRollbackError(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testRollbackError")
ctx := context.Background()
err := conn.Rollback(ctx, commitTransactionID)
verifyError(t, err, "Rollback")
}
func testRollbackPanics(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testRollbackPanics")
ctx := context.Background()
@ -262,6 +331,13 @@ func testRollback2(t *testing.T, conn tabletconn.TabletConn) {
}
}
func testRollback2Error(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testRollback2Error")
ctx := context.Background()
err := conn.Rollback2(ctx, commitTransactionID)
verifyError(t, err, "Rollback2")
}
func testRollback2Panics(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testRollback2Panics")
ctx := context.Background()
@ -272,6 +348,9 @@ func testRollback2Panics(t *testing.T, conn tabletconn.TabletConn) {
// Execute is part of the queryservice.QueryService interface
func (f *FakeQueryService) Execute(ctx context.Context, target *pb.Target, query *proto.Query, reply *mproto.QueryResult) error {
if f.hasError {
return testTabletError
}
if f.panics {
panic(fmt.Errorf("test-triggered panic"))
}
@ -353,6 +432,20 @@ func testExecute2(t *testing.T, conn tabletconn.TabletConn) {
}
}
func testExecuteError(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testExecuteError")
ctx := context.Background()
_, err := conn.Execute(ctx, executeQuery, executeBindVars, executeTransactionID)
verifyError(t, err, "Execute")
}
func testExecute2Error(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testExecute2Error")
ctx := context.Background()
_, err := conn.Execute2(ctx, executeQuery, executeBindVars, executeTransactionID)
verifyError(t, err, "Execute")
}
func testExecutePanics(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testExecutePanics")
ctx := context.Background()
@ -396,6 +489,13 @@ func (f *FakeQueryService) StreamExecute(ctx context.Context, target *pb.Target,
f.panicWait = make(chan struct{}) // for next test
panic(fmt.Errorf("test-triggered panic late"))
}
if f.hasError {
// wait until the client has the response, since all streaming implementation may not
// send previous messages if an error has been triggered.
<-f.errorWait
f.errorWait = make(chan struct{}) // for next test
return testTabletError
}
if err := sendReply(&streamExecuteQueryResult2); err != nil {
f.t.Errorf("sendReply2 failed: %v", err)
}
@ -472,6 +572,34 @@ func testStreamExecute(t *testing.T, conn tabletconn.TabletConn) {
}
}
func testStreamExecuteError(t *testing.T, conn tabletconn.TabletConn, fake *FakeQueryService) {
t.Log("testStreamExecuteError")
ctx := context.Background()
stream, errFunc, err := conn.StreamExecute(ctx, streamExecuteQuery, streamExecuteBindVars, streamExecuteTransactionID)
if err != nil {
t.Fatalf("StreamExecute failed: %v", err)
}
qr, ok := <-stream
if !ok {
t.Fatalf("StreamExecute failed: cannot read result1")
}
if len(qr.Rows) == 0 {
qr.Rows = nil
}
if !reflect.DeepEqual(*qr, streamExecuteQueryResult1) {
t.Errorf("Unexpected result1 from StreamExecute: got %v wanted %v", qr, streamExecuteQueryResult1)
}
// signal to the server that the first result has been received
close(fake.errorWait)
// After 1 result, we expect to get an error (no more results).
qr, ok = <-stream
if ok {
t.Fatalf("StreamExecute channel wasn't closed")
}
err = errFunc()
verifyError(t, err, "StreamExecute")
}
func testStreamExecutePanics(t *testing.T, conn tabletconn.TabletConn, fake *FakeQueryService) {
t.Log("testStreamExecutePanics")
// early panic is before sending the Fields, that is returned
@ -557,6 +685,34 @@ func testStreamExecute2(t *testing.T, conn tabletconn.TabletConn) {
}
}
func testStreamExecute2Error(t *testing.T, conn tabletconn.TabletConn, fake *FakeQueryService) {
t.Log("testStreamExecute2Error")
ctx := context.Background()
stream, errFunc, err := conn.StreamExecute2(ctx, streamExecuteQuery, streamExecuteBindVars, streamExecuteTransactionID)
if err != nil {
t.Fatalf("StreamExecute2 failed: %v", err)
}
qr, ok := <-stream
if !ok {
t.Fatalf("StreamExecute2 failed: cannot read result1")
}
if len(qr.Rows) == 0 {
qr.Rows = nil
}
if !reflect.DeepEqual(*qr, streamExecuteQueryResult1) {
t.Errorf("Unexpected result1 from StreamExecute2: got %v wanted %v", qr, streamExecuteQueryResult1)
}
// signal to the server that the first result has been received
close(fake.errorWait)
// After 1 result, we expect to get an error (no more results).
qr, ok = <-stream
if ok {
t.Fatalf("StreamExecute2 channel wasn't closed")
}
err = errFunc()
verifyError(t, err, "StreamExecute2")
}
func testStreamExecute2Panics(t *testing.T, conn tabletconn.TabletConn, fake *FakeQueryService) {
t.Log("testStreamExecute2Panics")
// early panic is before sending the Fields, that is returned
@ -608,6 +764,9 @@ func testStreamExecute2Panics(t *testing.T, conn tabletconn.TabletConn, fake *Fa
// ExecuteBatch is part of the queryservice.QueryService interface
func (f *FakeQueryService) ExecuteBatch(ctx context.Context, target *pb.Target, queryList *proto.QueryList, reply *proto.QueryResultList) error {
if f.hasError {
return testTabletError
}
if f.panics {
panic(fmt.Errorf("test-triggered panic"))
}
@ -699,6 +858,13 @@ func testExecuteBatch(t *testing.T, conn tabletconn.TabletConn) {
}
}
func testExecuteBatchError(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testBatchExecuteError")
ctx := context.Background()
_, err := conn.ExecuteBatch(ctx, executeBatchQueries, true, executeBatchTransactionID)
verifyError(t, err, "ExecuteBatch")
}
func testExecuteBatchPanics(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testExecuteBatchPanics")
ctx := context.Background()
@ -720,6 +886,13 @@ func testExecuteBatch2(t *testing.T, conn tabletconn.TabletConn) {
}
}
func testExecuteBatch2Error(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testBatchExecute2Error")
ctx := context.Background()
_, err := conn.ExecuteBatch2(ctx, executeBatchQueries, true, executeBatchTransactionID)
verifyError(t, err, "ExecuteBatch")
}
func testExecuteBatch2Panics(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testExecuteBatch2Panics")
ctx := context.Background()
@ -730,6 +903,9 @@ func testExecuteBatch2Panics(t *testing.T, conn tabletconn.TabletConn) {
// SplitQuery is part of the queryservice.QueryService interface
func (f *FakeQueryService) SplitQuery(ctx context.Context, target *pb.Target, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error {
if f.hasError {
return testTabletError
}
if f.panics {
panic(fmt.Errorf("test-triggered panic"))
}
@ -785,6 +961,13 @@ func testSplitQuery(t *testing.T, conn tabletconn.TabletConn) {
}
}
func testSplitQueryError(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testSplitQueryError")
ctx := context.Background()
_, err := conn.SplitQuery(ctx, splitQueryBoundQuery, splitQuerySplitColumn, splitQuerySplitCount)
verifyError(t, err, "SplitQuery")
}
func testSplitQueryPanics(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testSplitQueryPanics")
ctx := context.Background()
@ -890,6 +1073,7 @@ func CreateFakeServer(t *testing.T) *FakeQueryService {
panics: false,
streamExecutePanicsEarly: false,
panicWait: make(chan struct{}),
errorWait: make(chan struct{}),
}
}
@ -915,6 +1099,24 @@ func TestSuite(t *testing.T, protocol string, endPoint *pbt.EndPoint, fake *Fake
testSplitQuery(t, conn)
testStreamHealth(t, conn)
// fake should return an error, make sure errors are handled properly
fake.hasError = true
testBeginError(t, conn)
testCommitError(t, conn)
testRollbackError(t, conn)
testExecuteError(t, conn)
testStreamExecuteError(t, conn, fake)
testExecuteBatchError(t, conn)
testSplitQueryError(t, conn)
testBegin2Error(t, conn)
testCommit2Error(t, conn)
testRollback2Error(t, conn)
testExecute2Error(t, conn)
testStreamExecute2Error(t, conn, fake)
testExecuteBatch2Error(t, conn)
fake.hasError = false
// create a new connection that expects the extra fields
conn.Close()
conn, err = tabletconn.GetDialer()(ctx, endPoint, testTarget.Keyspace, testTarget.Shard, pbt.TabletType_REPLICA, 30*time.Second)