зеркало из https://github.com/github/vitess-gh.git
query routing: add table substitutions
The code has also been refactored and cleaned up for improved readability. Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Родитель
e0f79ce5cf
Коммит
80bd0c2d6d
|
@ -48,9 +48,6 @@ type builder interface {
|
|||
// execute before this one.
|
||||
Reorder(int)
|
||||
|
||||
// Primitive returns the underlying primitive.
|
||||
Primitive() engine.Primitive
|
||||
|
||||
// First returns the first builder of the tree,
|
||||
// which is usually the left most.
|
||||
First() builder
|
||||
|
@ -103,6 +100,10 @@ type builder interface {
|
|||
// resultColumn, whereas PushSelect guarantees the addition of a new
|
||||
// result column and returns a distinct symbol for it.
|
||||
SupplyCol(col *sqlparser.ColName) (rc *resultColumn, colnum int)
|
||||
|
||||
// Primitve returns the underlying primitive.
|
||||
// This function should only be called after Wireup is finished.
|
||||
Primitive() engine.Primitive
|
||||
}
|
||||
|
||||
// ContextVSchema defines the interface for this package to fetch
|
||||
|
|
|
@ -42,7 +42,7 @@ func buildDeletePlan(del *sqlparser.Delete, vschema ContextVSchema) (*engine.Del
|
|||
return nil, errors.New("unsupported: multi-table/vindex delete statement in sharded keyspace")
|
||||
}
|
||||
ro := rb.routeOptions[0]
|
||||
edel.Keyspace = ro.ERoute.Keyspace
|
||||
edel.Keyspace = ro.eroute.Keyspace
|
||||
if !edel.Keyspace.Sharded {
|
||||
// We only validate non-table subexpressions because the previous analysis has already validated them.
|
||||
if !pb.validateUnshardedRoute(del.Targets, del.Where, del.OrderBy, del.Limit) {
|
||||
|
@ -65,12 +65,12 @@ func buildDeletePlan(del *sqlparser.Delete, vschema ContextVSchema) (*engine.Del
|
|||
}
|
||||
|
||||
edel.QueryTimeout = queryTimeout(directives)
|
||||
if rb.routeOptions[0].ERoute.TargetDestination != nil {
|
||||
if rb.routeOptions[0].ERoute.TargetTabletType != topodatapb.TabletType_MASTER {
|
||||
if rb.routeOptions[0].eroute.TargetDestination != nil {
|
||||
if rb.routeOptions[0].eroute.TargetTabletType != topodatapb.TabletType_MASTER {
|
||||
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported: DELETE statement with a replica target")
|
||||
}
|
||||
edel.Opcode = engine.DeleteByDestination
|
||||
edel.TargetDestination = rb.routeOptions[0].ERoute.TargetDestination
|
||||
edel.TargetDestination = rb.routeOptions[0].eroute.TargetDestination
|
||||
return edel, nil
|
||||
}
|
||||
var err error
|
||||
|
|
|
@ -243,7 +243,7 @@ func hasSubquery(node sqlparser.SQLNode) bool {
|
|||
func (pb *primitiveBuilder) validateUnshardedRoute(nodes ...sqlparser.SQLNode) bool {
|
||||
var keyspace string
|
||||
if rb, ok := pb.bldr.(*route); ok {
|
||||
keyspace = rb.routeOptions[0].ERoute.Keyspace.Name
|
||||
keyspace = rb.routeOptions[0].eroute.Keyspace.Name
|
||||
} else {
|
||||
// This code is unreachable because the caller checks.
|
||||
return false
|
||||
|
|
|
@ -114,11 +114,11 @@ func (pb *primitiveBuilder) processAliasedTable(tableExpr *sqlparser.AliasedTabl
|
|||
vschemaTables := make([]*vindexes.Table, 0, len(subroute.routeOptions))
|
||||
for _, ro := range subroute.routeOptions {
|
||||
vst := &vindexes.Table{
|
||||
Keyspace: ro.ERoute.Keyspace,
|
||||
Keyspace: ro.eroute.Keyspace,
|
||||
}
|
||||
vschemaTables = append(vschemaTables, vst)
|
||||
for _, rc := range subroute.ResultColumns() {
|
||||
vindex, ok := ro.vindexes[rc.column]
|
||||
vindex, ok := ro.vindexMap[rc.column]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
@ -140,8 +140,7 @@ func (pb *primitiveBuilder) processAliasedTable(tableExpr *sqlparser.AliasedTabl
|
|||
return err
|
||||
}
|
||||
for i, ro := range subroute.routeOptions {
|
||||
ro.rb = rb
|
||||
ro.vindexes = vindexMaps[i]
|
||||
ro.SubqueryToTable(rb, vindexMaps[i])
|
||||
}
|
||||
rb.routeOptions = subroute.routeOptions
|
||||
subroute.Redirect = rb
|
||||
|
@ -165,10 +164,7 @@ func (pb *primitiveBuilder) buildTablePrimitive(tableExpr *sqlparser.AliasedTabl
|
|||
return err
|
||||
}
|
||||
rb, st := newRoute(sel)
|
||||
rb.routeOptions = []*routeOption{{
|
||||
rb: rb,
|
||||
ERoute: engine.NewSimpleRoute(engine.SelectDBA, ks),
|
||||
}}
|
||||
rb.routeOptions = []*routeOption{newSimpleRouteOption(rb, engine.NewSimpleRoute(engine.SelectDBA, ks))}
|
||||
pb.bldr, pb.st = rb, st
|
||||
return nil
|
||||
}
|
||||
|
@ -189,6 +185,27 @@ func (pb *primitiveBuilder) buildTablePrimitive(tableExpr *sqlparser.AliasedTabl
|
|||
return err
|
||||
}
|
||||
for i, vst := range vschemaTables {
|
||||
sub := &tableSubstitution{
|
||||
oldExpr: tableExpr,
|
||||
}
|
||||
if tableExpr.As.IsEmpty() {
|
||||
if tableName.Name != vst.Name {
|
||||
// Table name does not match. Change and alias it to old name.
|
||||
sub.newExpr = &sqlparser.AliasedTableExpr{
|
||||
Expr: &sqlparser.TableName{Name: vst.Name},
|
||||
As: tableName.Name,
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Table is already aliased.
|
||||
if tableName.Name != vst.Name {
|
||||
// Table name does not match. Change it and reuse existing alias.
|
||||
sub.newExpr = &sqlparser.AliasedTableExpr{
|
||||
Expr: &sqlparser.TableName{Name: vst.Name},
|
||||
As: tableExpr.As,
|
||||
}
|
||||
}
|
||||
}
|
||||
var eroute *engine.Route
|
||||
switch {
|
||||
case !vst.Keyspace.Sharded:
|
||||
|
@ -205,12 +222,7 @@ func (pb *primitiveBuilder) buildTablePrimitive(tableExpr *sqlparser.AliasedTabl
|
|||
eroute.Vindex, _ = vindexes.NewBinary("binary", nil)
|
||||
eroute.Values = []sqltypes.PlanValue{{Value: sqltypes.MakeTrusted(sqltypes.VarBinary, vst.Pinned)}}
|
||||
}
|
||||
rb.routeOptions = append(rb.routeOptions, &routeOption{
|
||||
rb: rb,
|
||||
vschemaTable: vst,
|
||||
vindexes: vindexMaps[i],
|
||||
ERoute: eroute,
|
||||
})
|
||||
rb.routeOptions = append(rb.routeOptions, newRouteOption(rb, vst, sub, vindexMaps[i], eroute))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -267,19 +279,13 @@ func (pb *primitiveBuilder) join(rpb *primitiveBuilder, ajoin *sqlparser.JoinTab
|
|||
}
|
||||
|
||||
// Try merging the routes.
|
||||
isLeftJoin := ajoin != nil && ajoin.Join == sqlparser.LeftJoinStr
|
||||
var mergedRouteOptions []*routeOption
|
||||
outer:
|
||||
for _, lro := range lRoute.routeOptions {
|
||||
for _, rro := range rRoute.routeOptions {
|
||||
if lro.JoinCanMerge(pb, rro, ajoin) {
|
||||
lro.vschemaTable = nil
|
||||
lro.substitutions = append(lro.substitutions, rro.substitutions...)
|
||||
// Add RHS vindexes only if it's not a left join.
|
||||
if ajoin == nil || ajoin.Join != sqlparser.LeftJoinStr {
|
||||
for c, v := range rro.vindexes {
|
||||
lro.vindexes[c] = v
|
||||
}
|
||||
}
|
||||
lro.MergeJoin(rro, isLeftJoin)
|
||||
mergedRouteOptions = append(mergedRouteOptions, lro)
|
||||
continue outer
|
||||
}
|
||||
|
@ -307,8 +313,6 @@ func (pb *primitiveBuilder) mergeRoutes(rpb *primitiveBuilder, routeOptions []*r
|
|||
} else {
|
||||
sel.From = sqlparser.TableExprs{ajoin}
|
||||
}
|
||||
// Redirect before merging the symtabs. Merge will use Redirect
|
||||
// to check if rRoute matches lRoute.
|
||||
rRoute.Redirect = lRoute
|
||||
// Since the routes have merged, set st.singleRoute to point at
|
||||
// the merged route.
|
||||
|
|
|
@ -41,7 +41,7 @@ func buildInsertPlan(ins *sqlparser.Insert, vschema ContextVSchema) (*engine.Ins
|
|||
return nil, fmt.Errorf("inserting into a vindex not allowed: %s", sqlparser.String(ins.Table))
|
||||
}
|
||||
ro := rb.routeOptions[0]
|
||||
if ro.ERoute.TargetDestination != nil {
|
||||
if ro.eroute.TargetDestination != nil {
|
||||
return nil, errors.New("unsupported: INSERT with a target destination")
|
||||
}
|
||||
if !ro.vschemaTable.Keyspace.Sharded {
|
||||
|
|
|
@ -33,6 +33,10 @@ var _ builder = (*route)(nil)
|
|||
// SelectScatter, etc. Portions of the original Select AST
|
||||
// are moved into this node, which will be used to build
|
||||
// the final SQL for this route.
|
||||
// A route can have multiple routeOptions. They are kept
|
||||
// up-to-date as the route improves. Those that don't
|
||||
// qualify are continuously removed from the options.
|
||||
// A single best route is chosen before the Wireup phase.
|
||||
type route struct {
|
||||
order int
|
||||
|
||||
|
@ -86,7 +90,7 @@ func (rb *route) Reorder(order int) {
|
|||
|
||||
// Primitive satisfies the builder interface.
|
||||
func (rb *route) Primitive() engine.Primitive {
|
||||
return rb.routeOptions[0].ERoute
|
||||
return rb.routeOptions[0].eroute
|
||||
}
|
||||
|
||||
// First satisfies the builder interface.
|
||||
|
@ -193,7 +197,7 @@ func (rb *route) PushOrderBy(order *sqlparser.Order) error {
|
|||
Desc: order.Direction == sqlparser.DescScr,
|
||||
}
|
||||
for _, ro := range rb.routeOptions {
|
||||
ro.ERoute.OrderBy = append(ro.ERoute.OrderBy, ob)
|
||||
ro.eroute.OrderBy = append(ro.eroute.OrderBy, ob)
|
||||
}
|
||||
|
||||
rb.Select.AddOrder(order)
|
||||
|
@ -236,7 +240,7 @@ func (rb *route) Wireup(bldr builder, jt *jointab) error {
|
|||
|
||||
// Precaution: update ERoute.Values only if it's not set already.
|
||||
ro := rb.routeOptions[0]
|
||||
if ro.ERoute.Values == nil {
|
||||
if ro.eroute.Values == nil {
|
||||
// Resolve values stored in the builder.
|
||||
switch vals := ro.condition.(type) {
|
||||
case *sqlparser.ComparisonExpr:
|
||||
|
@ -244,7 +248,7 @@ func (rb *route) Wireup(bldr builder, jt *jointab) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ro.ERoute.Values = []sqltypes.PlanValue{pv}
|
||||
ro.eroute.Values = []sqltypes.PlanValue{pv}
|
||||
vals.Right = sqlparser.ListArg("::" + engine.ListVarName)
|
||||
case nil:
|
||||
// no-op.
|
||||
|
@ -253,7 +257,7 @@ func (rb *route) Wireup(bldr builder, jt *jointab) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ro.ERoute.Values = []sqltypes.PlanValue{pv}
|
||||
ro.eroute.Values = []sqltypes.PlanValue{pv}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -261,18 +265,18 @@ func (rb *route) Wireup(bldr builder, jt *jointab) error {
|
|||
// we have to request the corresponding weight_string from mysql
|
||||
// and use that value instead. This is because we cannot mimic
|
||||
// mysql's collation behavior yet.
|
||||
for i, orderby := range ro.ERoute.OrderBy {
|
||||
for i, orderby := range ro.eroute.OrderBy {
|
||||
rc := rb.resultColumns[orderby.Col]
|
||||
if sqltypes.IsText(rc.column.typ) {
|
||||
// If a weight string was previously requested (by OrderedAggregator),
|
||||
// reuse it.
|
||||
if colnum, ok := rb.weightStrings[rc]; ok {
|
||||
ro.ERoute.OrderBy[i].Col = colnum
|
||||
ro.eroute.OrderBy[i].Col = colnum
|
||||
continue
|
||||
}
|
||||
|
||||
// len(rb.resultColumns) does not change. No harm using the value multiple times.
|
||||
ro.ERoute.TruncateColumnCount = len(rb.resultColumns)
|
||||
ro.eroute.TruncateColumnCount = len(rb.resultColumns)
|
||||
|
||||
// This code is partially duplicated from SupplyWeightString and PushSelect.
|
||||
// We should not update resultColumns because it's not returned in the result.
|
||||
|
@ -287,7 +291,7 @@ func (rb *route) Wireup(bldr builder, jt *jointab) error {
|
|||
}
|
||||
sel := rb.Select.(*sqlparser.Select)
|
||||
sel.SelectExprs = append(sel.SelectExprs, expr)
|
||||
ro.ERoute.OrderBy[i].Col = len(sel.SelectExprs) - 1
|
||||
ro.eroute.OrderBy[i].Col = len(sel.SelectExprs) - 1
|
||||
// We don't really have to update weightStrings, but we're doing it
|
||||
// for good measure.
|
||||
rb.weightStrings[rc] = len(sel.SelectExprs) - 1
|
||||
|
@ -315,6 +319,11 @@ func (rb *route) Wireup(bldr builder, jt *jointab) error {
|
|||
return true, nil
|
||||
}, rb.Select)
|
||||
|
||||
// Substitute table names
|
||||
for _, sub := range ro.substitutions {
|
||||
*sub.oldExpr = *sub.newExpr
|
||||
}
|
||||
|
||||
// Generate query while simultaneously resolving values.
|
||||
varFormatter := func(buf *sqlparser.TrackedBuffer, node sqlparser.SQLNode) {
|
||||
switch node := node.(type) {
|
||||
|
@ -336,8 +345,8 @@ func (rb *route) Wireup(bldr builder, jt *jointab) error {
|
|||
}
|
||||
buf := sqlparser.NewTrackedBuffer(varFormatter)
|
||||
varFormatter(buf, rb.Select)
|
||||
ro.ERoute.Query = buf.ParsedQuery().Query
|
||||
ro.ERoute.FieldQuery = rb.generateFieldQuery(rb.Select, jt)
|
||||
ro.eroute.Query = buf.ParsedQuery().Query
|
||||
ro.eroute.FieldQuery = rb.generateFieldQuery(rb.Select, jt)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -482,7 +491,7 @@ outer:
|
|||
for _, lro := range rb.routeOptions {
|
||||
for _, rro := range inner.routeOptions {
|
||||
if lro.SubqueryCanMerge(pb, rro) {
|
||||
lro.substitutions = append(lro.substitutions, rro.substitutions...)
|
||||
lro.MergeSubquery(rro)
|
||||
mergedRouteOptions = append(mergedRouteOptions, lro)
|
||||
continue outer
|
||||
}
|
||||
|
@ -504,8 +513,7 @@ outer:
|
|||
for _, lro := range rb.routeOptions {
|
||||
for _, rro := range right.routeOptions {
|
||||
if lro.UnionCanMerge(rro) {
|
||||
lro.vschemaTable = nil
|
||||
lro.substitutions = append(lro.substitutions, rro.substitutions...)
|
||||
lro.MergeUnion(rro)
|
||||
mergedRouteOptions = append(mergedRouteOptions, lro)
|
||||
continue outer
|
||||
}
|
||||
|
@ -523,7 +531,7 @@ outer:
|
|||
// route. It returns false if no such options exist.
|
||||
func (rb *route) removeMultishardOptions() bool {
|
||||
return rb.removeOptions(func(ro *routeOption) bool {
|
||||
switch ro.ERoute.Opcode {
|
||||
switch ro.eroute.Opcode {
|
||||
case engine.SelectUnsharded, engine.SelectDBA, engine.SelectNext, engine.SelectEqualUnique:
|
||||
return true
|
||||
}
|
||||
|
@ -537,7 +545,7 @@ func (rb *route) removeMultishardOptions() bool {
|
|||
// keyspaces like last_insert_id.
|
||||
func (rb *route) removeShardedOptions() bool {
|
||||
return rb.removeOptions(func(ro *routeOption) bool {
|
||||
return ro.ERoute.Opcode == engine.SelectUnsharded
|
||||
return ro.eroute.Opcode == engine.SelectUnsharded
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -545,7 +553,7 @@ func (rb *route) removeShardedOptions() bool {
|
|||
// the specified keyspace. It returns false if no such options exist.
|
||||
func (rb *route) removeOptionsWithUnmatchedKeyspace(keyspace string) bool {
|
||||
return rb.removeOptions(func(ro *routeOption) bool {
|
||||
return ro.ERoute.Keyspace.Name == keyspace
|
||||
return ro.eroute.Keyspace.Name == keyspace
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -22,33 +22,56 @@ import (
|
|||
"vitess.io/vitess/go/vt/vtgate/vindexes"
|
||||
)
|
||||
|
||||
// routeOption contains all the information for one route option.
|
||||
// A route can have multiple options.
|
||||
type routeOption struct {
|
||||
rb *route
|
||||
|
||||
// vschemaTable is set for DMLs, only if a single table
|
||||
// is referenced in the from clause.
|
||||
// vschemaTable is set only if a single table is referenced
|
||||
// in the from clause. It's used only for DMLs.
|
||||
vschemaTable *vindexes.Table
|
||||
|
||||
// substitutions contains the list of table expressions that
|
||||
// substitutions contain the list of table expressions that
|
||||
// have to be substituted in the route's query.
|
||||
substitutions []*tableSubstitution
|
||||
|
||||
// vindexes is a map of all vindexes that can be used
|
||||
// vindexMap is a map of all vindexMap that can be used
|
||||
// for the routeOption.
|
||||
vindexes map[*column]vindexes.Vindex
|
||||
vindexMap map[*column]vindexes.Vindex
|
||||
|
||||
// condition stores the AST condition that will be used
|
||||
// to resolve the ERoute Values field.
|
||||
condition sqlparser.Expr
|
||||
|
||||
// ERoute is the primitive being built.
|
||||
ERoute *engine.Route
|
||||
// eroute is the primitive being built.
|
||||
eroute *engine.Route
|
||||
}
|
||||
|
||||
type tableSubstitution struct {
|
||||
newExpr, oldExpr *sqlparser.AliasedTableExpr
|
||||
}
|
||||
|
||||
func newSimpleRouteOption(rb *route, eroute *engine.Route) *routeOption {
|
||||
return &routeOption{
|
||||
rb: rb,
|
||||
eroute: eroute,
|
||||
}
|
||||
}
|
||||
|
||||
func newRouteOption(rb *route, vst *vindexes.Table, sub *tableSubstitution, vindexMap map[*column]vindexes.Vindex, eroute *engine.Route) *routeOption {
|
||||
var subs []*tableSubstitution
|
||||
if sub != nil && sub.newExpr != nil {
|
||||
subs = []*tableSubstitution{sub}
|
||||
}
|
||||
return &routeOption{
|
||||
rb: rb,
|
||||
vschemaTable: vst,
|
||||
substitutions: subs,
|
||||
vindexMap: vindexMap,
|
||||
eroute: eroute,
|
||||
}
|
||||
}
|
||||
|
||||
func (ro *routeOption) JoinCanMerge(pb *primitiveBuilder, rro *routeOption, ajoin *sqlparser.JoinTableExpr) bool {
|
||||
return ro.canMerge(rro, func() bool {
|
||||
if ajoin == nil {
|
||||
|
@ -63,11 +86,26 @@ func (ro *routeOption) JoinCanMerge(pb *primitiveBuilder, rro *routeOption, ajoi
|
|||
})
|
||||
}
|
||||
|
||||
func (ro *routeOption) MergeJoin(rro *routeOption, isLeftJoin bool) {
|
||||
ro.vschemaTable = nil
|
||||
ro.substitutions = append(ro.substitutions, rro.substitutions...)
|
||||
if isLeftJoin {
|
||||
return
|
||||
}
|
||||
// Add RHS vindexes only if it's not a left join.
|
||||
for c, v := range rro.vindexMap {
|
||||
if ro.vindexMap == nil {
|
||||
ro.vindexMap = make(map[*column]vindexes.Vindex)
|
||||
}
|
||||
ro.vindexMap[c] = v
|
||||
}
|
||||
}
|
||||
|
||||
func (ro *routeOption) SubqueryCanMerge(pb *primitiveBuilder, inner *routeOption) bool {
|
||||
return ro.canMerge(inner, func() bool {
|
||||
switch vals := inner.condition.(type) {
|
||||
case *sqlparser.ColName:
|
||||
if ro.FindVindex(pb, vals) == inner.ERoute.Vindex {
|
||||
if ro.FindVindex(pb, vals) == inner.eroute.Vindex {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@ -75,28 +113,42 @@ func (ro *routeOption) SubqueryCanMerge(pb *primitiveBuilder, inner *routeOption
|
|||
})
|
||||
}
|
||||
|
||||
func (ro *routeOption) MergeSubquery(subqueryOption *routeOption) {
|
||||
ro.substitutions = append(ro.substitutions, subqueryOption.substitutions...)
|
||||
}
|
||||
|
||||
func (ro *routeOption) UnionCanMerge(rro *routeOption) bool {
|
||||
return ro.canMerge(rro, func() bool { return false })
|
||||
}
|
||||
|
||||
func (ro *routeOption) MergeUnion(rro *routeOption) {
|
||||
ro.vschemaTable = nil
|
||||
ro.substitutions = append(ro.substitutions, rro.substitutions...)
|
||||
}
|
||||
|
||||
func (ro *routeOption) SubqueryToTable(rb *route, vindexMap map[*column]vindexes.Vindex) {
|
||||
ro.rb = rb
|
||||
ro.vindexMap = vindexMap
|
||||
}
|
||||
|
||||
func (ro *routeOption) canMerge(rro *routeOption, customCheck func() bool) bool {
|
||||
if ro.ERoute.Keyspace.Name != rro.ERoute.Keyspace.Name {
|
||||
if ro.eroute.Keyspace.Name != rro.eroute.Keyspace.Name {
|
||||
return false
|
||||
}
|
||||
switch ro.ERoute.Opcode {
|
||||
switch ro.eroute.Opcode {
|
||||
case engine.SelectUnsharded:
|
||||
if rro.ERoute.Opcode == engine.SelectUnsharded {
|
||||
if rro.eroute.Opcode == engine.SelectUnsharded {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
case engine.SelectDBA:
|
||||
if rro.ERoute.Opcode == engine.SelectDBA {
|
||||
if rro.eroute.Opcode == engine.SelectDBA {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
case engine.SelectEqualUnique:
|
||||
// Check if they target the same shard.
|
||||
if rro.ERoute.Opcode == engine.SelectEqualUnique && ro.ERoute.Vindex == rro.ERoute.Vindex && valEqual(ro.condition, rro.condition) {
|
||||
if rro.eroute.Opcode == engine.SelectEqualUnique && ro.eroute.Vindex == rro.eroute.Vindex && valEqual(ro.condition, rro.condition) {
|
||||
return true
|
||||
}
|
||||
case engine.SelectNext:
|
||||
|
@ -142,7 +194,7 @@ func (ro *routeOption) canMergeOnFilter(pb *primitiveBuilder, rro *routeOption,
|
|||
// We assume that the filter has already been pushed into
|
||||
// the route.
|
||||
func (ro *routeOption) UpdatePlan(pb *primitiveBuilder, filter sqlparser.Expr) {
|
||||
switch ro.ERoute.Opcode {
|
||||
switch ro.eroute.Opcode {
|
||||
case engine.SelectUnsharded, engine.SelectNext, engine.SelectDBA:
|
||||
return
|
||||
}
|
||||
|
@ -150,9 +202,9 @@ func (ro *routeOption) UpdatePlan(pb *primitiveBuilder, filter sqlparser.Expr) {
|
|||
if opcode == engine.SelectScatter {
|
||||
return
|
||||
}
|
||||
switch ro.ERoute.Opcode {
|
||||
switch ro.eroute.Opcode {
|
||||
case engine.SelectEqualUnique:
|
||||
if opcode == engine.SelectEqualUnique && vindex.Cost() < ro.ERoute.Vindex.Cost() {
|
||||
if opcode == engine.SelectEqualUnique && vindex.Cost() < ro.eroute.Vindex.Cost() {
|
||||
ro.updateRoute(opcode, vindex, values)
|
||||
}
|
||||
case engine.SelectEqual:
|
||||
|
@ -160,7 +212,7 @@ func (ro *routeOption) UpdatePlan(pb *primitiveBuilder, filter sqlparser.Expr) {
|
|||
case engine.SelectEqualUnique:
|
||||
ro.updateRoute(opcode, vindex, values)
|
||||
case engine.SelectEqual:
|
||||
if vindex.Cost() < ro.ERoute.Vindex.Cost() {
|
||||
if vindex.Cost() < ro.eroute.Vindex.Cost() {
|
||||
ro.updateRoute(opcode, vindex, values)
|
||||
}
|
||||
}
|
||||
|
@ -169,7 +221,7 @@ func (ro *routeOption) UpdatePlan(pb *primitiveBuilder, filter sqlparser.Expr) {
|
|||
case engine.SelectEqualUnique, engine.SelectEqual:
|
||||
ro.updateRoute(opcode, vindex, values)
|
||||
case engine.SelectIN:
|
||||
if vindex.Cost() < ro.ERoute.Vindex.Cost() {
|
||||
if vindex.Cost() < ro.eroute.Vindex.Cost() {
|
||||
ro.updateRoute(opcode, vindex, values)
|
||||
}
|
||||
}
|
||||
|
@ -182,8 +234,8 @@ func (ro *routeOption) UpdatePlan(pb *primitiveBuilder, filter sqlparser.Expr) {
|
|||
}
|
||||
|
||||
func (ro *routeOption) updateRoute(opcode engine.RouteOpcode, vindex vindexes.Vindex, condition sqlparser.Expr) {
|
||||
ro.ERoute.Opcode = opcode
|
||||
ro.ERoute.Vindex = vindex
|
||||
ro.eroute.Opcode = opcode
|
||||
ro.eroute.Vindex = vindex
|
||||
ro.condition = condition
|
||||
}
|
||||
|
||||
|
@ -245,35 +297,35 @@ func (ro *routeOption) computeINPlan(pb *primitiveBuilder, comparison *sqlparser
|
|||
}
|
||||
|
||||
func (ro *routeOption) isBetterThan(other *routeOption) bool {
|
||||
switch other.ERoute.Opcode {
|
||||
switch other.eroute.Opcode {
|
||||
case engine.SelectUnsharded, engine.SelectNext, engine.SelectDBA:
|
||||
return false
|
||||
case engine.SelectEqualUnique:
|
||||
switch ro.ERoute.Opcode {
|
||||
switch ro.eroute.Opcode {
|
||||
case engine.SelectUnsharded, engine.SelectNext, engine.SelectDBA:
|
||||
return true
|
||||
case engine.SelectEqualUnique:
|
||||
if ro.ERoute.Vindex.Cost() < other.ERoute.Vindex.Cost() {
|
||||
if ro.eroute.Vindex.Cost() < other.eroute.Vindex.Cost() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
case engine.SelectIN:
|
||||
switch ro.ERoute.Opcode {
|
||||
switch ro.eroute.Opcode {
|
||||
case engine.SelectUnsharded, engine.SelectNext, engine.SelectDBA, engine.SelectEqualUnique:
|
||||
return true
|
||||
case engine.SelectIN:
|
||||
if ro.ERoute.Vindex.Cost() < other.ERoute.Vindex.Cost() {
|
||||
if ro.eroute.Vindex.Cost() < other.eroute.Vindex.Cost() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
case engine.SelectEqual:
|
||||
switch ro.ERoute.Opcode {
|
||||
switch ro.eroute.Opcode {
|
||||
case engine.SelectUnsharded, engine.SelectNext, engine.SelectDBA, engine.SelectEqualUnique, engine.SelectIN:
|
||||
return true
|
||||
case engine.SelectEqual:
|
||||
if ro.ERoute.Vindex.Cost() < other.ERoute.Vindex.Cost() {
|
||||
if ro.eroute.Vindex.Cost() < other.eroute.Vindex.Cost() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@ -300,7 +352,7 @@ func (ro *routeOption) FindVindex(pb *primitiveBuilder, expr sqlparser.Expr) vin
|
|||
if c.Origin() != ro.rb {
|
||||
return nil
|
||||
}
|
||||
return ro.vindexes[c]
|
||||
return ro.vindexMap[c]
|
||||
}
|
||||
|
||||
// exprIsValue returns true if the expression can be treated as a value
|
||||
|
|
|
@ -80,13 +80,13 @@ func (pb *primitiveBuilder) processSelect(sel *sqlparser.Select, outer *symtab)
|
|||
// TODO(sougou): this can probably be improved.
|
||||
for _, ro := range rb.routeOptions {
|
||||
directives := sqlparser.ExtractCommentDirectives(sel.Comments)
|
||||
ro.ERoute.QueryTimeout = queryTimeout(directives)
|
||||
if ro.ERoute.TargetDestination != nil {
|
||||
ro.eroute.QueryTimeout = queryTimeout(directives)
|
||||
if ro.eroute.TargetDestination != nil {
|
||||
return errors.New("unsupported: SELECT with a target destination")
|
||||
}
|
||||
|
||||
if directives.IsSet(sqlparser.DirectiveScatterErrorsAsWarnings) {
|
||||
ro.ERoute.ScatterErrorsAsWarnings = true
|
||||
ro.eroute.ScatterErrorsAsWarnings = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -229,10 +229,10 @@ func (pb *primitiveBuilder) pushSelectRoutes(selectExprs sqlparser.SelectExprs)
|
|||
return nil, errors.New("unsupported: SELECT NEXT query in cross-shard query")
|
||||
}
|
||||
for _, ro := range rb.routeOptions {
|
||||
if ro.ERoute.Opcode != engine.SelectUnsharded {
|
||||
if ro.eroute.Opcode != engine.SelectUnsharded {
|
||||
return nil, errors.New("NEXT used on a sharded table")
|
||||
}
|
||||
ro.ERoute.Opcode = engine.SelectNext
|
||||
ro.eroute.Opcode = engine.SelectNext
|
||||
}
|
||||
resultColumns = append(resultColumns, rb.PushAnonymous(node))
|
||||
default:
|
||||
|
@ -289,7 +289,7 @@ func (pb *primitiveBuilder) expandStar(inrcs []*resultColumn, expr *sqlparser.St
|
|||
As: col,
|
||||
}
|
||||
}
|
||||
rc, _, err := pb.bldr.PushSelect(expr, t.origin)
|
||||
rc, _, err := pb.bldr.PushSelect(expr, t.Origin())
|
||||
if err != nil {
|
||||
// Unreachable because PushSelect won't fail on ColName.
|
||||
return inrcs, false, err
|
||||
|
@ -316,7 +316,7 @@ func (pb *primitiveBuilder) expandStar(inrcs []*resultColumn, expr *sqlparser.St
|
|||
Qualifier: expr.TableName,
|
||||
},
|
||||
}
|
||||
rc, _, err := pb.bldr.PushSelect(expr, t.origin)
|
||||
rc, _, err := pb.bldr.PushSelect(expr, t.Origin())
|
||||
if err != nil {
|
||||
// Unreachable because PushSelect won't fail on ColName.
|
||||
return inrcs, false, err
|
||||
|
|
|
@ -93,15 +93,14 @@ func (st *symtab) AddVSchemaTable(alias sqlparser.TableName, vschemaTables []*vi
|
|||
origin: rb,
|
||||
}
|
||||
|
||||
for _, vst := range vschemaTables {
|
||||
vindexMaps = make([]map[*column]vindexes.Vindex, len(vschemaTables))
|
||||
for i, vst := range vschemaTables {
|
||||
// If any input is authoritative, we make the table
|
||||
// authoritative.
|
||||
// TODO(sougou): vschema builder should validate that authoritative columns match.
|
||||
if vst.ColumnListAuthoritative {
|
||||
t.isAuthoritative = true
|
||||
}
|
||||
vindexMap := make(map[*column]vindexes.Vindex)
|
||||
vindexMaps = append(vindexMaps, vindexMap)
|
||||
|
||||
for _, col := range vst.Columns {
|
||||
t.addColumn(col.Name, &column{
|
||||
|
@ -111,6 +110,7 @@ func (st *symtab) AddVSchemaTable(alias sqlparser.TableName, vschemaTables []*vi
|
|||
})
|
||||
}
|
||||
|
||||
var vindexMap map[*column]vindexes.Vindex
|
||||
for _, cv := range vst.ColumnVindexes {
|
||||
for i, cvcol := range cv.Columns {
|
||||
col, ok := t.columns[cvcol.Lowered()]
|
||||
|
@ -123,10 +123,14 @@ func (st *symtab) AddVSchemaTable(alias sqlparser.TableName, vschemaTables []*vi
|
|||
}
|
||||
if i == 0 {
|
||||
// For now, only the first column is used for vindex Map functions.
|
||||
if vindexMap == nil {
|
||||
vindexMap = make(map[*column]vindexes.Vindex)
|
||||
}
|
||||
vindexMap[col] = cv.Vindex
|
||||
}
|
||||
}
|
||||
}
|
||||
vindexMaps[i] = vindexMap
|
||||
|
||||
if ai := vst.AutoIncrement; ai != nil {
|
||||
if _, ok := t.columns[ai.Column.Lowered()]; !ok {
|
||||
|
@ -367,7 +371,7 @@ func (st *symtab) searchTables(col *sqlparser.ColName) (*column, error) {
|
|||
return nil, fmt.Errorf("symbol %s not found in table or subquery", sqlparser.String(col))
|
||||
}
|
||||
c = &column{
|
||||
origin: t.origin,
|
||||
origin: t.Origin(),
|
||||
st: st,
|
||||
}
|
||||
t.addColumn(col.Name, c)
|
||||
|
@ -433,10 +437,17 @@ func (t *table) addColumn(alias sqlparser.ColIdent, c *column) {
|
|||
t.columnNames = append(t.columnNames, alias)
|
||||
}
|
||||
|
||||
// Origin returns the route that originates the table.
|
||||
func (t *table) Origin() builder {
|
||||
// If it's a route, we have to resolve it.
|
||||
if rb, ok := t.origin.(*route); ok {
|
||||
return rb.Resolve()
|
||||
}
|
||||
return t.origin
|
||||
}
|
||||
|
||||
// column represents a unique symbol in the query that other
|
||||
// parts can refer to. If a column originates from a sharded
|
||||
// table, and is tied to a vindex, then its Vindex field is
|
||||
// set, which can be used to improve a route's plan.
|
||||
// parts can refer to.
|
||||
// Every column contains the builder it originates from.
|
||||
//
|
||||
// Two columns are equal if their pointer values match.
|
||||
|
|
|
@ -44,7 +44,7 @@ func buildUpdatePlan(upd *sqlparser.Update, vschema ContextVSchema) (*engine.Upd
|
|||
return nil, errors.New("unsupported: multi-table/vindex update statement in sharded keyspace")
|
||||
}
|
||||
ro := rb.routeOptions[0]
|
||||
eupd.Keyspace = ro.ERoute.Keyspace
|
||||
eupd.Keyspace = ro.eroute.Keyspace
|
||||
if !eupd.Keyspace.Sharded {
|
||||
// We only validate non-table subexpressions because the previous analysis has already validated them.
|
||||
if !pb.validateUnshardedRoute(upd.Exprs, upd.Where, upd.OrderBy, upd.Limit) {
|
||||
|
@ -73,12 +73,12 @@ func buildUpdatePlan(upd *sqlparser.Update, vschema ContextVSchema) (*engine.Upd
|
|||
}
|
||||
var err error
|
||||
|
||||
if ro.ERoute.TargetDestination != nil {
|
||||
if ro.ERoute.TargetTabletType != topodatapb.TabletType_MASTER {
|
||||
if ro.eroute.TargetDestination != nil {
|
||||
if ro.eroute.TargetTabletType != topodatapb.TabletType_MASTER {
|
||||
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported: UPDATE statement with a replica target")
|
||||
}
|
||||
eupd.Opcode = engine.UpdateByDestination
|
||||
eupd.TargetDestination = ro.ERoute.TargetDestination
|
||||
eupd.TargetDestination = ro.eroute.TargetDestination
|
||||
return eupd, nil
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче