bsongen: mysql/proto & some vtgate/proto

This commit is contained in:
Sugu Sougoumarane 2014-03-21 15:55:49 -07:00
Родитель f31c173aa0
Коммит 6a637b8c4d
11 изменённых файлов: 385 добавлений и 248 удалений

Просмотреть файл

@ -49,3 +49,9 @@ integration_test:
cd test ; echo "initial_sharding test"; time ./initial_sharding.py $$VT_TEST_FLAGS cd test ; echo "initial_sharding test"; time ./initial_sharding.py $$VT_TEST_FLAGS
cd test ; echo "initial_sharding_bytes test"; time ./initial_sharding_bytes.py $$VT_TEST_FLAGS cd test ; echo "initial_sharding_bytes test"; time ./initial_sharding_bytes.py $$VT_TEST_FLAGS
cd test ; echo "keyspace_test test"; time ./keyspace_test.py $$VT_TEST_FLAGS cd test ; echo "keyspace_test test"; time ./keyspace_test.py $$VT_TEST_FLAGS
# Build this target only if you want to regenerate the bson files
bson:
bsongen -file ./go/mysql/proto/structs.go -type QueryResult -o ./go/mysql/proto/query_result_bson.go
bsongen -file ./go/mysql/proto/structs.go -type Field -o ./go/mysql/proto/field_bson.go
bsongen -file ./go/vt/vtgate/proto/vtgate_proto.go -type QueryResult -o ./go/vt/vtgate/proto/query_result_bson.go

Просмотреть файл

@ -50,7 +50,7 @@ func main() {
} }
defer fout.Close() defer fout.Close()
} }
fmt.Fprintf(fout, "%s\n", out) fmt.Fprintf(fout, "%s", out)
} }
var ( var (

Просмотреть файл

@ -1,179 +0,0 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package proto
import (
"bytes"
"github.com/youtube/vitess/go/bson"
"github.com/youtube/vitess/go/bytes2"
"github.com/youtube/vitess/go/sqltypes"
)
func MarshalFieldBson(field Field, key string, buf *bytes2.ChunkedWriter) {
bson.EncodePrefix(buf, bson.Object, key)
lenWriter := bson.NewLenWriter(buf)
bson.EncodeString(buf, "Name", field.Name)
bson.EncodeInt64(buf, "Type", field.Type)
lenWriter.Close()
}
func UnmarshalFieldBson(field *Field, buf *bytes.Buffer) {
bson.Next(buf, 4)
kind := bson.NextByte(buf)
for kind != bson.EOO {
key := bson.ReadCString(buf)
switch key {
case "Name":
field.Name = bson.DecodeString(buf, kind)
case "Type":
field.Type = bson.DecodeInt64(buf, kind)
default:
bson.Skip(buf, kind)
}
kind = bson.NextByte(buf)
}
}
func (qr *QueryResult) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
bson.EncodeOptionalPrefix(buf, bson.Object, key)
lenWriter := bson.NewLenWriter(buf)
EncodeFieldsBson(qr.Fields, "Fields", buf)
bson.EncodeUint64(buf, "RowsAffected", qr.RowsAffected)
bson.EncodeUint64(buf, "InsertId", qr.InsertId)
EncodeRowsBson(qr.Rows, "Rows", buf)
lenWriter.Close()
}
func EncodeFieldsBson(fields []Field, key string, buf *bytes2.ChunkedWriter) {
bson.EncodePrefix(buf, bson.Array, key)
lenWriter := bson.NewLenWriter(buf)
for i, v := range fields {
MarshalFieldBson(v, bson.Itoa(i), buf)
}
lenWriter.Close()
}
func EncodeRowsBson(rows [][]sqltypes.Value, key string, buf *bytes2.ChunkedWriter) {
bson.EncodePrefix(buf, bson.Array, key)
lenWriter := bson.NewLenWriter(buf)
for i, v := range rows {
EncodeRowBson(v, bson.Itoa(i), buf)
}
lenWriter.Close()
}
func EncodeRowBson(row []sqltypes.Value, key string, buf *bytes2.ChunkedWriter) {
bson.EncodePrefix(buf, bson.Array, key)
lenWriter := bson.NewLenWriter(buf)
for i, v := range row {
if v.IsNull() {
bson.EncodePrefix(buf, bson.Null, bson.Itoa(i))
} else {
bson.EncodeBinary(buf, bson.Itoa(i), v.Raw())
}
}
lenWriter.Close()
}
func (qr *QueryResult) UnmarshalBson(buf *bytes.Buffer, kind byte) {
bson.VerifyObject(kind)
bson.Next(buf, 4)
kind = bson.NextByte(buf)
for kind != bson.EOO {
key := bson.ReadCString(buf)
switch key {
case "Fields":
qr.Fields = DecodeFieldsBson(buf, kind)
case "RowsAffected":
qr.RowsAffected = bson.DecodeUint64(buf, kind)
case "InsertId":
qr.InsertId = bson.DecodeUint64(buf, kind)
case "Rows":
qr.Rows = DecodeRowsBson(buf, kind)
default:
bson.Skip(buf, kind)
}
kind = bson.NextByte(buf)
}
}
func DecodeFieldsBson(buf *bytes.Buffer, kind byte) []Field {
switch kind {
case bson.Array:
// valid
case bson.Null:
return nil
default:
panic(bson.NewBsonError("Unexpected data type %v for Query.Fields", kind))
}
bson.Next(buf, 4)
fields := make([]Field, 0, 8)
kind = bson.NextByte(buf)
for kind != bson.EOO {
if kind != bson.Object {
panic(bson.NewBsonError("Unexpected data type %v for Query.Field", kind))
}
bson.SkipIndex(buf)
var field Field
UnmarshalFieldBson(&field, buf)
fields = append(fields, field)
kind = bson.NextByte(buf)
}
return fields
}
func DecodeRowsBson(buf *bytes.Buffer, kind byte) [][]sqltypes.Value {
switch kind {
case bson.Array:
// valid
case bson.Null:
return nil
default:
panic(bson.NewBsonError("Unexpected data type %v for Query.Rows", kind))
}
bson.Next(buf, 4)
rows := make([][]sqltypes.Value, 0, 8)
kind = bson.NextByte(buf)
for kind != bson.EOO {
bson.SkipIndex(buf)
rows = append(rows, DecodeRowBson(buf, kind))
kind = bson.NextByte(buf)
}
return rows
}
func DecodeRowBson(buf *bytes.Buffer, kind byte) []sqltypes.Value {
switch kind {
case bson.Array:
// valid
case bson.Null:
return nil
default:
panic(bson.NewBsonError("Unexpected data type %v for Query.Row", kind))
}
bson.Next(buf, 4)
row := make([]sqltypes.Value, 0, 8)
kind = bson.NextByte(buf)
for kind != bson.EOO {
bson.SkipIndex(buf)
if kind != bson.Null {
row = append(row, sqltypes.MakeString(bson.DecodeBinary(buf, kind)))
} else {
row = append(row, sqltypes.Value{})
}
kind = bson.NextByte(buf)
}
return row
}

Просмотреть файл

@ -0,0 +1,50 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package proto
// DO NOT EDIT.
// FILE GENERATED BY BSONGEN.
import (
"bytes"
"github.com/youtube/vitess/go/bson"
"github.com/youtube/vitess/go/bytes2"
)
// MarshalBson bson-encodes Field.
func (field *Field) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
bson.EncodeOptionalPrefix(buf, bson.Object, key)
lenWriter := bson.NewLenWriter(buf)
bson.EncodeString(buf, "Name", field.Name)
bson.EncodeInt64(buf, "Type", field.Type)
lenWriter.Close()
}
// UnmarshalBson bson-decodes into Field.
func (field *Field) UnmarshalBson(buf *bytes.Buffer, kind byte) {
switch kind {
case bson.EOO, bson.Object:
// valid
case bson.Null:
return
default:
panic(bson.NewBsonError("unexpected kind %v for Field", kind))
}
bson.Next(buf, 4)
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
switch bson.ReadCString(buf) {
case "Name":
field.Name = bson.DecodeString(buf, kind)
case "Type":
field.Type = bson.DecodeInt64(buf, kind)
default:
bson.Skip(buf, kind)
}
}
}

Просмотреть файл

@ -20,7 +20,7 @@ var testcases = []TestCase{
// Empty // Empty
{ {
qr: QueryResult{}, qr: QueryResult{},
encoded: "E\x00\x00\x00\x04Fields\x00\x05\x00\x00\x00\x00?RowsAffected\x00\x00\x00\x00\x00\x00\x00\x00\x00?InsertId\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04Rows\x00\x05\x00\x00\x00\x00\x00", encoded: ";\x00\x00\x00\nFields\x00?RowsAffected\x00\x00\x00\x00\x00\x00\x00\x00\x00?InsertId\x00\x00\x00\x00\x00\x00\x00\x00\x00\nRows\x00\x00",
}, },
// Only fields set // Only fields set
{ {
@ -29,7 +29,7 @@ var testcases = []TestCase{
{Name: "foo", Type: 1}, {Name: "foo", Type: 1},
}, },
}, },
encoded: "i\x00\x00\x00\x04Fields\x00)\x00\x00\x00\x030\x00!\x00\x00\x00\x05Name\x00\x03\x00\x00\x00\x00foo\x12Type\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00?RowsAffected\x00\x00\x00\x00\x00\x00\x00\x00\x00?InsertId\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04Rows\x00\x05\x00\x00\x00\x00\x00", encoded: "d\x00\x00\x00\x04Fields\x00)\x00\x00\x00\x030\x00!\x00\x00\x00\x05Name\x00\x03\x00\x00\x00\x00foo\x12Type\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00?RowsAffected\x00\x00\x00\x00\x00\x00\x00\x00\x00?InsertId\x00\x00\x00\x00\x00\x00\x00\x00\x00\nRows\x00\x00",
}, },
// Only rows, no fields // Only rows, no fields
{ {
@ -38,7 +38,7 @@ var testcases = []TestCase{
{sqltypes.MakeString([]byte("abcd")), sqltypes.MakeNumeric([]byte("1234")), sqltypes.MakeFractional([]byte("1.234"))}, {sqltypes.MakeString([]byte("abcd")), sqltypes.MakeNumeric([]byte("1234")), sqltypes.MakeFractional([]byte("1.234"))},
}, },
}, },
encoded: "r\x00\x00\x00\x04Fields\x00\x05\x00\x00\x00\x00?RowsAffected\x00\x00\x00\x00\x00\x00\x00\x00\x00?InsertId\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04Rows\x002\x00\x00\x00\x040\x00*\x00\x00\x00\x050\x00\x04\x00\x00\x00\x00abcd\x051\x00\x04\x00\x00\x00\x001234\x052\x00\x05\x00\x00\x00\x001.234\x00\x00\x00", encoded: "m\x00\x00\x00\nFields\x00?RowsAffected\x00\x00\x00\x00\x00\x00\x00\x00\x00?InsertId\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04Rows\x002\x00\x00\x00\x040\x00*\x00\x00\x00\x050\x00\x04\x00\x00\x00\x00abcd\x051\x00\x04\x00\x00\x00\x001234\x052\x00\x05\x00\x00\x00\x001.234\x00\x00\x00",
}, },
// one row and one field // one row and one field
{ {

Просмотреть файл

@ -0,0 +1,126 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package proto
// DO NOT EDIT.
// FILE GENERATED BY BSONGEN.
import (
"bytes"
"github.com/youtube/vitess/go/bson"
"github.com/youtube/vitess/go/bytes2"
"github.com/youtube/vitess/go/sqltypes"
)
// MarshalBson bson-encodes QueryResult.
func (queryResult *QueryResult) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
bson.EncodeOptionalPrefix(buf, bson.Object, key)
lenWriter := bson.NewLenWriter(buf)
// []Field
if queryResult.Fields == nil {
bson.EncodePrefix(buf, bson.Null, "Fields")
} else {
bson.EncodePrefix(buf, bson.Array, "Fields")
lenWriter := bson.NewLenWriter(buf)
for _i, _v1 := range queryResult.Fields {
_v1.MarshalBson(buf, bson.Itoa(_i))
}
lenWriter.Close()
}
bson.EncodeUint64(buf, "RowsAffected", queryResult.RowsAffected)
bson.EncodeUint64(buf, "InsertId", queryResult.InsertId)
// [][]sqltypes.Value
if queryResult.Rows == nil {
bson.EncodePrefix(buf, bson.Null, "Rows")
} else {
bson.EncodePrefix(buf, bson.Array, "Rows")
lenWriter := bson.NewLenWriter(buf)
for _i, _v2 := range queryResult.Rows {
// []sqltypes.Value
if _v2 == nil {
bson.EncodePrefix(buf, bson.Null, bson.Itoa(_i))
} else {
bson.EncodePrefix(buf, bson.Array, bson.Itoa(_i))
lenWriter := bson.NewLenWriter(buf)
for _i, _v3 := range _v2 {
_v3.MarshalBson(buf, bson.Itoa(_i))
}
lenWriter.Close()
}
}
lenWriter.Close()
}
lenWriter.Close()
}
// UnmarshalBson bson-decodes into QueryResult.
func (queryResult *QueryResult) UnmarshalBson(buf *bytes.Buffer, kind byte) {
switch kind {
case bson.EOO, bson.Object:
// valid
case bson.Null:
return
default:
panic(bson.NewBsonError("unexpected kind %v for QueryResult", kind))
}
bson.Next(buf, 4)
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
switch bson.ReadCString(buf) {
case "Fields":
// []Field
if kind != bson.Null {
if kind != bson.Array {
panic(bson.NewBsonError("unexpected kind %v for queryResult.Fields", kind))
}
bson.Next(buf, 4)
queryResult.Fields = make([]Field, 0, 8)
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
bson.SkipIndex(buf)
var _v1 Field
_v1.UnmarshalBson(buf, kind)
queryResult.Fields = append(queryResult.Fields, _v1)
}
}
case "RowsAffected":
queryResult.RowsAffected = bson.DecodeUint64(buf, kind)
case "InsertId":
queryResult.InsertId = bson.DecodeUint64(buf, kind)
case "Rows":
// [][]sqltypes.Value
if kind != bson.Null {
if kind != bson.Array {
panic(bson.NewBsonError("unexpected kind %v for queryResult.Rows", kind))
}
bson.Next(buf, 4)
queryResult.Rows = make([][]sqltypes.Value, 0, 8)
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
bson.SkipIndex(buf)
var _v2 []sqltypes.Value
// []sqltypes.Value
if kind != bson.Null {
if kind != bson.Array {
panic(bson.NewBsonError("unexpected kind %v for _v2", kind))
}
bson.Next(buf, 4)
_v2 = make([]sqltypes.Value, 0, 8)
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
bson.SkipIndex(buf)
var _v3 sqltypes.Value
_v3.UnmarshalBson(buf, kind)
_v2 = append(_v2, _v3)
}
}
queryResult.Rows = append(queryResult.Rows, _v2)
}
}
default:
bson.Skip(buf, kind)
}
}
}

Просмотреть файл

@ -6,6 +6,7 @@
package sqltypes package sqltypes
import ( import (
"bytes"
"encoding/base64" "encoding/base64"
"encoding/gob" "encoding/gob"
"encoding/json" "encoding/json"
@ -13,6 +14,8 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/youtube/vitess/go/bson"
"github.com/youtube/vitess/go/bytes2"
"github.com/youtube/vitess/go/hack" "github.com/youtube/vitess/go/hack"
) )
@ -123,6 +126,30 @@ func (v Value) EncodeAscii(b BinWriter) {
} }
} }
func (v Value) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
if key == "" {
lenWriter := bson.NewLenWriter(buf)
defer lenWriter.Close()
key = bson.MAGICTAG
}
if v.IsNull() {
bson.EncodePrefix(buf, bson.Null, key)
} else {
bson.EncodeBinary(buf, key, v.Raw())
}
}
func (v *Value) UnmarshalBson(buf *bytes.Buffer, kind byte) {
if kind == bson.EOO {
bson.Next(buf, 4)
kind = bson.NextByte(buf)
bson.ReadCString(buf)
}
if kind != bson.Null {
*v = MakeString(bson.DecodeBinary(buf, kind))
}
}
func (v Value) IsNull() bool { func (v Value) IsNull() bool {
return v.Inner == nil return v.Inner == nil
} }

Просмотреть файл

@ -0,0 +1,142 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package proto
// DO NOT EDIT.
// FILE GENERATED BY BSONGEN.
import (
"bytes"
"github.com/youtube/vitess/go/bson"
"github.com/youtube/vitess/go/bytes2"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqltypes"
)
// MarshalBson bson-encodes QueryResult.
func (queryResult *QueryResult) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
bson.EncodeOptionalPrefix(buf, bson.Object, key)
lenWriter := bson.NewLenWriter(buf)
// []mproto.Field
if queryResult.Fields == nil {
bson.EncodePrefix(buf, bson.Null, "Fields")
} else {
bson.EncodePrefix(buf, bson.Array, "Fields")
lenWriter := bson.NewLenWriter(buf)
for _i, _v1 := range queryResult.Fields {
_v1.MarshalBson(buf, bson.Itoa(_i))
}
lenWriter.Close()
}
bson.EncodeUint64(buf, "RowsAffected", queryResult.RowsAffected)
bson.EncodeUint64(buf, "InsertId", queryResult.InsertId)
// [][]sqltypes.Value
if queryResult.Rows == nil {
bson.EncodePrefix(buf, bson.Null, "Rows")
} else {
bson.EncodePrefix(buf, bson.Array, "Rows")
lenWriter := bson.NewLenWriter(buf)
for _i, _v2 := range queryResult.Rows {
// []sqltypes.Value
if _v2 == nil {
bson.EncodePrefix(buf, bson.Null, bson.Itoa(_i))
} else {
bson.EncodePrefix(buf, bson.Array, bson.Itoa(_i))
lenWriter := bson.NewLenWriter(buf)
for _i, _v3 := range _v2 {
_v3.MarshalBson(buf, bson.Itoa(_i))
}
lenWriter.Close()
}
}
lenWriter.Close()
}
// *Session
if queryResult.Session == nil {
bson.EncodePrefix(buf, bson.Null, "Session")
} else {
(*queryResult.Session).MarshalBson(buf, "Session")
}
bson.EncodeString(buf, "Error", queryResult.Error)
lenWriter.Close()
}
// UnmarshalBson bson-decodes into QueryResult.
func (queryResult *QueryResult) UnmarshalBson(buf *bytes.Buffer, kind byte) {
switch kind {
case bson.EOO, bson.Object:
// valid
case bson.Null:
return
default:
panic(bson.NewBsonError("unexpected kind %v for QueryResult", kind))
}
bson.Next(buf, 4)
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
switch bson.ReadCString(buf) {
case "Fields":
// []mproto.Field
if kind != bson.Null {
if kind != bson.Array {
panic(bson.NewBsonError("unexpected kind %v for queryResult.Fields", kind))
}
bson.Next(buf, 4)
queryResult.Fields = make([]mproto.Field, 0, 8)
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
bson.SkipIndex(buf)
var _v1 mproto.Field
_v1.UnmarshalBson(buf, kind)
queryResult.Fields = append(queryResult.Fields, _v1)
}
}
case "RowsAffected":
queryResult.RowsAffected = bson.DecodeUint64(buf, kind)
case "InsertId":
queryResult.InsertId = bson.DecodeUint64(buf, kind)
case "Rows":
// [][]sqltypes.Value
if kind != bson.Null {
if kind != bson.Array {
panic(bson.NewBsonError("unexpected kind %v for queryResult.Rows", kind))
}
bson.Next(buf, 4)
queryResult.Rows = make([][]sqltypes.Value, 0, 8)
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
bson.SkipIndex(buf)
var _v2 []sqltypes.Value
// []sqltypes.Value
if kind != bson.Null {
if kind != bson.Array {
panic(bson.NewBsonError("unexpected kind %v for _v2", kind))
}
bson.Next(buf, 4)
_v2 = make([]sqltypes.Value, 0, 8)
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
bson.SkipIndex(buf)
var _v3 sqltypes.Value
_v3.UnmarshalBson(buf, kind)
_v2 = append(_v2, _v3)
}
}
queryResult.Rows = append(queryResult.Rows, _v2)
}
}
case "Session":
// *Session
if kind != bson.Null {
queryResult.Session = new(Session)
(*queryResult.Session).UnmarshalBson(buf, kind)
}
case "Error":
queryResult.Error = bson.DecodeString(buf, kind)
default:
bson.Skip(buf, kind)
}
}
}

Просмотреть файл

@ -216,58 +216,6 @@ func PopulateQueryResult(in *mproto.QueryResult, out *QueryResult) {
out.Rows = in.Rows out.Rows = in.Rows
} }
// MarshalBson marshals QueryResult into buf.
func (qr *QueryResult) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
bson.EncodeOptionalPrefix(buf, bson.Object, key)
lenWriter := bson.NewLenWriter(buf)
mproto.EncodeFieldsBson(qr.Fields, "Fields", buf)
bson.EncodeUint64(buf, "RowsAffected", qr.RowsAffected)
bson.EncodeUint64(buf, "InsertId", qr.InsertId)
mproto.EncodeRowsBson(qr.Rows, "Rows", buf)
if qr.Session != nil {
qr.Session.MarshalBson(buf, "Session")
}
if qr.Error != "" {
bson.EncodeString(buf, "Error", qr.Error)
}
lenWriter.Close()
}
// UnmarshalBson unmarshals QueryResult from buf.
func (qr *QueryResult) UnmarshalBson(buf *bytes.Buffer, kind byte) {
bson.VerifyObject(kind)
bson.Next(buf, 4)
kind = bson.NextByte(buf)
for kind != bson.EOO {
keyName := bson.ReadCString(buf)
switch keyName {
case "Fields":
qr.Fields = mproto.DecodeFieldsBson(buf, kind)
case "RowsAffected":
qr.RowsAffected = bson.DecodeUint64(buf, kind)
case "InsertId":
qr.InsertId = bson.DecodeUint64(buf, kind)
case "Rows":
qr.Rows = mproto.DecodeRowsBson(buf, kind)
case "Session":
if kind != bson.Null {
qr.Session = new(Session)
qr.Session.UnmarshalBson(buf, kind)
}
case "Error":
qr.Error = bson.DecodeString(buf, kind)
default:
bson.Skip(buf, kind)
}
kind = bson.NextByte(buf)
}
}
// BatchQueryShard represents a batch query request // BatchQueryShard represents a batch query request
// for the specified shards. // for the specified shards.
type BatchQueryShard struct { type BatchQueryShard struct {

Просмотреть файл

@ -150,9 +150,9 @@ class TabletConnection(object):
results = [] results = []
try: try:
response = self.client.call('SqlQuery.Execute', req) response = self.client.call('SqlQuery.Execute', req)
reply = response.reply reply = _fix_reply(response.reply)
for field in (reply['Fields'] or []): for field in reply['Fields']:
fields.append((field['Name'], field['Type'])) fields.append((field['Name'], field['Type']))
conversions.append(field_types.conversions.get(field['Type'])) conversions.append(field_types.conversions.get(field['Type']))
@ -187,8 +187,9 @@ class TabletConnection(object):
conversions = [] conversions = []
results = [] results = []
rowcount = 0 rowcount = 0
reply = _fix_reply(reply)
for field in (reply['Fields'] or []): for field in reply['Fields']:
fields.append((field['Name'], field['Type'])) fields.append((field['Name'], field['Type']))
conversions.append(field_types.conversions.get(field['Type'])) conversions.append(field_types.conversions.get(field['Type']))
@ -221,9 +222,9 @@ class TabletConnection(object):
try: try:
self.client.stream_call('SqlQuery.StreamExecute', req) self.client.stream_call('SqlQuery.StreamExecute', req)
first_response = self.client.stream_next() first_response = self.client.stream_next()
reply = first_response.reply reply = _fix_reply(first_response.reply)
for field in (reply['Fields'] or []): for field in reply['Fields']:
self._stream_fields.append((field['Name'], field['Type'])) self._stream_fields.append((field['Name'], field['Type']))
self._stream_conversions.append(field_types.conversions.get(field['Type'])) self._stream_conversions.append(field_types.conversions.get(field['Type']))
except gorpc.GoRpcError as e: except gorpc.GoRpcError as e:
@ -251,6 +252,7 @@ class TabletConnection(object):
logging.exception('gorpc low-level error') logging.exception('gorpc low-level error')
raise raise
self._stream_result.reply = _fix_reply(self._stream_result.reply)
row = tuple(_make_row(self._stream_result.reply['Rows'][self._stream_result_index], self._stream_conversions)) row = tuple(_make_row(self._stream_result.reply['Rows'][self._stream_result_index], self._stream_conversions))
# If we are reading the last row, set us up to read more data. # If we are reading the last row, set us up to read more data.
@ -261,6 +263,10 @@ class TabletConnection(object):
return row return row
def _fix_reply(reply):
reply['Fields'] = reply['Fields'] or []
reply['Rows'] = reply['Rows'] or []
return reply
def _make_row(row, conversions): def _make_row(row, conversions):
converted_row = [] converted_row = []

Просмотреть файл

@ -112,7 +112,7 @@ class VtgateConnection(object):
req['Session'] = self.session req['Session'] = self.session
def _update_session(self, response): def _update_session(self, response):
if 'Session' in response.reply: if 'Session' in response.reply and response.reply['Session']:
self.session = response.reply['Session'] self.session = response.reply['Session']
def _execute(self, sql, bind_variables): def _execute(self, sql, bind_variables):
@ -133,10 +133,12 @@ class VtgateConnection(object):
response = self.client.call('VTGate.ExecuteShard', req) response = self.client.call('VTGate.ExecuteShard', req)
self._update_session(response) self._update_session(response)
reply = response.reply reply = response.reply
if 'Error' in response.reply: # TODO(sougou): Simplify this check after all servers are deployed
if 'Error' in response.reply and response.reply['Error']:
raise gorpc.AppError(response.reply['Error'], 'VTGate.ExecuteShard') raise gorpc.AppError(response.reply['Error'], 'VTGate.ExecuteShard')
reply = _fix_reply(reply)
for field in (reply['Fields'] or []): for field in reply['Fields']:
fields.append((field['Name'], field['Type'])) fields.append((field['Name'], field['Type']))
conversions.append(field_types.conversions.get(field['Type'])) conversions.append(field_types.conversions.get(field['Type']))
@ -172,15 +174,16 @@ class VtgateConnection(object):
self._add_session(req) self._add_session(req)
response = self.client.call('VTGate.ExecuteBatchShard', req) response = self.client.call('VTGate.ExecuteBatchShard', req)
self._update_session(response) self._update_session(response)
if 'Error' in response.reply: if 'Error' in response.reply and response.reply['Error']:
raise gorpc.AppError(response.reply['Error'], 'VTGate.ExecuteBatchShard') raise gorpc.AppError(response.reply['Error'], 'VTGate.ExecuteBatchShard')
for reply in response.reply['List']: for reply in response.reply['List']:
fields = [] fields = []
conversions = [] conversions = []
results = [] results = []
rowcount = 0 rowcount = 0
reply = _fix_reply(reply)
for field in (reply['Fields'] or []): for field in reply['Fields']:
fields.append((field['Name'], field['Type'])) fields.append((field['Name'], field['Type']))
conversions.append(field_types.conversions.get(field['Type'])) conversions.append(field_types.conversions.get(field['Type']))
@ -218,9 +221,9 @@ class VtgateConnection(object):
try: try:
self.client.stream_call('VTGate.StreamExecuteShard', req) self.client.stream_call('VTGate.StreamExecuteShard', req)
first_response = self.client.stream_next() first_response = self.client.stream_next()
reply = first_response.reply reply = _fix_reply(first_response.reply)
for field in (reply['Fields'] or []): for field in reply['Fields']:
self._stream_fields.append((field['Name'], field['Type'])) self._stream_fields.append((field['Name'], field['Type']))
self._stream_conversions.append(field_types.conversions.get(field['Type'])) self._stream_conversions.append(field_types.conversions.get(field['Type']))
except gorpc.GoRpcError as e: except gorpc.GoRpcError as e:
@ -243,7 +246,9 @@ class VtgateConnection(object):
self._stream_result_index = None self._stream_result_index = None
return None return None
# A session message, if any comes separately with no rows # A session message, if any comes separately with no rows
if 'Session' in self._stream_result.reply: # TODO(sougou) get rid of this check. After all the server
# changes, there will always be a 'Session' in the reply.
if 'Session' in self._stream_result.reply and self._stream_result.reply['Session']:
self.session = self._stream_result.reply['Session'] self.session = self._stream_result.reply['Session']
self._stream_result = None self._stream_result = None
continue continue
@ -253,6 +258,7 @@ class VtgateConnection(object):
logging.exception('gorpc low-level error') logging.exception('gorpc low-level error')
raise raise
self._stream_result.reply = _fix_reply(self._stream_result.reply)
row = tuple(_make_row(self._stream_result.reply['Rows'][self._stream_result_index], self._stream_conversions)) row = tuple(_make_row(self._stream_result.reply['Rows'][self._stream_result_index], self._stream_conversions))
# If we are reading the last row, set us up to read more data. # If we are reading the last row, set us up to read more data.
@ -264,6 +270,11 @@ class VtgateConnection(object):
return row return row
def _fix_reply(reply):
reply['Fields'] = reply['Fields'] or []
reply['Rows'] = reply['Rows'] or []
return reply
def _make_row(row, conversions): def _make_row(row, conversions):
converted_row = [] converted_row = []
for conversion_func, field_data in izip(conversions, row): for conversion_func, field_data in izip(conversions, row):