зеркало из https://github.com/github/vitess-gh.git
merge sharding completed in go
Signed-off-by: Ajeet jain <ajeet@planetscale.com>
This commit is contained in:
Родитель
af5e293a88
Коммит
4b4982953e
|
@ -6,7 +6,7 @@ jobs:
|
|||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
name: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
|
||||
name: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 22]
|
||||
|
||||
steps:
|
||||
- name: Set up Go
|
||||
|
|
|
@ -183,16 +183,16 @@ func CheckBinlogPlayerVars(t *testing.T, vttablet cluster.Vttablet, sourceShards
|
|||
replicationSourceObj := reflect.ValueOf(tabletVars["VReplicationSource"])
|
||||
replicationSourceValue := []string{}
|
||||
|
||||
assert.Equal(t,
|
||||
fmt.Sprintf("%v", replicationSourceObj.MapKeys()),
|
||||
fmt.Sprintf("%v", reflect.ValueOf(tabletVars["VReplicationSourceTablet"]).MapKeys()))
|
||||
assert.Equal(t, len(replicationSourceObj.MapKeys()), len(reflect.ValueOf(tabletVars["VReplicationSourceTablet"]).MapKeys()))
|
||||
|
||||
for _, key := range replicationSourceObj.MapKeys() {
|
||||
replicationSourceValue = append(replicationSourceValue,
|
||||
fmt.Sprintf("%v", replicationSourceObj.MapIndex(key)))
|
||||
}
|
||||
|
||||
assert.True(t, reflect.DeepEqual(replicationSourceValue, sourceShards))
|
||||
for _, shard := range sourceShards {
|
||||
assert.Containsf(t, replicationSourceValue, shard, "Source shard is not matched with vReplication shard value")
|
||||
}
|
||||
|
||||
if secondBehindMaster != 0 {
|
||||
secondBehindMaserMaxStr := fmt.Sprintf("%v", reflect.ValueOf(tabletVars["VReplicationSecondsBehindMasterMax"]))
|
||||
|
|
|
@ -56,9 +56,6 @@ var (
|
|||
index by_msg (msg)
|
||||
) Engine=InnoDB;
|
||||
`
|
||||
createViewTemplate = `
|
||||
create view %s (parent_id, id, msg, custom_ksid_col) as select parent_id, id, msg, custom_ksid_col from %s;
|
||||
`
|
||||
fixedParentID = 86
|
||||
tableName = "resharding1"
|
||||
vSchema = `
|
||||
|
@ -95,12 +92,9 @@ var (
|
|||
shard3 = &cluster.Shard{Name: "-80"}
|
||||
|
||||
// Sharding keys
|
||||
key1 uint64 = 1152921504606846976 // Key redirect to shard 0
|
||||
key2 uint64 = 5764607523034234880 // key redirect to shard 1
|
||||
key3 uint64 = 14987979559889010688 // Key redirect to shard 2
|
||||
|
||||
key4 uint64 = 2305843009213693952 // Key redirect to shard 0
|
||||
key5 uint64 = 5764607523034234880 //6917529027641081856 // Key redirect to shard 1
|
||||
key1 uint64 = 1 // Key redirect to shard 0 [-40]
|
||||
key2 uint64 = 3 // key redirect to shard 1 [40-80]
|
||||
key3 uint64 = 4 // Key redirect to shard 2 [80-]
|
||||
)
|
||||
|
||||
// TestMergesharding covers the workflow for a sharding merge.
|
||||
|
@ -255,15 +249,13 @@ func TestMergesharding(t *testing.T, useVarbinaryShardingKeyType bool) {
|
|||
// Apply Schema
|
||||
err = clusterInstance.VtctlclientProcess.ApplySchema(keyspaceName, fmt.Sprintf(createTabletTemplate, "resharding1", shardingColumnType))
|
||||
assert.Nil(t, err)
|
||||
//err = clusterInstance.VtctlclientProcess.ApplySchema(keyspaceName, fmt.Sprintf(createViewTemplate, "view1", "resharding3"))
|
||||
//assert.Nil(t, err)
|
||||
|
||||
// Apply VSchema
|
||||
err = clusterInstance.VtctlclientProcess.ApplyVSchema(keyspaceName, vSchema)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// Insert Data
|
||||
//insertStartupValues(t)
|
||||
insertStartupValues(t)
|
||||
|
||||
// run a health check on source replicas so they respond to discovery
|
||||
// (for binlog players) and on the source rdonlys (for workers)
|
||||
|
@ -293,37 +285,37 @@ func TestMergesharding(t *testing.T, useVarbinaryShardingKeyType bool) {
|
|||
// Run vtworker as daemon for the following SplitClone commands. -use_v3_resharding_mode default is true
|
||||
err = clusterInstance.StartVtworker(cell, "--command_display_interval", "10ms")
|
||||
assert.Nil(t, err)
|
||||
/*
|
||||
// Initial clone (online).
|
||||
err = clusterInstance.VtworkerProcess.ExecuteCommand("SplitClone",
|
||||
"--offline=false",
|
||||
"--chunk_count", "10",
|
||||
"--min_rows_per_chunk", "1",
|
||||
"--min_healthy_rdonly_tablets", "1",
|
||||
"--max_tps", "9999",
|
||||
shard3Ks)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// Check values in the merge shard
|
||||
checkValues(t, *shard3.MasterTablet(), []string{"INT64(86)", "INT64(1)", `VARCHAR("msg1")`, fmt.Sprintf("UINT64(%d)", key1)},
|
||||
1, true, tableName, fixedParentID, keyspaceName, shardingKeyType, nil)
|
||||
checkValues(t, *shard3.MasterTablet(), []string{"INT64(86)", "INT64(2)", `VARCHAR("msg2")`, fmt.Sprintf("UINT64(%d)", key2)},
|
||||
2, true, tableName, fixedParentID, keyspaceName, shardingKeyType, nil)
|
||||
// Initial clone (online).
|
||||
err = clusterInstance.VtworkerProcess.ExecuteCommand("SplitClone",
|
||||
"--offline=false",
|
||||
"--chunk_count", "10",
|
||||
"--min_rows_per_chunk", "1",
|
||||
"--min_healthy_rdonly_tablets", "1",
|
||||
"--max_tps", "9999",
|
||||
shard3Ks)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// Reset vtworker such that we can run the next command.
|
||||
err = clusterInstance.VtworkerProcess.ExecuteCommand("Reset")
|
||||
assert.Nil(t, err)
|
||||
// Check values in the merge shard
|
||||
checkValues(t, *shard3.MasterTablet(), []string{"INT64(86)", "INT64(1)", `VARCHAR("msg1")`, fmt.Sprintf("UINT64(%d)", key1)},
|
||||
1, true, tableName, fixedParentID, keyspaceName, shardingKeyType, nil)
|
||||
checkValues(t, *shard3.MasterTablet(), []string{"INT64(86)", "INT64(2)", `VARCHAR("msg2")`, fmt.Sprintf("UINT64(%d)", key2)},
|
||||
2, true, tableName, fixedParentID, keyspaceName, shardingKeyType, nil)
|
||||
|
||||
// Delete row 2 (provokes an insert).
|
||||
_, err = shard3Master.VttabletProcess.QueryTablet("delete from resharding1 where id=2", keyspaceName, true)
|
||||
assert.Nil(t, err)
|
||||
// Update row 3 (provokes an update).
|
||||
_, err = shard3Master.VttabletProcess.QueryTablet("update resharding1 set msg='msg-not-1' where id=1", keyspaceName, true)
|
||||
assert.Nil(t, err)
|
||||
// Reset vtworker such that we can run the next command.
|
||||
err = clusterInstance.VtworkerProcess.ExecuteCommand("Reset")
|
||||
assert.Nil(t, err)
|
||||
|
||||
// Delete row 2 (provokes an insert).
|
||||
_, err = shard3Master.VttabletProcess.QueryTablet("delete from resharding1 where id=2", keyspaceName, true)
|
||||
assert.Nil(t, err)
|
||||
// Update row 3 (provokes an update).
|
||||
_, err = shard3Master.VttabletProcess.QueryTablet("update resharding1 set msg='msg-not-1' where id=1", keyspaceName, true)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// Insert row 4 (provokes a delete).
|
||||
insertValue(t, shard3.MasterTablet(), keyspaceName, tableName, 4, "msg4", key3)
|
||||
|
||||
// Insert row 4 (provokes a delete).
|
||||
insertValue(t, shard3.MasterTablet(), keyspaceName, tableName, 4, "msg4", key3)
|
||||
*/
|
||||
err = clusterInstance.VtworkerProcess.ExecuteCommand(
|
||||
"SplitClone",
|
||||
"--chunk_count", "10",
|
||||
|
@ -344,7 +336,7 @@ func TestMergesharding(t *testing.T, useVarbinaryShardingKeyType bool) {
|
|||
assert.Nil(t, err)
|
||||
|
||||
// Check startup values
|
||||
//checkStartupValues(t, shardingKeyType)
|
||||
checkStartupValues(t, shardingKeyType)
|
||||
|
||||
// check the schema too
|
||||
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ValidateSchemaKeyspace", keyspaceName)
|
||||
|
@ -406,9 +398,6 @@ func TestMergesharding(t *testing.T, useVarbinaryShardingKeyType bool) {
|
|||
shard3Ks)
|
||||
assert.Nil(t, err)
|
||||
|
||||
fmt.Println(err)
|
||||
time.Sleep(10 * time.Minute)
|
||||
|
||||
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeSlaveType", shard0Rdonly.Alias, "rdonly")
|
||||
assert.Nil(t, err)
|
||||
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeSlaveType", shard3Rdonly.Alias, "rdonly")
|
||||
|
@ -459,7 +448,7 @@ func TestMergesharding(t *testing.T, useVarbinaryShardingKeyType bool) {
|
|||
// check srv keyspace
|
||||
expectedPartitions = map[topodata.TabletType][]string{}
|
||||
expectedPartitions[topodata.TabletType_MASTER] = []string{shard0.Name, shard1.Name, shard2.Name}
|
||||
expectedPartitions[topodata.TabletType_RDONLY] = []string{shard2.Name, shard3.Name}
|
||||
expectedPartitions[topodata.TabletType_RDONLY] = []string{shard3.Name, shard2.Name}
|
||||
expectedPartitions[topodata.TabletType_REPLICA] = []string{shard0.Name, shard1.Name, shard2.Name}
|
||||
sharding.CheckSrvKeyspace(t, cell, keyspaceName, "", 0, expectedPartitions, *clusterInstance)
|
||||
|
||||
|
@ -467,15 +456,14 @@ func TestMergesharding(t *testing.T, useVarbinaryShardingKeyType bool) {
|
|||
sharding.CheckTabletQueryService(t, *shard1Rdonly, "NOT_SERVING", true, *clusterInstance)
|
||||
|
||||
// Now serve replica from the split shards
|
||||
//destinationShards := []cluster.Shard{*shard2, *shard3}
|
||||
err = clusterInstance.VtctlclientProcess.ExecuteCommand(
|
||||
"MigrateServedTypes", shard3Ks, "replica")
|
||||
assert.Nil(t, err)
|
||||
|
||||
expectedPartitions = map[topodata.TabletType][]string{}
|
||||
expectedPartitions[topodata.TabletType_MASTER] = []string{shard0.Name, shard1.Name, shard2.Name}
|
||||
expectedPartitions[topodata.TabletType_RDONLY] = []string{shard2.Name, shard3.Name}
|
||||
expectedPartitions[topodata.TabletType_REPLICA] = []string{shard2.Name, shard3.Name}
|
||||
expectedPartitions[topodata.TabletType_RDONLY] = []string{shard3.Name, shard2.Name}
|
||||
expectedPartitions[topodata.TabletType_REPLICA] = []string{shard3.Name, shard2.Name}
|
||||
sharding.CheckSrvKeyspace(t, cell, keyspaceName, "", 0, expectedPartitions, *clusterInstance)
|
||||
|
||||
// now serve master from the split shards
|
||||
|
@ -484,9 +472,9 @@ func TestMergesharding(t *testing.T, useVarbinaryShardingKeyType bool) {
|
|||
assert.Nil(t, err)
|
||||
|
||||
expectedPartitions = map[topodata.TabletType][]string{}
|
||||
expectedPartitions[topodata.TabletType_MASTER] = []string{shard2.Name, shard3.Name}
|
||||
expectedPartitions[topodata.TabletType_RDONLY] = []string{shard2.Name, shard3.Name}
|
||||
expectedPartitions[topodata.TabletType_REPLICA] = []string{shard2.Name, shard3.Name}
|
||||
expectedPartitions[topodata.TabletType_MASTER] = []string{shard3.Name, shard2.Name}
|
||||
expectedPartitions[topodata.TabletType_RDONLY] = []string{shard3.Name, shard2.Name}
|
||||
expectedPartitions[topodata.TabletType_REPLICA] = []string{shard3.Name, shard2.Name}
|
||||
sharding.CheckSrvKeyspace(t, cell, keyspaceName, "", 0, expectedPartitions, *clusterInstance)
|
||||
|
||||
sharding.CheckTabletQueryService(t, *shard0Master, "NOT_SERVING", true, *clusterInstance)
|
||||
|
@ -521,12 +509,6 @@ func TestMergesharding(t *testing.T, useVarbinaryShardingKeyType bool) {
|
|||
err = clusterInstance.VtctlclientProcess.ExecuteCommand("RebuildKeyspaceGraph", keyspaceName)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// delete the original shard
|
||||
err = clusterInstance.VtctlclientProcess.ExecuteCommand("DeleteShard", shard0Ks)
|
||||
assert.Nil(t, err)
|
||||
err = clusterInstance.VtctlclientProcess.ExecuteCommand("DeleteShard", shard1Ks)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
}
|
||||
|
||||
func insertStartupValues(t *testing.T) {
|
||||
|
@ -546,7 +528,6 @@ func insertValue(t *testing.T, tablet *cluster.Vttablet, keyspaceName string, ta
|
|||
}
|
||||
|
||||
func checkStartupValues(t *testing.T, shardingKeyType querypb.Type) {
|
||||
|
||||
for _, tablet := range shard3.Vttablets {
|
||||
checkValues(t, *tablet, []string{"INT64(86)", "INT64(1)", `VARCHAR("msg1")`, fmt.Sprintf("UINT64(%d)", key1)},
|
||||
1, true, "resharding1", fixedParentID, keyspaceName, shardingKeyType, nil)
|
||||
|
@ -584,7 +565,7 @@ func checkLots(t *testing.T, count uint64, base uint64, table string, keyspaceNa
|
|||
isFound = checkValues(t, shard3Replica, []string{"INT64(86)",
|
||||
fmt.Sprintf("INT64(%d)", 10000+base+i),
|
||||
fmt.Sprintf(`VARCHAR("msg-range0-%d")`, 10000+base+i),
|
||||
fmt.Sprintf("UINT64(%d)", key4)},
|
||||
fmt.Sprintf("UINT64(%d)", key1)},
|
||||
10000+base+i, true, table, fixedParentID, keyspaceName, keyType, dbConn)
|
||||
if isFound {
|
||||
totalFound++
|
||||
|
@ -593,7 +574,7 @@ func checkLots(t *testing.T, count uint64, base uint64, table string, keyspaceNa
|
|||
isFound = checkValues(t, shard3Replica, []string{"INT64(86)",
|
||||
fmt.Sprintf("INT64(%d)", 20000+base+i),
|
||||
fmt.Sprintf(`VARCHAR("msg-range1-%d")`, 20000+base+i),
|
||||
fmt.Sprintf("UINT64(%d)", key4)},
|
||||
fmt.Sprintf("UINT64(%d)", key2)},
|
||||
20000+base+i, true, table, fixedParentID, keyspaceName, keyType, dbConn)
|
||||
if isFound {
|
||||
totalFound++
|
||||
|
@ -640,9 +621,9 @@ func insertLots(t *testing.T, count uint64, base uint64, table string, parentID
|
|||
var i uint64
|
||||
for i = 0; i < count; i++ {
|
||||
query1 = fmt.Sprintf(insertTabletTemplateKsID, table, parentID, 10000+base+i,
|
||||
fmt.Sprintf("msg-range0-%d", 10000+base+i), key4, key4, 10000+base+i)
|
||||
fmt.Sprintf("msg-range0-%d", 10000+base+i), key1, key1, 10000+base+i)
|
||||
query2 = fmt.Sprintf(insertTabletTemplateKsID, table, parentID, 20000+base+i,
|
||||
fmt.Sprintf("msg-range1-%d", 20000+base+i), key5, key5, 20000+base+i)
|
||||
fmt.Sprintf("msg-range1-%d", 20000+base+i), key2, key2, 20000+base+i)
|
||||
|
||||
sharding.InsertToTablet(t, query1, *shard0.MasterTablet(), ks, false)
|
||||
sharding.InsertToTablet(t, query2, *shard1.MasterTablet(), ks, false)
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
Copyright 2019 The Vitess Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
sharding "vitess.io/vitess/go/test/endtoend/sharding/mergesharding"
|
||||
)
|
||||
|
||||
// TestV3MergeShardingString - main tests merge sharding using a Byte column
|
||||
func TestV3MergeShardingString(t *testing.T) {
|
||||
sharding.TestMergesharding(t, true)
|
||||
|
||||
}
|
|
@ -459,8 +459,6 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
|
|||
// source or destination keyrange. If it matches either,
|
||||
// we'll just ask for all the data. If the overlap is a subset,
|
||||
// we'll filter.
|
||||
sdw.wr.Logger().Infof("Left keyrange %v", sdw.shardInfo.KeyRange)
|
||||
sdw.wr.Logger().Infof("Right keyrange %v", sdw.sourceShard.KeyRange)
|
||||
overlap, err := key.KeyRangesOverlap(sdw.shardInfo.KeyRange, sdw.sourceShard.KeyRange)
|
||||
if err != nil {
|
||||
return vterrors.Wrap(err, "Source shard doesn't overlap with destination")
|
||||
|
@ -498,8 +496,6 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
|
|||
// On the source, see if we need a full scan
|
||||
// or a filtered scan.
|
||||
var sourceQueryResultReader *QueryResultReader
|
||||
sdw.wr.Logger().Infof("Left keyrange %v", overlap)
|
||||
sdw.wr.Logger().Infof("Right keyrange %v", sdw.sourceShard.KeyRange)
|
||||
if key.KeyRangeEqual(overlap, sdw.sourceShard.KeyRange) {
|
||||
sourceQueryResultReader, err = TableScan(ctx, sdw.wr.Logger(), sdw.wr.TopoServer(), sdw.sourceAlias, tableDefinition)
|
||||
} else {
|
||||
|
|
|
@ -74,28 +74,6 @@
|
|||
"RetryMax": 0,
|
||||
"Tags": []
|
||||
},
|
||||
"merge_sharding": {
|
||||
"File": "merge_sharding.py",
|
||||
"Args": [],
|
||||
"Command": [],
|
||||
"Manual": false,
|
||||
"Shard": 4,
|
||||
"RetryMax": 0,
|
||||
"Tags": [
|
||||
"worker_test"
|
||||
]
|
||||
},
|
||||
"merge_sharding_bytes": {
|
||||
"File": "merge_sharding_bytes.py",
|
||||
"Args": [],
|
||||
"Command": [],
|
||||
"Manual": false,
|
||||
"Shard": 4,
|
||||
"RetryMax": 0,
|
||||
"Tags": [
|
||||
"worker_test"
|
||||
]
|
||||
},
|
||||
"messaging": {
|
||||
"File": "messaging.py",
|
||||
"Args": [],
|
||||
|
@ -318,6 +296,28 @@
|
|||
"site_test"
|
||||
]
|
||||
},
|
||||
"merge_sharding": {
|
||||
"File": "mergesharding_v3_test.go",
|
||||
"Args": ["vitess.io/vitess/go/test/endtoend/sharding/mergesharding/v3"],
|
||||
"Command": [],
|
||||
"Manual": false,
|
||||
"Shard": 22,
|
||||
"RetryMax": 0,
|
||||
"Tags": [
|
||||
"worker_test"
|
||||
]
|
||||
},
|
||||
"merge_sharding_bytes": {
|
||||
"File": "mergesharding_string_test.go",
|
||||
"Args": ["vitess.io/vitess/go/test/endtoend/sharding/mergesharding/string"],
|
||||
"Command": [],
|
||||
"Manual": false,
|
||||
"Shard": 22,
|
||||
"RetryMax": 0,
|
||||
"Tags": [
|
||||
"worker_test"
|
||||
]
|
||||
},
|
||||
"mysqlctl": {
|
||||
"File": "mysqlctl.go",
|
||||
"Args": ["vitess.io/vitess/go/test/endtoend/mysqlctl"],
|
||||
|
|
Загрузка…
Ссылка в новой задаче