diff --git a/go/cmd/vtcombo/tablet_map.go b/go/cmd/vtcombo/tablet_map.go index 0daa8b7a3e..36ea7f34a6 100644 --- a/go/cmd/vtcombo/tablet_map.go +++ b/go/cmd/vtcombo/tablet_map.go @@ -193,7 +193,10 @@ func (itc *internalTabletConn) Execute(ctx context.Context, query string, bindVa if err != nil { return nil, err } - bindVars = tproto.Proto3ToBindVariables(bv) + bindVars, err = tproto.Proto3ToBindVariables(bv) + if err != nil { + return nil, err + } reply := &mproto.QueryResult{} if err := itc.tablet.qsc.QueryService().Execute(ctx, &pbq.Target{ Keyspace: itc.tablet.keyspace, @@ -218,8 +221,12 @@ func (itc *internalTabletConn) ExecuteBatch(ctx context.Context, queries []tprot if err != nil { return nil, err } + bindVars, err := tproto.Proto3ToBindVariables(bv) + if err != nil { + return nil, err + } q[i].Sql = query.Sql - q[i].BindVariables = tproto.Proto3ToBindVariables(bv) + q[i].BindVariables = bindVars } reply := &tproto.QueryResultList{} if err := itc.tablet.qsc.QueryService().ExecuteBatch(ctx, &pbq.Target{ @@ -243,7 +250,10 @@ func (itc *internalTabletConn) StreamExecute(ctx context.Context, query string, if err != nil { return nil, nil, err } - bindVars = tproto.Proto3ToBindVariables(bv) + bindVars, err = tproto.Proto3ToBindVariables(bv) + if err != nil { + return nil, nil, err + } result := make(chan *mproto.QueryResult, 10) var finalErr error diff --git a/go/cmd/vtgateclienttest/services/echo.go b/go/cmd/vtgateclienttest/services/echo.go index 2798bf6d06..2a4025a0c9 100644 --- a/go/cmd/vtgateclienttest/services/echo.go +++ b/go/cmd/vtgateclienttest/services/echo.go @@ -65,7 +65,7 @@ func echoQueryResult(vals map[string]interface{}) *mproto.QueryResult { var row []sqltypes.Value for k, v := range vals { - qr.Fields = append(qr.Fields, mproto.Field{Name: k, Type: mproto.VT_VARCHAR}) + qr.Fields = append(qr.Fields, mproto.Field{Name: k, Type: mproto.VT_VAR_STRING}) val := reflect.ValueOf(v) if val.Kind() == reflect.Map { diff --git a/go/mysql/proto/proto3.go b/go/mysql/proto/proto3.go index c2ab096fb6..dc46e2858a 100644 --- a/go/mysql/proto/proto3.go +++ b/go/mysql/proto/proto3.go @@ -39,21 +39,23 @@ func ProtoToCharset(c *pbb.Charset) *Charset { } // FieldsToProto3 converts an internal []Field to the proto3 version -func FieldsToProto3(f []Field) []*pbq.Field { +func FieldsToProto3(f []Field) ([]*pbq.Field, error) { if len(f) == 0 { - return nil + return nil, nil } result := make([]*pbq.Field, len(f)) for i, f := range f { - // TODO(sougou): handle error. - vitessType, _ := sqltypes.MySQLToType(f.Type, f.Flags) + vitessType, err := sqltypes.MySQLToType(f.Type, f.Flags) + if err != nil { + return nil, err + } result[i] = &pbq.Field{ Name: f.Name, Type: vitessType, } } - return result + return result, nil } // Proto3ToFields converts a proto3 []Fields to an internal data structure. @@ -124,16 +126,20 @@ func Proto3ToRows(rows []*pbq.Row) [][]sqltypes.Value { } // QueryResultToProto3 converts an internal QueryResult to the proto3 version -func QueryResultToProto3(qr *QueryResult) *pbq.QueryResult { +func QueryResultToProto3(qr *QueryResult) (*pbq.QueryResult, error) { if qr == nil { - return nil + return nil, nil + } + fields, err := FieldsToProto3(qr.Fields) + if err != nil { + return nil, err } return &pbq.QueryResult{ - Fields: FieldsToProto3(qr.Fields), + Fields: fields, RowsAffected: qr.RowsAffected, InsertId: qr.InsertId, Rows: RowsToProto3(qr.Rows), - } + }, nil } // Proto3ToQueryResult converts a proto3 QueryResult to an internal data structure. diff --git a/go/mysql/proto/structs.go b/go/mysql/proto/structs.go index 339d5ebd9f..d497a7de1a 100644 --- a/go/mysql/proto/structs.go +++ b/go/mysql/proto/structs.go @@ -27,7 +27,6 @@ const ( VT_DATETIME = 12 VT_YEAR = 13 VT_NEWDATE = 14 - VT_VARCHAR = 15 VT_BIT = 16 VT_NEWDECIMAL = 246 VT_ENUM = 247 diff --git a/go/sqltypes/type_test.go b/go/sqltypes/type_test.go index d06250ee48..c31a1f5061 100644 --- a/go/sqltypes/type_test.go +++ b/go/sqltypes/type_test.go @@ -114,14 +114,14 @@ func TestTypeToMySQL(t *testing.T) { if v != 16 { t.Errorf("Bit: %d, want 16", v) } - if f != mysqlUnsigned>>8 { + if f != mysqlUnsigned>>16 { t.Errorf("Bit flag: %x, want %x", f, mysqlUnsigned>>8) } v, f = TypeToMySQL(Date) if v != 10 { t.Errorf("Bit: %d, want 10", v) } - if f != mysqlBinary>>8 { + if f != mysqlBinary>>16 { t.Errorf("Bit flag: %x, want %x", f, mysqlBinary>>8) } } diff --git a/go/vt/binlog/event_streamer.go b/go/vt/binlog/event_streamer.go index 1685ebaa46..cc3b430522 100644 --- a/go/vt/binlog/event_streamer.go +++ b/go/vt/binlog/event_streamer.go @@ -263,8 +263,8 @@ func parsePkTuple(tokenizer *sqlparser.Tokenizer, insertid int64, fields []mprot switch fields[index].Type { case mproto.VT_DECIMAL: // we haven't updated the type yet - fields[index].Type = mproto.VT_VARCHAR - case mproto.VT_VARCHAR: + fields[index].Type = mproto.VT_VAR_STRING + case mproto.VT_VAR_STRING: // nothing to do there default: // we already set this to something incompatible! diff --git a/go/vt/binlog/event_streamer_test.go b/go/vt/binlog/event_streamer_test.go index ebe5d85522..29079e2aa4 100644 --- a/go/vt/binlog/event_streamer_test.go +++ b/go/vt/binlog/event_streamer_test.go @@ -107,7 +107,7 @@ func TestDMLEvent(t *testing.T) { sendEvent: func(event *proto.StreamEvent) error { switch event.Category { case "DML": - want := `&{DML _table_ [{eid 8 0} {id 8 0} {name 15 0}] [[10 -1 name] [11 18446744073709551615 name]] 1 }` + want := `&{DML _table_ [{eid 8 0} {id 8 0} {name 253 0}] [[10 -1 name] [11 18446744073709551615 name]] 1 }` got := fmt.Sprintf("%v", event) if got != want { t.Errorf("got \n%s, want \n%s", got, want) diff --git a/go/vt/binlog/grpcbinlogstreamer/streamer.go b/go/vt/binlog/grpcbinlogstreamer/streamer.go index 980dca0334..13bcf64f77 100644 --- a/go/vt/binlog/grpcbinlogstreamer/streamer.go +++ b/go/vt/binlog/grpcbinlogstreamer/streamer.go @@ -31,8 +31,12 @@ func New(updateStream proto.UpdateStream) *UpdateStream { func (server *UpdateStream) StreamUpdate(req *pb.StreamUpdateRequest, stream pbs.UpdateStream_StreamUpdateServer) (err error) { defer server.updateStream.HandlePanic(&err) return server.updateStream.ServeUpdateStream(req.Position, func(reply *proto.StreamEvent) error { + event, err := proto.StreamEventToProto(reply) + if err != nil { + return err + } return stream.Send(&pb.StreamUpdateResponse{ - StreamEvent: proto.StreamEventToProto(reply), + StreamEvent: event, }) }) } diff --git a/go/vt/binlog/proto/proto3.go b/go/vt/binlog/proto/proto3.go index eabdee1d6d..8585bfce39 100644 --- a/go/vt/binlog/proto/proto3.go +++ b/go/vt/binlog/proto/proto3.go @@ -19,10 +19,14 @@ import ( // structures internally, and this will be obsolete. // StreamEventToProto converts a StreamEvent to a proto3 -func StreamEventToProto(s *StreamEvent) *pb.StreamEvent { +func StreamEventToProto(s *StreamEvent) (*pb.StreamEvent, error) { + fields, err := mproto.FieldsToProto3(s.PrimaryKeyFields) + if err != nil { + return nil, err + } result := &pb.StreamEvent{ TableName: s.TableName, - PrimaryKeyFields: mproto.FieldsToProto3(s.PrimaryKeyFields), + PrimaryKeyFields: fields, PrimaryKeyValues: mproto.RowsToProto3(s.PrimaryKeyValues), Sql: s.Sql, Timestamp: s.Timestamp, @@ -38,7 +42,7 @@ func StreamEventToProto(s *StreamEvent) *pb.StreamEvent { default: result.Category = pb.StreamEvent_SE_ERR } - return result + return result, nil } // ProtoToStreamEvent converts a proto to a StreamEvent diff --git a/go/vt/binlog/proto/stream_event_test.go b/go/vt/binlog/proto/stream_event_test.go index 505199c144..48d366c714 100644 --- a/go/vt/binlog/proto/stream_event_test.go +++ b/go/vt/binlog/proto/stream_event_test.go @@ -41,7 +41,7 @@ func TestStreamEvent(t *testing.T) { PrimaryKeyFields: []mproto.Field{ mproto.Field{ Name: "str2", - Type: mproto.VT_VARCHAR, + Type: mproto.VT_VAR_STRING, }, mproto.Field{ Name: "str3", @@ -78,7 +78,7 @@ func TestStreamEvent(t *testing.T) { PrimaryKeyFields: []mproto.Field{ mproto.Field{ Name: "str2", - Type: mproto.VT_VARCHAR, + Type: mproto.VT_VAR_STRING, }, mproto.Field{ Name: "str3", diff --git a/go/vt/tabletmanager/grpctmserver/server.go b/go/vt/tabletmanager/grpctmserver/server.go index aafb1941c7..13db1e23c7 100644 --- a/go/vt/tabletmanager/grpctmserver/server.go +++ b/go/vt/tabletmanager/grpctmserver/server.go @@ -190,9 +190,10 @@ func (s *server) ExecuteFetchAsDba(ctx context.Context, request *pb.ExecuteFetch response := &pb.ExecuteFetchAsDbaResponse{} return response, s.agent.RPCWrap(ctx, actionnode.TabletActionExecuteFetchAsDba, request, response, func() error { qr, err := s.agent.ExecuteFetchAsDba(ctx, request.Query, request.DbName, int(request.MaxRows), request.WantFields, request.DisableBinlogs, request.ReloadSchema) - if err == nil { - response.Result = mproto.QueryResultToProto3(qr) + if err != nil { + return err } + response.Result, err = mproto.QueryResultToProto3(qr) return err }) } @@ -202,9 +203,10 @@ func (s *server) ExecuteFetchAsApp(ctx context.Context, request *pb.ExecuteFetch response := &pb.ExecuteFetchAsAppResponse{} return response, s.agent.RPCWrap(ctx, actionnode.TabletActionExecuteFetchAsApp, request, response, func() error { qr, err := s.agent.ExecuteFetchAsApp(ctx, request.Query, int(request.MaxRows), request.WantFields) - if err == nil { - response.Result = mproto.QueryResultToProto3(qr) + if err != nil { + return err } + response.Result, err = mproto.QueryResultToProto3(qr) return err }) } diff --git a/go/vt/tabletserver/grpcqueryservice/server.go b/go/vt/tabletserver/grpcqueryservice/server.go index 6925a110e9..fa8389816e 100644 --- a/go/vt/tabletserver/grpcqueryservice/server.go +++ b/go/vt/tabletserver/grpcqueryservice/server.go @@ -53,17 +53,23 @@ func (q *query) Execute(ctx context.Context, request *pb.ExecuteRequest) (respon request.ImmediateCallerId, ) reply := new(mproto.QueryResult) + bv, err := proto.Proto3ToBindVariables(request.Query.BindVariables) + if err != nil { + return nil, tabletserver.ToGRPCError(err) + } if err := q.server.Execute(ctx, request.Target, &proto.Query{ Sql: request.Query.Sql, - BindVariables: proto.Proto3ToBindVariables(request.Query.BindVariables), + BindVariables: bv, SessionId: request.SessionId, TransactionId: request.TransactionId, }, reply); err != nil { return nil, tabletserver.ToGRPCError(err) } - return &pb.ExecuteResponse{ - Result: mproto.QueryResultToProto3(reply), - }, nil + result, err := mproto.QueryResultToProto3(reply) + if err != nil { + return nil, tabletserver.ToGRPCError(err) + } + return &pb.ExecuteResponse{Result: result}, nil } // ExecuteBatch is part of the queryservice.QueryServer interface @@ -74,17 +80,23 @@ func (q *query) ExecuteBatch(ctx context.Context, request *pb.ExecuteBatchReques request.ImmediateCallerId, ) reply := new(proto.QueryResultList) + bql, err := proto.Proto3ToBoundQueryList(request.Queries) + if err != nil { + return nil, tabletserver.ToGRPCError(err) + } if err := q.server.ExecuteBatch(ctx, request.Target, &proto.QueryList{ - Queries: proto.Proto3ToBoundQueryList(request.Queries), + Queries: bql, SessionId: request.SessionId, AsTransaction: request.AsTransaction, TransactionId: request.TransactionId, }, reply); err != nil { return nil, tabletserver.ToGRPCError(err) } - return &pb.ExecuteBatchResponse{ - Results: proto.QueryResultListToProto3(reply.List), - }, nil + results, err := proto.QueryResultListToProto3(reply.List) + if err != nil { + return nil, tabletserver.ToGRPCError(err) + } + return &pb.ExecuteBatchResponse{Results: results}, nil } // StreamExecute is part of the queryservice.QueryServer interface @@ -94,14 +106,20 @@ func (q *query) StreamExecute(request *pb.StreamExecuteRequest, stream pbs.Query request.EffectiveCallerId, request.ImmediateCallerId, ) + bv, err := proto.Proto3ToBindVariables(request.Query.BindVariables) + if err != nil { + return tabletserver.ToGRPCError(err) + } if err := q.server.StreamExecute(ctx, request.Target, &proto.Query{ Sql: request.Query.Sql, - BindVariables: proto.Proto3ToBindVariables(request.Query.BindVariables), + BindVariables: bv, SessionId: request.SessionId, }, func(reply *mproto.QueryResult) error { - return stream.Send(&pb.StreamExecuteResponse{ - Result: mproto.QueryResultToProto3(reply), - }) + result, err := mproto.QueryResultToProto3(reply) + if err != nil { + return err + } + return stream.Send(&pb.StreamExecuteResponse{Result: result}) }); err != nil { return tabletserver.ToGRPCError(err) } @@ -168,8 +186,12 @@ func (q *query) SplitQuery(ctx context.Context, request *pb.SplitQueryRequest) ( request.ImmediateCallerId, ) reply := &proto.SplitQueryResult{} + bq, err := proto.Proto3ToBoundQuery(request.Query) + if err != nil { + return nil, tabletserver.ToGRPCError(err) + } if err := q.server.SplitQuery(ctx, request.Target, &proto.SplitQueryRequest{ - Query: *proto.Proto3ToBoundQuery(request.Query), + Query: *bq, SplitColumn: request.SplitColumn, SplitCount: int(request.SplitCount), SessionID: request.SessionId, @@ -178,7 +200,7 @@ func (q *query) SplitQuery(ctx context.Context, request *pb.SplitQueryRequest) ( } qs, err := proto.QuerySplitsToProto3(reply.Queries) if err != nil { - return nil, err + return nil, tabletserver.ToGRPCError(err) } return &pb.SplitQueryResponse{Queries: qs}, nil } diff --git a/go/vt/tabletserver/grpctabletconn/conn.go b/go/vt/tabletserver/grpctabletconn/conn.go index 5bbc2b0970..5fbd310e14 100644 --- a/go/vt/tabletserver/grpctabletconn/conn.go +++ b/go/vt/tabletserver/grpctabletconn/conn.go @@ -288,7 +288,7 @@ func (conn *gRPCQueryClient) SplitQuery(ctx context.Context, query tproto.BoundQ q, err := tproto.BoundQueryToProto3(query.Sql, query.BindVariables) if err != nil { - return nil, err + return nil, tabletconn.TabletErrorFromGRPC(err) } req := &pb.SplitQueryRequest{ Target: conn.target, @@ -303,7 +303,11 @@ func (conn *gRPCQueryClient) SplitQuery(ctx context.Context, query tproto.BoundQ if err != nil { return nil, tabletconn.TabletErrorFromGRPC(err) } - return tproto.Proto3ToQuerySplits(sqr.Queries), nil + split, err := tproto.Proto3ToQuerySplits(sqr.Queries) + if err != nil { + return nil, tabletconn.TabletErrorFromGRPC(err) + } + return split, nil } // StreamHealth is the stub for TabletServer.StreamHealth RPC diff --git a/go/vt/tabletserver/proto/proto3.go b/go/vt/tabletserver/proto/proto3.go index dc4aa4bac7..94e2b3ef4b 100644 --- a/go/vt/tabletserver/proto/proto3.go +++ b/go/vt/tabletserver/proto/proto3.go @@ -219,32 +219,40 @@ func buildValue(v interface{}) (pb.Value, error) { } // Proto3ToBoundQuery converts a proto.BoundQuery to the internal data structure -func Proto3ToBoundQuery(query *pb.BoundQuery) *BoundQuery { +func Proto3ToBoundQuery(query *pb.BoundQuery) (*BoundQuery, error) { + bv, err := Proto3ToBindVariables(query.BindVariables) + if err != nil { + return nil, err + } return &BoundQuery{ Sql: string(query.Sql), - BindVariables: Proto3ToBindVariables(query.BindVariables), - } + BindVariables: bv, + }, nil } // Proto3ToBoundQueryList converts am array of proto.BoundQuery to the internal data structure -func Proto3ToBoundQueryList(queries []*pb.BoundQuery) []BoundQuery { +func Proto3ToBoundQueryList(queries []*pb.BoundQuery) ([]BoundQuery, error) { if len(queries) == 0 { - return nil + return nil, nil } result := make([]BoundQuery, len(queries)) for i, q := range queries { - result[i] = *Proto3ToBoundQuery(q) + res, err := Proto3ToBoundQuery(q) + if err != nil { + return nil, err + } + result[i] = *res } - return result + return result, nil } // Proto3ToBindVariables converts a proto.BinVariable map to internal data structure -func Proto3ToBindVariables(bv map[string]*pb.BindVariable) map[string]interface{} { - // TODO(sougou): handle error +func Proto3ToBindVariables(bv map[string]*pb.BindVariable) (map[string]interface{}, error) { if len(bv) == 0 { - return nil + return nil, nil } result := make(map[string]interface{}) + var err error for k, v := range bv { if v.Type == sqltypes.Tuple { list := make([]interface{}, len(v.Values)) @@ -253,14 +261,20 @@ func Proto3ToBindVariables(bv map[string]*pb.BindVariable) map[string]interface{ Type: lv.Type, Value: lv.Value, } - list[i], _ = buildSQLValue(asbind) + list[i], err = buildSQLValue(asbind) + if err != nil { + return nil, err + } } result[k] = list } else { - result[k], _ = buildSQLValue(v) + result[k], err = buildSQLValue(v) + if err != nil { + return nil, err + } } } - return result + return result, nil } func buildSQLValue(v *pb.BindVariable) (interface{}, error) { @@ -290,30 +304,38 @@ func Proto3ToQueryResultList(results []*pb.QueryResult) *QueryResultList { } // QueryResultListToProto3 changes the internal array of QueryResult to the proto3 version -func QueryResultListToProto3(results []mproto.QueryResult) []*pb.QueryResult { +func QueryResultListToProto3(results []mproto.QueryResult) ([]*pb.QueryResult, error) { if len(results) == 0 { - return nil + return nil, nil } result := make([]*pb.QueryResult, len(results)) + var err error for i := range results { - result[i] = mproto.QueryResultToProto3(&results[i]) + result[i], err = mproto.QueryResultToProto3(&results[i]) + if err != nil { + return nil, err + } } - return result + return result, nil } // Proto3ToQuerySplits converts a proto3 QuerySplit array to a native QuerySplit array -func Proto3ToQuerySplits(queries []*pb.QuerySplit) []QuerySplit { +func Proto3ToQuerySplits(queries []*pb.QuerySplit) ([]QuerySplit, error) { if len(queries) == 0 { - return nil + return nil, nil } result := make([]QuerySplit, len(queries)) for i, qs := range queries { + res, err := Proto3ToBoundQuery(qs.Query) + if err != nil { + return nil, err + } result[i] = QuerySplit{ - Query: *Proto3ToBoundQuery(qs.Query), + Query: *res, RowCount: qs.RowCount, } } - return result + return result, nil } // QuerySplitsToProto3 converts a native QuerySplit array to the proto3 version diff --git a/go/vt/tabletserver/query_splitter.go b/go/vt/tabletserver/query_splitter.go index 643ac6f905..78713a3fe4 100644 --- a/go/vt/tabletserver/query_splitter.go +++ b/go/vt/tabletserver/query_splitter.go @@ -217,7 +217,7 @@ func (qs *QuerySplitter) splitBoundaries(columnType int64, pkMinMax *mproto.Quer return qs.splitBoundariesIntColumn(pkMinMax) case mproto.VT_FLOAT, mproto.VT_DOUBLE: return qs.splitBoundariesFloatColumn(pkMinMax) - case mproto.VT_VARCHAR, mproto.VT_BIT, mproto.VT_VAR_STRING, mproto.VT_STRING: + case mproto.VT_BIT, mproto.VT_VAR_STRING, mproto.VT_STRING: return qs.splitBoundariesStringColumn() } return []sqltypes.Value{}, nil diff --git a/go/vt/vtgate/grpcvtgateservice/server.go b/go/vt/vtgate/grpcvtgateservice/server.go index 80fd1e83df..10ad95e5a2 100644 --- a/go/vt/vtgate/grpcvtgateservice/server.go +++ b/go/vt/vtgate/grpcvtgateservice/server.go @@ -35,14 +35,20 @@ func (vtg *VTGate) Execute(ctx context.Context, request *pb.ExecuteRequest) (res request.CallerId, callerid.NewImmediateCallerID("grpc client")) reply := new(proto.QueryResult) - executeErr := vtg.server.Execute(ctx, string(request.Query.Sql), tproto.Proto3ToBindVariables(request.Query.BindVariables), request.TabletType, proto.ProtoToSession(request.Session), request.NotInTransaction, reply) + bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables) + if err != nil { + return nil, vterrors.ToGRPCError(err) + } + executeErr := vtg.server.Execute(ctx, string(request.Query.Sql), bv, request.TabletType, proto.ProtoToSession(request.Session), request.NotInTransaction, reply) response = &pb.ExecuteResponse{ Error: vtgate.RPCErrorToVtRPCError(reply.Err), } if executeErr == nil { - response.Result = mproto.QueryResultToProto3(reply.Result) - response.Session = proto.SessionToProto(reply.Session) - return response, nil + response.Result, executeErr = mproto.QueryResultToProto3(reply.Result) + if executeErr == nil { + response.Session = proto.SessionToProto(reply.Session) + return response, nil + } } return nil, vterrors.ToGRPCError(executeErr) } @@ -54,9 +60,13 @@ func (vtg *VTGate) ExecuteShards(ctx context.Context, request *pb.ExecuteShardsR request.CallerId, callerid.NewImmediateCallerID("grpc client")) reply := new(proto.QueryResult) + bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables) + if err != nil { + return nil, vterrors.ToGRPCError(err) + } executeErr := vtg.server.ExecuteShards(ctx, string(request.Query.Sql), - tproto.Proto3ToBindVariables(request.Query.BindVariables), + bv, request.Keyspace, request.Shards, request.TabletType, @@ -67,9 +77,11 @@ func (vtg *VTGate) ExecuteShards(ctx context.Context, request *pb.ExecuteShardsR Error: vtgate.RPCErrorToVtRPCError(reply.Err), } if executeErr == nil { - response.Result = mproto.QueryResultToProto3(reply.Result) - response.Session = proto.SessionToProto(reply.Session) - return response, nil + response.Result, executeErr = mproto.QueryResultToProto3(reply.Result) + if executeErr == nil { + response.Session = proto.SessionToProto(reply.Session) + return response, nil + } } return nil, vterrors.ToGRPCError(executeErr) } @@ -81,9 +93,13 @@ func (vtg *VTGate) ExecuteKeyspaceIds(ctx context.Context, request *pb.ExecuteKe request.CallerId, callerid.NewImmediateCallerID("grpc client")) reply := new(proto.QueryResult) + bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables) + if err != nil { + return nil, vterrors.ToGRPCError(err) + } executeErr := vtg.server.ExecuteKeyspaceIds(ctx, string(request.Query.Sql), - tproto.Proto3ToBindVariables(request.Query.BindVariables), + bv, request.Keyspace, request.KeyspaceIds, request.TabletType, @@ -94,9 +110,11 @@ func (vtg *VTGate) ExecuteKeyspaceIds(ctx context.Context, request *pb.ExecuteKe Error: vtgate.RPCErrorToVtRPCError(reply.Err), } if executeErr == nil { - response.Result = mproto.QueryResultToProto3(reply.Result) - response.Session = proto.SessionToProto(reply.Session) - return response, nil + response.Result, executeErr = mproto.QueryResultToProto3(reply.Result) + if executeErr == nil { + response.Session = proto.SessionToProto(reply.Session) + return response, nil + } } return nil, vterrors.ToGRPCError(executeErr) } @@ -108,9 +126,13 @@ func (vtg *VTGate) ExecuteKeyRanges(ctx context.Context, request *pb.ExecuteKeyR request.CallerId, callerid.NewImmediateCallerID("grpc client")) reply := new(proto.QueryResult) + bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables) + if err != nil { + return nil, vterrors.ToGRPCError(err) + } executeErr := vtg.server.ExecuteKeyRanges(ctx, string(request.Query.Sql), - tproto.Proto3ToBindVariables(request.Query.BindVariables), + bv, request.Keyspace, request.KeyRanges, request.TabletType, @@ -121,9 +143,11 @@ func (vtg *VTGate) ExecuteKeyRanges(ctx context.Context, request *pb.ExecuteKeyR Error: vtgate.RPCErrorToVtRPCError(reply.Err), } if executeErr == nil { - response.Result = mproto.QueryResultToProto3(reply.Result) - response.Session = proto.SessionToProto(reply.Session) - return response, nil + response.Result, executeErr = mproto.QueryResultToProto3(reply.Result) + if executeErr == nil { + response.Session = proto.SessionToProto(reply.Session) + return response, nil + } } return nil, vterrors.ToGRPCError(executeErr) } @@ -135,9 +159,13 @@ func (vtg *VTGate) ExecuteEntityIds(ctx context.Context, request *pb.ExecuteEnti request.CallerId, callerid.NewImmediateCallerID("grpc client")) reply := new(proto.QueryResult) + bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables) + if err != nil { + return nil, vterrors.ToGRPCError(err) + } executeErr := vtg.server.ExecuteEntityIds(ctx, string(request.Query.Sql), - tproto.Proto3ToBindVariables(request.Query.BindVariables), + bv, request.Keyspace, request.EntityColumnName, request.EntityKeyspaceIds, @@ -149,9 +177,11 @@ func (vtg *VTGate) ExecuteEntityIds(ctx context.Context, request *pb.ExecuteEnti Error: vtgate.RPCErrorToVtRPCError(reply.Err), } if executeErr == nil { - response.Result = mproto.QueryResultToProto3(reply.Result) - response.Session = proto.SessionToProto(reply.Session) - return response, nil + response.Result, executeErr = mproto.QueryResultToProto3(reply.Result) + if executeErr == nil { + response.Session = proto.SessionToProto(reply.Session) + return response, nil + } } return nil, vterrors.ToGRPCError(executeErr) } @@ -163,8 +193,12 @@ func (vtg *VTGate) ExecuteBatchShards(ctx context.Context, request *pb.ExecuteBa request.CallerId, callerid.NewImmediateCallerID("grpc client")) reply := new(proto.QueryResultList) + bsq, err := proto.ProtoToBoundShardQueries(request.Queries) + if err != nil { + return nil, vterrors.ToGRPCError(err) + } executeErr := vtg.server.ExecuteBatchShards(ctx, - proto.ProtoToBoundShardQueries(request.Queries), + bsq, request.TabletType, request.AsTransaction, proto.ProtoToSession(request.Session), @@ -173,9 +207,11 @@ func (vtg *VTGate) ExecuteBatchShards(ctx context.Context, request *pb.ExecuteBa Error: vtgate.RPCErrorToVtRPCError(reply.Err), } if executeErr == nil { - response.Results = tproto.QueryResultListToProto3(reply.List) - response.Session = proto.SessionToProto(reply.Session) - return response, nil + response.Results, executeErr = tproto.QueryResultListToProto3(reply.List) + if executeErr == nil { + response.Session = proto.SessionToProto(reply.Session) + return response, nil + } } return nil, vterrors.ToGRPCError(executeErr) } @@ -188,8 +224,12 @@ func (vtg *VTGate) ExecuteBatchKeyspaceIds(ctx context.Context, request *pb.Exec request.CallerId, callerid.NewImmediateCallerID("grpc client")) reply := new(proto.QueryResultList) + bq, err := proto.ProtoToBoundKeyspaceIdQueries(request.Queries) + if err != nil { + return nil, vterrors.ToGRPCError(err) + } executeErr := vtg.server.ExecuteBatchKeyspaceIds(ctx, - proto.ProtoToBoundKeyspaceIdQueries(request.Queries), + bq, request.TabletType, request.AsTransaction, proto.ProtoToSession(request.Session), @@ -198,9 +238,11 @@ func (vtg *VTGate) ExecuteBatchKeyspaceIds(ctx context.Context, request *pb.Exec Error: vtgate.RPCErrorToVtRPCError(reply.Err), } if executeErr == nil { - response.Results = tproto.QueryResultListToProto3(reply.List) - response.Session = proto.SessionToProto(reply.Session) - return response, nil + response.Results, executeErr = tproto.QueryResultListToProto3(reply.List) + if executeErr == nil { + response.Session = proto.SessionToProto(reply.Session) + return response, nil + } } return nil, vterrors.ToGRPCError(executeErr) } @@ -211,14 +253,20 @@ func (vtg *VTGate) StreamExecute(request *pb.StreamExecuteRequest, stream pbs.Vi ctx := callerid.NewContext(callinfo.GRPCCallInfo(stream.Context()), request.CallerId, callerid.NewImmediateCallerID("grpc client")) + bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables) + if err != nil { + return vterrors.ToGRPCError(err) + } vtgErr := vtg.server.StreamExecute(ctx, string(request.Query.Sql), - tproto.Proto3ToBindVariables(request.Query.BindVariables), + bv, request.TabletType, func(value *proto.QueryResult) error { - return stream.Send(&pb.StreamExecuteResponse{ - Result: mproto.QueryResultToProto3(value.Result), - }) + result, err := mproto.QueryResultToProto3(value.Result) + if err != nil { + return err + } + return stream.Send(&pb.StreamExecuteResponse{Result: result}) }) return vterrors.ToGRPCError(vtgErr) } @@ -229,16 +277,22 @@ func (vtg *VTGate) StreamExecuteShards(request *pb.StreamExecuteShardsRequest, s ctx := callerid.NewContext(callinfo.GRPCCallInfo(stream.Context()), request.CallerId, callerid.NewImmediateCallerID("grpc client")) + bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables) + if err != nil { + return vterrors.ToGRPCError(err) + } vtgErr := vtg.server.StreamExecuteShards(ctx, string(request.Query.Sql), - tproto.Proto3ToBindVariables(request.Query.BindVariables), + bv, request.Keyspace, request.Shards, request.TabletType, func(value *proto.QueryResult) error { - return stream.Send(&pb.StreamExecuteShardsResponse{ - Result: mproto.QueryResultToProto3(value.Result), - }) + result, err := mproto.QueryResultToProto3(value.Result) + if err != nil { + return err + } + return stream.Send(&pb.StreamExecuteShardsResponse{Result: result}) }) return vterrors.ToGRPCError(vtgErr) } @@ -250,16 +304,22 @@ func (vtg *VTGate) StreamExecuteKeyspaceIds(request *pb.StreamExecuteKeyspaceIds ctx := callerid.NewContext(callinfo.GRPCCallInfo(stream.Context()), request.CallerId, callerid.NewImmediateCallerID("grpc client")) + bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables) + if err != nil { + return vterrors.ToGRPCError(err) + } vtgErr := vtg.server.StreamExecuteKeyspaceIds(ctx, string(request.Query.Sql), - tproto.Proto3ToBindVariables(request.Query.BindVariables), + bv, request.Keyspace, request.KeyspaceIds, request.TabletType, func(value *proto.QueryResult) error { - return stream.Send(&pb.StreamExecuteKeyspaceIdsResponse{ - Result: mproto.QueryResultToProto3(value.Result), - }) + result, err := mproto.QueryResultToProto3(value.Result) + if err != nil { + return err + } + return stream.Send(&pb.StreamExecuteKeyspaceIdsResponse{Result: result}) }) return vterrors.ToGRPCError(vtgErr) } @@ -271,16 +331,22 @@ func (vtg *VTGate) StreamExecuteKeyRanges(request *pb.StreamExecuteKeyRangesRequ ctx := callerid.NewContext(callinfo.GRPCCallInfo(stream.Context()), request.CallerId, callerid.NewImmediateCallerID("grpc client")) + bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables) + if err != nil { + return vterrors.ToGRPCError(err) + } vtgErr := vtg.server.StreamExecuteKeyRanges(ctx, string(request.Query.Sql), - tproto.Proto3ToBindVariables(request.Query.BindVariables), + bv, request.Keyspace, request.KeyRanges, request.TabletType, func(value *proto.QueryResult) error { - return stream.Send(&pb.StreamExecuteKeyRangesResponse{ - Result: mproto.QueryResultToProto3(value.Result), - }) + result, err := mproto.QueryResultToProto3(value.Result) + if err != nil { + return err + } + return stream.Send(&pb.StreamExecuteKeyRangesResponse{Result: result}) }) return vterrors.ToGRPCError(vtgErr) } @@ -336,10 +402,14 @@ func (vtg *VTGate) SplitQuery(ctx context.Context, request *pb.SplitQueryRequest ctx = callerid.NewContext(callinfo.GRPCCallInfo(ctx), request.CallerId, callerid.NewImmediateCallerID("grpc client")) + bv, err := tproto.Proto3ToBindVariables(request.Query.BindVariables) + if err != nil { + return nil, vterrors.ToGRPCError(err) + } splits, vtgErr := vtg.server.SplitQuery(ctx, request.Keyspace, string(request.Query.Sql), - tproto.Proto3ToBindVariables(request.Query.BindVariables), + bv, request.SplitColumn, int(request.SplitCount)) if vtgErr != nil { diff --git a/go/vt/vtgate/proto/proto3.go b/go/vt/vtgate/proto/proto3.go index 739ba460e0..a8d117fb68 100644 --- a/go/vt/vtgate/proto/proto3.go +++ b/go/vt/vtgate/proto/proto3.go @@ -156,18 +156,22 @@ func BoundShardQueriesToProto(bsq []BoundShardQuery) ([]*pb.BoundShardQuery, err } // ProtoToBoundShardQueries transforms a list of BoundShardQuery from proto3 -func ProtoToBoundShardQueries(bsq []*pb.BoundShardQuery) []BoundShardQuery { +func ProtoToBoundShardQueries(bsq []*pb.BoundShardQuery) ([]BoundShardQuery, error) { if len(bsq) == 0 { - return nil + return nil, nil } result := make([]BoundShardQuery, len(bsq)) for i, q := range bsq { result[i].Sql = string(q.Query.Sql) - result[i].BindVariables = tproto.Proto3ToBindVariables(q.Query.BindVariables) + bv, err := tproto.Proto3ToBindVariables(q.Query.BindVariables) + if err != nil { + return nil, err + } + result[i].BindVariables = bv result[i].Keyspace = q.Keyspace result[i].Shards = q.Shards } - return result + return result, nil } // BoundKeyspaceIdQueriesToProto transforms a list of BoundKeyspaceIdQuery to proto3 @@ -191,16 +195,20 @@ func BoundKeyspaceIdQueriesToProto(bsq []BoundKeyspaceIdQuery) ([]*pb.BoundKeysp } // ProtoToBoundKeyspaceIdQueries transforms a list of BoundKeyspaceIdQuery from proto3 -func ProtoToBoundKeyspaceIdQueries(bsq []*pb.BoundKeyspaceIdQuery) []BoundKeyspaceIdQuery { +func ProtoToBoundKeyspaceIdQueries(bsq []*pb.BoundKeyspaceIdQuery) ([]BoundKeyspaceIdQuery, error) { if len(bsq) == 0 { - return nil + return nil, nil } result := make([]BoundKeyspaceIdQuery, len(bsq)) for i, q := range bsq { + bv, err := tproto.Proto3ToBindVariables(q.Query.BindVariables) + if err != nil { + return nil, err + } result[i].Sql = string(q.Query.Sql) - result[i].BindVariables = tproto.Proto3ToBindVariables(q.Query.BindVariables) + result[i].BindVariables = bv result[i].Keyspace = q.Keyspace result[i].KeyspaceIds = key.ProtoToKeyspaceIds(q.KeyspaceIds) } - return result + return result, nil } diff --git a/go/vt/worker/split_clone_test.go b/go/vt/worker/split_clone_test.go index 9895663d90..f8ff2f02d9 100644 --- a/go/vt/worker/split_clone_test.go +++ b/go/vt/worker/split_clone_test.go @@ -64,7 +64,7 @@ func (sq *testQueryService) StreamExecute(ctx context.Context, target *pb.Target }, mproto.Field{ Name: "msg", - Type: mproto.VT_VARCHAR, + Type: mproto.VT_VAR_STRING, }, mproto.Field{ Name: "keyspace_id", diff --git a/go/vt/worker/split_diff_test.go b/go/vt/worker/split_diff_test.go index 019031c133..f6379a5ab1 100644 --- a/go/vt/worker/split_diff_test.go +++ b/go/vt/worker/split_diff_test.go @@ -56,7 +56,7 @@ func (sq *destinationTabletServer) StreamExecute(ctx context.Context, target *pb }, mproto.Field{ Name: "msg", - Type: mproto.VT_VARCHAR, + Type: mproto.VT_VAR_STRING, }, mproto.Field{ Name: "keyspace_id", @@ -115,7 +115,7 @@ func (sq *sourceTabletServer) StreamExecute(ctx context.Context, target *pb.Targ }, mproto.Field{ Name: "msg", - Type: mproto.VT_VARCHAR, + Type: mproto.VT_VAR_STRING, }, mproto.Field{ Name: "keyspace_id", diff --git a/go/vt/worker/sqldiffer_test.go b/go/vt/worker/sqldiffer_test.go index 638d82789f..b30e50358d 100644 --- a/go/vt/worker/sqldiffer_test.go +++ b/go/vt/worker/sqldiffer_test.go @@ -45,7 +45,7 @@ func (sq *sqlDifferTabletServer) StreamExecute(ctx context.Context, target *pb.T }, mproto.Field{ Name: "msg", - Type: mproto.VT_VARCHAR, + Type: mproto.VT_VAR_STRING, }, }, }); err != nil { diff --git a/go/vt/worker/vertical_split_clone_test.go b/go/vt/worker/vertical_split_clone_test.go index df75c6d0bc..d1f06d091f 100644 --- a/go/vt/worker/vertical_split_clone_test.go +++ b/go/vt/worker/vertical_split_clone_test.go @@ -64,7 +64,7 @@ func (sq *verticalTabletServer) StreamExecute(ctx context.Context, target *pb.Ta }, mproto.Field{ Name: "msg", - Type: mproto.VT_VARCHAR, + Type: mproto.VT_VAR_STRING, }, }, }); err != nil { diff --git a/go/vt/worker/vertical_split_diff_test.go b/go/vt/worker/vertical_split_diff_test.go index 4fecaf14b9..5a80bcddbf 100644 --- a/go/vt/worker/vertical_split_diff_test.go +++ b/go/vt/worker/vertical_split_diff_test.go @@ -56,7 +56,7 @@ func (sq *verticalDiffTabletServer) StreamExecute(ctx context.Context, target *p }, mproto.Field{ Name: "msg", - Type: mproto.VT_VARCHAR, + Type: mproto.VT_VAR_STRING, }, }, }); err != nil { diff --git a/py/vtdb/field_types.py b/py/vtdb/field_types.py index d70b29d998..1cf4d24ca1 100755 --- a/py/vtdb/field_types.py +++ b/py/vtdb/field_types.py @@ -20,7 +20,6 @@ VT_TIME = 11 VT_DATETIME = 12 VT_YEAR = 13 VT_NEWDATE = 14 -VT_VARCHAR = 15 VT_BIT = 16 VT_NEWDECIMAL = 246 VT_ENUM = 247