Refactoring error handling in gRPC. Now unit tests pass.

This commit is contained in:
Alain Jobart 2015-06-17 08:24:46 -07:00
Родитель 4c790a265c
Коммит c7a126eb2b
5 изменённых файлов: 141 добавлений и 92 удалений

Просмотреть файл

@ -31,23 +31,25 @@ func New(server queryservice.QueryService) *Query {
// GetSessionId is part of the queryservice.QueryServer interface
func (q *Query) GetSessionId(ctx context.Context, request *pb.GetSessionIdRequest) (response *pb.GetSessionIdResponse, err error) {
defer q.server.HandlePanic(&err)
ctx = callinfo.GRPCCallInfo(ctx)
sessionInfo := new(proto.SessionInfo)
if err := q.server.GetSessionId(&proto.SessionParams{
gsiErr := q.server.GetSessionId(&proto.SessionParams{
Keyspace: request.Keyspace,
Shard: request.Shard,
}, sessionInfo); err != nil {
return nil, err
}
return &pb.GetSessionIdResponse{
}, sessionInfo)
response = &pb.GetSessionIdResponse{
SessionId: sessionInfo.SessionId,
}, nil
}
tabletserver.AddTabletErrorToResult(gsiErr, &response.Error)
return response, nil
}
// Execute is part of the queryservice.QueryServer interface
func (q *Query) Execute(ctx context.Context, request *pb.ExecuteRequest) (response *pb.ExecuteResponse, err error) {
defer q.server.HandlePanic(&err)
ctx = callinfo.GRPCCallInfo(ctx)
reply := new(mproto.QueryResult)
execErr := q.server.Execute(ctx, &proto.Query{
Sql: string(request.Query.Sql),
@ -56,7 +58,9 @@ func (q *Query) Execute(ctx context.Context, request *pb.ExecuteRequest) (respon
TransactionId: request.TransactionId,
}, reply)
if execErr != nil {
return nil, execErr
response := new(pb.ExecuteResponse)
tabletserver.AddTabletErrorToResult(execErr, &response.Error)
return response, nil
}
return &pb.ExecuteResponse{
Result: proto.QueryResultToProto3(reply),
@ -75,7 +79,9 @@ func (q *Query) ExecuteBatch(ctx context.Context, request *pb.ExecuteBatchReques
TransactionId: request.TransactionId,
}, reply)
if execErr != nil {
return nil, execErr
response := new(pb.ExecuteBatchResponse)
tabletserver.AddTabletErrorToResult(execErr, &response.Error)
return response, nil
}
return &pb.ExecuteBatchResponse{
Results: proto.QueryResultListToProto3(reply.List),
@ -86,7 +92,8 @@ func (q *Query) ExecuteBatch(ctx context.Context, request *pb.ExecuteBatchReques
func (q *Query) StreamExecute(request *pb.StreamExecuteRequest, stream pbs.Query_StreamExecuteServer) (err error) {
defer q.server.HandlePanic(&err)
ctx := callinfo.GRPCCallInfo(stream.Context())
return q.server.StreamExecute(ctx, &proto.Query{
seErr := q.server.StreamExecute(ctx, &proto.Query{
Sql: string(request.Query.Sql),
BindVariables: proto.Proto3ToBindVariables(request.Query.BindVariables),
SessionId: request.SessionId,
@ -95,18 +102,30 @@ func (q *Query) StreamExecute(request *pb.StreamExecuteRequest, stream pbs.Query
Result: proto.QueryResultToProto3(reply),
})
})
if seErr != nil {
response := new(pb.StreamExecuteResponse)
tabletserver.AddTabletErrorToResult(seErr, &response.Error)
if err := stream.Send(response); err != nil {
return err
}
}
return nil
}
// Begin is part of the queryservice.QueryServer interface
func (q *Query) Begin(ctx context.Context, request *pb.BeginRequest) (response *pb.BeginResponse, err error) {
defer q.server.HandlePanic(&err)
ctx = callinfo.GRPCCallInfo(ctx)
txInfo := new(proto.TransactionInfo)
if err := q.server.Begin(ctx, &proto.Session{
if beginErr := q.server.Begin(ctx, &proto.Session{
SessionId: request.SessionId,
}, txInfo); err != nil {
return nil, err
}, txInfo); beginErr != nil {
response := new(pb.BeginResponse)
tabletserver.AddTabletErrorToResult(beginErr, &response.Error)
return response, nil
}
return &pb.BeginResponse{
TransactionId: txInfo.TransactionId,
}, nil
@ -116,39 +135,46 @@ func (q *Query) Begin(ctx context.Context, request *pb.BeginRequest) (response *
func (q *Query) Commit(ctx context.Context, request *pb.CommitRequest) (response *pb.CommitResponse, err error) {
defer q.server.HandlePanic(&err)
ctx = callinfo.GRPCCallInfo(ctx)
if err := q.server.Commit(ctx, &proto.Session{
commitErr := q.server.Commit(ctx, &proto.Session{
SessionId: request.SessionId,
TransactionId: request.TransactionId,
}); err != nil {
return nil, err
}
return &pb.CommitResponse{}, nil
})
response = new(pb.CommitResponse)
tabletserver.AddTabletErrorToResult(commitErr, &response.Error)
return response, nil
}
// Rollback is part of the queryservice.QueryServer interface
func (q *Query) Rollback(ctx context.Context, request *pb.RollbackRequest) (response *pb.RollbackResponse, err error) {
defer q.server.HandlePanic(&err)
if err := q.server.Rollback(ctx, &proto.Session{
ctx = callinfo.GRPCCallInfo(ctx)
rollbackErr := q.server.Rollback(ctx, &proto.Session{
SessionId: request.SessionId,
TransactionId: request.TransactionId,
}); err != nil {
return nil, err
}
return &pb.RollbackResponse{}, nil
})
response = new(pb.RollbackResponse)
tabletserver.AddTabletErrorToResult(rollbackErr, &response.Error)
return response, nil
}
// SplitQuery is part of the queryservice.QueryServer interface
func (q *Query) SplitQuery(ctx context.Context, request *pb.SplitQueryRequest) (response *pb.SplitQueryResponse, err error) {
defer q.server.HandlePanic(&err)
ctx = callinfo.GRPCCallInfo(ctx)
reply := &proto.SplitQueryResult{}
if err := q.server.SplitQuery(ctx, &proto.SplitQueryRequest{
if sqErr := q.server.SplitQuery(ctx, &proto.SplitQueryRequest{
Query: *proto.Proto3ToBoundQuery(request.Query),
SplitColumn: request.SplitColumn,
SplitCount: int(request.SplitCount),
SessionID: request.SessionId,
}, reply); err != nil {
return nil, err
}, reply); sqErr != nil {
response = new(pb.SplitQueryResponse)
tabletserver.AddTabletErrorToResult(sqErr, &response.Error)
return response, nil
}
return &pb.SplitQueryResponse{
Queries: proto.QuerySplitsToProto3(reply.Queries),

Просмотреть файл

@ -7,13 +7,11 @@ package gorpctabletconn
import (
"fmt"
"io"
"strings"
"sync"
"time"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/netutil"
"github.com/youtube/vitess/go/rpcplus"
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
"github.com/youtube/vitess/go/vt/topo"
@ -23,6 +21,7 @@ import (
pb "github.com/youtube/vitess/go/vt/proto/query"
pbs "github.com/youtube/vitess/go/vt/proto/queryservice"
pbv "github.com/youtube/vitess/go/vt/proto/vtrpc"
)
func init() {
@ -55,6 +54,9 @@ func DialTablet(ctx context.Context, endPoint topo.EndPoint, keyspace, shard str
if err != nil {
return nil, err
}
if gsir.Error != nil {
return nil, tabletErrorFromRPCError(gsir.Error)
}
return &gRPCQueryClient{
endPoint: endPoint,
@ -79,10 +81,10 @@ func (conn *gRPCQueryClient) Execute(ctx context.Context, query string, bindVars
}
er, err := conn.c.Execute(ctx, req)
if err != nil {
return nil, tabletError(err)
return nil, tabletErrorFromGRPC(err)
}
if er.Error != nil {
return nil, vterrors.FromVtRPCError(er.Error)
return nil, tabletErrorFromRPCError(er.Error)
}
return tproto.Proto3ToQueryResult(er.Result), nil
}
@ -105,10 +107,10 @@ func (conn *gRPCQueryClient) ExecuteBatch(ctx context.Context, queries []tproto.
}
ebr, err := conn.c.ExecuteBatch(ctx, req)
if err != nil {
return nil, tabletError(err)
return nil, tabletErrorFromGRPC(err)
}
if ebr.Error != nil {
return nil, vterrors.FromVtRPCError(ebr.Error)
return nil, tabletErrorFromRPCError(ebr.Error)
}
return tproto.Proto3ToQueryResultList(ebr.Results), nil
}
@ -127,7 +129,7 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, query string, bi
}
stream, err := conn.c.StreamExecute(ctx, req)
if err != nil {
return nil, nil, err
return nil, nil, tabletErrorFromGRPC(err)
}
sr := make(chan *mproto.QueryResult, 10)
var finalError error
@ -136,13 +138,13 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, query string, bi
ser, err := stream.Recv()
if err != nil {
if err != io.EOF {
finalError = err
finalError = tabletErrorFromGRPC(err)
}
close(sr)
return
}
if ser.Error != nil {
finalError = vterrors.FromVtRPCError(ser.Error)
finalError = tabletErrorFromRPCError(ser.Error)
close(sr)
return
}
@ -167,10 +169,10 @@ func (conn *gRPCQueryClient) Begin(ctx context.Context) (transactionID int64, er
}
br, err := conn.c.Begin(ctx, req)
if err != nil {
return 0, tabletError(err)
return 0, tabletErrorFromGRPC(err)
}
if br.Error != nil {
return 0, vterrors.FromVtRPCError(br.Error)
return 0, tabletErrorFromRPCError(br.Error)
}
return br.TransactionId, nil
}
@ -189,10 +191,10 @@ func (conn *gRPCQueryClient) Commit(ctx context.Context, transactionID int64) er
}
cr, err := conn.c.Commit(ctx, req)
if err != nil {
return tabletError(err)
return tabletErrorFromGRPC(err)
}
if cr.Error != nil {
return vterrors.FromVtRPCError(cr.Error)
return tabletErrorFromRPCError(cr.Error)
}
return nil
}
@ -211,10 +213,10 @@ func (conn *gRPCQueryClient) Rollback(ctx context.Context, transactionID int64)
}
rr, err := conn.c.Rollback(ctx, req)
if err != nil {
return tabletError(err)
return tabletErrorFromGRPC(err)
}
if rr.Error != nil {
return vterrors.FromVtRPCError(rr.Error)
return tabletErrorFromRPCError(rr.Error)
}
return nil
}
@ -235,10 +237,10 @@ func (conn *gRPCQueryClient) SplitQuery(ctx context.Context, query tproto.BoundQ
}
sqr, err := conn.c.SplitQuery(ctx, req)
if err != nil {
return nil, tabletError(err)
return nil, tabletErrorFromGRPC(err)
}
if sqr.Error != nil {
return nil, vterrors.FromVtRPCError(sqr.Error)
return nil, tabletErrorFromRPCError(sqr.Error)
}
return tproto.Proto3ToQuerySplits(sqr.Queries), nil
}
@ -262,40 +264,24 @@ func (conn *gRPCQueryClient) EndPoint() topo.EndPoint {
return conn.endPoint
}
func tabletError(err error) error {
if err == nil {
return nil
}
// TODO(aaijazi): tabletconn is in an intermediate state right now, where application errors
// can be returned as rpcplus.ServerError or vterrors.VitessError. Soon, it will be standardized
// to only VitessError.
isServerError := false
switch err.(type) {
case rpcplus.ServerError:
isServerError = true
case *vterrors.VitessError:
isServerError = true
default:
}
if isServerError {
var code int
errStr := err.Error()
switch {
case strings.Contains(errStr, "fatal: "):
code = tabletconn.ERR_FATAL
case strings.Contains(errStr, "retry: "):
code = tabletconn.ERR_RETRY
case strings.Contains(errStr, "tx_pool_full: "):
code = tabletconn.ERR_TX_POOL_FULL
case strings.Contains(errStr, "not_in_tx: "):
code = tabletconn.ERR_NOT_IN_TX
default:
code = tabletconn.ERR_NORMAL
}
return &tabletconn.ServerError{Code: code, Err: fmt.Sprintf("vttablet: %v", err)}
}
if err == context.Canceled {
return tabletconn.Cancelled
}
// tabletErrorFromGRPC returns a tabletconn.OperationalError from the
// gRPC error.
func tabletErrorFromGRPC(err error) error {
return tabletconn.OperationalError(fmt.Sprintf("vttablet: %v", err))
}
// tabletErrorFromRPCError reconstructs a tablet error from the
// RPCError, using the RPCError code.
func tabletErrorFromRPCError(rpcErr *pbv.RPCError) error {
ve := vterrors.FromVtRPCError(rpcErr)
// see if the range is in the tablet error range
if ve.Code >= int64(pbv.ErrorCode_TabletError) && ve.Code <= int64(pbv.ErrorCode_UnknownTabletError) {
return &tabletconn.ServerError{
Code: int(ve.Code - int64(pbv.ErrorCode_TabletError)),
Err: fmt.Sprintf("vttablet: %v", ve.Error()),
}
}
return tabletconn.OperationalError(fmt.Sprintf("vttablet: %v", ve.Message))
}

Просмотреть файл

@ -16,6 +16,7 @@ import (
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/tb"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/proto/vtrpc"
"github.com/youtube/vitess/go/vt/vterrors"
)
@ -237,3 +238,28 @@ func AddTabletErrorToQueryResult(err error, reply *mproto.QueryResult) {
}
reply.Err = rpcErr
}
// AddTabletErrorToResult sets the provided pointer to the provided error,
// if any.
func AddTabletErrorToResult(err error, responseError **vtrpc.RPCError) {
if err == nil {
return
}
var rpcErr *vtrpc.RPCError
if terr, ok := err.(*TabletError); ok {
rpcErr = &vtrpc.RPCError{
// Transform TabletError code to VitessError code
Code: vtrpc.ErrorCode(int64(terr.ErrorType) + vterrors.TabletError),
// Make sure the the VitessError message is identical to the TabletError
// err, so that downstream consumers will see identical messages no matter
// which endpoint they're using.
Message: terr.Error(),
}
} else {
rpcErr = &vtrpc.RPCError{
Code: vtrpc.ErrorCode_UnknownTabletError,
Message: err.Error(),
}
}
*responseError = rpcErr
}

Просмотреть файл

@ -14,6 +14,7 @@ import (
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/tabletserver"
"github.com/youtube/vitess/go/vt/tabletserver/proto"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
"golang.org/x/net/context"
@ -172,10 +173,9 @@ func (f *FakeQueryService) Execute(ctx context.Context, query *proto.Query, repl
f.t.Errorf("invalid Execute.Query.TransactionId: got %v expected %v", query.TransactionId, executeTransactionID)
}
if f.hasError {
*reply = executeQueryResultError
} else {
*reply = executeQueryResult
return executeQueryResultError
}
*reply = executeQueryResult
return nil
}
@ -212,11 +212,9 @@ var executeQueryResult = mproto.QueryResult{
},
}
var executeQueryResultError = mproto.QueryResult{
Err: &mproto.RPCError{
Code: 1000,
Message: "succeeded despite err",
},
var executeQueryResultError = &tabletserver.TabletError{
ErrorType: tabletserver.ErrRetry,
Message: "succeeded despite err",
}
func testExecute(t *testing.T, conn tabletconn.TabletConn) {
@ -238,7 +236,7 @@ func testExecuteError(t *testing.T, conn tabletconn.TabletConn) {
if err == nil {
t.Fatalf("Execute was expecting an error, didn't get one")
}
expectedErr := "vttablet: succeeded despite err"
expectedErr := "vttablet: retry: succeeded despite err"
if err.Error() != expectedErr {
t.Errorf("Unexpected error from Execute: got %v wanted %v", err, expectedErr)
}
@ -353,16 +351,29 @@ func testStreamExecute(t *testing.T, conn tabletconn.TabletConn) {
func testStreamExecutePanics(t *testing.T, conn tabletconn.TabletConn, fake *FakeQueryService) {
// early panic is before sending the Fields, that is returned
// by the StreamExecute call itself
// by the StreamExecute call itself, or as the first error
// by ErrFunc
ctx := context.Background()
fake.streamExecutePanicsEarly = true
if _, _, err := conn.StreamExecute(ctx, streamExecuteQuery, streamExecuteBindVars, streamExecuteTransactionID); err == nil || !strings.Contains(err.Error(), "caught test panic") {
t.Fatalf("unexpected panic error: %v", err)
stream, errFunc, err := conn.StreamExecute(ctx, streamExecuteQuery, streamExecuteBindVars, streamExecuteTransactionID)
if err != nil {
if !strings.Contains(err.Error(), "caught test panic") {
t.Fatalf("unexpected panic error: %v", err)
}
} else {
_, ok := <-stream
if ok {
t.Fatalf("StreamExecute early panic should not return anything")
}
err = errFunc()
if err == nil || !strings.Contains(err.Error(), "caught test panic") {
t.Fatalf("unexpected panic error: %v", err)
}
}
// late panic is after sending Fields
fake.streamExecutePanicsEarly = false
stream, errFunc, err := conn.StreamExecute(ctx, streamExecuteQuery, streamExecuteBindVars, streamExecuteTransactionID)
stream, errFunc, err = conn.StreamExecute(ctx, streamExecuteQuery, streamExecuteBindVars, streamExecuteTransactionID)
if err != nil {
t.Fatalf("StreamExecute failed: %v", err)
}

Просмотреть файл

@ -65,7 +65,7 @@ func FromRPCError(rpcErr *mproto.RPCError) error {
// FromVtRPCError recovers a VitessError from a *vtrpc.RPCError (which is how VitessErrors
// are transmitted across proto3 RPC boundaries).
func FromVtRPCError(rpcErr *pb.RPCError) error {
func FromVtRPCError(rpcErr *pb.RPCError) *VitessError {
if rpcErr == nil {
return nil
}