VReplication: Handle DECIMAL 0 Value Edge Case (#11212)

* Handle DECIMAL 0 val edge case

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Add testing

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Add vplayer/vcopier test for decimal with 0 scale. Use different logic for handling zero values with 0 scale

Signed-off-by: Rohit Nayak <rohit@planetscale.com>

* Minor changes after self review

Signed-off-by: Matt Lord <mattalord@gmail.com>

Signed-off-by: Matt Lord <mattalord@gmail.com>
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
Co-authored-by: Rohit Nayak <rohit@planetscale.com>
This commit is contained in:
Matt Lord 2022-09-13 15:21:45 -04:00 коммит произвёл GitHub
Родитель b87c56f735
Коммит eb59094db8
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 43 добавлений и 8 удалений

Просмотреть файл

@ -710,6 +710,14 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, field *querypb.F
// now see if we have a fraction
if scale == 0 {
// When the field is a DECIMAL using a scale of 0, e.g.
// DECIMAL(5,0), a binlogged value of 0 is almost treated
// like the NULL byte and we get a 0 byte length value.
// In this case let's return the correct value of 0.
if txt.Len() == 0 {
txt.WriteRune('0')
}
return sqltypes.MakeTrusted(querypb.Type_DECIMAL,
txt.Bytes()), l, nil
}

Просмотреть файл

@ -36,7 +36,7 @@ var (
create table product(pid int, description varbinary(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid)) CHARSET=utf8mb4;
create table customer(cid int, name varchar(128) collate utf8mb4_bin, meta json default null, typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),
ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00',
date2 datetime not null default '2021-00-01 00:00:00', primary key(cid,typ)) CHARSET=utf8mb4;
date2 datetime not null default '2021-00-01 00:00:00', dec80 decimal(8,0), primary key(cid,typ)) CHARSET=utf8mb4;
create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table merchant(mname varchar(128), category varchar(128), primary key(mname)) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
create table orders(oid int, cid int, pid int, mname varchar(128), price int, qty int, total int as (qty * price), total2 int as (qty * price) stored, primary key(oid)) CHARSET=utf8;

Просмотреть файл

@ -446,6 +446,8 @@ func insertInitialData(t *testing.T) {
})
}
// insertMoreCustomers creates additional customers.
// Note: this will only work when the customer sequence is in place.
func insertMoreCustomers(t *testing.T, numCustomers int) {
sql := "insert into customer (name) values "
i := 0
@ -501,9 +503,23 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
customerTab2 := custKs.Shards["80-"].Tablets["zone1-300"].Vttablet
productTab := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet
// Wait to finish the copy phase for all tables
catchup(t, customerTab1, workflow, "MoveTables")
catchup(t, customerTab2, workflow, "MoveTables")
// Confirm that the 0 scale decimal field, dec80, is replicated correctly
dec80Replicated := false
execVtgateQuery(t, vtgateConn, sourceKs, "update customer set dec80 = 0")
waitForNoWorkflowLag(t, vc, targetKs, workflow)
for _, shard := range []string{"-80", "80-"} {
shardTarget := fmt.Sprintf("%s:%s", targetKs, shard)
if res := execVtgateQuery(t, vtgateConn, shardTarget, "select cid from customer"); len(res.Rows) > 0 {
waitForQueryResult(t, vtgateConn, shardTarget, "select distinct dec80 from customer", `[[DECIMAL(0)]]`)
dec80Replicated = true
}
}
require.Equal(t, true, dec80Replicated)
query := "select cid from customer"
require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", query, query))
insertQuery1 := "insert into customer(cid, name) values(1001, 'tempCustomer1')"

Просмотреть файл

@ -449,9 +449,9 @@ func TestPlayerCopyTables(t *testing.T) {
defer deleteTablet(addTablet(100))
execStatements(t, []string{
"create table src1(id int, val varbinary(128), primary key(id))",
"insert into src1 values(2, 'bbb'), (1, 'aaa')",
fmt.Sprintf("create table %s.dst1(id int, val varbinary(128), val2 varbinary(128), primary key(id))", vrepldb),
"create table src1(id int, val varbinary(128), d decimal(8,0), primary key(id))",
"insert into src1 values(2, 'bbb', 1), (1, 'aaa', 0)",
fmt.Sprintf("create table %s.dst1(id int, val varbinary(128), val2 varbinary(128), d decimal(8,0), primary key(id))", vrepldb),
"create table yes(id int, val varbinary(128), primary key(id))",
fmt.Sprintf("create table %s.yes(id int, val varbinary(128), primary key(id))", vrepldb),
"create table no(id int, val varbinary(128), primary key(id))",
@ -468,7 +468,7 @@ func TestPlayerCopyTables(t *testing.T) {
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "dst1",
Filter: "select id, val, val as val2 from src1",
Filter: "select id, val, val as val2, d from src1",
}, {
Match: "/yes",
}},
@ -504,7 +504,7 @@ func TestPlayerCopyTables(t *testing.T) {
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=",
"begin",
"insert into dst1(id,val,val2) values (1,'aaa','aaa'), (2,'bbb','bbb')",
"insert into dst1(id,val,val2,d) values (1,'aaa','aaa',0), (2,'bbb','bbb',1)",
`/update _vt.copy_state set lastpk='fields:{name:\\"id\\" type:INT32} rows:{lengths:1 values:\\"2\\"}' where vrepl_id=.*`,
"commit",
// copy of dst1 is done: delete from copy_state.
@ -519,8 +519,8 @@ func TestPlayerCopyTables(t *testing.T) {
"/update _vt.vreplication set state='Running'",
})
expectData(t, "dst1", [][]string{
{"1", "aaa", "aaa"},
{"2", "bbb", "bbb"},
{"1", "aaa", "aaa", "0"},
{"2", "bbb", "bbb", "1"},
})
expectData(t, "yes", [][]string{})
validateCopyRowCountStat(t, 2)

Просмотреть файл

@ -1410,6 +1410,8 @@ func TestPlayerTypes(t *testing.T) {
fmt.Sprintf("create table %s.src1(id int, val varbinary(128), primary key(id))", vrepldb),
"create table binary_pk(b binary(4), val varbinary(4), primary key(b))",
fmt.Sprintf("create table %s.binary_pk(b binary(4), val varbinary(4), primary key(b))", vrepldb),
"create table vitess_decimal(id int, d1 decimal(8,0) default null, d2 decimal(8,0) default null, d3 decimal(8,0) default null, d4 decimal(8, 1), d5 decimal(8, 1), d6 decimal(8, 1), primary key(id))",
fmt.Sprintf("create table %s.vitess_decimal(id int, d1 decimal(8,0) default null, d2 decimal(8,0) default null, d3 decimal(8,0) default null, d4 decimal(8, 1), d5 decimal(8, 1), d6 decimal(8, 1), primary key(id))", vrepldb),
})
defer execStatements(t, []string{
"drop table vitess_ints",
@ -1426,6 +1428,8 @@ func TestPlayerTypes(t *testing.T) {
fmt.Sprintf("drop table %s.src1", vrepldb),
"drop table binary_pk",
fmt.Sprintf("drop table %s.binary_pk", vrepldb),
"drop table vitess_decimal",
fmt.Sprintf("drop table %s.vitess_decimal", vrepldb),
})
if enableJSONColumnTesting {
execStatements(t, []string{
@ -1501,6 +1505,13 @@ func TestPlayerTypes(t *testing.T) {
data: [][]string{
{"a\000\000\000", "aaa"},
},
}, {
input: "insert into vitess_decimal values(1, 0, 1, null, 0, 1.1, 1)",
output: "insert into vitess_decimal(id,d1,d2,d3,d4,d5,d6) values (1,0,1,null,.0,1.1,1.0)",
table: "vitess_decimal",
data: [][]string{
{"1", "0", "1", "", "0.0", "1.1", "1.0"},
},
}, {
// Binary pk is a special case: https://github.com/vitessio/vitess/issues/3984
input: "update binary_pk set val='bbb' where b='a\\0\\0\\0'",