worker: SplitClone: Handle keyspace_id changes properly.

If a row already exists on the destination with a different keyspace_id on a different destination shard, we cannot use UPDATE. Instead, we must DELETE the old row from the old destination shard and INSERT the new row to the new destination shard.
This commit is contained in:
Michael Berlin 2016-07-13 15:20:29 +02:00
Родитель 4203424826
Коммит a9136d687a
2 изменённых файлов: 98 добавлений и 2 удалений

Просмотреть файл

@ -181,7 +181,7 @@ func (rd *RowDiffer2) Diff() (DiffReport, error) {
advanceLeft = true
advanceRight = true
// Update the row on the destination.
if err := rd.reconcileRow(left, DiffNotEqual); err != nil {
if err := rd.updateRow(left, right, DiffNotEqual); err != nil {
return dr, err
}
continue
@ -220,7 +220,7 @@ func (rd *RowDiffer2) Diff() (DiffReport, error) {
advanceLeft = true
advanceRight = true
// Update the row on the destination.
if err := rd.reconcileRow(right, DiffNotEqual); err != nil {
if err := rd.updateRow(left, right, DiffNotEqual); err != nil {
return dr, err
}
}
@ -237,7 +237,11 @@ func (rd *RowDiffer2) Diff() (DiffReport, error) {
return dr, nil
}
// reconcileRow is used for the DiffType DiffMissing and DiffExtraneous.
func (rd *RowDiffer2) reconcileRow(row []sqltypes.Value, typ DiffType) error {
if typ == DiffNotEqual {
panic(fmt.Sprintf("reconcileRow() called with wrong type: %v", typ))
}
destShardIndex, err := rd.router.Route(row)
if err != nil {
return fmt.Errorf("failed to route row (%v) to correct shard: %v", row, err)
@ -251,6 +255,44 @@ func (rd *RowDiffer2) reconcileRow(row []sqltypes.Value, typ DiffType) error {
return nil
}
// updateRow is used for the DiffType DiffNotEqual.
// It needs to look at the row of the source (newRow) and destination (oldRow)
// to detect if the keyspace_id has changed in the meantime.
// If that's the case, we cannot UPDATE the row. Instead, we must DELETE
// the old row and INSERT the new row to the respective destination shards.
func (rd *RowDiffer2) updateRow(newRow, oldRow []sqltypes.Value, typ DiffType) error {
if typ != DiffNotEqual {
panic(fmt.Sprintf("updateRow() called with wrong type: %v", typ))
}
destShardIndexOld, err := rd.router.Route(oldRow)
if err != nil {
return fmt.Errorf("failed to route old row (%v) to correct shard: %v", oldRow, err)
}
destShardIndexNew, err := rd.router.Route(newRow)
if err != nil {
return fmt.Errorf("failed to route new row (%v) to correct shard: %v", newRow, err)
}
if destShardIndexOld == destShardIndexNew {
// keyspace_id has not changed. Update the row in place on the destination.
if err := rd.aggregators[destShardIndexNew][typ].Add(newRow); err != nil {
return fmt.Errorf("failed to add row update to RowAggregator (keyspace_id not changed): %v", err)
}
} else {
// keyspace_id has changed. Delete the old row and insert the new one.
if err := rd.aggregators[destShardIndexOld][DiffExtraneous].Add(oldRow); err != nil {
return fmt.Errorf("failed to add row update to RowAggregator (keyspace_id changed, deleting old row): %v", err)
}
if err := rd.aggregators[destShardIndexNew][DiffMissing].Add(newRow); err != nil {
return fmt.Errorf("failed to add row update to RowAggregator (keyspace_id changed, inserting new row): %v", err)
}
}
// TODO(mberlin): Add more fine granular stats here.
rd.tableStatusList.addCopiedRows(rd.tableIndex, 1)
return nil
}
// RowRouter allows to find out which shard's key range contains a given
// keyspace ID.
type RowRouter struct {

Просмотреть файл

@ -488,6 +488,60 @@ primary key (name)
workerclient_proc = utils.run_vtworker_client_bg(['Reset'], worker_rpc_port)
utils.wait_procs([workerclient_proc])
# Test the correct handling of keyspace_id changes which happen after
# the first clone.
# Let row 2 go to shard 3 instead of shard 2.
shard_1_master.mquery('vt_test_keyspace',
'update resharding1 set'
' custom_ksid_col=0xD000000000000000 WHERE id=2',
write=True)
workerclient_proc = utils.run_vtworker_client_bg(
['SplitClone',
'--offline=false',
'--exclude_tables', 'unrelated',
'--min_table_size_for_split', '1',
'--min_healthy_rdonly_tablets', '1',
'--max_tps', '9999',
'test_keyspace/80-'],
worker_rpc_port)
utils.wait_procs([workerclient_proc])
# Row 2 will be deleted from shard 2 and inserted to shard 3.
self.verify_reconciliation_counters(worker_port, 'Online', 'resharding1',
1, 0, 1)
self._check_value(shard_2_master, 'resharding1', 2, 'msg2',
0xD000000000000000, should_be_here=False)
self._check_value(shard_3_master, 'resharding1', 2, 'msg2',
0xD000000000000000)
# Reset vtworker such that we can run the next command.
workerclient_proc = utils.run_vtworker_client_bg(['Reset'], worker_rpc_port)
utils.wait_procs([workerclient_proc])
# Move row 2 back to shard 2 from shard 3 by changing the keyspace_id again.
shard_1_master.mquery('vt_test_keyspace',
'update resharding1 set'
' custom_ksid_col=0x9000000000000000 WHERE id=2',
write=True)
workerclient_proc = utils.run_vtworker_client_bg(
['SplitClone',
'--offline=false',
'--exclude_tables', 'unrelated',
'--min_table_size_for_split', '1',
'--min_healthy_rdonly_tablets', '1',
'--max_tps', '9999',
'test_keyspace/80-'],
worker_rpc_port)
utils.wait_procs([workerclient_proc])
# Row 2 will be deleted from shard 3 and inserted to shard 2.
self.verify_reconciliation_counters(worker_port, 'Online', 'resharding1',
1, 0, 1)
self._check_value(shard_2_master, 'resharding1', 2, 'msg2',
0x9000000000000000)
self._check_value(shard_3_master, 'resharding1', 2, 'msg2',
0x9000000000000000, should_be_here=False)
# Reset vtworker such that we can run the next command.
workerclient_proc = utils.run_vtworker_client_bg(['Reset'], worker_rpc_port)
utils.wait_procs([workerclient_proc])
# Modify the destination shard. SplitClone will revert the changes.
# Delete row 2 (provokes an insert).
shard_2_master.mquery('vt_test_keyspace',