Improved code readability and improved support for Ordered results

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
This commit is contained in:
Florent Poinsard 2021-09-22 17:02:28 +02:00
Родитель cf04d29e90
Коммит 4217916f2f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 87A9DEBFB0824A2D
4 изменённых файлов: 93 добавлений и 43 удалений

Просмотреть файл

@ -191,21 +191,20 @@ func ResultsEqualUnordered(r1, r2 []Result) bool {
return false
}
// allRows is a hash map that contains a row hashed as a key and
// the number of occurrence as the value. we use this map to ensure
// equality between the two result sets. when analyzing r1, we
// increment each key's value by one for each row's occurrence, and
// then we decrement it by one each time we see the same key in r2.
// if one of the key's value is not equal to zero, then r1 and r2 do
// not match.
allRows := map[string]int{}
countRows := 0
for _, r := range r1 {
for _, row := range r.Rows {
newHash := hashCodeForRow(row)
allRows[newHash] += 1
}
countRows += int(r.RowsAffected)
saveRowsAnalysis(r, allRows, &countRows, true)
}
for _, r := range r2 {
for _, row := range r.Rows {
newHash := hashCodeForRow(row)
allRows[newHash] -= 1
}
countRows -= int(r.RowsAffected)
saveRowsAnalysis(r, allRows, &countRows, false)
}
if countRows != 0 {
return false
@ -218,6 +217,22 @@ func ResultsEqualUnordered(r1, r2 []Result) bool {
return true
}
func saveRowsAnalysis(r Result, allRows map[string]int, totalRows *int, increment bool) {
for _, row := range r.Rows {
newHash := hashCodeForRow(row)
if increment {
allRows[newHash] += 1
} else {
allRows[newHash] -= 1
}
}
if increment {
*totalRows += int(r.RowsAffected)
} else {
*totalRows -= int(r.RowsAffected)
}
}
func hashCodeForRow(val []Value) string {
h := sha256.New()
h.Write([]byte(fmt.Sprintf("%v", val)))

Просмотреть файл

@ -741,6 +741,18 @@ func TestRenameFieldsOnOLAP(t *testing.T) {
assert.Equal(t, `[[VARBINARY("OLAP")]]`, fmt.Sprintf("%v", qr.Rows))
}
func TestSimpleOrderBy(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()
defer exec(t, conn, `delete from t1`)
exec(t, conn, "insert into t1(id1, id2) values (0,10),(1,9),(2,8),(3,7),(4,6),(5,5)")
assertMatches(t, conn, `SELECT id2 FROM t1 ORDER BY id2 ASC`, `[[INT64(5)] [INT64(6)] [INT64(7)] [INT64(8)] [INT64(9)] [INT64(10)]]`)
}
func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) {
t.Helper()
qr := exec(t, conn, query)

Просмотреть файл

@ -24,55 +24,62 @@ import (
"vitess.io/vitess/go/vt/vterrors"
)
// Gen4CompareV3 is a Primitive used to compare V3 and Gen4's plans.
type Gen4CompareV3 struct {
V3, Gen4 Primitive
HasOrderBy, IsNextVal bool
V3, Gen4 Primitive
HasOrderBy bool
}
var _ Primitive = (*Gen4CompareV3)(nil)
var _ Gen4Comparer = (*Gen4CompareV3)(nil)
// GetGen4Primitive implements the Gen4Comparer interface
func (c *Gen4CompareV3) GetGen4Primitive() Primitive {
return c.Gen4
}
// RouteType implements the Primitive interface
func (c *Gen4CompareV3) RouteType() string {
return c.Gen4.RouteType()
}
// GetKeyspaceName implements the Primitive interface
func (c *Gen4CompareV3) GetKeyspaceName() string {
return c.Gen4.GetKeyspaceName()
}
// GetTableName implements the Primitive interface
func (c *Gen4CompareV3) GetTableName() string {
return c.Gen4.GetTableName()
}
// GetFields implements the Primitive interface
func (c *Gen4CompareV3) GetFields(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return c.Gen4.GetFields(vcursor, bindVars)
}
// NeedsTransaction implements the Primitive interface
func (c *Gen4CompareV3) NeedsTransaction() bool {
return c.Gen4.NeedsTransaction()
}
// TryExecute implements the Primitive interface
func (c *Gen4CompareV3) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
gen4Result, gen4Err := c.Gen4.TryExecute(vcursor, bindVars, wantfields)
// we are not executing the plan a second time if the query is a select next val,
// since the first execution incremented the `next` value, results will always
// mismatch between v3 and Gen4.
if c.IsNextVal {
return gen4Result, gen4Err
}
v3Result, v3Err := c.V3.TryExecute(vcursor, bindVars, wantfields)
err := CompareV3AndGen4Errors(v3Err, gen4Err)
if err != nil {
return nil, err
}
match := sqltypes.ResultsEqualUnordered([]sqltypes.Result{*v3Result}, []sqltypes.Result{*gen4Result})
var match bool
if c.HasOrderBy {
match = sqltypes.ResultsEqual([]sqltypes.Result{*v3Result}, []sqltypes.Result{*gen4Result})
} else {
match = sqltypes.ResultsEqualUnordered([]sqltypes.Result{*v3Result}, []sqltypes.Result{*gen4Result})
}
if !match {
log.Infof("%T mismatch", c)
log.Infof("V3 got: %s", v3Result.Rows)
log.Infof("Gen4 got: %s", gen4Result.Rows)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "results did not match")
@ -80,42 +87,45 @@ func (c *Gen4CompareV3) TryExecute(vcursor VCursor, bindVars map[string]*querypb
return gen4Result, nil
}
// TryStreamExecute implements the Primitive interface
func (c *Gen4CompareV3) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
v3Result, gen4Result := &sqltypes.Result{}, &sqltypes.Result{}
gen4Error := c.Gen4.TryStreamExecute(vcursor, bindVars, wantfields, func(result *sqltypes.Result) error {
gen4Result.AppendResult(result)
return nil
})
// we are not executing the plan a second time if the query is a select next val,
// since the first execution incremented the `next` value, results will always
// mismatch between v3 and Gen4.
if c.IsNextVal {
if gen4Error != nil {
return gen4Error
}
return callback(gen4Result)
}
v3Err := c.V3.TryStreamExecute(vcursor, bindVars, wantfields, func(result *sqltypes.Result) error {
v3Result.AppendResult(result)
return nil
})
err := CompareV3AndGen4Errors(v3Err, gen4Error)
if err != nil {
return err
}
match := sqltypes.ResultsEqualUnordered([]sqltypes.Result{*v3Result}, []sqltypes.Result{*gen4Result})
var match bool
if c.HasOrderBy {
match = sqltypes.ResultsEqual([]sqltypes.Result{*v3Result}, []sqltypes.Result{*gen4Result})
} else {
match = sqltypes.ResultsEqualUnordered([]sqltypes.Result{*v3Result}, []sqltypes.Result{*gen4Result})
}
if !match {
log.Infof("%T mismatch", c)
log.Infof("V3 got: %s", v3Result.Rows)
log.Infof("Gen4 got: %s", gen4Result.Rows)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "results did not match")
}
return callback(gen4Result)
}
// Inputs implements the Primitive interface
func (c *Gen4CompareV3) Inputs() []Primitive {
return c.Gen4.Inputs()
}
// Description implements the Primitive interface
func (c *Gen4CompareV3) Description() PrimitiveDescription {
return c.Gen4.Description()
}

Просмотреть файл

@ -23,36 +23,49 @@ import (
func gen4CompareV3Planner(query string) func(sqlparser.Statement, *sqlparser.ReservedVars, ContextVSchema) (engine.Primitive, error) {
return func(statement sqlparser.Statement, vars *sqlparser.ReservedVars, schema ContextVSchema) (engine.Primitive, error) {
// we will be switching the planner version to Gen4 and V3 in order to
// create instructions using them, thus we make sure to switch back to
// the Gen4CompareV3 planner before exiting this method.
defer schema.SetPlannerVersion(Gen4CompareV3)
primitive := &engine.Gen4CompareV3{}
onlyGen4 := false
gen4Primitive, gen4Err := planWithPlannerVersion(statement, vars, schema, query, Gen4)
// preliminary checks on the given statement
switch s := statement.(type) {
case *sqlparser.Insert:
// we insert data only once using the gen4 planner to avoid duplicated rows in tables.
return gen4Primitive, gen4Err
// we want to insert data only once into the database,
// for this reason, we will only use Gen4's instructions.
onlyGen4 = true
case *sqlparser.Select:
primitive.HasOrderBy = len(s.OrderBy) > 0
for _, expr := range s.SelectExprs {
// we are not executing the plan a second time if the query is a select next val,
// since the first execution might increment the `next` value, results will almost
// always be different between v3 and Gen4.
if _, nextVal := expr.(*sqlparser.Nextval); nextVal {
primitive.IsNextVal = true
onlyGen4 = true
break
}
}
}
// since lock primitives can imply creation and deletion of new locks,
// we execute them only once using Gen4 to avoid the duplicated locks
// and double releases.
if hasLockPrimitive(gen4Primitive) {
// plan statement using Gen4
gen4Primitive, gen4Err := planWithPlannerVersion(statement, vars, schema, query, Gen4)
// if onlyGen4 is set to true or Gen4's instruction contain a lock primitive,
// we use only Gen4's primitive and exit early without using V3's.
// since lock primitives can imply the creation or deletion of locks,
// we want to execute them once using Gen4 to avoid the duplicated locks
// or double lock-releases.
if onlyGen4 || hasLockPrimitive(gen4Primitive) {
return gen4Primitive, gen4Err
}
// get V3's plan
v3Primitive, v3Err := planWithPlannerVersion(statement, vars, schema, query, V3)
// check errors
// check potential errors from Gen4 and V3
err := engine.CompareV3AndGen4Errors(v3Err, gen4Err)
if err != nil {
return nil, err