From 774b7ddf37baf1c043f24d7491293e9aabe51bbe Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 28 Oct 2020 14:14:26 +0100 Subject: [PATCH] Binary PK: fix bug where padding of binary columns was being done incorrectly Signed-off-by: Rohit Nayak --- go/mysql/binlog_event_rbr.go | 7 +- .../vreplication/framework_test.go | 10 +- .../vreplication/table_plan_builder.go | 29 +++-- .../vreplication/vcopier_test.go | 100 ++++++++++++++++++ .../vreplication/vplayer_flaky_test.go | 66 ++++++++++++ .../tabletmanager/vreplication/vreplicator.go | 33 ++++-- .../tabletserver/vstreamer/vstreamer.go | 1 - .../tabletserver/vstreamer/vstreamer_test.go | 46 +++++++- 8 files changed, 260 insertions(+), 32 deletions(-) diff --git a/go/mysql/binlog_event_rbr.go b/go/mysql/binlog_event_rbr.go index 89e87c1909..1b37bf5a88 100644 --- a/go/mysql/binlog_event_rbr.go +++ b/go/mysql/binlog_event_rbr.go @@ -872,12 +872,7 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ l := int(data[pos]) mdata := data[pos+1 : pos+1+l] if sqltypes.IsBinary(styp) { - // Fixed length binaries have to be padded with zeroes - // up to the length of the field. Otherwise, equality checks - // fail against saved data. See https://github.com/vitessio/vitess/issues/3984. - ret := make([]byte, max) - copy(ret, mdata) - return sqltypes.MakeTrusted(querypb.Type_BINARY, ret), l + 1, nil + return sqltypes.MakeTrusted(querypb.Type_BINARY, mdata), l + 1, nil } return sqltypes.MakeTrusted(querypb.Type_VARCHAR, mdata), l + 1, nil diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 145e9b8559..60449db43f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -27,9 +27,8 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/require" "golang.org/x/net/context" "vitess.io/vitess/go/mysql" @@ -515,7 +514,6 @@ func expectNontxQueries(t *testing.T, queries []string) { retry: select { case got = <-globalDBQueries: - if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "update _vt.vreplication set pos") || heartbeatRe.MatchString(got) { goto retry } @@ -530,11 +528,9 @@ func expectNontxQueries(t *testing.T, queries []string) { } else { match = (got == query) } - if !match { - t.Errorf("query:\n%q, does not match query %d:\n%q", got, i, query) - } + require.True(t, match, "query %d:: got:%s, want:%s", i, got, query) case <-time.After(5 * time.Second): - t.Errorf("no query received, expecting %s", query) + t.Fatalf("no query received, expecting %s", query) failed = true } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 9a7c5bad7d..ee1930abcf 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -65,8 +65,10 @@ type colExpr struct { // references contains all the column names referenced in the expression. references map[string]bool - isGrouped bool - isPK bool + isGrouped bool + isPK bool + dataType string + columnType string } // operation is the opcode for the colExpr. @@ -470,12 +472,14 @@ func (tpb *tablePlanBuilder) analyzePK(pkInfoMap map[string][]*PrimaryKeyInfo) e for _, pkcol := range pkcols { cexpr := tpb.findCol(sqlparser.NewColIdent(pkcol.Name)) if cexpr == nil { - return fmt.Errorf("primary key column %s not found in select list", pkcol) + return fmt.Errorf("primary key column %v not found in select list", pkcol) } if cexpr.operation != opExpr { - return fmt.Errorf("primary key column %s is not allowed to reference an aggregate expression", pkcol) + return fmt.Errorf("primary key column %v is not allowed to reference an aggregate expression", pkcol) } cexpr.isPK = true + cexpr.dataType = pkcol.DataType + cexpr.columnType = pkcol.ColumnType tpb.pkCols = append(tpb.pkCols, cexpr) } return nil @@ -662,16 +666,29 @@ func (tpb *tablePlanBuilder) generateDeleteStatement() *sqlparser.ParsedQuery { return buf.ParsedQuery() } +// For binary(n) column types, the value in the where clause needs to be padded with nulls upto the length of the column +// for MySQL comparison to work properly. This is achieved by casting it to the column type +func castIfNecessary(buf *sqlparser.TrackedBuffer, cexpr *colExpr) { + if cexpr.dataType == "binary" { + buf.Myprintf("cast(%v as %s)", cexpr.expr, cexpr.columnType) + return + } + buf.Myprintf("%v", cexpr.expr) +} + func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) { buf.WriteString(" where ") bvf.mode = bvBefore separator := "" for _, cexpr := range tpb.pkCols { if _, ok := cexpr.expr.(*sqlparser.ColName); ok { - buf.Myprintf("%s%v=%v", separator, cexpr.colName, cexpr.expr) + buf.Myprintf("%s%v=", separator, cexpr.colName) + castIfNecessary(buf, cexpr) } else { // Parenthesize non-trivial expressions. - buf.Myprintf("%s%v=(%v)", separator, cexpr.colName, cexpr.expr) + buf.Myprintf("%s%v=(", separator, cexpr.colName) + castIfNecessary(buf, cexpr) + buf.Myprintf(")") } separator = " and " } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index d5f2028a7c..dc3959b62b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -30,6 +30,106 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" ) +func TestPlayerCopyCharPK(t *testing.T) { + defer deleteTablet(addTablet(100)) + + savedPacketSize := *vstreamer.PacketSize + // PacketSize of 1 byte will send at most one row at a time. + *vstreamer.PacketSize = 1 + defer func() { *vstreamer.PacketSize = savedPacketSize }() + + savedCopyTimeout := copyTimeout + // copyTimeout should be low enough to have time to send one row. + copyTimeout = 500 * time.Millisecond + defer func() { copyTimeout = savedCopyTimeout }() + + savedWaitRetryTime := waitRetryTime + // waitRetry time should be very low to cause the wait loop to execute multipel times. + waitRetryTime = 10 * time.Millisecond + defer func() { waitRetryTime = savedWaitRetryTime }() + + execStatements(t, []string{ + "create table src(idc binary(2) , val int, primary key(idc))", + "insert into src values('a', 1), ('c', 2)", + fmt.Sprintf("create table %s.dst(idc binary(2), val int, primary key(idc))", vrepldb), + }) + defer execStatements(t, []string{ + "drop table src", + fmt.Sprintf("drop table %s.dst", vrepldb), + }) + env.SchemaEngine.Reload(context.Background()) + + count := 0 + vstreamRowsSendHook = func(ctx context.Context) { + defer func() { count++ }() + // Allow the first two calls to go through: field info and one row. + if count <= 1 { + return + } + // Insert a row with PK which is < the lastPK till now because of the utf8mb4 collation + execStatements(t, []string{ + "update src set val = 3 where idc = 'a\000'", + }) + // Wait for context to expire and then send the row. + // This will cause the copier to abort and go back to catchup mode. + <-ctx.Done() + // Do this no more than once. + vstreamRowsSendHook = nil + } + + vstreamHook = func(context.Context) { + // Sleeping 50ms guarantees that the catchup wait loop executes multiple times. + // This is because waitRetryTime is set to 10ms. + time.Sleep(50 * time.Millisecond) + // Do this no more than once. + vstreamHook = nil + } + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "dst", + Filter: "select * from src", + }}, + } + + bls := &binlogdatapb.BinlogSource{ + Keyspace: env.KeyspaceName, + Shard: env.ShardName, + Filter: filter, + OnDdl: binlogdatapb.OnDDLAction_IGNORE, + } + + query := binlogplayer.CreateVReplicationState("test", bls, "", binlogplayer.VReplicationInit, playerEngine.dbName) + qr, err := playerEngine.Exec(query) + if err != nil { + t.Fatal(err) + } + defer func() { + query := fmt.Sprintf("delete from _vt.vreplication where id = %d", qr.InsertID) + if _, err := playerEngine.Exec(query); err != nil { + t.Fatal(err) + } + expectDeleteQueries(t) + }() + + expectNontxQueries(t, []string{ + "/insert into _vt.vreplication", + "/insert into _vt.copy_state", + "/update _vt.vreplication set state='Copying'", + "insert into dst(idc,val) values ('a\\0',1)", + `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, + `update dst set val=3 where idc=cast('a' as binary(2)) and ('a') <= ('a\0')`, + "insert into dst(idc,val) values ('c\\0',2)", + `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, + "/delete from _vt.copy_state.*dst", + "/update _vt.vreplication set state='Running'", + }) + expectData(t, "dst", [][]string{ + {"a\000", "3"}, + {"c\000", "2"}, + }) +} + // TestPlayerCopyVarcharPKCaseInsensitive tests the copy/catchup phase for a table with a varchar primary key // which is case insensitive. func TestPlayerCopyVarcharPKCaseInsensitive(t *testing.T) { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index c3850807a3..443ae2edea 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -34,6 +34,72 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) +func TestCharPK(t *testing.T) { + defer deleteTablet(addTablet(100)) + + execStatements(t, []string{ + "create table t1(id int, val binary(2), primary key(val))", + fmt.Sprintf("create table %s.t1(id int, val binary(2), primary key(val))", vrepldb), + }) + defer execStatements(t, []string{ + "drop table t1", + fmt.Sprintf("drop table %s.t1", vrepldb), + }) + env.SchemaEngine.Reload(context.Background()) + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + } + bls := &binlogdatapb.BinlogSource{ + Keyspace: env.KeyspaceName, + Shard: env.ShardName, + Filter: filter, + OnDdl: binlogdatapb.OnDDLAction_IGNORE, + } + cancel, _ := startVReplication(t, bls, "") + defer cancel() + + testcases := []struct { + input string + output string + table string + data [][]string + }{{ + // Start with all nulls + input: "insert into t1 values(1, 'a')", + output: "insert into t1(id,val) values (1,'a')", + table: "t1", + data: [][]string{ + {"1", "a\000"}, + }, + }, { + // Start with all nulls + input: "update t1 set id = 2 where val = 'a\000'", + output: "update t1 set id=2 where val=cast('a' as binary(2))", + table: "t1", + data: [][]string{ + {"2", "a\000"}, + }, + }} + + for _, tcases := range testcases { + execStatements(t, []string{tcases.input}) + output := []string{ + "begin", + tcases.output, + "/update _vt.vreplication set pos", + "commit", + } + expectDBClientQueries(t, output) + if tcases.table != "" { + expectData(t, tcases.table, tcases.data) + } + } +} + func TestRollup(t *testing.T) { defer deleteTablet(addTablet(100)) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 48135c9910..510a27e286 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -193,9 +193,11 @@ func (vr *vreplicator) replicate(ctx context.Context) error { // PrimaryKeyInfo is used to store charset and collation for primary keys where applicable type PrimaryKeyInfo struct { - Name string - CharSet string - Collation string + Name string + CharSet string + Collation string + DataType string + ColumnType string } func (vr *vreplicator) buildPkInfoMap(ctx context.Context) (map[string][]*PrimaryKeyInfo, error) { @@ -203,7 +205,7 @@ func (vr *vreplicator) buildPkInfoMap(ctx context.Context) (map[string][]*Primar if err != nil { return nil, err } - queryTemplate := "select character_set_name, collation_name,column_name,data_type from information_schema.columns where table_schema=%s and table_name=%s;" + queryTemplate := "select character_set_name, collation_name, column_name, data_type, column_type from information_schema.columns where table_schema=%s and table_name=%s;" pkInfoMap := make(map[string][]*PrimaryKeyInfo) for _, td := range schema.TableDefinitions { @@ -212,8 +214,8 @@ func (vr *vreplicator) buildPkInfoMap(ctx context.Context) (map[string][]*Primar if err != nil { return nil, err } - if len(qr.Rows) > 0 && len(qr.Fields) != 4 { - return nil, fmt.Errorf("incorrect result returned for collation query") + if len(qr.Rows) == 0 { + return nil, fmt.Errorf("no data returned from information_schema.columns") } var pks []string @@ -226,6 +228,7 @@ func (vr *vreplicator) buildPkInfoMap(ctx context.Context) (map[string][]*Primar for _, pk := range pks { charSet := "" collation := "" + var dataType, columnType string for _, row := range qr.Rows { columnName := row[2].ToString() if strings.EqualFold(columnName, pk) { @@ -236,17 +239,27 @@ func (vr *vreplicator) buildPkInfoMap(ctx context.Context) (map[string][]*Primar break } } - if currentField != nil && sqltypes.IsText(currentField.Type) { + if currentField == nil { + continue + } + dataType = row[3].ToString() + columnType = row[4].ToString() + if sqltypes.IsText(currentField.Type) { charSet = row[0].ToString() collation = row[1].ToString() } break } } + if dataType == "" || columnType == "" { + return nil, fmt.Errorf("no dataType/columnType found in information_schema.columns for table %s, column %s", td.Name, pk) + } pkInfos = append(pkInfos, &PrimaryKeyInfo{ - Name: pk, - CharSet: charSet, - Collation: collation, + Name: pk, + CharSet: charSet, + Collation: collation, + DataType: dataType, + ColumnType: columnType, }) } pkInfoMap[td.Name] = pkInfos diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 6d387beee8..411bad9225 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -172,7 +172,6 @@ func (vs *vstreamer) replicate(ctx context.Context) error { return wrapError(err, vs.pos, vs.vse) } defer conn.Close() - events, err := conn.StartBinlogDumpFromPosition(vs.ctx, vs.pos) if err != nil { return wrapError(err, vs.pos, vs.vse) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index baceab92f8..4ea6b4dc1d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -53,6 +53,47 @@ func checkIfOptionIsSupported(t *testing.T, variable string) bool { return false } +func TestCellValuePadding(t *testing.T) { + + execStatements(t, []string{ + "create table t1(id int, val binary(4), primary key(val))", + "create table t2(id int, val char(4), primary key(val))", + }) + defer execStatements(t, []string{ + "drop table t1", + "drop table t2", + }) + engine.se.Reload(context.Background()) + queries := []string{ + "begin", + "insert into t1 values (1, 'aaa')", + "insert into t1 values (2, 'bbb')", + "update t1 set id = 11 where val = 'aaa\000'", + "insert into t2 values (1, 'aaa')", + "insert into t2 values (2, 'bbb')", + "update t2 set id = 11 where val = 'aaa'", + "commit", + } + + testcases := []testcase{{ + input: queries, + output: [][]string{{ + `begin`, + `type:FIELD field_event: fields: > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: after: > > `, + `type:FIELD field_event: fields: > `, + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `type:ROW row_event: after: > > `, + `gtid`, + `commit`, + }}, + }} + runCases(t, nil, testcases, "current", nil) +} + func TestSetStatement(t *testing.T) { if testing.Short() { @@ -558,6 +599,7 @@ func TestSavepoint(t *testing.T) { }} runCases(t, nil, testcases, "current", nil) } + func TestStatements(t *testing.T) { if testing.Short() { t.Skip() @@ -1364,8 +1406,8 @@ func TestTypes(t *testing.T) { output: [][]string{{ `begin`, `type:FIELD field_event: fields: fields: fields: fields: fields: fields: fields: fields: fields: > `, - `type:ROW row_event: > > `, + `type:ROW row_event: > > `, `gtid`, `commit`, }},