зеркало из https://github.com/github/vitess-gh.git
Fix bug wrt inline types and large documents. Refactor code. More extensive comments. Add test for reported failure mode and improve other tests
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
This commit is contained in:
Родитель
ddda433ca2
Коммит
225df50500
|
@ -57,6 +57,32 @@ func printASCIIBytes(data []byte) {
|
|||
log.Infof("[%s]", s)
|
||||
}
|
||||
|
||||
// only used for logging/debugging
|
||||
var jsonTypeToName = map[uint]string{
|
||||
jsonSmallObject: "sObject",
|
||||
jsonLargeObject: "lObject",
|
||||
jsonSmallArray: "sArray",
|
||||
jsonLargeArray: "lArray",
|
||||
jsonLiteral: "literal",
|
||||
jsonInt16: "int16",
|
||||
jsonUint16: "uint16",
|
||||
jsonInt32: "int32",
|
||||
jsonUint32: "uint32",
|
||||
jsonInt64: "int64",
|
||||
jsonUint64: "uint64",
|
||||
jsonDouble: "double", //0x0b
|
||||
jsonString: "string", //0x0c a utf8mb4 string
|
||||
jsonOpaque: "opaque", //0x0f "custom" data
|
||||
}
|
||||
|
||||
func jsonDataTypeToString(typ uint) string {
|
||||
sType, ok := jsonTypeToName[typ]
|
||||
if !ok {
|
||||
return "undefined"
|
||||
}
|
||||
return sType
|
||||
}
|
||||
|
||||
//endregion
|
||||
|
||||
// provides the single API function, used to convert json from binary format used in binlogs to a string representation
|
||||
|
@ -66,7 +92,7 @@ func getJSONValue(data []byte) (string, error) {
|
|||
if len(data) == 0 {
|
||||
ast = ajson.NullNode("")
|
||||
} else {
|
||||
ast, _, err = binlogJSON.parse(data)
|
||||
ast, err = binlogJSON.parse(data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -88,28 +114,31 @@ func init() {
|
|||
|
||||
//region plugin manager
|
||||
|
||||
// BinlogJSON contains the plugins for all json types and methods for parsing the binary json representation from the binlog
|
||||
// BinlogJSON contains the plugins for all json types and methods for parsing the
|
||||
// binary json representation of a specific type from the binlog
|
||||
type BinlogJSON struct {
|
||||
plugins map[jsonDataType]jsonPlugin
|
||||
}
|
||||
|
||||
func (jh *BinlogJSON) parse(data []byte) (node *ajson.Node, newPos int, err error) {
|
||||
var pos int
|
||||
|
||||
typ := data[0]
|
||||
// parse decodes a value from the binlog. pos is a pointer that keeps track of the offset of the current node being parsed
|
||||
func (jh *BinlogJSON) parse(data []byte) (node *ajson.Node, err error) {
|
||||
pos := 0
|
||||
typ := data[pos]
|
||||
jlog("Top level object is type %s\n", jsonDataTypeToString(uint(typ)))
|
||||
pos++
|
||||
return jh.getNode(jsonDataType(typ), data, pos)
|
||||
}
|
||||
|
||||
// each plugin registers itself in init()s
|
||||
func (jh *BinlogJSON) register(typ jsonDataType, Plugin jsonPlugin) {
|
||||
jh.plugins[typ] = Plugin
|
||||
}
|
||||
|
||||
func (jh *BinlogJSON) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, newPos int, err error) {
|
||||
// gets the node at this position
|
||||
func (jh *BinlogJSON) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
|
||||
Plugin := jh.plugins[typ]
|
||||
if Plugin == nil {
|
||||
return nil, 0, fmt.Errorf("Plugin not found for type %d", typ)
|
||||
return nil, fmt.Errorf("Plugin not found for type %d", typ)
|
||||
}
|
||||
return Plugin.getNode(typ, data, pos)
|
||||
}
|
||||
|
@ -119,8 +148,8 @@ func (jh *BinlogJSON) getNode(typ jsonDataType, data []byte, pos int) (node *ajs
|
|||
//region enums
|
||||
|
||||
// jsonDataType has the values used in the mysql json binary representation to denote types
|
||||
// we have string, literal(true/false/null), number, object or array types
|
||||
// large object => doc size > 64K, you get pointers instead of inline values
|
||||
// We have string, literal(true/false/null), number, object or array types
|
||||
// large object => doc size > 64K: you get pointers instead of inline values
|
||||
type jsonDataType byte
|
||||
|
||||
// type mapping as defined by the mysql json representation
|
||||
|
@ -141,41 +170,6 @@ const (
|
|||
jsonOpaque = 15 //0x0f "custom" data
|
||||
)
|
||||
|
||||
func jsonDataTypeToString(typ uint) string {
|
||||
switch typ {
|
||||
case jsonSmallObject:
|
||||
return "sObject"
|
||||
case jsonLargeObject:
|
||||
return "lObject"
|
||||
case jsonSmallArray:
|
||||
return "sArray"
|
||||
case jsonLargeArray:
|
||||
return "lArray"
|
||||
case jsonLiteral:
|
||||
return "literal"
|
||||
case jsonInt16:
|
||||
return "int16"
|
||||
case jsonUint16:
|
||||
return "uint16"
|
||||
case jsonInt32:
|
||||
return "int32"
|
||||
case jsonUint32:
|
||||
return "uint32"
|
||||
case jsonInt64:
|
||||
return "int64"
|
||||
case jsonUint64:
|
||||
return "uint64"
|
||||
case jsonDouble:
|
||||
return "double"
|
||||
case jsonString:
|
||||
return "string"
|
||||
case jsonOpaque:
|
||||
return "opaque"
|
||||
default:
|
||||
return "undefined"
|
||||
}
|
||||
}
|
||||
|
||||
// literals in the binary json format can be one of three types: null, true, false
|
||||
type jsonDataLiteral byte
|
||||
|
||||
|
@ -186,34 +180,33 @@ const (
|
|||
jsonFalseLiteral = '\x02'
|
||||
)
|
||||
|
||||
// in objects and arrays some values are inlined, others have offsets into the raw data
|
||||
var inlineTypes = map[jsonDataType]bool{
|
||||
jsonSmallObject: false,
|
||||
jsonLargeObject: false,
|
||||
jsonSmallArray: false,
|
||||
jsonLargeArray: false,
|
||||
jsonLiteral: true,
|
||||
jsonInt16: true,
|
||||
jsonUint16: true,
|
||||
jsonInt32: false,
|
||||
jsonUint32: false,
|
||||
jsonInt64: false,
|
||||
jsonUint64: false,
|
||||
jsonDouble: false,
|
||||
jsonString: false,
|
||||
jsonOpaque: false,
|
||||
}
|
||||
|
||||
//endregion
|
||||
|
||||
//region util funcs
|
||||
|
||||
// readInt returns either 32-bit or a 16-bit int from the passed buffer. Which one it is, depends on whether the document is "large" or not
|
||||
// in objects and arrays some values are inlined, others have offsets into the raw data
|
||||
// for all documents literals (true/false/null) and 16bit integers are inlined
|
||||
// for large documents 32bit integers are also inlined:
|
||||
// principle is that two byte values are inlined in small and four byte in large docs
|
||||
func isInline(typ jsonDataType, large bool) bool {
|
||||
switch typ {
|
||||
case jsonLiteral, jsonInt16, jsonUint16:
|
||||
return true
|
||||
case jsonInt32, jsonUint32:
|
||||
if large {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// readInt returns either a 32-bit or a 16-bit int from the passed buffer. Which one it is,
|
||||
// depends on whether the document is "large" or not
|
||||
// JSON documents stored are considered "large" if the size of the stored json document is
|
||||
// more than 64K bytes. For a large document all types which have their inlineTypes entry as true
|
||||
// are inlined. Others only store the offset in the document
|
||||
// This int is either an offset into the raw data, count of elements or size of the represented data structure
|
||||
// (This design decision allows a fixed number of bytes to be used for representing object keys and array entries)
|
||||
// more than 64K bytes. Values of non-inlined types are stored as offsets into the document
|
||||
// The int returned is either an offset into the raw data, count of elements or size of the represented data structure
|
||||
// (This design decision allows a fixed number of bytes to be used for representing object keys and array indices)
|
||||
// readInt also returns the new position (by advancing the position by the number of bytes read)
|
||||
func readInt(data []byte, pos int, large bool) (int, int) {
|
||||
if large {
|
||||
return int(data[pos]) +
|
||||
|
@ -230,14 +223,15 @@ func readInt(data []byte, pos int, large bool) (int, int) {
|
|||
// of an arbitrarily long string as implemented by the mysql server
|
||||
// https://github.com/mysql/mysql-server/blob/5.7/sql/json_binary.cc#L234
|
||||
// https://github.com/mysql/mysql-server/blob/8.0/sql/json_binary.cc#L283
|
||||
// readVariableLength also returns the new position (by advancing the position by the number of bytes read)
|
||||
func readVariableLength(data []byte, pos int) (int, int) {
|
||||
var bb byte
|
||||
var res int
|
||||
var length int
|
||||
var idx byte
|
||||
for {
|
||||
bb = data[pos]
|
||||
pos++
|
||||
res |= int(bb&0x7f) << (7 * idx)
|
||||
length |= int(bb&0x7f) << (7 * idx)
|
||||
// if the high bit is 1, the integer value of the byte will be negative
|
||||
// high bit of 1 signifies that the next byte is part of the length encoding
|
||||
if int8(bb) >= 0 {
|
||||
|
@ -245,14 +239,48 @@ func readVariableLength(data []byte, pos int) (int, int) {
|
|||
}
|
||||
idx++
|
||||
}
|
||||
return res, pos
|
||||
return length, pos
|
||||
}
|
||||
|
||||
// getElem returns the json value found inside json objects and arrays at provided position
|
||||
func getElem(data []byte, pos int, large bool) (*ajson.Node, int, error) {
|
||||
var elem *ajson.Node
|
||||
var err error
|
||||
var offset int
|
||||
typ := jsonDataType(data[pos])
|
||||
pos++
|
||||
if isInline(typ, large) {
|
||||
elem, err = binlogJSON.getNode(typ, data, pos)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
if large {
|
||||
pos += 4
|
||||
} else {
|
||||
pos += 2
|
||||
}
|
||||
} else {
|
||||
offset, pos = readInt(data, pos, large)
|
||||
if offset >= len(data) {
|
||||
log.Errorf("unable to decode element")
|
||||
return nil, 0, fmt.Errorf("unable to decode element: %+v", data)
|
||||
}
|
||||
newData := data[offset:]
|
||||
//newPos ignored because this is an offset into the "extra" section of the buffer
|
||||
elem, err = binlogJSON.getNode(typ, newData, 1)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
return elem, pos, nil
|
||||
}
|
||||
|
||||
//endregion
|
||||
|
||||
// json sub-type interface
|
||||
// one plugin for each sub-type, plugins are stateless and initialized on load via individual init() functions
|
||||
type jsonPlugin interface {
|
||||
getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, newPos int, err error)
|
||||
getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error)
|
||||
}
|
||||
|
||||
type jsonPluginInfo struct {
|
||||
|
@ -273,14 +301,13 @@ type intPlugin struct {
|
|||
|
||||
var _ jsonPlugin = (*intPlugin)(nil)
|
||||
|
||||
func (ih intPlugin) getVal(typ jsonDataType, data []byte, pos int) (value float64, newPos int) {
|
||||
func (ih intPlugin) getVal(typ jsonDataType, data []byte, pos int) (value float64) {
|
||||
var val uint64
|
||||
var val2 float64
|
||||
size := ih.sizes[typ]
|
||||
for i := 0; i < size; i++ {
|
||||
val = val + uint64(data[pos+i])<<(8*i)
|
||||
}
|
||||
pos += size
|
||||
switch typ {
|
||||
case jsonInt16:
|
||||
val2 = float64(int16(val))
|
||||
|
@ -297,13 +324,13 @@ func (ih intPlugin) getVal(typ jsonDataType, data []byte, pos int) (value float6
|
|||
case jsonDouble:
|
||||
val2 = math.Float64frombits(val)
|
||||
}
|
||||
return val2, pos
|
||||
return val2
|
||||
}
|
||||
|
||||
func (ih intPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, newPos int, err error) {
|
||||
val, pos := ih.getVal(typ, data, pos)
|
||||
func (ih intPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
|
||||
val := ih.getVal(typ, data, pos)
|
||||
node = ajson.NumericNode("", val)
|
||||
return node, pos, nil
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func newIntPlugin() *intPlugin {
|
||||
|
@ -343,9 +370,8 @@ type literalPlugin struct {
|
|||
|
||||
var _ jsonPlugin = (*literalPlugin)(nil)
|
||||
|
||||
func (lh literalPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, newPos int, err error) {
|
||||
func (lh literalPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
|
||||
val := jsonDataLiteral(data[pos])
|
||||
pos += 2
|
||||
switch val {
|
||||
case jsonNullLiteral:
|
||||
node = ajson.NullNode("")
|
||||
|
@ -354,9 +380,9 @@ func (lh literalPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *a
|
|||
case jsonFalseLiteral:
|
||||
node = ajson.BoolNode("", false)
|
||||
default:
|
||||
return nil, 0, fmt.Errorf("unknown literal value %v", val)
|
||||
return nil, fmt.Errorf("unknown literal value %v", val)
|
||||
}
|
||||
return node, pos, nil
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func newLiteralPlugin() *literalPlugin {
|
||||
|
@ -384,13 +410,15 @@ type opaquePlugin struct {
|
|||
|
||||
var _ jsonPlugin = (*opaquePlugin)(nil)
|
||||
|
||||
func (oh opaquePlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, newPos int, err error) {
|
||||
// other types are stored as catch-all opaque types: documentation on these is sketchy
|
||||
// we currently know about and support date/time/datetime/decimal
|
||||
func (oh opaquePlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
|
||||
dataType := data[pos]
|
||||
pos++
|
||||
_, pos = readVariableLength(data, pos)
|
||||
start := 3 // account for length of stored value
|
||||
end := start + 8 // all currently supported opaque data types are 8 bytes in size
|
||||
switch dataType {
|
||||
case TypeDate:
|
||||
raw := binary.LittleEndian.Uint64(data[3:11])
|
||||
raw := binary.LittleEndian.Uint64(data[start:end])
|
||||
value := raw >> 24
|
||||
yearMonth := (value >> 22) & 0x01ffff // 17 bits starting at 22nd
|
||||
year := yearMonth / 13
|
||||
|
@ -399,7 +427,7 @@ func (oh opaquePlugin) getNode(typ jsonDataType, data []byte, pos int) (node *aj
|
|||
dateString := fmt.Sprintf("%04d-%02d-%02d", year, month, day)
|
||||
node = ajson.StringNode("", dateString)
|
||||
case TypeTime:
|
||||
raw := binary.LittleEndian.Uint64(data[3:11])
|
||||
raw := binary.LittleEndian.Uint64(data[start:end])
|
||||
value := raw >> 24
|
||||
hour := (value >> 12) & 0x03ff // 10 bits starting at 12th
|
||||
minute := (value >> 6) & 0x3f // 6 bits starting at 6th
|
||||
|
@ -408,7 +436,7 @@ func (oh opaquePlugin) getNode(typ jsonDataType, data []byte, pos int) (node *aj
|
|||
timeString := fmt.Sprintf("%02d:%02d:%02d.%06d", hour, minute, second, microSeconds)
|
||||
node = ajson.StringNode("", timeString)
|
||||
case TypeDateTime:
|
||||
raw := binary.LittleEndian.Uint64(data[3:11])
|
||||
raw := binary.LittleEndian.Uint64(data[start:end])
|
||||
value := raw >> 24
|
||||
yearMonth := (value >> 22) & 0x01ffff // 17 bits starting at 22nd
|
||||
year := yearMonth / 13
|
||||
|
@ -421,24 +449,23 @@ func (oh opaquePlugin) getNode(typ jsonDataType, data []byte, pos int) (node *aj
|
|||
timeString := fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d.%06d", year, month, day, hour, minute, second, microSeconds)
|
||||
node = ajson.StringNode("", timeString)
|
||||
case TypeNewDecimal:
|
||||
decimalData := data[3:11]
|
||||
decimalData := data[start:end]
|
||||
precision := decimalData[0]
|
||||
scale := decimalData[1]
|
||||
metadata := (uint16(precision) << 8) + uint16(scale)
|
||||
val, _, err := CellValue(decimalData, 2, TypeNewDecimal, metadata, querypb.Type_DECIMAL)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
return nil, err
|
||||
}
|
||||
float, err := evalengine.ToFloat64(val)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
return nil, err
|
||||
}
|
||||
node = ajson.NumericNode("", float)
|
||||
default:
|
||||
return nil, 0, fmt.Errorf("opaque type %d is not supported yet, data %v", dataType, data[2:])
|
||||
return nil, fmt.Errorf("opaque type %d is not supported yet, data %v", dataType, data[2:])
|
||||
}
|
||||
pos += 8
|
||||
return node, pos, nil
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func newOpaquePlugin() *opaquePlugin {
|
||||
|
@ -466,11 +493,11 @@ type stringPlugin struct {
|
|||
|
||||
var _ jsonPlugin = (*stringPlugin)(nil)
|
||||
|
||||
func (sh stringPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, newPos int, err error) {
|
||||
func (sh stringPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
|
||||
size, pos := readVariableLength(data, pos)
|
||||
node = ajson.StringNode("", string(data[pos:pos+size]))
|
||||
|
||||
return node, pos, nil
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func newStringPlugin() *stringPlugin {
|
||||
|
@ -498,38 +525,28 @@ type arrayPlugin struct {
|
|||
|
||||
var _ jsonPlugin = (*arrayPlugin)(nil)
|
||||
|
||||
func (ah arrayPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, newPos int, err error) {
|
||||
jlog("JSON Array %s, len %d, data %+v", jsonDataTypeToString(uint(typ)), len(data), data)
|
||||
//printAsciiBytes(data)
|
||||
// arrays are stored thus:
|
||||
// | type_identifier(2/3) | elem count | obj size | list of offsets+lengths of values | actual values |
|
||||
func (ah arrayPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
|
||||
jlog("JSON Array %s, len %d", jsonDataTypeToString(uint(typ)), len(data))
|
||||
var nodes []*ajson.Node
|
||||
var elem *ajson.Node
|
||||
var elementCount, offset, size int
|
||||
var elementCount, size int
|
||||
large := typ == jsonLargeArray
|
||||
elementCount, pos = readInt(data, pos, large)
|
||||
jlog("Array(%t): elem count: %d\n", large, elementCount)
|
||||
size, pos = readInt(data, pos, large)
|
||||
jlog("Array(%t): elem count: %d, size:%d\n", large, elementCount, size)
|
||||
for i := 0; i < elementCount; i++ {
|
||||
typ = jsonDataType(data[pos])
|
||||
pos++
|
||||
if inlineTypes[typ] {
|
||||
elem, pos, err = binlogJSON.getNode(typ, data, pos)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
} else {
|
||||
offset, pos = readInt(data, pos, large)
|
||||
newData := data[offset:]
|
||||
elem, _, err = binlogJSON.getNode(typ, newData, 1) //newPos ignored because this is an offset into the "extra" section of the buffer
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
elem, pos, err = getElem(data, pos, large)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodes = append(nodes, elem)
|
||||
jlog("Index is %s:%s", i, jsonDataTypeToString(uint(typ)))
|
||||
jlog("Index is %d:%s", i, jsonDataTypeToString(uint(typ)))
|
||||
}
|
||||
node = ajson.ArrayNode("", nodes)
|
||||
return node, pos, nil
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func newArrayPlugin() *arrayPlugin {
|
||||
|
@ -558,54 +575,53 @@ type objectPlugin struct {
|
|||
|
||||
var _ jsonPlugin = (*objectPlugin)(nil)
|
||||
|
||||
func (oh objectPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, newPos int, err error) {
|
||||
jlog("JSON Type is %s, len %d, data %+v", jsonDataTypeToString(uint(typ)), len(data), data)
|
||||
//printAsciiBytes(data)
|
||||
nodes := make(map[string]*ajson.Node)
|
||||
var elem *ajson.Node
|
||||
var elementCount, offset, size int
|
||||
// objects are stored thus:
|
||||
// | type_identifier(0/1) | elem count | obj size | list of offsets+lengths of keys | list of offsets+lengths of values | actual keys | actual values |
|
||||
func (oh objectPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
|
||||
jlog("JSON Type is %s, len %d", jsonDataTypeToString(uint(typ)), len(data))
|
||||
|
||||
// "large" changes storage layout of element count, total object size,
|
||||
// if "large" is true then all of these are 4 bytes. For "small" documents they are 2 bytes
|
||||
var large = typ == jsonLargeObject
|
||||
|
||||
var elementCount int // total number of elements (== keys) in this object map. (element can be another object: recursively handled)
|
||||
var size int // total size of object
|
||||
|
||||
elementCount, pos = readInt(data, pos, large)
|
||||
jlog("Object: elem count: %d\n", elementCount)
|
||||
size, pos = readInt(data, pos, large)
|
||||
jlog("Object: elem count: %d, size %d\n", elementCount, size)
|
||||
keys := make([]string, elementCount)
|
||||
|
||||
keys := make([]string, elementCount) // stores all the keys in this object
|
||||
for i := 0; i < elementCount; i++ {
|
||||
var keyOffset, keyLength int
|
||||
var keyOffset int
|
||||
var keyLength int
|
||||
keyOffset, pos = readInt(data, pos, large)
|
||||
keyLength, pos = readInt(data, pos, false) // keyLength is always a 16-bit int
|
||||
if keyOffset+1 >= len(data) || keyOffset+keyLength+1 > len(data) {
|
||||
log.Errorf("unable to parse json value: %+v", data)
|
||||
return nil, 0, fmt.Errorf("unable to parse json value: %+v", data)
|
||||
}
|
||||
keys[i] = string(data[keyOffset+1 : keyOffset+keyLength+1])
|
||||
}
|
||||
|
||||
for i := 0; i < elementCount; i++ {
|
||||
typ = jsonDataType(data[pos])
|
||||
pos++
|
||||
if inlineTypes[typ] {
|
||||
elem, pos, err = binlogJSON.getNode(typ, data, pos)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
} else {
|
||||
offset, pos = readInt(data, pos, large)
|
||||
if offset >= len(data) {
|
||||
return nil, 0, fmt.Errorf("unable to parse json value: %+v", data)
|
||||
}
|
||||
newData := data[offset:]
|
||||
elem, _, err = binlogJSON.getNode(typ, newData, 1) //newPos ignored because this is an offset into the "extra" section of the buffer
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
keyOffsetStart := keyOffset + 1
|
||||
if keyOffsetStart >= len(data) || keyOffsetStart+keyLength > len(data) { // guards against out-of-bounds if logic is incorrect
|
||||
log.Errorf("unable to decode object elements")
|
||||
return nil, fmt.Errorf("unable to decode object elements: %v", data)
|
||||
}
|
||||
nodes[keys[i]] = elem
|
||||
keys[i] = string(data[keyOffsetStart : keyOffsetStart+keyLength])
|
||||
}
|
||||
jlog("Object keys: %+v", keys)
|
||||
|
||||
object := make(map[string]*ajson.Node)
|
||||
var elem *ajson.Node
|
||||
|
||||
// get the value for each key
|
||||
for i := 0; i < elementCount; i++ {
|
||||
elem, pos, err = getElem(data, pos, large)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
object[keys[i]] = elem
|
||||
jlog("Key is %s:%s", keys[i], jsonDataTypeToString(uint(typ)))
|
||||
}
|
||||
|
||||
node = ajson.ObjectNode("", nodes)
|
||||
return node, pos, nil
|
||||
node = ajson.ObjectNode("", object)
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func newObjectPlugin() *objectPlugin {
|
||||
|
|
|
@ -16,9 +16,11 @@ limitations under the License.
|
|||
|
||||
package vreplication
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
var jsonDoc1 = `
|
||||
var jsonSingleDoc = `
|
||||
{
|
||||
"_id": "5f882c85c74593afb7895a16",
|
||||
"index": 0,
|
||||
|
@ -27,6 +29,7 @@ var jsonDoc1 = `
|
|||
"balance": "$2,636.61",
|
||||
"picture": "http://placehold.it/32x32",
|
||||
"age": 36,
|
||||
"id2": 1234567890,
|
||||
"eyeColor": "green",
|
||||
"name": "Stephens Paul",
|
||||
"gender": "male",
|
||||
|
@ -53,7 +56,7 @@ func repeatJSON(jsonDoc string, times int, typ largeDocCollectionType) string {
|
|||
jsonDocs = "["
|
||||
for times > 0 {
|
||||
times--
|
||||
jsonDocs += jsonDoc1
|
||||
jsonDocs += jsonSingleDoc
|
||||
if times != 0 {
|
||||
jsonDocs += ","
|
||||
}
|
||||
|
@ -63,7 +66,7 @@ func repeatJSON(jsonDoc string, times int, typ largeDocCollectionType) string {
|
|||
jsonDocs = "{"
|
||||
for times > 0 {
|
||||
times--
|
||||
jsonDocs += fmt.Sprintf("\"%d\": %s", times, jsonDoc1)
|
||||
jsonDocs += fmt.Sprintf("\"%d\": %s", times, jsonSingleDoc)
|
||||
if times != 0 {
|
||||
jsonDocs += ","
|
||||
}
|
||||
|
@ -74,7 +77,7 @@ func repeatJSON(jsonDoc string, times int, typ largeDocCollectionType) string {
|
|||
return jsonDocs
|
||||
}
|
||||
|
||||
var jsonDoc2 = `
|
||||
var jsonMultipleDocs = `
|
||||
[
|
||||
{
|
||||
"_id": "5f882c85c74593afb7895a16",
|
||||
|
@ -96,6 +99,7 @@ var jsonDoc2 = `
|
|||
"latitude": -31.013461,
|
||||
"longitude": 136.055816,
|
||||
"tags": [
|
||||
"Unicode 木元木元木元木元木元木元木元木元木元木元木元木元木元木元木元木元木元木元木元木元木元木元 Text",
|
||||
"nisi",
|
||||
"tempor",
|
||||
"dolor",
|
||||
|
@ -348,3 +352,40 @@ var jsonDoc2 = `
|
|||
}
|
||||
]
|
||||
`
|
||||
|
||||
// handcrafted test (based on an actual user failure) to isolate error with inline type handling
|
||||
var singleLargeObjectTemplate = `
|
||||
{
|
||||
"user": {
|
||||
"id": 1234567890,
|
||||
"type": 1,
|
||||
"login": "isaac",
|
||||
"created_at": {
|
||||
"seconds": 1618443705
|
||||
}
|
||||
},
|
||||
"repository": {
|
||||
"id": 1234567890,
|
||||
"name": "gravitational theory",
|
||||
"created_at": {
|
||||
"seconds": 1234567890
|
||||
},
|
||||
"updated_at": {
|
||||
"seconds": 1234567890
|
||||
},
|
||||
"visibility": 1,
|
||||
"description": "%s"
|
||||
},
|
||||
"numberOfLaws": 3,
|
||||
"user2": {
|
||||
"id": 1234567890,
|
||||
"type": 1,
|
||||
"login": "isaac",
|
||||
"created_at": {
|
||||
"seconds": 1234567890
|
||||
},
|
||||
"challenge": "falling-apples",
|
||||
"response": "eureka"
|
||||
}
|
||||
}
|
||||
`
|
||||
|
|
|
@ -19,6 +19,7 @@ package vreplication
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -2426,6 +2427,70 @@ func TestTimestamp(t *testing.T) {
|
|||
expectData(t, "t1", [][]string{{"1", want, want}})
|
||||
}
|
||||
|
||||
func TestPlayerJSONTwoColumns(t *testing.T) {
|
||||
defer deleteTablet(addTablet(100))
|
||||
execStatements(t, []string{
|
||||
"create table vitess_json2(id int auto_increment, val json, val2 json, primary key(id))",
|
||||
fmt.Sprintf("create table %s.vitess_json2(id int, val json, val2 json, primary key(id))", vrepldb),
|
||||
})
|
||||
defer execStatements(t, []string{
|
||||
"drop table vitess_json2",
|
||||
fmt.Sprintf("drop table %s.vitess_json2", vrepldb),
|
||||
})
|
||||
|
||||
env.SchemaEngine.Reload(context.Background())
|
||||
|
||||
filter := &binlogdatapb.Filter{
|
||||
Rules: []*binlogdatapb.Rule{{
|
||||
Match: "/.*",
|
||||
}},
|
||||
}
|
||||
bls := &binlogdatapb.BinlogSource{
|
||||
Keyspace: env.KeyspaceName,
|
||||
Shard: env.ShardName,
|
||||
Filter: filter,
|
||||
OnDdl: binlogdatapb.OnDDLAction_IGNORE,
|
||||
}
|
||||
cancel, _ := startVReplication(t, bls, "")
|
||||
defer cancel()
|
||||
type testcase struct {
|
||||
name string
|
||||
input string
|
||||
data [][]string
|
||||
}
|
||||
var testcases []testcase
|
||||
id := 0
|
||||
var addTestCase = func(name, val, val2 string) {
|
||||
id++
|
||||
testcases = append(testcases, testcase{
|
||||
name: name,
|
||||
input: fmt.Sprintf("insert into vitess_json2(val, val2) values (%s, %s)", encodeString(val), encodeString(val2)),
|
||||
data: [][]string{
|
||||
{strconv.Itoa(id), val, val2},
|
||||
},
|
||||
})
|
||||
}
|
||||
longString := strings.Repeat("aa", math.MaxInt16)
|
||||
largeObject := fmt.Sprintf(singleLargeObjectTemplate, longString)
|
||||
addTestCase("twoCols", jsonSingleDoc, largeObject)
|
||||
id = 0
|
||||
for _, tcase := range testcases {
|
||||
t.Run(tcase.name, func(t *testing.T) {
|
||||
id++
|
||||
execStatements(t, []string{tcase.input})
|
||||
want := []string{
|
||||
"begin",
|
||||
"/insert into vitess_json2",
|
||||
"/update _vt.vreplication set pos=",
|
||||
"commit",
|
||||
}
|
||||
expectDBClientQueries(t, want)
|
||||
expectJSON(t, "vitess_json2", tcase.data, id, env.Mysqld.FetchSuperQuery)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// TestPlayerJSONDocs validates more complex and 'large' json docs. It only validates that the data in the table
|
||||
// TestPlayerTypes, above, also verifies the sql queries applied on the target. It is too painful to test the applied
|
||||
// sql for larger jsons because of the need to escape special characters, so we check larger jsons separately
|
||||
|
@ -2433,7 +2498,7 @@ func TestTimestamp(t *testing.T) {
|
|||
func TestPlayerJSONDocs(t *testing.T) {
|
||||
log.Errorf("TestPlayerJSON: flavor is %s", env.Flavor)
|
||||
skipTest := true
|
||||
flavors := []string{"mysql80", "mysql57"}
|
||||
flavors := []string{"mysql80", "mysql57", "mysql56"}
|
||||
//flavors = append(flavors, "mysql56") // uncomment for local testing, in CI it fails on percona56
|
||||
for _, flavor := range flavors {
|
||||
if strings.EqualFold(env.Flavor, flavor) {
|
||||
|
@ -2442,6 +2507,7 @@ func TestPlayerJSONDocs(t *testing.T) {
|
|||
}
|
||||
}
|
||||
if skipTest {
|
||||
log.Warningf("not running TestPlayerJSONDocs")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -2480,8 +2546,6 @@ func TestPlayerJSONDocs(t *testing.T) {
|
|||
id := 0
|
||||
var addTestCase = func(name, val string) {
|
||||
id++
|
||||
//s := strings.ReplaceAll(val, "\n", "")
|
||||
//s = strings.ReplaceAll(s, " ", "")
|
||||
testcases = append(testcases, testcase{
|
||||
name: name,
|
||||
input: fmt.Sprintf("insert into vitess_json(val) values (%s)", encodeString(val)),
|
||||
|
@ -2490,11 +2554,20 @@ func TestPlayerJSONDocs(t *testing.T) {
|
|||
},
|
||||
})
|
||||
}
|
||||
addTestCase("singleDoc", jsonDoc1)
|
||||
addTestCase("multipleDocs", jsonDoc2)
|
||||
addTestCase("singleDoc", jsonSingleDoc)
|
||||
addTestCase("multipleDocs", jsonMultipleDocs)
|
||||
longString := strings.Repeat("aa", math.MaxInt16)
|
||||
|
||||
largeObject := fmt.Sprintf(singleLargeObjectTemplate, longString)
|
||||
addTestCase("singleLargeObject", largeObject)
|
||||
|
||||
largeArray := fmt.Sprintf(`[1, 1234567890, "a", true, %s]`, largeObject)
|
||||
_ = largeArray
|
||||
addTestCase("singleLargeArray", largeArray)
|
||||
|
||||
// the json doc is repeated multiple times to hit the 64K threshold: 140 is got by trial and error
|
||||
addTestCase("largeArrayDoc", repeatJSON(jsonDoc1, 140, largeJSONArrayCollection))
|
||||
addTestCase("largeObjectDoc", repeatJSON(jsonDoc1, 140, largeJSONObjectCollection))
|
||||
addTestCase("largeArrayDoc", repeatJSON(jsonSingleDoc, 140, largeJSONArrayCollection))
|
||||
addTestCase("largeObjectDoc", repeatJSON(jsonSingleDoc, 140, largeJSONObjectCollection))
|
||||
id = 0
|
||||
for _, tcase := range testcases {
|
||||
t.Run(tcase.name, func(t *testing.T) {
|
||||
|
@ -2567,6 +2640,7 @@ func expectJSON(t *testing.T, table string, values [][]string, id int, exec func
|
|||
want, err := ajson.Unmarshal([]byte(row[1]))
|
||||
require.NoError(t, err)
|
||||
match, err := got.Eq(want)
|
||||
//log.Infof(">>>>>>>> got \n-----\n%s\n------\n, want \n-----\n%s\n------\n", got, want)
|
||||
require.NoError(t, err)
|
||||
require.True(t, match)
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче