Binary PK: fix bug where padding of binary columns was being done incorrectly

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
This commit is contained in:
Rohit Nayak 2020-10-28 14:14:26 +01:00
Родитель 9cab72e09b
Коммит 774b7ddf37
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: BA0A4E9168156524
8 изменённых файлов: 260 добавлений и 32 удалений

Просмотреть файл

@ -872,12 +872,7 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
l := int(data[pos])
mdata := data[pos+1 : pos+1+l]
if sqltypes.IsBinary(styp) {
// Fixed length binaries have to be padded with zeroes
// up to the length of the field. Otherwise, equality checks
// fail against saved data. See https://github.com/vitessio/vitess/issues/3984.
ret := make([]byte, max)
copy(ret, mdata)
return sqltypes.MakeTrusted(querypb.Type_BINARY, ret), l + 1, nil
return sqltypes.MakeTrusted(querypb.Type_BINARY, mdata), l + 1, nil
}
return sqltypes.MakeTrusted(querypb.Type_VARCHAR, mdata), l + 1, nil

Просмотреть файл

@ -27,9 +27,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"vitess.io/vitess/go/mysql"
@ -515,7 +514,6 @@ func expectNontxQueries(t *testing.T, queries []string) {
retry:
select {
case got = <-globalDBQueries:
if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "update _vt.vreplication set pos") || heartbeatRe.MatchString(got) {
goto retry
}
@ -530,11 +528,9 @@ func expectNontxQueries(t *testing.T, queries []string) {
} else {
match = (got == query)
}
if !match {
t.Errorf("query:\n%q, does not match query %d:\n%q", got, i, query)
}
require.True(t, match, "query %d:: got:%s, want:%s", i, got, query)
case <-time.After(5 * time.Second):
t.Errorf("no query received, expecting %s", query)
t.Fatalf("no query received, expecting %s", query)
failed = true
}
}

Просмотреть файл

@ -65,8 +65,10 @@ type colExpr struct {
// references contains all the column names referenced in the expression.
references map[string]bool
isGrouped bool
isPK bool
isGrouped bool
isPK bool
dataType string
columnType string
}
// operation is the opcode for the colExpr.
@ -470,12 +472,14 @@ func (tpb *tablePlanBuilder) analyzePK(pkInfoMap map[string][]*PrimaryKeyInfo) e
for _, pkcol := range pkcols {
cexpr := tpb.findCol(sqlparser.NewColIdent(pkcol.Name))
if cexpr == nil {
return fmt.Errorf("primary key column %s not found in select list", pkcol)
return fmt.Errorf("primary key column %v not found in select list", pkcol)
}
if cexpr.operation != opExpr {
return fmt.Errorf("primary key column %s is not allowed to reference an aggregate expression", pkcol)
return fmt.Errorf("primary key column %v is not allowed to reference an aggregate expression", pkcol)
}
cexpr.isPK = true
cexpr.dataType = pkcol.DataType
cexpr.columnType = pkcol.ColumnType
tpb.pkCols = append(tpb.pkCols, cexpr)
}
return nil
@ -662,16 +666,29 @@ func (tpb *tablePlanBuilder) generateDeleteStatement() *sqlparser.ParsedQuery {
return buf.ParsedQuery()
}
// For binary(n) column types, the value in the where clause needs to be padded with nulls upto the length of the column
// for MySQL comparison to work properly. This is achieved by casting it to the column type
func castIfNecessary(buf *sqlparser.TrackedBuffer, cexpr *colExpr) {
if cexpr.dataType == "binary" {
buf.Myprintf("cast(%v as %s)", cexpr.expr, cexpr.columnType)
return
}
buf.Myprintf("%v", cexpr.expr)
}
func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) {
buf.WriteString(" where ")
bvf.mode = bvBefore
separator := ""
for _, cexpr := range tpb.pkCols {
if _, ok := cexpr.expr.(*sqlparser.ColName); ok {
buf.Myprintf("%s%v=%v", separator, cexpr.colName, cexpr.expr)
buf.Myprintf("%s%v=", separator, cexpr.colName)
castIfNecessary(buf, cexpr)
} else {
// Parenthesize non-trivial expressions.
buf.Myprintf("%s%v=(%v)", separator, cexpr.colName, cexpr.expr)
buf.Myprintf("%s%v=(", separator, cexpr.colName)
castIfNecessary(buf, cexpr)
buf.Myprintf(")")
}
separator = " and "
}

Просмотреть файл

@ -30,6 +30,106 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
)
func TestPlayerCopyCharPK(t *testing.T) {
defer deleteTablet(addTablet(100))
savedPacketSize := *vstreamer.PacketSize
// PacketSize of 1 byte will send at most one row at a time.
*vstreamer.PacketSize = 1
defer func() { *vstreamer.PacketSize = savedPacketSize }()
savedCopyTimeout := copyTimeout
// copyTimeout should be low enough to have time to send one row.
copyTimeout = 500 * time.Millisecond
defer func() { copyTimeout = savedCopyTimeout }()
savedWaitRetryTime := waitRetryTime
// waitRetry time should be very low to cause the wait loop to execute multipel times.
waitRetryTime = 10 * time.Millisecond
defer func() { waitRetryTime = savedWaitRetryTime }()
execStatements(t, []string{
"create table src(idc binary(2) , val int, primary key(idc))",
"insert into src values('a', 1), ('c', 2)",
fmt.Sprintf("create table %s.dst(idc binary(2), val int, primary key(idc))", vrepldb),
})
defer execStatements(t, []string{
"drop table src",
fmt.Sprintf("drop table %s.dst", vrepldb),
})
env.SchemaEngine.Reload(context.Background())
count := 0
vstreamRowsSendHook = func(ctx context.Context) {
defer func() { count++ }()
// Allow the first two calls to go through: field info and one row.
if count <= 1 {
return
}
// Insert a row with PK which is < the lastPK till now because of the utf8mb4 collation
execStatements(t, []string{
"update src set val = 3 where idc = 'a\000'",
})
// Wait for context to expire and then send the row.
// This will cause the copier to abort and go back to catchup mode.
<-ctx.Done()
// Do this no more than once.
vstreamRowsSendHook = nil
}
vstreamHook = func(context.Context) {
// Sleeping 50ms guarantees that the catchup wait loop executes multiple times.
// This is because waitRetryTime is set to 10ms.
time.Sleep(50 * time.Millisecond)
// Do this no more than once.
vstreamHook = nil
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "dst",
Filter: "select * from src",
}},
}
bls := &binlogdatapb.BinlogSource{
Keyspace: env.KeyspaceName,
Shard: env.ShardName,
Filter: filter,
OnDdl: binlogdatapb.OnDDLAction_IGNORE,
}
query := binlogplayer.CreateVReplicationState("test", bls, "", binlogplayer.VReplicationInit, playerEngine.dbName)
qr, err := playerEngine.Exec(query)
if err != nil {
t.Fatal(err)
}
defer func() {
query := fmt.Sprintf("delete from _vt.vreplication where id = %d", qr.InsertID)
if _, err := playerEngine.Exec(query); err != nil {
t.Fatal(err)
}
expectDeleteQueries(t)
}()
expectNontxQueries(t, []string{
"/insert into _vt.vreplication",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"insert into dst(idc,val) values ('a\\0',1)",
`/update _vt.copy_state set lastpk='fields:<name:\\"idc\\" type:BINARY > rows:<lengths:2 values:\\"a\\\\000\\" > ' where vrepl_id=.*`,
`update dst set val=3 where idc=cast('a' as binary(2)) and ('a') <= ('a\0')`,
"insert into dst(idc,val) values ('c\\0',2)",
`/update _vt.copy_state set lastpk='fields:<name:\\"idc\\" type:BINARY > rows:<lengths:2 values:\\"c\\\\000\\" > ' where vrepl_id=.*`,
"/delete from _vt.copy_state.*dst",
"/update _vt.vreplication set state='Running'",
})
expectData(t, "dst", [][]string{
{"a\000", "3"},
{"c\000", "2"},
})
}
// TestPlayerCopyVarcharPKCaseInsensitive tests the copy/catchup phase for a table with a varchar primary key
// which is case insensitive.
func TestPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {

Просмотреть файл

@ -34,6 +34,72 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
func TestCharPK(t *testing.T) {
defer deleteTablet(addTablet(100))
execStatements(t, []string{
"create table t1(id int, val binary(2), primary key(val))",
fmt.Sprintf("create table %s.t1(id int, val binary(2), primary key(val))", vrepldb),
})
defer execStatements(t, []string{
"drop table t1",
fmt.Sprintf("drop table %s.t1", vrepldb),
})
env.SchemaEngine.Reload(context.Background())
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
}},
}
bls := &binlogdatapb.BinlogSource{
Keyspace: env.KeyspaceName,
Shard: env.ShardName,
Filter: filter,
OnDdl: binlogdatapb.OnDDLAction_IGNORE,
}
cancel, _ := startVReplication(t, bls, "")
defer cancel()
testcases := []struct {
input string
output string
table string
data [][]string
}{{
// Start with all nulls
input: "insert into t1 values(1, 'a')",
output: "insert into t1(id,val) values (1,'a')",
table: "t1",
data: [][]string{
{"1", "a\000"},
},
}, {
// Start with all nulls
input: "update t1 set id = 2 where val = 'a\000'",
output: "update t1 set id=2 where val=cast('a' as binary(2))",
table: "t1",
data: [][]string{
{"2", "a\000"},
},
}}
for _, tcases := range testcases {
execStatements(t, []string{tcases.input})
output := []string{
"begin",
tcases.output,
"/update _vt.vreplication set pos",
"commit",
}
expectDBClientQueries(t, output)
if tcases.table != "" {
expectData(t, tcases.table, tcases.data)
}
}
}
func TestRollup(t *testing.T) {
defer deleteTablet(addTablet(100))

Просмотреть файл

@ -193,9 +193,11 @@ func (vr *vreplicator) replicate(ctx context.Context) error {
// PrimaryKeyInfo is used to store charset and collation for primary keys where applicable
type PrimaryKeyInfo struct {
Name string
CharSet string
Collation string
Name string
CharSet string
Collation string
DataType string
ColumnType string
}
func (vr *vreplicator) buildPkInfoMap(ctx context.Context) (map[string][]*PrimaryKeyInfo, error) {
@ -203,7 +205,7 @@ func (vr *vreplicator) buildPkInfoMap(ctx context.Context) (map[string][]*Primar
if err != nil {
return nil, err
}
queryTemplate := "select character_set_name, collation_name,column_name,data_type from information_schema.columns where table_schema=%s and table_name=%s;"
queryTemplate := "select character_set_name, collation_name, column_name, data_type, column_type from information_schema.columns where table_schema=%s and table_name=%s;"
pkInfoMap := make(map[string][]*PrimaryKeyInfo)
for _, td := range schema.TableDefinitions {
@ -212,8 +214,8 @@ func (vr *vreplicator) buildPkInfoMap(ctx context.Context) (map[string][]*Primar
if err != nil {
return nil, err
}
if len(qr.Rows) > 0 && len(qr.Fields) != 4 {
return nil, fmt.Errorf("incorrect result returned for collation query")
if len(qr.Rows) == 0 {
return nil, fmt.Errorf("no data returned from information_schema.columns")
}
var pks []string
@ -226,6 +228,7 @@ func (vr *vreplicator) buildPkInfoMap(ctx context.Context) (map[string][]*Primar
for _, pk := range pks {
charSet := ""
collation := ""
var dataType, columnType string
for _, row := range qr.Rows {
columnName := row[2].ToString()
if strings.EqualFold(columnName, pk) {
@ -236,17 +239,27 @@ func (vr *vreplicator) buildPkInfoMap(ctx context.Context) (map[string][]*Primar
break
}
}
if currentField != nil && sqltypes.IsText(currentField.Type) {
if currentField == nil {
continue
}
dataType = row[3].ToString()
columnType = row[4].ToString()
if sqltypes.IsText(currentField.Type) {
charSet = row[0].ToString()
collation = row[1].ToString()
}
break
}
}
if dataType == "" || columnType == "" {
return nil, fmt.Errorf("no dataType/columnType found in information_schema.columns for table %s, column %s", td.Name, pk)
}
pkInfos = append(pkInfos, &PrimaryKeyInfo{
Name: pk,
CharSet: charSet,
Collation: collation,
Name: pk,
CharSet: charSet,
Collation: collation,
DataType: dataType,
ColumnType: columnType,
})
}
pkInfoMap[td.Name] = pkInfos

Просмотреть файл

@ -172,7 +172,6 @@ func (vs *vstreamer) replicate(ctx context.Context) error {
return wrapError(err, vs.pos, vs.vse)
}
defer conn.Close()
events, err := conn.StartBinlogDumpFromPosition(vs.ctx, vs.pos)
if err != nil {
return wrapError(err, vs.pos, vs.vse)

Просмотреть файл

@ -53,6 +53,47 @@ func checkIfOptionIsSupported(t *testing.T, variable string) bool {
return false
}
func TestCellValuePadding(t *testing.T) {
execStatements(t, []string{
"create table t1(id int, val binary(4), primary key(val))",
"create table t2(id int, val char(4), primary key(val))",
})
defer execStatements(t, []string{
"drop table t1",
"drop table t2",
})
engine.se.Reload(context.Background())
queries := []string{
"begin",
"insert into t1 values (1, 'aaa')",
"insert into t1 values (2, 'bbb')",
"update t1 set id = 11 where val = 'aaa\000'",
"insert into t2 values (1, 'aaa')",
"insert into t2 values (2, 'bbb')",
"update t2 set id = 11 where val = 'aaa'",
"commit",
}
testcases := []testcase{{
input: queries,
output: [][]string{{
`begin`,
`type:FIELD field_event:<table_name:"t1" fields:<name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 > fields:<name:"val" type:BINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:4 charset:63 > > `,
`type:ROW row_event:<table_name:"t1" row_changes:<after:<lengths:1 lengths:3 values:"1aaa" > > > `,
`type:ROW row_event:<table_name:"t1" row_changes:<after:<lengths:1 lengths:3 values:"2bbb" > > > `,
`type:ROW row_event:<table_name:"t1" row_changes:<before:<lengths:1 lengths:3 values:"1aaa" > after:<lengths:2 lengths:3 values:"11aaa" > > > `,
`type:FIELD field_event:<table_name:"t2" fields:<name:"id" type:INT32 table:"t2" org_table:"t2" database:"vttest" org_name:"id" column_length:11 charset:63 > fields:<name:"val" type:CHAR table:"t2" org_table:"t2" database:"vttest" org_name:"val" column_length:12 charset:33 > > `,
`type:ROW row_event:<table_name:"t2" row_changes:<after:<lengths:1 lengths:3 values:"1aaa" > > > `,
`type:ROW row_event:<table_name:"t2" row_changes:<after:<lengths:1 lengths:3 values:"2bbb" > > > `,
`type:ROW row_event:<table_name:"t2" row_changes:<before:<lengths:1 lengths:3 values:"1aaa" > after:<lengths:2 lengths:3 values:"11aaa" > > > `,
`gtid`,
`commit`,
}},
}}
runCases(t, nil, testcases, "current", nil)
}
func TestSetStatement(t *testing.T) {
if testing.Short() {
@ -558,6 +599,7 @@ func TestSavepoint(t *testing.T) {
}}
runCases(t, nil, testcases, "current", nil)
}
func TestStatements(t *testing.T) {
if testing.Short() {
t.Skip()
@ -1364,8 +1406,8 @@ func TestTypes(t *testing.T) {
output: [][]string{{
`begin`,
`type:FIELD field_event:<table_name:"vitess_strings" fields:<name:"vb" type:VARBINARY table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"vb" column_length:16 charset:63 > fields:<name:"c" type:CHAR table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"c" column_length:48 charset:33 > fields:<name:"vc" type:VARCHAR table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"vc" column_length:48 charset:33 > fields:<name:"b" type:BINARY table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"b" column_length:4 charset:63 > fields:<name:"tb" type:BLOB table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"tb" column_length:255 charset:63 > fields:<name:"bl" type:BLOB table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"bl" column_length:65535 charset:63 > fields:<name:"ttx" type:TEXT table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"ttx" column_length:765 charset:33 > fields:<name:"tx" type:TEXT table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"tx" column_length:196605 charset:33 > fields:<name:"en" type:ENUM table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"en" column_length:3 charset:33 > fields:<name:"s" type:SET table:"vitess_strings" org_table:"vitess_strings" database:"vttest" org_name:"s" column_length:9 charset:33 > > `,
`type:ROW row_event:<table_name:"vitess_strings" row_changes:<after:<lengths:1 lengths:1 lengths:1 lengths:4 lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 ` +
`values:"abcd\000\000\000efgh13" > > > `,
`type:ROW row_event:<table_name:"vitess_strings" row_changes:<after:<lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 ` +
`values:"abcdefgh13" > > > `,
`gtid`,
`commit`,
}},