Merge pull request #2650 from alainjobart/rbr

Rbr: adding test with real DB, fixing problems.
This commit is contained in:
Alain Jobart 2017-03-23 08:46:33 -07:00 коммит произвёл GitHub
Родитель de406f2d1c 087d429d83
Коммит e7f2af8bb8
7 изменённых файлов: 904 добавлений и 158 удалений

Просмотреть файл

@ -308,6 +308,11 @@ ssl-key=%v/server-key.pem
testRowReplicationWithRealDatabase(t, &params)
})
// Test RBR types are working properly.
t.Run("RBRTypes", func(t *testing.T) {
testRowReplicationTypesWithRealDatabase(t, &params)
})
// Test Schema queries work as intended.
t.Run("Schema", func(t *testing.T) {
testSchema(t, &params)

Просмотреть файл

@ -117,4 +117,23 @@ But eventually, we probably want to remove it entirely, as it is not
transmitted over the wire. For now, we keep it for backward
compatibility with the C client.
--
Row-based replication:
The following types or constructs are not yet supported by our RBR:
- in MariaDB, the type TIMESTAMP(N) where N>0 is stored in the row the
exact same way as TIMESTAMP(0). So there is no way to get N, except
by knowing the table exact schema. This is such a corner case. MySQL
5.6+ uses TIMESTAMP2, and uses metadata to know the precision, so it
works there very nicely.
From mariaDB source code comment:
'So row-based replication between temporal data types of
different precision is not possible in MariaDB.'
- JSON is stored as an optimized index data blob in the row. We don't
parse it to re-print a text version for re-insertion. Instead, we
just return NULL. So JSOn is not supported.
*/

Просмотреть файл

@ -1,16 +1,21 @@
package replication
import (
"bytes"
"encoding/binary"
"fmt"
"math"
"strconv"
"time"
"github.com/youtube/vitess/go/sqltypes"
querypb "github.com/youtube/vitess/go/vt/proto/query"
)
// ZeroTimestamp is the special value 0 for a timestamp.
var ZeroTimestamp = []byte("0000-00-00 00:00:00")
// TableMap implements BinlogEvent.TableMap().
//
// Expected format (L = total length of event data):
@ -187,10 +192,8 @@ func cellLength(data []byte, pos int, typ byte, metadata uint16) (int, error) {
return 4, nil
case TypeLongLong, TypeDouble:
return 8, nil
case TypeDate, TypeNewDate:
case TypeDate, TypeTime, TypeNewDate:
return 3, nil
case TypeTime:
return 4, nil
case TypeDateTime:
return 8, nil
case TypeVarchar, TypeVarString:
@ -281,14 +284,11 @@ func cellLength(data []byte, pos int, typ byte, metadata uint16) (int, error) {
// This may do String, Enum, and Set. The type is in
// metadata. If it's a string, then there will be more bits.
// This will give us the maximum length of the field.
max := 0
t := metadata >> 8
if t == TypeEnum || t == TypeSet {
max = int(metadata & 0xff)
} else {
max = int((((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0xff))
return int(metadata & 0xff), nil
}
max := int((((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0xff))
// Length is encoded in 1 or 2 bytes.
if max > 255 {
l := int(uint64(data[pos]) |
@ -303,6 +303,22 @@ func cellLength(data []byte, pos int, typ byte, metadata uint16) (int, error) {
}
}
// printTimestamp is a helper method to append a timestamp into a bytes.Buffer,
// and return the Buffer.
func printTimestamp(v uint32) *bytes.Buffer {
if v == 0 {
return bytes.NewBuffer(ZeroTimestamp)
}
t := time.Unix(int64(v), 0).UTC()
year, month, day := t.Date()
hour, minute, second := t.Clock()
result := &bytes.Buffer{}
fmt.Fprintf(result, "%04d-%02d-%02d %02d:%02d:%02d", year, int(month), day, hour, minute, second)
return result
}
// CellValue returns the data for a cell as a sqltypes.Value, and how
// many bytes it takes. It only uses the querypb.Type value for the
// signed flag.
@ -316,6 +332,11 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
return sqltypes.MakeTrusted(querypb.Type_UINT8,
strconv.AppendUint(nil, uint64(data[pos]), 10)), 1, nil
case TypeYear:
val := data[pos]
if val == 0 {
return sqltypes.MakeTrusted(querypb.Type_YEAR,
[]byte{'0', '0', '0', '0'}), 1, nil
}
return sqltypes.MakeTrusted(querypb.Type_YEAR,
strconv.AppendUint(nil, uint64(data[pos])+1900, 10)), 1, nil
case TypeShort:
@ -362,8 +383,9 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
strconv.AppendFloat(nil, fval, 'E', -1, 64)), 8, nil
case TypeTimestamp:
val := binary.LittleEndian.Uint32(data[pos : pos+4])
txt := printTimestamp(val)
return sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
strconv.AppendUint(nil, uint64(val), 10)), 4, nil
txt.Bytes()), 4, nil
case TypeLongLong:
val := binary.LittleEndian.Uint64(data[pos : pos+8])
if sqltypes.IsSigned(styp) {
@ -382,12 +404,26 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
return sqltypes.MakeTrusted(querypb.Type_DATE,
[]byte(fmt.Sprintf("%04d-%02d-%02d", year, month, day))), 3, nil
case TypeTime:
val := binary.LittleEndian.Uint32(data[pos : pos+4])
hour := val / 10000
minute := (val % 10000) / 100
second := val % 100
var hour, minute, second int32
if data[pos+2]&128 > 0 {
// Negative number, have to extend the sign.
val := int32(uint32(data[pos]) +
uint32(data[pos+1])<<8 +
uint32(data[pos+2])<<16 +
uint32(255)<<24)
hour = val / 10000
minute = -((val % 10000) / 100)
second = -(val % 100)
} else {
val := int32(data[pos]) +
int32(data[pos+1])<<8 +
int32(data[pos+2])<<16
hour = val / 10000
minute = (val % 10000) / 100
second = val % 100
}
return sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte(fmt.Sprintf("%02d:%02d:%02d", hour, minute, second))), 4, nil
[]byte(fmt.Sprintf("%02d:%02d:%02d", hour, minute, second))), 3, nil
case TypeDateTime:
val := binary.LittleEndian.Uint64(data[pos : pos+8])
d := val / 1000000
@ -418,47 +454,54 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
return sqltypes.MakeTrusted(querypb.Type_BIT,
data[pos:pos+l]), l, nil
case TypeTimestamp2:
second := binary.LittleEndian.Uint32(data[pos : pos+4])
second := binary.BigEndian.Uint32(data[pos : pos+4])
txt := printTimestamp(second)
switch metadata {
case 1:
decimals := int(data[pos+4])
fmt.Fprintf(txt, ".%01d", decimals/10)
return sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
[]byte(fmt.Sprintf("%v.%01d", second, decimals))), 5, nil
txt.Bytes()), 5, nil
case 2:
decimals := int(data[pos+4])
fmt.Fprintf(txt, ".%02d", decimals)
return sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
[]byte(fmt.Sprintf("%v.%02d", second, decimals))), 5, nil
txt.Bytes()), 5, nil
case 3:
decimals := int(data[pos+4]) +
int(data[pos+5])<<8
decimals := int(data[pos+4])<<8 +
int(data[pos+5])
fmt.Fprintf(txt, ".%03d", decimals/10)
return sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
[]byte(fmt.Sprintf("%v.%03d", second, decimals))), 6, nil
txt.Bytes()), 6, nil
case 4:
decimals := int(data[pos+4]) +
int(data[pos+5])<<8
decimals := int(data[pos+4])<<8 +
int(data[pos+5])
fmt.Fprintf(txt, ".%04d", decimals)
return sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
[]byte(fmt.Sprintf("%v.%04d", second, decimals))), 6, nil
txt.Bytes()), 6, nil
case 5:
decimals := int(data[pos+4]) +
decimals := int(data[pos+4])<<16 +
int(data[pos+5])<<8 +
int(data[pos+6])<<16
int(data[pos+6])
fmt.Fprintf(txt, ".%05d", decimals/10)
return sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
[]byte(fmt.Sprintf("%v.%05d", second, decimals))), 7, nil
txt.Bytes()), 7, nil
case 6:
decimals := int(data[pos+4]) +
decimals := int(data[pos+4])<<16 +
int(data[pos+5])<<8 +
int(data[pos+6])<<16
int(data[pos+6])
fmt.Fprintf(txt, ".%06d", decimals)
return sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
[]byte(fmt.Sprintf("%v.%.6d", second, decimals))), 7, nil
txt.Bytes()), 7, nil
}
return sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
strconv.AppendUint(nil, uint64(second), 10)), 4, nil
txt.Bytes()), 4, nil
case TypeDateTime2:
ymdhms := (uint64(data[pos]) |
uint64(data[pos+1])<<8 |
ymdhms := (uint64(data[pos])<<32 |
uint64(data[pos+1])<<24 |
uint64(data[pos+2])<<16 |
uint64(data[pos+3])<<24 |
uint64(data[pos+4])<<32) - uint64(0x8000000000)
uint64(data[pos+3])<<8 |
uint64(data[pos+4])) - uint64(0x8000000000)
ymd := ymdhms >> 17
ym := ymd >> 5
hms := ymdhms % (1 << 17)
@ -471,46 +514,53 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
minute := (hms >> 6) % (1 << 6)
hour := hms >> 12
datetime := fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second)
txt := &bytes.Buffer{}
fmt.Fprintf(txt, "%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second)
switch metadata {
case 1:
decimals := int(data[pos+5])
fmt.Fprintf(txt, ".%01d", decimals/10)
return sqltypes.MakeTrusted(querypb.Type_DATETIME,
[]byte(fmt.Sprintf("%v.%01d", datetime, decimals))), 6, nil
txt.Bytes()), 6, nil
case 2:
decimals := int(data[pos+5])
fmt.Fprintf(txt, ".%02d", decimals)
return sqltypes.MakeTrusted(querypb.Type_DATETIME,
[]byte(fmt.Sprintf("%v.%02d", datetime, decimals))), 6, nil
txt.Bytes()), 6, nil
case 3:
decimals := int(data[pos+5]) +
int(data[pos+6])<<8
decimals := int(data[pos+5])<<8 +
int(data[pos+6])
fmt.Fprintf(txt, ".%03d", decimals/10)
return sqltypes.MakeTrusted(querypb.Type_DATETIME,
[]byte(fmt.Sprintf("%v.%03d", datetime, decimals))), 7, nil
txt.Bytes()), 7, nil
case 4:
decimals := int(data[pos+5]) +
int(data[pos+6])<<8
decimals := int(data[pos+5])<<8 +
int(data[pos+6])
fmt.Fprintf(txt, ".%04d", decimals)
return sqltypes.MakeTrusted(querypb.Type_DATETIME,
[]byte(fmt.Sprintf("%v.%04d", datetime, decimals))), 7, nil
txt.Bytes()), 7, nil
case 5:
decimals := int(data[pos+5]) +
decimals := int(data[pos+5])<<16 +
int(data[pos+6])<<8 +
int(data[pos+7])<<16
int(data[pos+7])
fmt.Fprintf(txt, ".%05d", decimals/10)
return sqltypes.MakeTrusted(querypb.Type_DATETIME,
[]byte(fmt.Sprintf("%v.%05d", datetime, decimals))), 8, nil
txt.Bytes()), 8, nil
case 6:
decimals := int(data[pos+5]) +
decimals := int(data[pos+5])<<16 +
int(data[pos+6])<<8 +
int(data[pos+7])<<16
int(data[pos+7])
fmt.Fprintf(txt, ".%06d", decimals)
return sqltypes.MakeTrusted(querypb.Type_DATETIME,
[]byte(fmt.Sprintf("%v.%.6d", datetime, decimals))), 8, nil
txt.Bytes()), 8, nil
}
return sqltypes.MakeTrusted(querypb.Type_DATETIME,
[]byte(datetime)), 5, nil
txt.Bytes()), 5, nil
case TypeTime2:
hms := (int64(data[pos]) |
hms := (int64(data[pos])<<16 |
int64(data[pos+1])<<8 |
int64(data[pos+2])<<16) - 0x800000
int64(data[pos+2])) - 0x800000
sign := ""
if hms < 0 {
hms = -hms
@ -534,34 +584,34 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
}
fracStr = fmt.Sprintf(".%.2d", frac)
case 3:
frac := int(data[pos+3]) |
int(data[pos+4])<<8
frac := int(data[pos+3])<<8 |
int(data[pos+4])
if sign == "-" && frac != 0 {
hms--
frac = 0x10000 - frac
}
fracStr = fmt.Sprintf(".%.3d", frac/10)
case 4:
frac := int(data[pos+3]) |
int(data[pos+4])<<8
frac := int(data[pos+3])<<8 |
int(data[pos+4])
if sign == "-" && frac != 0 {
hms--
frac = 0x10000 - frac
}
fracStr = fmt.Sprintf(".%.4d", frac)
case 5:
frac := int(data[pos+3]) |
frac := int(data[pos+3])<<16 |
int(data[pos+4])<<8 |
int(data[pos+5])<<16
int(data[pos+5])
if sign == "-" && frac != 0 {
hms--
frac = 0x1000000 - frac
}
fracStr = fmt.Sprintf(".%.5d", frac/10)
case 6:
frac := int(data[pos+3]) |
frac := int(data[pos+3])<<16 |
int(data[pos+4])<<8 |
int(data[pos+5])<<16
int(data[pos+5])
if sign == "-" && frac != 0 {
hms--
frac = 0x1000000 - frac
@ -576,14 +626,18 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
[]byte(fmt.Sprintf("%v%02d:%02d:%02d%v", sign, hour, minute, second, fracStr))), 3 + (int(metadata)+1)/2, nil
case TypeJSON:
l := int(uint64(data[pos]) |
uint64(data[pos+1])<<8)
// length in encoded in 'meta' bytes, but at least 2,
// and the value cannot be > 64k, so just read 2 bytes.
// (meta also should have '2' as value).
// (this weird logic is what event printing does).
l := int(uint64(data[pos]) |
uint64(data[pos+1])<<8)
return sqltypes.MakeTrusted(querypb.Type_JSON,
data[pos+int(metadata):pos+int(metadata)+l]), l + int(metadata), nil
// TODO(alainjobart) the binary data for JSON should
// be parsed, and re-printed as JSON. This is a large
// project, as the binary version of the data is
// somewhat complex. For now, just return NULL.
return sqltypes.NULL, l + int(metadata), nil
case TypeNewDecimal:
precision := int(metadata >> 8) // total digits number
@ -601,12 +655,13 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
d := make([]byte, l)
copy(d, data[pos:pos+l])
result := []byte{}
txt := &bytes.Buffer{}
isNegative := (d[0] & 0x80) == 0
d[0] ^= 0x80 // First bit is inverted.
if isNegative {
// Negative numbers are just inverted bytes.
result = append(result, '-')
txt.WriteByte('-')
for i := range d {
d[i] ^= 0xff
}
@ -638,55 +693,52 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
}
pos = dig2bytes[intg0x]
if val > 0 {
result = strconv.AppendUint(result, uint64(val), 10)
txt.Write(strconv.AppendUint(nil, uint64(val), 10))
}
// now the full digits, 32 bits each, 9 digits
for i := 0; i < intg0; i++ {
val = binary.BigEndian.Uint32(d[pos : pos+4])
t := fmt.Sprintf("%9d", val)
result = append(result, []byte(t)...)
fmt.Fprintf(txt, "%9d", val)
pos += 4
}
// now see if we have a fraction
if scale == 0 {
return sqltypes.MakeTrusted(querypb.Type_DECIMAL,
result), l, nil
txt.Bytes()), l, nil
}
result = append(result, '.')
txt.WriteByte('.')
// now the full fractional digits
for i := 0; i < frac0; i++ {
val = binary.BigEndian.Uint32(d[pos : pos+4])
t := fmt.Sprintf("%9d", val)
result = append(result, []byte(t)...)
fmt.Fprintf(txt, "%9d", val)
pos += 4
}
// then the partial fractional digits
t := ""
switch dig2bytes[frac0x] {
case 0:
// Nothing to do
return sqltypes.MakeTrusted(querypb.Type_DECIMAL,
result), l, nil
txt.Bytes()), l, nil
case 1:
// one byte, 1 or 2 digits
val = uint32(d[pos])
if frac0x == 1 {
t = fmt.Sprintf("%1d", val)
fmt.Fprintf(txt, "%1d", val)
} else {
t = fmt.Sprintf("%2d", val)
fmt.Fprintf(txt, "%2d", val)
}
case 2:
// two bytes, 3 or 4 digits
val = uint32(d[pos])<<8 +
uint32(d[pos+1])
if frac0x == 3 {
t = fmt.Sprintf("%3d", val)
fmt.Fprintf(txt, "%3d", val)
} else {
t = fmt.Sprintf("%4d", val)
fmt.Fprintf(txt, "%4d", val)
}
case 3:
// 3 bytes, 5 or 6 digits
@ -694,9 +746,9 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
uint32(d[pos+1])<<8 +
uint32(d[pos+2])
if frac0x == 5 {
t = fmt.Sprintf("%5d", val)
fmt.Fprintf(txt, "%5d", val)
} else {
t = fmt.Sprintf("%6d", val)
fmt.Fprintf(txt, "%6d", val)
}
case 4:
// 4 bytes, 7 or 8 digits (9 digits would be a full)
@ -705,15 +757,14 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
uint32(d[pos+2])<<8 +
uint32(d[pos+3])
if frac0x == 7 {
t = fmt.Sprintf("%7d", val)
fmt.Fprintf(txt, "%7d", val)
} else {
t = fmt.Sprintf("%8d", val)
fmt.Fprintf(txt, "%8d", val)
}
}
result = append(result, []byte(t)...)
return sqltypes.MakeTrusted(querypb.Type_DECIMAL,
result), l, nil
txt.Bytes()), l, nil
case TypeEnum:
switch metadata & 0xff {
@ -766,24 +817,32 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
// metadata. If it's a string, then there will be more bits.
t := metadata >> 8
if t == TypeEnum {
// We don't know the string values. So just use the
// numbers.
switch metadata & 0xff {
case 1:
// One byte storage.
return sqltypes.MakeTrusted(querypb.Type_ENUM,
return sqltypes.MakeTrusted(querypb.Type_UINT8,
strconv.AppendUint(nil, uint64(data[pos]), 10)), 1, nil
case 2:
// Two bytes storage.
val := binary.LittleEndian.Uint16(data[pos : pos+2])
return sqltypes.MakeTrusted(querypb.Type_ENUM,
return sqltypes.MakeTrusted(querypb.Type_UINT16,
strconv.AppendUint(nil, uint64(val), 10)), 2, nil
default:
return sqltypes.NULL, 0, fmt.Errorf("unexpected enum size: %v", metadata&0xff)
}
}
if t == TypeSet {
// We don't know the set values. So just use the
// numbers.
l := int(metadata & 0xff)
return sqltypes.MakeTrusted(querypb.Type_BIT,
data[pos:pos+l]), l, nil
var val uint64
for i := 0; i < l; i++ {
val += uint64(data[pos+i]) << (uint(i) * 8)
}
return sqltypes.MakeTrusted(querypb.Type_UINT64,
strconv.AppendUint(nil, uint64(val), 10)), l, nil
}
// This is a real string. The length is weird.
max := int((((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0xff))

Просмотреть файл

@ -82,10 +82,11 @@ func TestCellLengthAndData(t *testing.T) {
out: sqltypes.MakeTrusted(querypb.Type_FLOAT64,
[]byte("3.1415926535E+00")),
}, {
// 0x58d137c5 = 1490106309 = 2017-03-21 14:25:09
typ: TypeTimestamp,
data: []byte{0x84, 0x83, 0x82, 0x81},
data: []byte{0xc5, 0x37, 0xd1, 0x58},
out: sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
[]byte(fmt.Sprintf("%v", 0x81828384))),
[]byte("2017-03-21 14:25:09")),
}, {
typ: TypeLongLong,
styp: querypb.Type_UINT64,
@ -112,8 +113,8 @@ func TestCellLengthAndData(t *testing.T) {
[]byte("2010-10-03")),
}, {
typ: TypeTime,
// 154532 = 0x00025ba4
data: []byte{0xa4, 0x5b, 0x02, 0x00},
// 154532 = 0x025ba4
data: []byte{0xa4, 0x5b, 0x02},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("15:45:32")),
}, {
@ -141,98 +142,99 @@ func TestCellLengthAndData(t *testing.T) {
out: sqltypes.MakeTrusted(querypb.Type_BIT,
[]byte{3, 1}),
}, {
// 0x58d137c5 = 1490106309 = 2017-03-21 14:25:09
typ: TypeTimestamp2,
metadata: 0,
data: []byte{0x84, 0x83, 0x82, 0x81},
data: []byte{0x58, 0xd1, 0x37, 0xc5},
out: sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
[]byte(fmt.Sprintf("%v", 0x81828384))),
[]byte("2017-03-21 14:25:09")),
}, {
typ: TypeTimestamp2,
metadata: 1,
data: []byte{0x84, 0x83, 0x82, 0x81, 7},
data: []byte{0x58, 0xd1, 0x37, 0xc5, 70},
out: sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
[]byte(fmt.Sprintf("%v.7", 0x81828384))),
[]byte("2017-03-21 14:25:09.7")),
}, {
typ: TypeTimestamp2,
metadata: 2,
data: []byte{0x84, 0x83, 0x82, 0x81, 76},
data: []byte{0x58, 0xd1, 0x37, 0xc5, 76},
out: sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
[]byte(fmt.Sprintf("%v.76", 0x81828384))),
[]byte("2017-03-21 14:25:09.76")),
}, {
typ: TypeTimestamp2,
metadata: 3,
// 765 = 0x02fd
data: []byte{0x84, 0x83, 0x82, 0x81, 0xfd, 0x02},
// 7650 = 0x1de2
data: []byte{0x58, 0xd1, 0x37, 0xc5, 0x1d, 0xe2},
out: sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
[]byte(fmt.Sprintf("%v.765", 0x81828384))),
[]byte("2017-03-21 14:25:09.765")),
}, {
typ: TypeTimestamp2,
metadata: 4,
// 7654 = 0x1de6
data: []byte{0x84, 0x83, 0x82, 0x81, 0xe6, 0x1d},
data: []byte{0x58, 0xd1, 0x37, 0xc5, 0x1d, 0xe6},
out: sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
[]byte(fmt.Sprintf("%v.7654", 0x81828384))),
[]byte("2017-03-21 14:25:09.7654")),
}, {
typ: TypeTimestamp2,
metadata: 5,
// 76543 = 0x012aff
data: []byte{0x84, 0x83, 0x82, 0x81, 0xff, 0x2a, 0x01},
// 76540 = 0x0badf6
data: []byte{0x58, 0xd1, 0x37, 0xc5, 0x0b, 0xad, 0xf6},
out: sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
[]byte(fmt.Sprintf("%v.76543", 0x81828384))),
[]byte("2017-03-21 14:25:09.76543")),
}, {
typ: TypeTimestamp2,
metadata: 6,
// 765432 = 0x0badf8
data: []byte{0x84, 0x83, 0x82, 0x81, 0xf8, 0xad, 0x0b},
data: []byte{0x58, 0xd1, 0x37, 0xc5, 0x0b, 0xad, 0xf8},
out: sqltypes.MakeTrusted(querypb.Type_TIMESTAMP,
[]byte(fmt.Sprintf("%v.765432", 0x81828384))),
[]byte("2017-03-21 14:25:09.765432")),
}, {
typ: TypeDateTime2,
metadata: 0,
// (2012 * 13 + 6) << 22 + 21 << 17 + 15 << 12 + 45 << 6 + 17)
// = 109734198097 = 0x198caafb51
// Then have to add 0x8000000000 = 0x998caafb51
data: []byte{0x51, 0xfb, 0xaa, 0x8c, 0x99},
data: []byte{0x99, 0x8c, 0xaa, 0xfb, 0x51},
out: sqltypes.MakeTrusted(querypb.Type_DATETIME,
[]byte("2012-06-21 15:45:17")),
}, {
typ: TypeDateTime2,
metadata: 1,
data: []byte{0x51, 0xfb, 0xaa, 0x8c, 0x99, 7},
data: []byte{0x99, 0x8c, 0xaa, 0xfb, 0x51, 70},
out: sqltypes.MakeTrusted(querypb.Type_DATETIME,
[]byte("2012-06-21 15:45:17.7")),
}, {
typ: TypeDateTime2,
metadata: 2,
data: []byte{0x51, 0xfb, 0xaa, 0x8c, 0x99, 76},
data: []byte{0x99, 0x8c, 0xaa, 0xfb, 0x51, 76},
out: sqltypes.MakeTrusted(querypb.Type_DATETIME,
[]byte("2012-06-21 15:45:17.76")),
}, {
typ: TypeDateTime2,
metadata: 3,
// 765 = 0x02fd
data: []byte{0x51, 0xfb, 0xaa, 0x8c, 0x99, 0xfd, 0x02},
// 7650 = 0x1de2
data: []byte{0x99, 0x8c, 0xaa, 0xfb, 0x51, 0x1d, 0xe2},
out: sqltypes.MakeTrusted(querypb.Type_DATETIME,
[]byte("2012-06-21 15:45:17.765")),
}, {
typ: TypeDateTime2,
metadata: 4,
// 7654 = 0x1de6
data: []byte{0x51, 0xfb, 0xaa, 0x8c, 0x99, 0xe6, 0x1d},
data: []byte{0x99, 0x8c, 0xaa, 0xfb, 0x51, 0x1d, 0xe6},
out: sqltypes.MakeTrusted(querypb.Type_DATETIME,
[]byte("2012-06-21 15:45:17.7654")),
}, {
typ: TypeDateTime2,
metadata: 5,
// 76543 = 0x012aff
data: []byte{0x51, 0xfb, 0xaa, 0x8c, 0x99, 0xff, 0x2a, 0x01},
// 765430 = 0x0badf6
data: []byte{0x99, 0x8c, 0xaa, 0xfb, 0x51, 0x0b, 0xad, 0xf6},
out: sqltypes.MakeTrusted(querypb.Type_DATETIME,
[]byte("2012-06-21 15:45:17.76543")),
}, {
typ: TypeDateTime2,
metadata: 6,
// 765432 = 0x0badf8
data: []byte{0x51, 0xfb, 0xaa, 0x8c, 0x99, 0xf8, 0xad, 0x0b},
data: []byte{0x99, 0x8c, 0xaa, 0xfb, 0x51, 0x0b, 0xad, 0xf8},
out: sqltypes.MakeTrusted(querypb.Type_DATETIME,
[]byte("2012-06-21 15:45:17.765432")),
}, {
@ -248,130 +250,130 @@ func TestCellLengthAndData(t *testing.T) {
// 7FFFFE.F6 -2 246 -00:00:01.10 FFFFFFFFFE.FE7960
typ: TypeTime2,
metadata: 2,
data: []byte{0x00, 0x00, 0x80, 0x00},
data: []byte{0x80, 0x00, 0x00, 0x00},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("00:00:00.00")),
}, {
typ: TypeTime2,
metadata: 2,
data: []byte{0xff, 0xff, 0x7f, 0xff},
data: []byte{0x7f, 0xff, 0xff, 0xff},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("-00:00:00.01")),
}, {
typ: TypeTime2,
metadata: 2,
data: []byte{0xff, 0xff, 0x7f, 0x9d},
data: []byte{0x7f, 0xff, 0xff, 0x9d},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("-00:00:00.99")),
}, {
typ: TypeTime2,
metadata: 2,
data: []byte{0xff, 0xff, 0x7f, 0x00},
data: []byte{0x7f, 0xff, 0xff, 0x00},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("-00:00:01.00")),
}, {
typ: TypeTime2,
metadata: 2,
data: []byte{0xfe, 0xff, 0x7f, 0xff},
data: []byte{0x7f, 0xff, 0xfe, 0xff},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("-00:00:01.01")),
}, {
typ: TypeTime2,
metadata: 2,
data: []byte{0xfe, 0xff, 0x7f, 0xf6},
data: []byte{0x7f, 0xff, 0xfe, 0xf6},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("-00:00:01.10")),
}, {
// Similar tests for 4 decimals.
typ: TypeTime2,
metadata: 4,
data: []byte{0x00, 0x00, 0x80, 0x00, 0x00},
data: []byte{0x80, 0x00, 0x00, 0x00, 0x00},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("00:00:00.0000")),
}, {
typ: TypeTime2,
metadata: 4,
data: []byte{0xff, 0xff, 0x7f, 0xff, 0xff},
data: []byte{0x7f, 0xff, 0xff, 0xff, 0xff},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("-00:00:00.0001")),
}, {
typ: TypeTime2,
metadata: 4,
data: []byte{0xff, 0xff, 0x7f, 0x9d, 0xff},
data: []byte{0x7f, 0xff, 0xff, 0xff, 0x9d},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("-00:00:00.0099")),
}, {
typ: TypeTime2,
metadata: 4,
data: []byte{0xff, 0xff, 0x7f, 0x00, 0x00},
data: []byte{0x7f, 0xff, 0xff, 0x00, 0x00},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("-00:00:01.0000")),
}, {
typ: TypeTime2,
metadata: 4,
data: []byte{0xfe, 0xff, 0x7f, 0xff, 0xff},
data: []byte{0x7f, 0xff, 0xfe, 0xff, 0xff},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("-00:00:01.0001")),
}, {
typ: TypeTime2,
metadata: 4,
data: []byte{0xfe, 0xff, 0x7f, 0xf6, 0xff},
data: []byte{0x7f, 0xff, 0xfe, 0xff, 0xf6},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("-00:00:01.0010")),
}, {
// Similar tests for 6 decimals.
typ: TypeTime2,
metadata: 6,
data: []byte{0x00, 0x00, 0x80, 0x00, 0x00, 0x00},
data: []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("00:00:00.000000")),
}, {
typ: TypeTime2,
metadata: 6,
data: []byte{0xff, 0xff, 0x7f, 0xff, 0xff, 0xff},
data: []byte{0x7f, 0xff, 0xff, 0xff, 0xff, 0xff},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("-00:00:00.000001")),
}, {
typ: TypeTime2,
metadata: 6,
data: []byte{0xff, 0xff, 0x7f, 0x9d, 0xff, 0xff},
data: []byte{0x7f, 0xff, 0xff, 0xff, 0xff, 0x9d},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("-00:00:00.000099")),
}, {
typ: TypeTime2,
metadata: 6,
data: []byte{0xff, 0xff, 0x7f, 0x00, 0x00, 0x00},
data: []byte{0x7f, 0xff, 0xff, 0x00, 0x00, 0x00},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("-00:00:01.000000")),
}, {
typ: TypeTime2,
metadata: 6,
data: []byte{0xfe, 0xff, 0x7f, 0xff, 0xff, 0xff},
data: []byte{0x7f, 0xff, 0xfe, 0xff, 0xff, 0xff},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("-00:00:01.000001")),
}, {
typ: TypeTime2,
metadata: 6,
data: []byte{0xfe, 0xff, 0x7f, 0xf6, 0xff, 0xff},
data: []byte{0x7f, 0xff, 0xfe, 0xff, 0xff, 0xf6},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("-00:00:01.000010")),
}, {
// Few more tests.
typ: TypeTime2,
metadata: 0,
data: []byte{0x00, 0x00, 0x80},
data: []byte{0x80, 0x00, 0x00},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("00:00:00")),
}, {
typ: TypeTime2,
metadata: 1,
data: []byte{0x01, 0x00, 0x80, 0x0a},
data: []byte{0x80, 0x00, 0x01, 0x0a},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("00:00:01.1")),
}, {
typ: TypeTime2,
metadata: 2,
data: []byte{0x01, 0x00, 0x80, 0x0a},
data: []byte{0x80, 0x00, 0x01, 0x0a},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("00:00:01.10")),
}, {
@ -379,15 +381,14 @@ func TestCellLengthAndData(t *testing.T) {
metadata: 0,
// 15 << 12 + 34 << 6 + 54 = 63670 = 0x00f8b6
// and need to add 0x800000
data: []byte{0xb6, 0xf8, 0x80},
data: []byte{0x80, 0xf8, 0xb6},
out: sqltypes.MakeTrusted(querypb.Type_TIME,
[]byte("15:34:54")),
}, {
typ: TypeJSON,
metadata: 2,
data: []byte{0x03, 0x00, 'a', 'b', 'c'},
out: sqltypes.MakeTrusted(querypb.Type_JSON,
[]byte("abc")),
out: sqltypes.NULL,
}, {
typ: TypeEnum,
metadata: 1,

Просмотреть файл

@ -1,6 +1,8 @@
package mysqlconn
import (
"bytes"
"fmt"
"reflect"
"strings"
"sync"
@ -11,6 +13,9 @@ import (
"github.com/youtube/vitess/go/mysqlconn/replication"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/sqltypes"
querypb "github.com/youtube/vitess/go/vt/proto/query"
)
func TestComBinlogDump(t *testing.T) {
@ -602,3 +607,627 @@ func testRowReplicationWithRealDatabase(t *testing.T, params *sqldb.ConnParams)
}
}
// testRowReplicationTypesWithRealDatabase creates a table wih all
// supported data types. Then we insert a row in it. then we re-build
// the SQL for the values, re-insert these. Then we select from the
// database and make sure both rows are identical.
func testRowReplicationTypesWithRealDatabase(t *testing.T, params *sqldb.ConnParams) {
// testcases are ordered by the types numbers in constants.go.
// Number are always unsigned, as we don't pass in sqltypes.Type.
testcases := []struct {
name string
createType string
createValue string
}{{
// TINYINT
name: "tinytiny",
createType: "TINYINT UNSIGNED",
createValue: "145",
}, {
// SMALLINT
name: "smallish",
createType: "SMALLINT UNSIGNED",
createValue: "40000",
}, {
// INT
name: "regular_int",
createType: "INT UNSIGNED",
createValue: "4000000000",
}, {
// FLOAT
name: "floating",
createType: "FLOAT",
createValue: "-3.14159E-22",
}, {
// DOUBLE
name: "doubling",
createType: "DOUBLE",
createValue: "-3.14159265359E+12",
}, {
// TIMESTAMP (zero value)
name: "timestamp_zero",
createType: "TIMESTAMP",
createValue: "'0000-00-00 00:00:00'",
}, {
// TIMESTAMP (day precision)
name: "timestamp_day",
createType: "TIMESTAMP",
createValue: "'2012-11-10 00:00:00'",
}, {
// BIGINT
name: "big_int",
createType: "BIGINT UNSIGNED",
createValue: "10000000000000000000",
}, {
// MEDIUMINT
name: "mediumish",
createType: "MEDIUMINT UNSIGNED",
createValue: "10000000",
}, {
// DATE
name: "date_regular",
createType: "DATE",
createValue: "'1920-10-24'",
}, {
// TIME
name: "time_regular",
createType: "TIME",
createValue: "'120:44:58'",
}, {
// TIME
name: "time_neg",
createType: "TIME",
createValue: "'-212:44:58'",
}, {
// DATETIME
name: "datetime0",
createType: "DATETIME",
createValue: "'1020-08-23 12:44:58'",
}, {
// YEAR zero
name: "year0",
createType: "YEAR",
createValue: "0",
}, {
// YEAR
name: "year_nonzero",
createType: "YEAR",
createValue: "2052",
}, {
// VARCHAR 8 bits
name: "shortvc",
createType: "VARCHAR(30)",
createValue: "'short varchar'",
}, {
// VARCHAR 16 bits
name: "longvc",
createType: "VARCHAR(1000)",
createValue: "'long varchar'",
}, {
// BIT
name: "bit1",
createType: "BIT",
createValue: "b'1'",
}, {
// BIT
name: "bit6",
createType: "BIT(6)",
createValue: "b'100101'",
}, {
// BIT
name: "bit8",
createType: "BIT(8)",
createValue: "b'10100101'",
}, {
// BIT
name: "bit14",
createType: "BIT(14)",
createValue: "b'10100101000111'",
}, {
// BIT
name: "bit55",
createType: "BIT(55)",
createValue: "b'1010010100110100101001101001010011010010100110100101001'",
}, {
// BIT
name: "bit64",
createType: "BIT(64)",
createValue: "b'1111111111010010100110100101001101001010011010010100110100101001'",
}, {
// DECIMAL
name: "decimal2_1",
createType: "DECIMAL(2,1)",
createValue: "1.2",
}, {
// DECIMAL neg
name: "decimal2_1_neg",
createType: "DECIMAL(2,1)",
createValue: "-5.6",
}, {
// DECIMAL
name: "decimal4_2",
createType: "DECIMAL(4,2)",
createValue: "61.52",
}, {
// DECIMAL neg
name: "decimal4_2_neg",
createType: "DECIMAL(4,2)",
createValue: "-78.94",
}, {
// DECIMAL
name: "decimal6_3",
createType: "DECIMAL(6,3)",
createValue: "611.542",
}, {
// DECIMAL neg
name: "decimal6_3_neg",
createType: "DECIMAL(6,3)",
createValue: "-478.394",
}, {
// DECIMAL
name: "decimal8_4",
createType: "DECIMAL(8,4)",
createValue: "6311.5742",
}, {
// DECIMAL neg
name: "decimal8_4_neg",
createType: "DECIMAL(8,4)",
createValue: "-4778.3894",
}, {
// DECIMAL
name: "decimal10_5",
createType: "DECIMAL(10,5)",
createValue: "63711.57342",
}, {
// DECIMAL neg
name: "decimal10_5_neg",
createType: "DECIMAL(10,5)",
createValue: "-47378.38594",
}, {
// DECIMAL
name: "decimal12_6",
createType: "DECIMAL(12,6)",
createValue: "637311.557342",
}, {
// DECIMAL neg
name: "decimal12_6_neg",
createType: "DECIMAL(12,6)",
createValue: "-473788.385794",
}, {
// DECIMAL
name: "decimal14_7",
createType: "DECIMAL(14,7)",
createValue: "6375311.5574342",
}, {
// DECIMAL neg
name: "decimal14_7_neg",
createType: "DECIMAL(14,7)",
createValue: "-4732788.3853794",
}, {
// DECIMAL
name: "decimal16_8",
createType: "DECIMAL(16,8)",
createValue: "63375311.54574342",
}, {
// DECIMAL neg
name: "decimal16_8_neg",
createType: "DECIMAL(16,8)",
createValue: "-47327788.38533794",
}, {
// DECIMAL
name: "decimal18_9",
createType: "DECIMAL(18,9)",
createValue: "633075311.545714342",
}, {
// DECIMAL neg
name: "decimal18_9_neg",
createType: "DECIMAL(18,9)",
createValue: "-473327788.385033794",
}, {
// DECIMAL
name: "decimal20_10",
createType: "DECIMAL(20,10)",
createValue: "6330375311.5405714342",
}, {
// DECIMAL neg
name: "decimal20_10_neg",
createType: "DECIMAL(20,10)",
createValue: "-4731327788.3850337294",
}, {
// DECIMAL lots of left digits
name: "decimal34_0",
createType: "DECIMAL(34,0)",
createValue: "8765432345678987654345432123456786",
}, {
// DECIMAL lots of left digits neg
name: "decimal34_0_neg",
createType: "DECIMAL(34,0)",
createValue: "-8765432345678987654345432123456786",
}, {
// DECIMAL lots of right digits
name: "decimal34_30",
createType: "DECIMAL(34,30)",
createValue: "8765.432345678987654345432123456786",
}, {
// DECIMAL lots of right digits neg
name: "decimal34_30_neg",
createType: "DECIMAL(34,30)",
createValue: "-8765.432345678987654345432123456786",
}, {
// ENUM
name: "tshirtsize",
createType: "ENUM('x-small', 'small', 'medium', 'large', 'x-larg')",
createValue: "'large'",
}, {
// SET
name: "setnumbers",
createType: "SET('one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten')",
createValue: "'two,three,ten'",
}, {
// TINYBLOB
name: "tiny_blob",
createType: "TINYBLOB",
createValue: "'ab\\'cd'",
}, {
// BLOB
name: "bloby",
createType: "BLOB",
createValue: "'ab\\'cd'",
}, {
// MEDIUMBLOB
name: "medium_blob",
createType: "MEDIUMBLOB",
createValue: "'ab\\'cd'",
}, {
// LONGBLOB
name: "long_blob",
createType: "LONGBLOB",
createValue: "'ab\\'cd'",
}, {
// CHAR 8 bits
name: "shortchar",
createType: "CHAR(30)",
createValue: "'short char'",
}, {
// CHAR 9 bits (100 * 3 = 300, 256<=300<512)
name: "mediumchar",
createType: "CHAR(100)",
createValue: "'medium char'",
}, {
// CHAR 10 bits (250 * 3 = 750, 512<=750<124)
name: "longchar",
createType: "CHAR(250)",
createValue: "'long char'",
}, {
// GEOMETRY
name: "geo_stuff",
createType: "GEOMETRY",
createValue: "ST_GeomFromText('POINT(1 1)')",
}}
conn, isMariaDB, f := connectForReplication(t, params, true /* rbr */)
defer conn.Close()
// MariaDB timestamp(N) is not supported by our RBR. See doc.go.
if !isMariaDB {
testcases = append(testcases, []struct {
name string
createType string
createValue string
}{{
// TIMESTAMP (second precision)
name: "timestamp_second",
createType: "TIMESTAMP",
createValue: "'2012-11-10 15:34:56'",
}, {
// TIMESTAMP (100 millisecond precision)
name: "timestamp_100millisecond",
createType: "TIMESTAMP(1)",
createValue: "'2012-11-10 15:34:56.6'",
}, {
// TIMESTAMP (10 millisecond precision)
name: "timestamp_10millisecond",
createType: "TIMESTAMP(2)",
createValue: "'2012-11-10 15:34:56.01'",
}, {
// TIMESTAMP (millisecond precision)
name: "timestamp_millisecond",
createType: "TIMESTAMP(3)",
createValue: "'2012-11-10 15:34:56.012'",
}, {
// TIMESTAMP (100 microsecond precision)
name: "timestamp_100microsecond",
createType: "TIMESTAMP(4)",
createValue: "'2012-11-10 15:34:56.0123'",
}, {
// TIMESTAMP (10 microsecond precision)
name: "timestamp_10microsecond",
createType: "TIMESTAMP(5)",
createValue: "'2012-11-10 15:34:56.01234'",
}, {
// TIMESTAMP (microsecond precision)
name: "timestamp_microsecond",
createType: "TIMESTAMP(6)",
createValue: "'2012-11-10 15:34:56.012345'",
}, {
// TIMESTAMP (0 with microsecond precision)
name: "timestamp_microsecond_z",
createType: "TIMESTAMP(6)",
createValue: "'0000-00-00 00:00:00.000000'",
}, {
// TIME
name: "time_100milli",
createType: "TIME(1)",
createValue: "'12:44:58.3'",
}, {
// TIME
name: "time_10milli",
createType: "TIME(2)",
createValue: "'412:44:58.01'",
}, {
// TIME
name: "time_milli",
createType: "TIME(3)",
createValue: "'-12:44:58.012'",
}, {
// TIME
name: "time_100micro",
createType: "TIME(4)",
createValue: "'12:44:58.0123'",
}, {
// TIME
name: "time_10micro",
createType: "TIME(5)",
createValue: "'12:44:58.01234'",
}, {
// TIME
name: "time_micro",
createType: "TIME(6)",
createValue: "'-12:44:58.012345'",
}, {
// DATETIME
name: "datetime1",
createType: "DATETIME(1)",
createValue: "'1020-08-23 12:44:58.8'",
}, {
// DATETIME
name: "datetime2",
createType: "DATETIME(2)",
createValue: "'1020-08-23 12:44:58.01'",
}, {
// DATETIME
name: "datetime3",
createType: "DATETIME(3)",
createValue: "'1020-08-23 12:44:58.012'",
}, {
// DATETIME
name: "datetime4",
createType: "DATETIME(4)",
createValue: "'1020-08-23 12:44:58.0123'",
}, {
// DATETIME
name: "datetime5",
createType: "DATETIME(5)",
createValue: "'1020-08-23 12:44:58.01234'",
}, {
// DATETIME
name: "datetime6",
createType: "DATETIME(6)",
createValue: "'1020-08-23 12:44:58.012345'",
}}...)
}
// JSON is only supported by MySQL 5.7+
// However the binary format is not just the text version.
// So it doesn't work as expected.
if false && strings.HasPrefix(conn.ServerVersion, "5.7") {
testcases = append(testcases, struct {
name string
createType string
createValue string
}{
// JSON
name: "json1",
createType: "JSON",
createValue: "'{\"a\":\"b\"}'",
})
}
ctx := context.Background()
dConn, err := Connect(ctx, params)
if err != nil {
t.Fatal(err)
}
defer dConn.Close()
// Set the connection time zone for execution of the
// statements to PST. That way we're sure to test the
// conversion for the TIMESTAMP types.
if _, err := dConn.ExecuteFetch("SET time_zone = '+08:00'", 0, false); err != nil {
t.Fatal(err)
}
// Create the table with all fields.
createTable := "create table replicationtypes(id int"
for _, tcase := range testcases {
createTable += fmt.Sprintf(", %v %v", tcase.name, tcase.createType)
}
createTable += ", primary key(id))"
if _, err := dConn.ExecuteFetch(createTable, 0, false); err != nil {
t.Fatal(err)
}
// Insert the value with all fields.
insert := "insert into replicationtypes set id=1"
for _, tcase := range testcases {
insert += fmt.Sprintf(", %v=%v", tcase.name, tcase.createValue)
}
result, err := dConn.ExecuteFetch(insert, 0, false)
if err != nil {
t.Fatalf("insert failed: %v", err)
}
if result.RowsAffected != 1 || len(result.Rows) != 0 {
t.Errorf("unexpected result for insert: %v", result)
}
// Get the new events from the binlogs.
// Only care about the Write event.
var tableID uint64
var tableMap *replication.TableMap
var values []sqltypes.Value
for values == nil {
data, err := conn.ReadPacket()
if err != nil {
t.Fatalf("ReadPacket failed: %v", err)
}
// Make sure it's a replication packet.
switch data[0] {
case OKPacket:
// What we expect, handled below.
case ErrPacket:
err := parseErrorPacket(data)
t.Fatalf("ReadPacket returned an error packet: %v", err)
default:
// Very unexpected.
t.Fatalf("ReadPacket returned a weird packet: %v", data)
}
// See what we got, strip the checksum.
be := newBinlogEvent(isMariaDB, data)
if !be.IsValid() {
t.Fatalf("read an invalid packet: %v", be)
}
be, _, err = be.StripChecksum(f)
if err != nil {
t.Fatalf("StripChecksum failed: %v", err)
}
switch {
case be.IsTableMap():
tableID = be.TableID(f) // This would be 0x00ffffff for an event to clear all table map entries.
var err error
tableMap, err = be.TableMap(f)
if err != nil {
t.Fatalf("TableMap event is broken: %v", err)
}
t.Logf("Got Table Map event: %v %v", tableID, tableMap)
if tableMap.Database != "vttest" ||
tableMap.Name != "replicationtypes" ||
len(tableMap.Types) != len(testcases)+1 ||
tableMap.CanBeNull.Bit(0) {
t.Errorf("got wrong TableMap: %v", tableMap)
}
case be.IsWriteRows():
if got := be.TableID(f); got != tableID {
t.Fatalf("WriteRows event got table ID %v but was expecting %v", got, tableID)
}
wr, err := be.Rows(f, tableMap)
if err != nil {
t.Fatalf("Rows event is broken: %v", err)
}
// Check it has the right values
values, err = valuesForTests(t, &wr, tableMap, 0)
if err != nil {
t.Fatalf("valuesForTests is broken: %v", err)
}
t.Logf("Got WriteRows event data: %v %v", wr, values)
if len(values) != len(testcases)+1 {
t.Fatalf("Got wrong length %v for values, was expecting %v", len(values), len(testcases)+1)
}
default:
t.Logf("Got unrelated event: %v", be)
}
}
// Insert a second row with the same data.
var sql bytes.Buffer
sql.WriteString("insert into replicationtypes set id=2")
for i, tcase := range testcases {
sql.WriteString(", ")
sql.WriteString(tcase.name)
sql.WriteString(" = ")
if values[i+1].Type() == querypb.Type_TIMESTAMP && !bytes.HasPrefix(values[i+1].Raw(), replication.ZeroTimestamp) {
// Values in the binary log are UTC. Let's convert them
// to whatever timezone the connection is using,
// so MySQL properly converts them back to UTC.
sql.WriteString("convert_tz(")
values[i+1].EncodeSQL(&sql)
sql.WriteString(", '+00:00', @@session.time_zone)")
} else {
values[i+1].EncodeSQL(&sql)
}
}
result, err = dConn.ExecuteFetch(sql.String(), 0, false)
if err != nil {
t.Fatalf("insert '%v' failed: %v", sql.String(), err)
}
if result.RowsAffected != 1 || len(result.Rows) != 0 {
t.Errorf("unexpected result for insert: %v", result)
}
t.Logf("Insert after getting event is: %v", sql.String())
// Re-select both rows, make sure all columns are the same.
stmt := "select id"
for _, tcase := range testcases {
stmt += ", " + tcase.name
}
stmt += " from replicationtypes"
result, err = dConn.ExecuteFetch(stmt, 2, false)
if err != nil {
t.Fatalf("select failed: %v", err)
}
if len(result.Rows) != 2 {
t.Fatalf("unexpected result for select: %v", result)
}
for i, tcase := range testcases {
if !reflect.DeepEqual(result.Rows[0][i+1], result.Rows[1][i+1]) {
t.Errorf("Field %v is not the same, got %v(%v) and %v(%v)", tcase.name, result.Rows[0][i+1], result.Rows[0][i+1].Type, result.Rows[1][i+1], result.Rows[1][i+1].Type)
}
}
// Drop the table, we're done.
if _, err := dConn.ExecuteFetch("drop table replicationtypes", 0, false); err != nil {
t.Fatal(err)
}
}
// valuesForTests is a helper method to return the sqltypes.Value
// of all columns in a row in a Row. Only use it in tests, as the
// returned values cannot be interpreted correctly without the schema.
// We assume everything is unsigned in this method.
func valuesForTests(t *testing.T, rs *replication.Rows, tm *replication.TableMap, rowIndex int) ([]sqltypes.Value, error) {
var result []sqltypes.Value
valueIndex := 0
data := rs.Rows[rowIndex].Data
pos := 0
for c := 0; c < rs.DataColumns.Count(); c++ {
if !rs.DataColumns.Bit(c) {
continue
}
if rs.Rows[rowIndex].NullColumns.Bit(valueIndex) {
// This column is represented, but its value is NULL.
result = append(result, sqltypes.NULL)
valueIndex++
continue
}
// We have real data
value, l, err := replication.CellValue(data, pos, tm.Types[c], tm.Metadata[c], querypb.Type_UINT64)
if err != nil {
return nil, err
}
result = append(result, value)
t.Logf(" %v: type=%v data=%v metadata=%v -> %v", c, tm.Types[c], data[pos:pos+l], tm.Metadata[c], value)
pos += l
valueIndex++
}
return result, nil
}

Просмотреть файл

@ -264,6 +264,11 @@ func TestServer(t *testing.T) {
// TestClearTextServer creates a Server that needs clear text passwords from the client.
func TestClearTextServer(t *testing.T) {
// If the database we're using is MariaDB, the client
// is also the MariaDB client, that does support
// clear text by default.
isMariaDB := os.Getenv("MYSQL_FLAVOR") == "MariaDB"
th := &testHandler{}
authServer := NewAuthServerConfig()
@ -292,24 +297,34 @@ func TestClearTextServer(t *testing.T) {
Pass: "password1",
}
// Run a 'select rows' command with results.
// This should fail as clear text is not enabled by default on the client.
// Run a 'select rows' command with results. This should fail
// as clear text is not enabled by default on the client
// (except MariaDB).
l.AllowClearTextWithoutTLS = true
output, ok := runMysql(t, params, "select rows")
sql := "select rows"
output, ok := runMysql(t, params, sql)
if ok {
t.Fatalf("mysql should have failed but returned: %v", output)
}
if strings.Contains(output, "No such file or directory") {
t.Logf("skipping mysql clear text tests, as the clear text plugin cannot be loaded: %v", err)
return
}
if !strings.Contains(output, "plugin not enabled") {
t.Errorf("Unexpected output for 'select rows': %v", output)
if isMariaDB {
t.Logf("mysql should have failed but returned: %v\nbut letting it go on MariaDB", output)
} else {
t.Fatalf("mysql should have failed but returned: %v", output)
}
} else {
if strings.Contains(output, "No such file or directory") {
t.Logf("skipping mysql clear text tests, as the clear text plugin cannot be loaded: %v", err)
return
}
if !strings.Contains(output, "plugin not enabled") {
t.Errorf("Unexpected output for 'select rows': %v", output)
}
}
// Now enable clear text plugin in client, but server requires SSL.
l.AllowClearTextWithoutTLS = false
output, ok = runMysql(t, params, enableCleartextPluginPrefix+"select rows")
if !isMariaDB {
sql = enableCleartextPluginPrefix + sql
}
output, ok = runMysql(t, params, sql)
if ok {
t.Fatalf("mysql should have failed but returned: %v", output)
}
@ -319,7 +334,7 @@ func TestClearTextServer(t *testing.T) {
// Now enable clear text plugin, it should now work.
l.AllowClearTextWithoutTLS = true
output, ok = runMysql(t, params, enableCleartextPluginPrefix+"select rows")
output, ok = runMysql(t, params, sql)
if !ok {
t.Fatalf("mysql failed: %v", output)
}
@ -331,7 +346,7 @@ func TestClearTextServer(t *testing.T) {
// Change password, make sure server rejects us.
params.Pass = ""
output, ok = runMysql(t, params, enableCleartextPluginPrefix+"select rows")
output, ok = runMysql(t, params, sql)
if ok {
t.Fatalf("mysql should have failed but returned: %v", output)
}

Просмотреть файл

@ -733,7 +733,16 @@ func writeValuesAsSQL(sql *bytes.Buffer, tce *tableCacheEntry, rs *replication.R
if err != nil {
return keyspaceIDCell, nil, err
}
value.EncodeSQL(sql)
if value.Type() == querypb.Type_TIMESTAMP && !bytes.HasPrefix(value.Raw(), replication.ZeroTimestamp) {
// Values in the binary log are UTC. Let's convert them
// to whatever timezone the connection is using,
// so MySQL properly converts them back to UTC.
sql.WriteString("convert_tz(")
value.EncodeSQL(sql)
sql.WriteString(", '+00:00', @@session.time_zone)")
} else {
value.EncodeSQL(sql)
}
if c == tce.keyspaceIDIndex {
keyspaceIDCell = value
}
@ -785,7 +794,16 @@ func writeIdentifiesAsSQL(sql *bytes.Buffer, tce *tableCacheEntry, rs *replicati
if err != nil {
return keyspaceIDCell, nil, err
}
value.EncodeSQL(sql)
if value.Type() == querypb.Type_TIMESTAMP && !bytes.HasPrefix(value.Raw(), replication.ZeroTimestamp) {
// Values in the binary log are UTC. Let's convert them
// to whatever timezone the connection is using,
// so MySQL properly converts them back to UTC.
sql.WriteString("convert_tz(")
value.EncodeSQL(sql)
sql.WriteString(", '+00:00', @@session.time_zone)")
} else {
value.EncodeSQL(sql)
}
if c == tce.keyspaceIDIndex {
keyspaceIDCell = value
}