зеркало из https://github.com/github/vitess-gh.git
Merge pull request #5159 from planetscale/ss-vrepl-unify-parsing
vreplication: refactor table_plan_builder
This commit is contained in:
Коммит
7820d4ebd8
|
@ -22,6 +22,7 @@ import (
|
|||
"encoding/hex"
|
||||
"fmt"
|
||||
"math"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
|
||||
|
@ -296,3 +297,10 @@ func ParseShardingSpec(spec string) ([]*topodatapb.KeyRange, error) {
|
|||
}
|
||||
return ranges, nil
|
||||
}
|
||||
|
||||
var krRegexp = regexp.MustCompile(`^[0-9a-fA-F]*-[0-9a-fA-F]*$`)
|
||||
|
||||
// IsKeyRange returns true if the string represents a keyrange.
|
||||
func IsKeyRange(kr string) bool {
|
||||
return krRegexp.MatchString(kr)
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
@ -381,7 +382,7 @@ func BenchmarkUint64KeyString(b *testing.B) {
|
|||
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, key := range keys {
|
||||
key.String()
|
||||
_ = key.String()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -435,3 +436,44 @@ func BenchmarkKeyRangesOverlap(b *testing.B) {
|
|||
KeyRangesOverlap(kr1, kr2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsKeyRange(t *testing.T) {
|
||||
testcases := []struct {
|
||||
in string
|
||||
out bool
|
||||
}{{
|
||||
in: "-",
|
||||
out: true,
|
||||
}, {
|
||||
in: "-80",
|
||||
out: true,
|
||||
}, {
|
||||
in: "40-80",
|
||||
out: true,
|
||||
}, {
|
||||
in: "80-",
|
||||
out: true,
|
||||
}, {
|
||||
in: "a0-",
|
||||
out: true,
|
||||
}, {
|
||||
in: "-A0",
|
||||
out: true,
|
||||
}, {
|
||||
in: "",
|
||||
out: false,
|
||||
}, {
|
||||
in: "x-80",
|
||||
out: false,
|
||||
}, {
|
||||
in: "-80x",
|
||||
out: false,
|
||||
}, {
|
||||
in: "select",
|
||||
out: false,
|
||||
}}
|
||||
|
||||
for _, tcase := range testcases {
|
||||
assert.Equal(t, IsKeyRange(tcase.in), tcase.out, tcase.in)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -85,6 +85,44 @@ func TestBuildPlayerPlan(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
}, {
|
||||
// Regular with keyrange
|
||||
input: &binlogdatapb.Filter{
|
||||
Rules: []*binlogdatapb.Rule{{
|
||||
Match: "/.*",
|
||||
Filter: "-80",
|
||||
}},
|
||||
},
|
||||
plan: &TestReplicatorPlan{
|
||||
VStreamFilter: &binlogdatapb.Filter{
|
||||
Rules: []*binlogdatapb.Rule{{
|
||||
Match: "t1",
|
||||
Filter: "select * from t1 where in_keyrange('-80')",
|
||||
}},
|
||||
},
|
||||
TargetTables: []string{"t1"},
|
||||
TablePlans: map[string]*TestTablePlan{
|
||||
"t1": {
|
||||
TargetName: "t1",
|
||||
SendRule: "t1",
|
||||
},
|
||||
},
|
||||
},
|
||||
planpk: &TestReplicatorPlan{
|
||||
VStreamFilter: &binlogdatapb.Filter{
|
||||
Rules: []*binlogdatapb.Rule{{
|
||||
Match: "t1",
|
||||
Filter: "select * from t1 where in_keyrange('-80')",
|
||||
}},
|
||||
},
|
||||
TargetTables: []string{"t1"},
|
||||
TablePlans: map[string]*TestTablePlan{
|
||||
"t1": {
|
||||
TargetName: "t1",
|
||||
SendRule: "t1",
|
||||
},
|
||||
},
|
||||
},
|
||||
}, {
|
||||
// '*' expression
|
||||
input: &binlogdatapb.Filter{
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"strings"
|
||||
|
||||
"vitess.io/vitess/go/sqltypes"
|
||||
"vitess.io/vitess/go/vt/key"
|
||||
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
|
||||
"vitess.io/vitess/go/vt/sqlparser"
|
||||
)
|
||||
|
@ -88,69 +89,64 @@ func buildReplicatorPlan(filter *binlogdatapb.Filter, tableKeys map[string][]str
|
|||
TablePlans: make(map[string]*TablePlan),
|
||||
tableKeys: tableKeys,
|
||||
}
|
||||
nextTable:
|
||||
for tableName := range tableKeys {
|
||||
lastpk, ok := copyState[tableName]
|
||||
if ok && lastpk == nil {
|
||||
// Don't replicate uncopied tables.
|
||||
continue
|
||||
}
|
||||
for _, rule := range filter.Rules {
|
||||
switch {
|
||||
case strings.HasPrefix(rule.Match, "/"):
|
||||
expr := strings.Trim(rule.Match, "/")
|
||||
result, err := regexp.MatchString(expr, tableName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !result {
|
||||
continue
|
||||
}
|
||||
sendRule := &binlogdatapb.Rule{
|
||||
Match: tableName,
|
||||
Filter: buildQuery(tableName, rule.Filter),
|
||||
}
|
||||
plan.VStreamFilter.Rules = append(plan.VStreamFilter.Rules, sendRule)
|
||||
tablePlan := &TablePlan{
|
||||
TargetName: tableName,
|
||||
SendRule: sendRule,
|
||||
Lastpk: lastpk,
|
||||
}
|
||||
plan.TargetTables[tableName] = tablePlan
|
||||
plan.TablePlans[tableName] = tablePlan
|
||||
continue nextTable
|
||||
case rule.Match == tableName:
|
||||
tablePlan, err := buildTablePlan(rule, tableKeys, lastpk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, ok := plan.TablePlans[tablePlan.SendRule.Match]; ok {
|
||||
continue
|
||||
}
|
||||
plan.VStreamFilter.Rules = append(plan.VStreamFilter.Rules, tablePlan.SendRule)
|
||||
plan.TargetTables[tableName] = tablePlan
|
||||
plan.TablePlans[tablePlan.SendRule.Match] = tablePlan
|
||||
continue nextTable
|
||||
}
|
||||
rule, err := tableMatches(tableName, filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if rule == nil {
|
||||
continue
|
||||
}
|
||||
tablePlan, err := buildTablePlan(tableName, rule.Filter, tableKeys, lastpk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, ok := plan.TablePlans[tablePlan.SendRule.Match]; ok {
|
||||
continue
|
||||
}
|
||||
plan.VStreamFilter.Rules = append(plan.VStreamFilter.Rules, tablePlan.SendRule)
|
||||
plan.TargetTables[tableName] = tablePlan
|
||||
plan.TablePlans[tablePlan.SendRule.Match] = tablePlan
|
||||
}
|
||||
return plan, nil
|
||||
}
|
||||
|
||||
func buildQuery(tableName, filter string) string {
|
||||
buf := sqlparser.NewTrackedBuffer(nil)
|
||||
buf.Myprintf("select * from %v", sqlparser.NewTableIdent(tableName))
|
||||
if filter != "" {
|
||||
buf.Myprintf(" where in_keyrange(%v)", sqlparser.NewStrVal([]byte(filter)))
|
||||
// tableMatches is similar to the one defined in vstreamer.
|
||||
func tableMatches(tableName string, filter *binlogdatapb.Filter) (*binlogdatapb.Rule, error) {
|
||||
for _, rule := range filter.Rules {
|
||||
switch {
|
||||
case strings.HasPrefix(rule.Match, "/"):
|
||||
expr := strings.Trim(rule.Match, "/")
|
||||
result, err := regexp.MatchString(expr, tableName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !result {
|
||||
continue
|
||||
}
|
||||
return rule, nil
|
||||
case tableName == rule.Match:
|
||||
return rule, nil
|
||||
}
|
||||
}
|
||||
return buf.String()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func buildTablePlan(rule *binlogdatapb.Rule, tableKeys map[string][]string, lastpk *sqltypes.Result) (*TablePlan, error) {
|
||||
query := rule.Filter
|
||||
if query == "" {
|
||||
func buildTablePlan(tableName, filter string, tableKeys map[string][]string, lastpk *sqltypes.Result) (*TablePlan, error) {
|
||||
query := filter
|
||||
switch {
|
||||
case filter == "":
|
||||
buf := sqlparser.NewTrackedBuffer(nil)
|
||||
buf.Myprintf("select * from %v", sqlparser.NewTableIdent(rule.Match))
|
||||
buf.Myprintf("select * from %v", sqlparser.NewTableIdent(tableName))
|
||||
query = buf.String()
|
||||
case key.IsKeyRange(filter):
|
||||
buf := sqlparser.NewTrackedBuffer(nil)
|
||||
buf.Myprintf("select * from %v where in_keyrange(%v)", sqlparser.NewTableIdent(tableName), sqlparser.NewStrVal([]byte(filter)))
|
||||
query = buf.String()
|
||||
}
|
||||
sel, fromTable, err := analyzeSelectFrom(query)
|
||||
|
@ -170,7 +166,7 @@ func buildTablePlan(rule *binlogdatapb.Rule, tableKeys map[string][]string, last
|
|||
}
|
||||
sendRule.Filter = query
|
||||
tablePlan := &TablePlan{
|
||||
TargetName: rule.Match,
|
||||
TargetName: tableName,
|
||||
SendRule: sendRule,
|
||||
Lastpk: lastpk,
|
||||
}
|
||||
|
@ -178,7 +174,7 @@ func buildTablePlan(rule *binlogdatapb.Rule, tableKeys map[string][]string, last
|
|||
}
|
||||
|
||||
tpb := &tablePlanBuilder{
|
||||
name: sqlparser.NewTableIdent(rule.Match),
|
||||
name: sqlparser.NewTableIdent(tableName),
|
||||
sendSelect: &sqlparser.Select{
|
||||
From: sel.From,
|
||||
Where: sel.Where,
|
||||
|
|
|
@ -128,6 +128,7 @@ func mustSendDDL(query mysql.Query, dbname string, filter *binlogdatapb.Filter)
|
|||
return true
|
||||
}
|
||||
|
||||
// tableMatches is similar to the one defined in vreplication.
|
||||
func tableMatches(table sqlparser.TableName, dbname string, filter *binlogdatapb.Filter) bool {
|
||||
if !table.Qualifier.IsEmpty() && table.Qualifier.String() != dbname {
|
||||
return false
|
||||
|
|
Загрузка…
Ссылка в новой задаче