зеркало из https://github.com/github/vitess-gh.git
Merge pull request #974 from alainjobart/log
Adding better error handling for grpc tablet client.
This commit is contained in:
Коммит
fb684f89d4
|
@ -8,6 +8,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/vt/callerid"
|
||||
|
@ -37,7 +38,7 @@ func (q *query) GetSessionId(ctx context.Context, request *pb.GetSessionIdReques
|
|||
Keyspace: request.Keyspace,
|
||||
Shard: request.Shard,
|
||||
}, sessionInfo); err != nil {
|
||||
return nil, err
|
||||
return nil, grpc.Errorf(codes.Internal, "%v", err)
|
||||
}
|
||||
|
||||
return &pb.GetSessionIdResponse{
|
||||
|
@ -59,7 +60,7 @@ func (q *query) Execute(ctx context.Context, request *pb.ExecuteRequest) (respon
|
|||
SessionId: request.SessionId,
|
||||
TransactionId: request.TransactionId,
|
||||
}, reply); err != nil {
|
||||
return nil, err
|
||||
return nil, grpc.Errorf(codes.Internal, "%v", err)
|
||||
}
|
||||
return &pb.ExecuteResponse{
|
||||
Result: mproto.QueryResultToProto3(reply),
|
||||
|
@ -80,7 +81,7 @@ func (q *query) ExecuteBatch(ctx context.Context, request *pb.ExecuteBatchReques
|
|||
AsTransaction: request.AsTransaction,
|
||||
TransactionId: request.TransactionId,
|
||||
}, reply); err != nil {
|
||||
return nil, err
|
||||
return nil, grpc.Errorf(codes.Internal, "%v", err)
|
||||
}
|
||||
return &pb.ExecuteBatchResponse{
|
||||
Results: proto.QueryResultListToProto3(reply.List),
|
||||
|
@ -116,7 +117,7 @@ func (q *query) Begin(ctx context.Context, request *pb.BeginRequest) (response *
|
|||
if err := q.server.Begin(ctx, request.Target, &proto.Session{
|
||||
SessionId: request.SessionId,
|
||||
}, txInfo); err != nil {
|
||||
return nil, err
|
||||
return nil, grpc.Errorf(codes.Internal, "%v", err)
|
||||
}
|
||||
|
||||
return &pb.BeginResponse{
|
||||
|
@ -135,7 +136,7 @@ func (q *query) Commit(ctx context.Context, request *pb.CommitRequest) (response
|
|||
SessionId: request.SessionId,
|
||||
TransactionId: request.TransactionId,
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
return nil, grpc.Errorf(codes.Internal, "%v", err)
|
||||
}
|
||||
return &pb.CommitResponse{}, nil
|
||||
}
|
||||
|
@ -151,7 +152,7 @@ func (q *query) Rollback(ctx context.Context, request *pb.RollbackRequest) (resp
|
|||
SessionId: request.SessionId,
|
||||
TransactionId: request.TransactionId,
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
return nil, grpc.Errorf(codes.Internal, "%v", err)
|
||||
}
|
||||
|
||||
return &pb.RollbackResponse{}, nil
|
||||
|
@ -171,7 +172,7 @@ func (q *query) SplitQuery(ctx context.Context, request *pb.SplitQueryRequest) (
|
|||
SplitCount: int(request.SplitCount),
|
||||
SessionID: request.SessionId,
|
||||
}, reply); err != nil {
|
||||
return nil, err
|
||||
return nil, grpc.Errorf(codes.Internal, "%v", err)
|
||||
}
|
||||
return &pb.SplitQueryResponse{
|
||||
Queries: proto.QuerySplitsToProto3(reply.Queries),
|
||||
|
|
|
@ -7,6 +7,7 @@ package grpctabletconn
|
|||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -17,6 +18,7 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/query"
|
||||
pbs "github.com/youtube/vitess/go/vt/proto/queryservice"
|
||||
|
@ -362,5 +364,23 @@ func (conn *gRPCQueryClient) EndPoint() *pbt.EndPoint {
|
|||
// tabletErrorFromGRPC returns a tabletconn.OperationalError from the
|
||||
// gRPC error.
|
||||
func tabletErrorFromGRPC(err error) error {
|
||||
if grpc.Code(err) == codes.Internal {
|
||||
// server side error, convert it
|
||||
var code int
|
||||
errStr := err.Error()
|
||||
switch {
|
||||
case strings.Contains(errStr, "fatal: "):
|
||||
code = tabletconn.ERR_FATAL
|
||||
case strings.Contains(errStr, "retry: "):
|
||||
code = tabletconn.ERR_RETRY
|
||||
case strings.Contains(errStr, "tx_pool_full: "):
|
||||
code = tabletconn.ERR_TX_POOL_FULL
|
||||
case strings.Contains(errStr, "not_in_tx: "):
|
||||
code = tabletconn.ERR_NOT_IN_TX
|
||||
default:
|
||||
code = tabletconn.ERR_NORMAL
|
||||
}
|
||||
return &tabletconn.ServerError{Code: code, Err: fmt.Sprintf("vttablet: %v", err)}
|
||||
}
|
||||
return tabletconn.OperationalError(fmt.Sprintf("vttablet: %v", err))
|
||||
}
|
||||
|
|
|
@ -21,9 +21,13 @@ errors=
|
|||
# with multiple files requires the files to all be in one package.
|
||||
for gofile in $gofiles
|
||||
do
|
||||
if ! go tool vet $vetflags $gofile 2>&1; then
|
||||
errors=YES
|
||||
fi
|
||||
if [ $gofile == "go/vt/tabletserver/grpcqueryservice/server.go" ]; then
|
||||
echo "skipping go/vt/tabletserver/grpcqueryservice/server.go as Errorf is different"
|
||||
else
|
||||
if ! go tool vet $vetflags $gofile 2>&1; then
|
||||
errors=YES
|
||||
fi
|
||||
fi
|
||||
done
|
||||
|
||||
[ -z "$errors" ] && exit 0
|
||||
|
|
|
@ -18,6 +18,7 @@ from multiprocessing.pool import ThreadPool
|
|||
import environment
|
||||
import tablet
|
||||
import utils
|
||||
from protocols_flavor import protocols_flavor
|
||||
|
||||
from net import gorpc
|
||||
from vtdb import keyrange
|
||||
|
@ -1021,12 +1022,15 @@ class TestFailures(unittest.TestCase):
|
|||
self.replica_tablet.wait_for_vttablet_state('SERVING')
|
||||
# TODO: expect to fail until we can detect vttablet shuts down gracefully
|
||||
# while VTGate is idle.
|
||||
# NOTE: with grpc, it will reconnect, and not trigger an error.
|
||||
if protocols_flavor().tabletconn_protocol() == 'grpc':
|
||||
return
|
||||
try:
|
||||
vtgate_conn._execute(
|
||||
result = vtgate_conn._execute(
|
||||
"select 1 from vt_insert_test", {},
|
||||
KEYSPACE_NAME, 'replica',
|
||||
keyranges=[self.keyrange])
|
||||
self.fail("DatabaseError should have been raised")
|
||||
self.fail("DatabaseError should have been raised, but got %s" % str(result))
|
||||
except Exception, e:
|
||||
self.assertIsInstance(e, dbexceptions.DatabaseError)
|
||||
self.assertNotIsInstance(e, dbexceptions.IntegrityError)
|
||||
|
|
Загрузка…
Ссылка в новой задаче