зеркало из https://github.com/github/vitess-gh.git
vreplication: Reshard initial cut
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Родитель
fae58cc69a
Коммит
e095c71d09
|
@ -120,6 +120,21 @@ func EvenShardsKeyRange(i, n int) (*topodatapb.KeyRange, error) {
|
|||
return &topodatapb.KeyRange{Start: startBytes, End: endBytes}, nil
|
||||
}
|
||||
|
||||
// KeyRangeAdd adds two adjacent keyranges into a single value.
|
||||
// If the values are not adjacent, it returns false.
|
||||
func KeyRangeAdd(first, second *topodatapb.KeyRange) (*topodatapb.KeyRange, bool) {
|
||||
if first == nil || second == nil {
|
||||
return nil, false
|
||||
}
|
||||
if len(first.End) != 0 && bytes.Equal(first.End, second.Start) {
|
||||
return &topodatapb.KeyRange{Start: first.Start, End: second.End}, true
|
||||
}
|
||||
if len(second.End) != 0 && bytes.Equal(second.End, first.Start) {
|
||||
return &topodatapb.KeyRange{Start: second.Start, End: first.End}, true
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// KeyRangeContains returns true if the provided id is in the keyrange.
|
||||
func KeyRangeContains(kr *topodatapb.KeyRange, id []byte) bool {
|
||||
if kr == nil {
|
||||
|
|
|
@ -139,6 +139,107 @@ func TestEvenShardsKeyRange(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestKeyRangeAdd(t *testing.T) {
|
||||
testcases := []struct {
|
||||
first string
|
||||
second string
|
||||
out string
|
||||
ok bool
|
||||
}{{
|
||||
first: "",
|
||||
second: "",
|
||||
out: "",
|
||||
ok: false,
|
||||
}, {
|
||||
first: "",
|
||||
second: "-80",
|
||||
out: "",
|
||||
ok: false,
|
||||
}, {
|
||||
first: "-80",
|
||||
second: "",
|
||||
out: "",
|
||||
ok: false,
|
||||
}, {
|
||||
first: "",
|
||||
second: "80-",
|
||||
out: "",
|
||||
ok: false,
|
||||
}, {
|
||||
first: "80-",
|
||||
second: "",
|
||||
out: "",
|
||||
ok: false,
|
||||
}, {
|
||||
first: "80-",
|
||||
second: "-40",
|
||||
out: "",
|
||||
ok: false,
|
||||
}, {
|
||||
first: "-40",
|
||||
second: "80-",
|
||||
out: "",
|
||||
ok: false,
|
||||
}, {
|
||||
first: "-80",
|
||||
second: "80-",
|
||||
out: "-",
|
||||
ok: true,
|
||||
}, {
|
||||
first: "80-",
|
||||
second: "-80",
|
||||
out: "-",
|
||||
ok: true,
|
||||
}, {
|
||||
first: "-40",
|
||||
second: "40-80",
|
||||
out: "-80",
|
||||
ok: true,
|
||||
}, {
|
||||
first: "40-80",
|
||||
second: "-40",
|
||||
out: "-80",
|
||||
ok: true,
|
||||
}, {
|
||||
first: "40-80",
|
||||
second: "80-c0",
|
||||
out: "40-c0",
|
||||
ok: true,
|
||||
}, {
|
||||
first: "80-c0",
|
||||
second: "40-80",
|
||||
out: "40-c0",
|
||||
ok: true,
|
||||
}}
|
||||
stringToKeyRange := func(spec string) *topodatapb.KeyRange {
|
||||
if spec == "" {
|
||||
return nil
|
||||
}
|
||||
parts := strings.Split(spec, "-")
|
||||
if len(parts) != 2 {
|
||||
panic("invalid spec")
|
||||
}
|
||||
kr, err := ParseKeyRangeParts(parts[0], parts[1])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return kr
|
||||
}
|
||||
keyRangeToString := func(kr *topodatapb.KeyRange) string {
|
||||
if kr == nil {
|
||||
return ""
|
||||
}
|
||||
return KeyRangeString(kr)
|
||||
}
|
||||
for _, tcase := range testcases {
|
||||
first := stringToKeyRange(tcase.first)
|
||||
second := stringToKeyRange(tcase.second)
|
||||
out, ok := KeyRangeAdd(first, second)
|
||||
assert.Equal(t, tcase.out, keyRangeToString(out))
|
||||
assert.Equal(t, tcase.ok, ok)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvenShardsKeyRange_Error(t *testing.T) {
|
||||
testCases := []struct {
|
||||
i, n int
|
||||
|
|
|
@ -17,14 +17,66 @@ limitations under the License.
|
|||
package topotools
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"vitess.io/vitess/go/vt/key"
|
||||
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
|
||||
"vitess.io/vitess/go/vt/topo"
|
||||
)
|
||||
|
||||
// ValidateForReshard returns an error if sourceShards cannot reshard into
|
||||
// targetShards.
|
||||
func ValidateForReshard(sourceShards, targetShards []*topo.ShardInfo) error {
|
||||
for _, source := range sourceShards {
|
||||
for _, target := range targetShards {
|
||||
if key.KeyRangeEqual(source.KeyRange, target.KeyRange) {
|
||||
return fmt.Errorf("same keyrange is present in source and target: %v", key.KeyRangeString(source.KeyRange))
|
||||
}
|
||||
}
|
||||
}
|
||||
sourcekr, err := combineKeyRanges(sourceShards)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
targetkr, err := combineKeyRanges(targetShards)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !key.KeyRangeEqual(sourcekr, targetkr) {
|
||||
return fmt.Errorf("source and target keyranges don't match: %v vs %v", key.KeyRangeString(sourcekr), key.KeyRangeString(targetkr))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func combineKeyRanges(shards []*topo.ShardInfo) (*topodatapb.KeyRange, error) {
|
||||
if len(shards) == 0 {
|
||||
return nil, fmt.Errorf("there are no shards to combine")
|
||||
}
|
||||
result := shards[0].KeyRange
|
||||
krmap := make(map[string]*topodatapb.KeyRange)
|
||||
for _, si := range shards[1:] {
|
||||
krmap[si.ShardName()] = si.KeyRange
|
||||
}
|
||||
for len(krmap) != 0 {
|
||||
foundOne := false
|
||||
for k, kr := range krmap {
|
||||
newkr, ok := key.KeyRangeAdd(result, kr)
|
||||
if ok {
|
||||
foundOne = true
|
||||
result = newkr
|
||||
delete(krmap, k)
|
||||
}
|
||||
}
|
||||
if !foundOne {
|
||||
return nil, errors.New("shards don't form a contiguous keyrange")
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// OverlappingShards contains sets of shards that overlap which each-other.
|
||||
// With this library, there is no guarantee of which set will be left or right.
|
||||
type OverlappingShards struct {
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"encoding/hex"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"vitess.io/vitess/go/vt/topo"
|
||||
|
||||
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
|
||||
|
@ -94,6 +95,64 @@ func compareResultLists(t *testing.T, os []*OverlappingShards, expected []expect
|
|||
}
|
||||
}
|
||||
|
||||
func TestValidateForReshard(t *testing.T) {
|
||||
testcases := []struct {
|
||||
sources []string
|
||||
targets []string
|
||||
out string
|
||||
}{{
|
||||
sources: []string{"-80", "80-"},
|
||||
targets: []string{"-40", "40-"},
|
||||
out: "",
|
||||
}, {
|
||||
sources: []string{"80-", "-80"},
|
||||
targets: []string{"-40", "40-"},
|
||||
out: "",
|
||||
}, {
|
||||
sources: []string{"-40", "40-80", "80-"},
|
||||
targets: []string{"-30", "30-"},
|
||||
out: "",
|
||||
}, {
|
||||
sources: []string{"0"},
|
||||
targets: []string{"-40", "40-"},
|
||||
out: "",
|
||||
}, {
|
||||
sources: []string{"-40", "40-80", "80-"},
|
||||
targets: []string{"-40", "40-"},
|
||||
out: "same keyrange is present in source and target: -40",
|
||||
}, {
|
||||
sources: []string{"-30", "30-80"},
|
||||
targets: []string{"-40", "40-"},
|
||||
out: "source and target keyranges don't match: -80 vs -",
|
||||
}, {
|
||||
sources: []string{"-30", "20-80"},
|
||||
targets: []string{"-40", "40-"},
|
||||
out: "shards don't form a contiguous keyrange",
|
||||
}}
|
||||
buildShards := func(shards []string) []*topo.ShardInfo {
|
||||
sis := make([]*topo.ShardInfo, 0, len(shards))
|
||||
for _, shard := range shards {
|
||||
_, kr, err := topo.ValidateShardName(shard)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
sis = append(sis, topo.NewShardInfo("", shard, &topodatapb.Shard{KeyRange: kr}, nil))
|
||||
}
|
||||
return sis
|
||||
}
|
||||
|
||||
for _, tcase := range testcases {
|
||||
sources := buildShards(tcase.sources)
|
||||
targets := buildShards(tcase.targets)
|
||||
err := ValidateForReshard(sources, targets)
|
||||
if tcase.out == "" {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
assert.EqualError(t, err, tcase.out)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindOverlappingShardsNoOverlap(t *testing.T) {
|
||||
var shardMap map[string]*topo.ShardInfo
|
||||
var os []*OverlappingShards
|
||||
|
|
|
@ -307,6 +307,9 @@ var commands = []commandGroup{
|
|||
{"ValidateKeyspace", commandValidateKeyspace,
|
||||
"[-ping-tablets] <keyspace name>",
|
||||
"Validates that all nodes reachable from the specified keyspace are consistent."},
|
||||
{"Reshard", commandReshard,
|
||||
"<keyspace.workflow> <source_shards> <target_shards>",
|
||||
"Start a Resharding process. Example: Reshard ks.workflow001 '0' '-80,80-'"},
|
||||
{"SplitClone", commandSplitClone,
|
||||
"<keyspace> <from_shards> <to_shards>",
|
||||
"Start the SplitClone process to perform horizontal resharding. Example: SplitClone ks '0' '-80,80-'"},
|
||||
|
@ -1784,6 +1787,22 @@ func commandValidateKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlag
|
|||
return wr.ValidateKeyspace(ctx, keyspace, *pingTablets)
|
||||
}
|
||||
|
||||
func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
|
||||
if err := subFlags.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
if subFlags.NArg() != 3 {
|
||||
return fmt.Errorf("three arguments are required: <keyspace.workflow>, source_shards, target_shards")
|
||||
}
|
||||
keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
source := strings.Split(subFlags.Arg(1), ",")
|
||||
target := strings.Split(subFlags.Arg(2), ",")
|
||||
return wr.Reshard(ctx, workflow, keyspace, source, target)
|
||||
}
|
||||
|
||||
func commandSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
|
||||
if err := subFlags.Parse(args); err != nil {
|
||||
return err
|
||||
|
|
|
@ -38,6 +38,7 @@ import (
|
|||
"vitess.io/vitess/go/vt/topotools"
|
||||
"vitess.io/vitess/go/vt/topotools/events"
|
||||
"vitess.io/vitess/go/vt/vterrors"
|
||||
"vitess.io/vitess/go/vt/vtgate/vindexes"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -90,6 +91,72 @@ func (wr *Wrangler) SetKeyspaceShardingInfo(ctx context.Context, keyspace, shard
|
|||
return wr.ts.UpdateKeyspace(ctx, ki)
|
||||
}
|
||||
|
||||
// Reshard initiates a resharding workflow.
|
||||
func (wr *Wrangler) Reshard(ctx context.Context, workflow, keyspace string, sources, targets []string) error {
|
||||
var sourceShards, targetShards []*topo.ShardInfo
|
||||
for _, shard := range sources {
|
||||
si, err := wr.ts.GetShard(ctx, keyspace, shard)
|
||||
if err != nil {
|
||||
return vterrors.Wrapf(err, "GetShard(%s) failed", shard)
|
||||
}
|
||||
sourceShards = append(sourceShards, si)
|
||||
}
|
||||
for _, shard := range targets {
|
||||
si, err := wr.ts.GetShard(ctx, keyspace, shard)
|
||||
if err != nil {
|
||||
return vterrors.Wrapf(err, "GetShard(%s) failed", shard)
|
||||
}
|
||||
targetShards = append(targetShards, si)
|
||||
}
|
||||
if err := topotools.ValidateForReshard(sourceShards, targetShards); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Exclude all reference tables.
|
||||
vschema, err := wr.ts.GetVSchema(ctx, keyspace)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var excludeRules []*binlogdatapb.Rule
|
||||
for tableName, ti := range vschema.Tables {
|
||||
if ti.Type == vindexes.TypeReference {
|
||||
excludeRules = append(excludeRules, &binlogdatapb.Rule{
|
||||
Match: tableName,
|
||||
Filter: "exclude",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
for _, dest := range targetShards {
|
||||
master, err := wr.ts.GetTablet(ctx, dest.MasterAlias)
|
||||
if err != nil {
|
||||
return vterrors.Wrapf(err, "GetTablet(%v) failed", dest.MasterAlias)
|
||||
}
|
||||
for _, source := range sourceShards {
|
||||
if !key.KeyRangesIntersect(dest.KeyRange, source.KeyRange) {
|
||||
continue
|
||||
}
|
||||
filter := &binlogdatapb.Filter{
|
||||
Rules: append(excludeRules, &binlogdatapb.Rule{
|
||||
Match: "/.*",
|
||||
Filter: key.KeyRangeString(dest.KeyRange),
|
||||
}),
|
||||
}
|
||||
bls := &binlogdatapb.BinlogSource{
|
||||
Keyspace: keyspace,
|
||||
Shard: source.ShardName(),
|
||||
Filter: filter,
|
||||
}
|
||||
// TODO(sougou): do this in two phases.
|
||||
cmd := binlogplayer.CreateVReplicationState(workflow, bls, "", binlogplayer.BlpRunning, master.DbName())
|
||||
if _, err := wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd); err != nil {
|
||||
return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd)
|
||||
}
|
||||
}
|
||||
}
|
||||
return wr.refreshMasters(ctx, targetShards)
|
||||
}
|
||||
|
||||
// SplitClone initiates a SplitClone workflow.
|
||||
func (wr *Wrangler) SplitClone(ctx context.Context, keyspace string, from, to []string) error {
|
||||
var fromShards, toShards []*topo.ShardInfo
|
||||
|
|
|
@ -85,9 +85,6 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards,
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if sourceKeyRange == nil {
|
||||
sourceKeyRange = &topodatapb.KeyRange{}
|
||||
}
|
||||
tme.sourceKeyRanges = append(tme.sourceKeyRanges, sourceKeyRange)
|
||||
}
|
||||
for _, shard := range targetShards {
|
||||
|
@ -98,9 +95,6 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards,
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if targetKeyRange == nil {
|
||||
targetKeyRange = &topodatapb.KeyRange{}
|
||||
}
|
||||
tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange)
|
||||
}
|
||||
|
||||
|
@ -209,9 +203,6 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if sourceKeyRange == nil {
|
||||
sourceKeyRange = &topodatapb.KeyRange{}
|
||||
}
|
||||
tme.sourceKeyRanges = append(tme.sourceKeyRanges, sourceKeyRange)
|
||||
}
|
||||
for _, shard := range targetShards {
|
||||
|
@ -222,9 +213,6 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if targetKeyRange == nil {
|
||||
targetKeyRange = &topodatapb.KeyRange{}
|
||||
}
|
||||
tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange)
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче