This commit is contained in:
Sugu Sougoumarane 2015-10-24 18:10:03 -07:00
Родитель 94eb00ff00
Коммит 5aa0b5c830
23 изменённых файлов: 277 добавлений и 127 удалений

Просмотреть файл

@ -193,7 +193,10 @@ func (itc *internalTabletConn) Execute(ctx context.Context, query string, bindVa
if err != nil {
return nil, err
}
bindVars = tproto.Proto3ToBindVariables(bv)
bindVars, err = tproto.Proto3ToBindVariables(bv)
if err != nil {
return nil, err
}
reply := &mproto.QueryResult{}
if err := itc.tablet.qsc.QueryService().Execute(ctx, &pbq.Target{
Keyspace: itc.tablet.keyspace,
@ -218,8 +221,12 @@ func (itc *internalTabletConn) ExecuteBatch(ctx context.Context, queries []tprot
if err != nil {
return nil, err
}
bindVars, err := tproto.Proto3ToBindVariables(bv)
if err != nil {
return nil, err
}
q[i].Sql = query.Sql
q[i].BindVariables = tproto.Proto3ToBindVariables(bv)
q[i].BindVariables = bindVars
}
reply := &tproto.QueryResultList{}
if err := itc.tablet.qsc.QueryService().ExecuteBatch(ctx, &pbq.Target{
@ -243,7 +250,10 @@ func (itc *internalTabletConn) StreamExecute(ctx context.Context, query string,
if err != nil {
return nil, nil, err
}
bindVars = tproto.Proto3ToBindVariables(bv)
bindVars, err = tproto.Proto3ToBindVariables(bv)
if err != nil {
return nil, nil, err
}
result := make(chan *mproto.QueryResult, 10)
var finalErr error

Просмотреть файл

@ -65,7 +65,7 @@ func echoQueryResult(vals map[string]interface{}) *mproto.QueryResult {
var row []sqltypes.Value
for k, v := range vals {
qr.Fields = append(qr.Fields, mproto.Field{Name: k, Type: mproto.VT_VARCHAR})
qr.Fields = append(qr.Fields, mproto.Field{Name: k, Type: mproto.VT_VAR_STRING})
val := reflect.ValueOf(v)
if val.Kind() == reflect.Map {

Просмотреть файл

@ -39,21 +39,23 @@ func ProtoToCharset(c *pbb.Charset) *Charset {
}
// FieldsToProto3 converts an internal []Field to the proto3 version
func FieldsToProto3(f []Field) []*pbq.Field {
func FieldsToProto3(f []Field) ([]*pbq.Field, error) {
if len(f) == 0 {
return nil
return nil, nil
}
result := make([]*pbq.Field, len(f))
for i, f := range f {
// TODO(sougou): handle error.
vitessType, _ := sqltypes.MySQLToType(f.Type, f.Flags)
vitessType, err := sqltypes.MySQLToType(f.Type, f.Flags)
if err != nil {
return nil, err
}
result[i] = &pbq.Field{
Name: f.Name,
Type: vitessType,
}
}
return result
return result, nil
}
// Proto3ToFields converts a proto3 []Fields to an internal data structure.
@ -124,16 +126,20 @@ func Proto3ToRows(rows []*pbq.Row) [][]sqltypes.Value {
}
// QueryResultToProto3 converts an internal QueryResult to the proto3 version
func QueryResultToProto3(qr *QueryResult) *pbq.QueryResult {
func QueryResultToProto3(qr *QueryResult) (*pbq.QueryResult, error) {
if qr == nil {
return nil
return nil, nil
}
fields, err := FieldsToProto3(qr.Fields)
if err != nil {
return nil, err
}
return &pbq.QueryResult{
Fields: FieldsToProto3(qr.Fields),
Fields: fields,
RowsAffected: qr.RowsAffected,
InsertId: qr.InsertId,
Rows: RowsToProto3(qr.Rows),
}
}, nil
}
// Proto3ToQueryResult converts a proto3 QueryResult to an internal data structure.

Просмотреть файл

@ -27,7 +27,6 @@ const (
VT_DATETIME = 12
VT_YEAR = 13
VT_NEWDATE = 14
VT_VARCHAR = 15
VT_BIT = 16
VT_NEWDECIMAL = 246
VT_ENUM = 247

Просмотреть файл

@ -114,14 +114,14 @@ func TestTypeToMySQL(t *testing.T) {
if v != 16 {
t.Errorf("Bit: %d, want 16", v)
}
if f != mysqlUnsigned>>8 {
if f != mysqlUnsigned>>16 {
t.Errorf("Bit flag: %x, want %x", f, mysqlUnsigned>>8)
}
v, f = TypeToMySQL(Date)
if v != 10 {
t.Errorf("Bit: %d, want 10", v)
}
if f != mysqlBinary>>8 {
if f != mysqlBinary>>16 {
t.Errorf("Bit flag: %x, want %x", f, mysqlBinary>>8)
}
}

Просмотреть файл

@ -263,8 +263,8 @@ func parsePkTuple(tokenizer *sqlparser.Tokenizer, insertid int64, fields []mprot
switch fields[index].Type {
case mproto.VT_DECIMAL:
// we haven't updated the type yet
fields[index].Type = mproto.VT_VARCHAR
case mproto.VT_VARCHAR:
fields[index].Type = mproto.VT_VAR_STRING
case mproto.VT_VAR_STRING:
// nothing to do there
default:
// we already set this to something incompatible!

Просмотреть файл

@ -107,7 +107,7 @@ func TestDMLEvent(t *testing.T) {
sendEvent: func(event *proto.StreamEvent) error {
switch event.Category {
case "DML":
want := `&{DML _table_ [{eid 8 0} {id 8 0} {name 15 0}] [[10 -1 name] [11 18446744073709551615 name]] 1 }`
want := `&{DML _table_ [{eid 8 0} {id 8 0} {name 253 0}] [[10 -1 name] [11 18446744073709551615 name]] 1 }`
got := fmt.Sprintf("%v", event)
if got != want {
t.Errorf("got \n%s, want \n%s", got, want)

Просмотреть файл

@ -31,8 +31,12 @@ func New(updateStream proto.UpdateStream) *UpdateStream {
func (server *UpdateStream) StreamUpdate(req *pb.StreamUpdateRequest, stream pbs.UpdateStream_StreamUpdateServer) (err error) {
defer server.updateStream.HandlePanic(&err)
return server.updateStream.ServeUpdateStream(req.Position, func(reply *proto.StreamEvent) error {
event, err := proto.StreamEventToProto(reply)
if err != nil {
return err
}
return stream.Send(&pb.StreamUpdateResponse{
StreamEvent: proto.StreamEventToProto(reply),
StreamEvent: event,
})
})
}

Просмотреть файл

@ -19,10 +19,14 @@ import (
// structures internally, and this will be obsolete.
// StreamEventToProto converts a StreamEvent to a proto3
func StreamEventToProto(s *StreamEvent) *pb.StreamEvent {
func StreamEventToProto(s *StreamEvent) (*pb.StreamEvent, error) {
fields, err := mproto.FieldsToProto3(s.PrimaryKeyFields)
if err != nil {
return nil, err
}
result := &pb.StreamEvent{
TableName: s.TableName,
PrimaryKeyFields: mproto.FieldsToProto3(s.PrimaryKeyFields),
PrimaryKeyFields: fields,
PrimaryKeyValues: mproto.RowsToProto3(s.PrimaryKeyValues),
Sql: s.Sql,
Timestamp: s.Timestamp,
@ -38,7 +42,7 @@ func StreamEventToProto(s *StreamEvent) *pb.StreamEvent {
default:
result.Category = pb.StreamEvent_SE_ERR
}
return result
return result, nil
}
// ProtoToStreamEvent converts a proto to a StreamEvent

Просмотреть файл

@ -41,7 +41,7 @@ func TestStreamEvent(t *testing.T) {
PrimaryKeyFields: []mproto.Field{
mproto.Field{
Name: "str2",
Type: mproto.VT_VARCHAR,
Type: mproto.VT_VAR_STRING,
},
mproto.Field{
Name: "str3",
@ -78,7 +78,7 @@ func TestStreamEvent(t *testing.T) {
PrimaryKeyFields: []mproto.Field{
mproto.Field{
Name: "str2",
Type: mproto.VT_VARCHAR,
Type: mproto.VT_VAR_STRING,
},
mproto.Field{
Name: "str3",

Просмотреть файл

@ -190,9 +190,10 @@ func (s *server) ExecuteFetchAsDba(ctx context.Context, request *pb.ExecuteFetch
response := &pb.ExecuteFetchAsDbaResponse{}
return response, s.agent.RPCWrap(ctx, actionnode.TabletActionExecuteFetchAsDba, request, response, func() error {
qr, err := s.agent.ExecuteFetchAsDba(ctx, request.Query, request.DbName, int(request.MaxRows), request.WantFields, request.DisableBinlogs, request.ReloadSchema)
if err == nil {
response.Result = mproto.QueryResultToProto3(qr)
if err != nil {
return err
}
response.Result, err = mproto.QueryResultToProto3(qr)
return err
})
}
@ -202,9 +203,10 @@ func (s *server) ExecuteFetchAsApp(ctx context.Context, request *pb.ExecuteFetch
response := &pb.ExecuteFetchAsAppResponse{}
return response, s.agent.RPCWrap(ctx, actionnode.TabletActionExecuteFetchAsApp, request, response, func() error {
qr, err := s.agent.ExecuteFetchAsApp(ctx, request.Query, int(request.MaxRows), request.WantFields)
if err == nil {
response.Result = mproto.QueryResultToProto3(qr)
if err != nil {
return err
}
response.Result, err = mproto.QueryResultToProto3(qr)
return err
})
}

Просмотреть файл

@ -53,17 +53,23 @@ func (q *query) Execute(ctx context.Context, request *pb.ExecuteRequest) (respon
request.ImmediateCallerId,
)
reply := new(mproto.QueryResult)
bv, err := proto.Proto3ToBindVariables(request.Query.BindVariables)
if err != nil {
return nil, tabletserver.ToGRPCError(err)
}
if err := q.server.Execute(ctx, request.Target, &proto.Query{
Sql: request.Query.Sql,
BindVariables: proto.Proto3ToBindVariables(request.Query.BindVariables),
BindVariables: bv,
SessionId: request.SessionId,
TransactionId: request.TransactionId,
}, reply); err != nil {
return nil, tabletserver.ToGRPCError(err)
}
return &pb.ExecuteResponse{
Result: mproto.QueryResultToProto3(reply),
}, nil
result, err := mproto.QueryResultToProto3(reply)
if err != nil {
return nil, tabletserver.ToGRPCError(err)
}
return &pb.ExecuteResponse{Result: result}, nil
}
// ExecuteBatch is part of the queryservice.QueryServer interface
@ -74,17 +80,23 @@ func (q *query) ExecuteBatch(ctx context.Context, request *pb.ExecuteBatchReques
request.ImmediateCallerId,
)
reply := new(proto.QueryResultList)
bql, err := proto.Proto3ToBoundQueryList(request.Queries)
if err != nil {
return nil, tabletserver.ToGRPCError(err)
}
if err := q.server.ExecuteBatch(ctx, request.Target, &proto.QueryList{
Queries: proto.Proto3ToBoundQueryList(request.Queries),
Queries: bql,
SessionId: request.SessionId,
AsTransaction: request.AsTransaction,
TransactionId: request.TransactionId,
}, reply); err != nil {
return nil, tabletserver.ToGRPCError(err)
}
return &pb.ExecuteBatchResponse{
Results: proto.QueryResultListToProto3(reply.List),
}, nil
results, err := proto.QueryResultListToProto3(reply.List)
if err != nil {
return nil, tabletserver.ToGRPCError(err)
}
return &pb.ExecuteBatchResponse{Results: results}, nil
}
// StreamExecute is part of the queryservice.QueryServer interface
@ -94,14 +106,20 @@ func (q *query) StreamExecute(request *pb.StreamExecuteRequest, stream pbs.Query
request.EffectiveCallerId,
request.ImmediateCallerId,
)
bv, err := proto.Proto3ToBindVariables(request.Query.BindVariables)
if err != nil {
return tabletserver.ToGRPCError(err)
}
if err := q.server.StreamExecute(ctx, request.Target, &proto.Query{
Sql: request.Query.Sql,
BindVariables: proto.Proto3ToBindVariables(request.Query.BindVariables),
BindVariables: bv,
SessionId: request.SessionId,
}, func(reply *mproto.QueryResult) error {
return stream.Send(&pb.StreamExecuteResponse{
Result: mproto.QueryResultToProto3(reply),
})
result, err := mproto.QueryResultToProto3(reply)
if err != nil {
return err
}
return stream.Send(&pb.StreamExecuteResponse{Result: result})
}); err != nil {
return tabletserver.ToGRPCError(err)
}
@ -168,8 +186,12 @@ func (q *query) SplitQuery(ctx context.Context, request *pb.SplitQueryRequest) (
request.ImmediateCallerId,
)
reply := &proto.SplitQueryResult{}
bq, err := proto.Proto3ToBoundQuery(request.Query)
if err != nil {
return nil, tabletserver.ToGRPCError(err)
}
if err := q.server.SplitQuery(ctx, request.Target, &proto.SplitQueryRequest{
Query: *proto.Proto3ToBoundQuery(request.Query),
Query: *bq,
SplitColumn: request.SplitColumn,
SplitCount: int(request.SplitCount),
SessionID: request.SessionId,
@ -178,7 +200,7 @@ func (q *query) SplitQuery(ctx context.Context, request *pb.SplitQueryRequest) (
}
qs, err := proto.QuerySplitsToProto3(reply.Queries)
if err != nil {
return nil, err
return nil, tabletserver.ToGRPCError(err)
}
return &pb.SplitQueryResponse{Queries: qs}, nil
}

Просмотреть файл

@ -288,7 +288,7 @@ func (conn *gRPCQueryClient) SplitQuery(ctx context.Context, query tproto.BoundQ
q, err := tproto.BoundQueryToProto3(query.Sql, query.BindVariables)
if err != nil {
return nil, err
return nil, tabletconn.TabletErrorFromGRPC(err)
}
req := &pb.SplitQueryRequest{
Target: conn.target,
@ -303,7 +303,11 @@ func (conn *gRPCQueryClient) SplitQuery(ctx context.Context, query tproto.BoundQ
if err != nil {
return nil, tabletconn.TabletErrorFromGRPC(err)
}
return tproto.Proto3ToQuerySplits(sqr.Queries), nil
split, err := tproto.Proto3ToQuerySplits(sqr.Queries)
if err != nil {
return nil, tabletconn.TabletErrorFromGRPC(err)
}
return split, nil
}
// StreamHealth is the stub for TabletServer.StreamHealth RPC

Просмотреть файл

@ -219,32 +219,40 @@ func buildValue(v interface{}) (pb.Value, error) {
}
// Proto3ToBoundQuery converts a proto.BoundQuery to the internal data structure
func Proto3ToBoundQuery(query *pb.BoundQuery) *BoundQuery {
func Proto3ToBoundQuery(query *pb.BoundQuery) (*BoundQuery, error) {
bv, err := Proto3ToBindVariables(query.BindVariables)
if err != nil {
return nil, err
}
return &BoundQuery{
Sql: string(query.Sql),
BindVariables: Proto3ToBindVariables(query.BindVariables),
}
BindVariables: bv,
}, nil
}
// Proto3ToBoundQueryList converts am array of proto.BoundQuery to the internal data structure
func Proto3ToBoundQueryList(queries []*pb.BoundQuery) []BoundQuery {
func Proto3ToBoundQueryList(queries []*pb.BoundQuery) ([]BoundQuery, error) {
if len(queries) == 0 {
return nil
return nil, nil
}
result := make([]BoundQuery, len(queries))
for i, q := range queries {
result[i] = *Proto3ToBoundQuery(q)
res, err := Proto3ToBoundQuery(q)
if err != nil {
return nil, err
}
result[i] = *res
}
return result
return result, nil
}
// Proto3ToBindVariables converts a proto.BinVariable map to internal data structure
func Proto3ToBindVariables(bv map[string]*pb.BindVariable) map[string]interface{} {
// TODO(sougou): handle error
func Proto3ToBindVariables(bv map[string]*pb.BindVariable) (map[string]interface{}, error) {
if len(bv) == 0 {
return nil
return nil, nil
}
result := make(map[string]interface{})
var err error
for k, v := range bv {
if v.Type == sqltypes.Tuple {
list := make([]interface{}, len(v.Values))
@ -253,14 +261,20 @@ func Proto3ToBindVariables(bv map[string]*pb.BindVariable) map[string]interface{
Type: lv.Type,
Value: lv.Value,
}
list[i], _ = buildSQLValue(asbind)
list[i], err = buildSQLValue(asbind)
if err != nil {
return nil, err
}
}
result[k] = list
} else {
result[k], _ = buildSQLValue(v)
result[k], err = buildSQLValue(v)
if err != nil {
return nil, err
}
}
}
return result
return result, nil
}
func buildSQLValue(v *pb.BindVariable) (interface{}, error) {
@ -290,30 +304,38 @@ func Proto3ToQueryResultList(results []*pb.QueryResult) *QueryResultList {
}
// QueryResultListToProto3 changes the internal array of QueryResult to the proto3 version
func QueryResultListToProto3(results []mproto.QueryResult) []*pb.QueryResult {
func QueryResultListToProto3(results []mproto.QueryResult) ([]*pb.QueryResult, error) {
if len(results) == 0 {
return nil
return nil, nil
}
result := make([]*pb.QueryResult, len(results))
var err error
for i := range results {
result[i] = mproto.QueryResultToProto3(&results[i])
result[i], err = mproto.QueryResultToProto3(&results[i])
if err != nil {
return nil, err
}
}
return result
return result, nil
}
// Proto3ToQuerySplits converts a proto3 QuerySplit array to a native QuerySplit array
func Proto3ToQuerySplits(queries []*pb.QuerySplit) []QuerySplit {
func Proto3ToQuerySplits(queries []*pb.QuerySplit) ([]QuerySplit, error) {
if len(queries) == 0 {
return nil
return nil, nil
}
result := make([]QuerySplit, len(queries))
for i, qs := range queries {
res, err := Proto3ToBoundQuery(qs.Query)
if err != nil {
return nil, err
}
result[i] = QuerySplit{
Query: *Proto3ToBoundQuery(qs.Query),
Query: *res,
RowCount: qs.RowCount,
}
}
return result
return result, nil
}
// QuerySplitsToProto3 converts a native QuerySplit array to the proto3 version

Просмотреть файл

@ -217,7 +217,7 @@ func (qs *QuerySplitter) splitBoundaries(columnType int64, pkMinMax *mproto.Quer
return qs.splitBoundariesIntColumn(pkMinMax)
case mproto.VT_FLOAT, mproto.VT_DOUBLE:
return qs.splitBoundariesFloatColumn(pkMinMax)
case mproto.VT_VARCHAR, mproto.VT_BIT, mproto.VT_VAR_STRING, mproto.VT_STRING:
case mproto.VT_BIT, mproto.VT_VAR_STRING, mproto.VT_STRING:
return qs.splitBoundariesStringColumn()
}
return []sqltypes.Value{}, nil

Просмотреть файл

@ -35,14 +35,20 @@ func (vtg *VTGate) Execute(ctx context.Context, request *pb.ExecuteRequest) (res
request.CallerId,
callerid.NewImmediateCallerID("grpc client"))
reply := new(proto.QueryResult)
executeErr := vtg.server.Execute(ctx, string(request.Query.Sql), tproto.Proto3ToBindVariables(request.Query.BindVariables), request.TabletType, proto.ProtoToSession(request.Session), request.NotInTransaction, reply)
bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables)
if err != nil {
return nil, vterrors.ToGRPCError(err)
}
executeErr := vtg.server.Execute(ctx, string(request.Query.Sql), bv, request.TabletType, proto.ProtoToSession(request.Session), request.NotInTransaction, reply)
response = &pb.ExecuteResponse{
Error: vtgate.RPCErrorToVtRPCError(reply.Err),
}
if executeErr == nil {
response.Result = mproto.QueryResultToProto3(reply.Result)
response.Session = proto.SessionToProto(reply.Session)
return response, nil
response.Result, executeErr = mproto.QueryResultToProto3(reply.Result)
if executeErr == nil {
response.Session = proto.SessionToProto(reply.Session)
return response, nil
}
}
return nil, vterrors.ToGRPCError(executeErr)
}
@ -54,9 +60,13 @@ func (vtg *VTGate) ExecuteShards(ctx context.Context, request *pb.ExecuteShardsR
request.CallerId,
callerid.NewImmediateCallerID("grpc client"))
reply := new(proto.QueryResult)
bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables)
if err != nil {
return nil, vterrors.ToGRPCError(err)
}
executeErr := vtg.server.ExecuteShards(ctx,
string(request.Query.Sql),
tproto.Proto3ToBindVariables(request.Query.BindVariables),
bv,
request.Keyspace,
request.Shards,
request.TabletType,
@ -67,9 +77,11 @@ func (vtg *VTGate) ExecuteShards(ctx context.Context, request *pb.ExecuteShardsR
Error: vtgate.RPCErrorToVtRPCError(reply.Err),
}
if executeErr == nil {
response.Result = mproto.QueryResultToProto3(reply.Result)
response.Session = proto.SessionToProto(reply.Session)
return response, nil
response.Result, executeErr = mproto.QueryResultToProto3(reply.Result)
if executeErr == nil {
response.Session = proto.SessionToProto(reply.Session)
return response, nil
}
}
return nil, vterrors.ToGRPCError(executeErr)
}
@ -81,9 +93,13 @@ func (vtg *VTGate) ExecuteKeyspaceIds(ctx context.Context, request *pb.ExecuteKe
request.CallerId,
callerid.NewImmediateCallerID("grpc client"))
reply := new(proto.QueryResult)
bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables)
if err != nil {
return nil, vterrors.ToGRPCError(err)
}
executeErr := vtg.server.ExecuteKeyspaceIds(ctx,
string(request.Query.Sql),
tproto.Proto3ToBindVariables(request.Query.BindVariables),
bv,
request.Keyspace,
request.KeyspaceIds,
request.TabletType,
@ -94,9 +110,11 @@ func (vtg *VTGate) ExecuteKeyspaceIds(ctx context.Context, request *pb.ExecuteKe
Error: vtgate.RPCErrorToVtRPCError(reply.Err),
}
if executeErr == nil {
response.Result = mproto.QueryResultToProto3(reply.Result)
response.Session = proto.SessionToProto(reply.Session)
return response, nil
response.Result, executeErr = mproto.QueryResultToProto3(reply.Result)
if executeErr == nil {
response.Session = proto.SessionToProto(reply.Session)
return response, nil
}
}
return nil, vterrors.ToGRPCError(executeErr)
}
@ -108,9 +126,13 @@ func (vtg *VTGate) ExecuteKeyRanges(ctx context.Context, request *pb.ExecuteKeyR
request.CallerId,
callerid.NewImmediateCallerID("grpc client"))
reply := new(proto.QueryResult)
bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables)
if err != nil {
return nil, vterrors.ToGRPCError(err)
}
executeErr := vtg.server.ExecuteKeyRanges(ctx,
string(request.Query.Sql),
tproto.Proto3ToBindVariables(request.Query.BindVariables),
bv,
request.Keyspace,
request.KeyRanges,
request.TabletType,
@ -121,9 +143,11 @@ func (vtg *VTGate) ExecuteKeyRanges(ctx context.Context, request *pb.ExecuteKeyR
Error: vtgate.RPCErrorToVtRPCError(reply.Err),
}
if executeErr == nil {
response.Result = mproto.QueryResultToProto3(reply.Result)
response.Session = proto.SessionToProto(reply.Session)
return response, nil
response.Result, executeErr = mproto.QueryResultToProto3(reply.Result)
if executeErr == nil {
response.Session = proto.SessionToProto(reply.Session)
return response, nil
}
}
return nil, vterrors.ToGRPCError(executeErr)
}
@ -135,9 +159,13 @@ func (vtg *VTGate) ExecuteEntityIds(ctx context.Context, request *pb.ExecuteEnti
request.CallerId,
callerid.NewImmediateCallerID("grpc client"))
reply := new(proto.QueryResult)
bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables)
if err != nil {
return nil, vterrors.ToGRPCError(err)
}
executeErr := vtg.server.ExecuteEntityIds(ctx,
string(request.Query.Sql),
tproto.Proto3ToBindVariables(request.Query.BindVariables),
bv,
request.Keyspace,
request.EntityColumnName,
request.EntityKeyspaceIds,
@ -149,9 +177,11 @@ func (vtg *VTGate) ExecuteEntityIds(ctx context.Context, request *pb.ExecuteEnti
Error: vtgate.RPCErrorToVtRPCError(reply.Err),
}
if executeErr == nil {
response.Result = mproto.QueryResultToProto3(reply.Result)
response.Session = proto.SessionToProto(reply.Session)
return response, nil
response.Result, executeErr = mproto.QueryResultToProto3(reply.Result)
if executeErr == nil {
response.Session = proto.SessionToProto(reply.Session)
return response, nil
}
}
return nil, vterrors.ToGRPCError(executeErr)
}
@ -163,8 +193,12 @@ func (vtg *VTGate) ExecuteBatchShards(ctx context.Context, request *pb.ExecuteBa
request.CallerId,
callerid.NewImmediateCallerID("grpc client"))
reply := new(proto.QueryResultList)
bsq, err := proto.ProtoToBoundShardQueries(request.Queries)
if err != nil {
return nil, vterrors.ToGRPCError(err)
}
executeErr := vtg.server.ExecuteBatchShards(ctx,
proto.ProtoToBoundShardQueries(request.Queries),
bsq,
request.TabletType,
request.AsTransaction,
proto.ProtoToSession(request.Session),
@ -173,9 +207,11 @@ func (vtg *VTGate) ExecuteBatchShards(ctx context.Context, request *pb.ExecuteBa
Error: vtgate.RPCErrorToVtRPCError(reply.Err),
}
if executeErr == nil {
response.Results = tproto.QueryResultListToProto3(reply.List)
response.Session = proto.SessionToProto(reply.Session)
return response, nil
response.Results, executeErr = tproto.QueryResultListToProto3(reply.List)
if executeErr == nil {
response.Session = proto.SessionToProto(reply.Session)
return response, nil
}
}
return nil, vterrors.ToGRPCError(executeErr)
}
@ -188,8 +224,12 @@ func (vtg *VTGate) ExecuteBatchKeyspaceIds(ctx context.Context, request *pb.Exec
request.CallerId,
callerid.NewImmediateCallerID("grpc client"))
reply := new(proto.QueryResultList)
bq, err := proto.ProtoToBoundKeyspaceIdQueries(request.Queries)
if err != nil {
return nil, vterrors.ToGRPCError(err)
}
executeErr := vtg.server.ExecuteBatchKeyspaceIds(ctx,
proto.ProtoToBoundKeyspaceIdQueries(request.Queries),
bq,
request.TabletType,
request.AsTransaction,
proto.ProtoToSession(request.Session),
@ -198,9 +238,11 @@ func (vtg *VTGate) ExecuteBatchKeyspaceIds(ctx context.Context, request *pb.Exec
Error: vtgate.RPCErrorToVtRPCError(reply.Err),
}
if executeErr == nil {
response.Results = tproto.QueryResultListToProto3(reply.List)
response.Session = proto.SessionToProto(reply.Session)
return response, nil
response.Results, executeErr = tproto.QueryResultListToProto3(reply.List)
if executeErr == nil {
response.Session = proto.SessionToProto(reply.Session)
return response, nil
}
}
return nil, vterrors.ToGRPCError(executeErr)
}
@ -211,14 +253,20 @@ func (vtg *VTGate) StreamExecute(request *pb.StreamExecuteRequest, stream pbs.Vi
ctx := callerid.NewContext(callinfo.GRPCCallInfo(stream.Context()),
request.CallerId,
callerid.NewImmediateCallerID("grpc client"))
bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables)
if err != nil {
return vterrors.ToGRPCError(err)
}
vtgErr := vtg.server.StreamExecute(ctx,
string(request.Query.Sql),
tproto.Proto3ToBindVariables(request.Query.BindVariables),
bv,
request.TabletType,
func(value *proto.QueryResult) error {
return stream.Send(&pb.StreamExecuteResponse{
Result: mproto.QueryResultToProto3(value.Result),
})
result, err := mproto.QueryResultToProto3(value.Result)
if err != nil {
return err
}
return stream.Send(&pb.StreamExecuteResponse{Result: result})
})
return vterrors.ToGRPCError(vtgErr)
}
@ -229,16 +277,22 @@ func (vtg *VTGate) StreamExecuteShards(request *pb.StreamExecuteShardsRequest, s
ctx := callerid.NewContext(callinfo.GRPCCallInfo(stream.Context()),
request.CallerId,
callerid.NewImmediateCallerID("grpc client"))
bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables)
if err != nil {
return vterrors.ToGRPCError(err)
}
vtgErr := vtg.server.StreamExecuteShards(ctx,
string(request.Query.Sql),
tproto.Proto3ToBindVariables(request.Query.BindVariables),
bv,
request.Keyspace,
request.Shards,
request.TabletType,
func(value *proto.QueryResult) error {
return stream.Send(&pb.StreamExecuteShardsResponse{
Result: mproto.QueryResultToProto3(value.Result),
})
result, err := mproto.QueryResultToProto3(value.Result)
if err != nil {
return err
}
return stream.Send(&pb.StreamExecuteShardsResponse{Result: result})
})
return vterrors.ToGRPCError(vtgErr)
}
@ -250,16 +304,22 @@ func (vtg *VTGate) StreamExecuteKeyspaceIds(request *pb.StreamExecuteKeyspaceIds
ctx := callerid.NewContext(callinfo.GRPCCallInfo(stream.Context()),
request.CallerId,
callerid.NewImmediateCallerID("grpc client"))
bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables)
if err != nil {
return vterrors.ToGRPCError(err)
}
vtgErr := vtg.server.StreamExecuteKeyspaceIds(ctx,
string(request.Query.Sql),
tproto.Proto3ToBindVariables(request.Query.BindVariables),
bv,
request.Keyspace,
request.KeyspaceIds,
request.TabletType,
func(value *proto.QueryResult) error {
return stream.Send(&pb.StreamExecuteKeyspaceIdsResponse{
Result: mproto.QueryResultToProto3(value.Result),
})
result, err := mproto.QueryResultToProto3(value.Result)
if err != nil {
return err
}
return stream.Send(&pb.StreamExecuteKeyspaceIdsResponse{Result: result})
})
return vterrors.ToGRPCError(vtgErr)
}
@ -271,16 +331,22 @@ func (vtg *VTGate) StreamExecuteKeyRanges(request *pb.StreamExecuteKeyRangesRequ
ctx := callerid.NewContext(callinfo.GRPCCallInfo(stream.Context()),
request.CallerId,
callerid.NewImmediateCallerID("grpc client"))
bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables)
if err != nil {
return vterrors.ToGRPCError(err)
}
vtgErr := vtg.server.StreamExecuteKeyRanges(ctx,
string(request.Query.Sql),
tproto.Proto3ToBindVariables(request.Query.BindVariables),
bv,
request.Keyspace,
request.KeyRanges,
request.TabletType,
func(value *proto.QueryResult) error {
return stream.Send(&pb.StreamExecuteKeyRangesResponse{
Result: mproto.QueryResultToProto3(value.Result),
})
result, err := mproto.QueryResultToProto3(value.Result)
if err != nil {
return err
}
return stream.Send(&pb.StreamExecuteKeyRangesResponse{Result: result})
})
return vterrors.ToGRPCError(vtgErr)
}
@ -336,10 +402,14 @@ func (vtg *VTGate) SplitQuery(ctx context.Context, request *pb.SplitQueryRequest
ctx = callerid.NewContext(callinfo.GRPCCallInfo(ctx),
request.CallerId,
callerid.NewImmediateCallerID("grpc client"))
bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables)
if err != nil {
return nil, vterrors.ToGRPCError(err)
}
splits, vtgErr := vtg.server.SplitQuery(ctx,
request.Keyspace,
string(request.Query.Sql),
tproto.Proto3ToBindVariables(request.Query.BindVariables),
bv,
request.SplitColumn,
int(request.SplitCount))
if vtgErr != nil {

Просмотреть файл

@ -156,18 +156,22 @@ func BoundShardQueriesToProto(bsq []BoundShardQuery) ([]*pb.BoundShardQuery, err
}
// ProtoToBoundShardQueries transforms a list of BoundShardQuery from proto3
func ProtoToBoundShardQueries(bsq []*pb.BoundShardQuery) []BoundShardQuery {
func ProtoToBoundShardQueries(bsq []*pb.BoundShardQuery) ([]BoundShardQuery, error) {
if len(bsq) == 0 {
return nil
return nil, nil
}
result := make([]BoundShardQuery, len(bsq))
for i, q := range bsq {
result[i].Sql = string(q.Query.Sql)
result[i].BindVariables = tproto.Proto3ToBindVariables(q.Query.BindVariables)
bv, err := tproto.Proto3ToBindVariables(q.Query.BindVariables)
if err != nil {
return nil, err
}
result[i].BindVariables = bv
result[i].Keyspace = q.Keyspace
result[i].Shards = q.Shards
}
return result
return result, nil
}
// BoundKeyspaceIdQueriesToProto transforms a list of BoundKeyspaceIdQuery to proto3
@ -191,16 +195,20 @@ func BoundKeyspaceIdQueriesToProto(bsq []BoundKeyspaceIdQuery) ([]*pb.BoundKeysp
}
// ProtoToBoundKeyspaceIdQueries transforms a list of BoundKeyspaceIdQuery from proto3
func ProtoToBoundKeyspaceIdQueries(bsq []*pb.BoundKeyspaceIdQuery) []BoundKeyspaceIdQuery {
func ProtoToBoundKeyspaceIdQueries(bsq []*pb.BoundKeyspaceIdQuery) ([]BoundKeyspaceIdQuery, error) {
if len(bsq) == 0 {
return nil
return nil, nil
}
result := make([]BoundKeyspaceIdQuery, len(bsq))
for i, q := range bsq {
bv, err := tproto.Proto3ToBindVariables(q.Query.BindVariables)
if err != nil {
return nil, err
}
result[i].Sql = string(q.Query.Sql)
result[i].BindVariables = tproto.Proto3ToBindVariables(q.Query.BindVariables)
result[i].BindVariables = bv
result[i].Keyspace = q.Keyspace
result[i].KeyspaceIds = key.ProtoToKeyspaceIds(q.KeyspaceIds)
}
return result
return result, nil
}

Просмотреть файл

@ -64,7 +64,7 @@ func (sq *testQueryService) StreamExecute(ctx context.Context, target *pb.Target
},
mproto.Field{
Name: "msg",
Type: mproto.VT_VARCHAR,
Type: mproto.VT_VAR_STRING,
},
mproto.Field{
Name: "keyspace_id",

Просмотреть файл

@ -56,7 +56,7 @@ func (sq *destinationTabletServer) StreamExecute(ctx context.Context, target *pb
},
mproto.Field{
Name: "msg",
Type: mproto.VT_VARCHAR,
Type: mproto.VT_VAR_STRING,
},
mproto.Field{
Name: "keyspace_id",
@ -115,7 +115,7 @@ func (sq *sourceTabletServer) StreamExecute(ctx context.Context, target *pb.Targ
},
mproto.Field{
Name: "msg",
Type: mproto.VT_VARCHAR,
Type: mproto.VT_VAR_STRING,
},
mproto.Field{
Name: "keyspace_id",

Просмотреть файл

@ -45,7 +45,7 @@ func (sq *sqlDifferTabletServer) StreamExecute(ctx context.Context, target *pb.T
},
mproto.Field{
Name: "msg",
Type: mproto.VT_VARCHAR,
Type: mproto.VT_VAR_STRING,
},
},
}); err != nil {

Просмотреть файл

@ -64,7 +64,7 @@ func (sq *verticalTabletServer) StreamExecute(ctx context.Context, target *pb.Ta
},
mproto.Field{
Name: "msg",
Type: mproto.VT_VARCHAR,
Type: mproto.VT_VAR_STRING,
},
},
}); err != nil {

Просмотреть файл

@ -56,7 +56,7 @@ func (sq *verticalDiffTabletServer) StreamExecute(ctx context.Context, target *p
},
mproto.Field{
Name: "msg",
Type: mproto.VT_VARCHAR,
Type: mproto.VT_VAR_STRING,
},
},
}); err != nil {

Просмотреть файл

@ -20,7 +20,6 @@ VT_TIME = 11
VT_DATETIME = 12
VT_YEAR = 13
VT_NEWDATE = 14
VT_VARCHAR = 15
VT_BIT = 16
VT_NEWDECIMAL = 246
VT_ENUM = 247