зеркало из https://github.com/github/vitess-gh.git
vindexes: alt approach for lookup sorted inserts
Looks like too many callers are affected if the passed in values are sorted by the `Create` function. Instead of changing all the call sites, it's better that `Create` sorts a copy of the values and leave the external behavior unchanged. Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Родитель
84a36ab84c
Коммит
3daf82c694
|
@ -213,6 +213,51 @@ func TestConsistentLookupMultiInsert(t *testing.T) {
|
|||
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(5)]]"; got != want {
|
||||
t.Errorf("select:\n%v want\n%v", got, want)
|
||||
}
|
||||
exec(t, conn, "delete from t1 where id1=1")
|
||||
exec(t, conn, "delete from t1 where id1=2")
|
||||
exec(t, conn, "delete from t1 where id1=3")
|
||||
exec(t, conn, "delete from t1 where id1=4")
|
||||
exec(t, conn, "delete from t1_id2_idx where id2=4")
|
||||
}
|
||||
|
||||
func TestHashLookupMultiInsertIgnore(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
conn, err := mysql.Connect(ctx, &vtParams)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
// conn2 is for queries that target shards.
|
||||
conn2, err := mysql.Connect(ctx, &vtParams)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer conn2.Close()
|
||||
|
||||
// DB should start out clean
|
||||
qr := exec(t, conn, "select count(*) from t2_id4_idx")
|
||||
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want {
|
||||
t.Errorf("select:\n%v want\n%v", got, want)
|
||||
}
|
||||
qr = exec(t, conn, "select count(*) from t2")
|
||||
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want {
|
||||
t.Errorf("select:\n%v want\n%v", got, want)
|
||||
}
|
||||
|
||||
// Try inserting a bunch of ids at once
|
||||
exec(t, conn, "begin")
|
||||
exec(t, conn, "insert ignore into t2(id3, id4) values(50,60), (30,40), (10,20)")
|
||||
exec(t, conn, "commit")
|
||||
|
||||
// Verify
|
||||
qr = exec(t, conn, "select id3, id4 from t2 order by id3")
|
||||
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(10) INT64(20)] [INT64(30) INT64(40)] [INT64(50) INT64(60)]]"; got != want {
|
||||
t.Errorf("select:\n%v want\n%v", got, want)
|
||||
}
|
||||
qr = exec(t, conn, "select id3, id4 from t2_id4_idx order by id3")
|
||||
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(10) INT64(20)] [INT64(30) INT64(40)] [INT64(50) INT64(60)]]"; got != want {
|
||||
t.Errorf("select:\n%v want\n%v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
|
||||
|
|
|
@ -61,6 +61,20 @@ create table aggr_test(
|
|||
val2 bigint,
|
||||
primary key(id)
|
||||
) Engine=InnoDB;
|
||||
|
||||
create table t2(
|
||||
id3 bigint,
|
||||
id4 bigint,
|
||||
primary key(id3)
|
||||
) Engine=InnoDB;
|
||||
|
||||
create table t2_id4_idx(
|
||||
id bigint not null auto_increment,
|
||||
id4 bigint,
|
||||
id3 bigint,
|
||||
primary key(id),
|
||||
key idx_id4(id4)
|
||||
) Engine=InnoDB;
|
||||
`
|
||||
|
||||
vschema = &vschemapb.Keyspace{
|
||||
|
@ -78,6 +92,16 @@ create table aggr_test(
|
|||
},
|
||||
Owner: "t1",
|
||||
},
|
||||
"t2_id4_idx": {
|
||||
Type: "lookup_hash",
|
||||
Params: map[string]string{
|
||||
"table": "t2_id4_idx",
|
||||
"from": "id4",
|
||||
"to": "id3",
|
||||
"autocommit": "true",
|
||||
},
|
||||
Owner: "t2",
|
||||
},
|
||||
},
|
||||
Tables: map[string]*vschemapb.Table{
|
||||
"t1": {
|
||||
|
@ -95,6 +119,21 @@ create table aggr_test(
|
|||
Name: "hash",
|
||||
}},
|
||||
},
|
||||
"t2": {
|
||||
ColumnVindexes: []*vschemapb.ColumnVindex{{
|
||||
Column: "id3",
|
||||
Name: "hash",
|
||||
}, {
|
||||
Column: "id4",
|
||||
Name: "t2_id4_idx",
|
||||
}},
|
||||
},
|
||||
"t2_id4_idx": {
|
||||
ColumnVindexes: []*vschemapb.ColumnVindex{{
|
||||
Column: "id4",
|
||||
Name: "hash",
|
||||
}},
|
||||
},
|
||||
"vstream_test": {
|
||||
ColumnVindexes: []*vschemapb.ColumnVindex{{
|
||||
Column: "id",
|
||||
|
|
|
@ -210,7 +210,7 @@ func (lu *clCommon) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][]byte
|
|||
|
||||
// Create reserves the id by inserting it into the vindex table.
|
||||
func (lu *clCommon) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error {
|
||||
err := lu.lkp.createCustom(vcursor, rowsColValues, ksids, ksidsToValues(ksids), ignoreMode, vtgatepb.CommitOrder_PRE)
|
||||
err := lu.lkp.createCustom(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode, vtgatepb.CommitOrder_PRE)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -108,7 +108,7 @@ func (ln *LookupNonUnique) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [
|
|||
|
||||
// Create reserves the id by inserting it into the vindex table.
|
||||
func (ln *LookupNonUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error {
|
||||
return ln.lkp.Create(vcursor, rowsColValues, ksids, ksidsToValues(ksids), ignoreMode)
|
||||
return ln.lkp.Create(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode)
|
||||
}
|
||||
|
||||
// Delete deletes the entry from the vindex table.
|
||||
|
@ -261,7 +261,7 @@ func (lu *LookupUnique) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][]
|
|||
|
||||
// Create reserves the id by inserting it into the vindex table.
|
||||
func (lu *LookupUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error {
|
||||
return lu.lkp.Create(vcursor, rowsColValues, ksids, ksidsToValues(ksids), ignoreMode)
|
||||
return lu.lkp.Create(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode)
|
||||
}
|
||||
|
||||
// Update updates the entry in the vindex table.
|
||||
|
|
|
@ -155,7 +155,7 @@ func (lh *LookupHash) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value,
|
|||
if err != nil {
|
||||
return fmt.Errorf("lookup.Create.vunhash: %v", err)
|
||||
}
|
||||
return lh.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode)
|
||||
return lh.lkp.Create(vcursor, rowsColValues, values, ignoreMode)
|
||||
}
|
||||
|
||||
// Update updates the entry in the vindex table.
|
||||
|
@ -309,7 +309,7 @@ func (lhu *LookupHashUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes.
|
|||
if err != nil {
|
||||
return fmt.Errorf("lookup.Create.vunhash: %v", err)
|
||||
}
|
||||
return lhu.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode)
|
||||
return lhu.lkp.Create(vcursor, rowsColValues, values, ignoreMode)
|
||||
}
|
||||
|
||||
// Delete deletes the entry from the vindex table.
|
||||
|
|
|
@ -107,7 +107,6 @@ func (lkp *lookupInternal) Verify(vcursor VCursor, ids, values []sqltypes.Value)
|
|||
|
||||
type sorter struct {
|
||||
rowsColValues [][]sqltypes.Value
|
||||
ksids [][]byte
|
||||
toValues []sqltypes.Value
|
||||
}
|
||||
|
||||
|
@ -133,7 +132,6 @@ func (v *sorter) Less(i, j int) bool {
|
|||
|
||||
func (v *sorter) Swap(i, j int) {
|
||||
v.toValues[i], v.toValues[j] = v.toValues[j], v.toValues[i]
|
||||
v.ksids[i], v.ksids[j] = v.ksids[j], v.ksids[i]
|
||||
v.rowsColValues[i], v.rowsColValues[j] = v.rowsColValues[j], v.rowsColValues[i]
|
||||
}
|
||||
|
||||
|
@ -147,14 +145,14 @@ func (v *sorter) Swap(i, j int) {
|
|||
// If we assume that the primary vindex is on column_c. The call to create will look like this:
|
||||
// Create(vcursor, [[value_a0, value_b0,], [value_a1, value_b1]], [binary(value_c0), binary(value_c1)])
|
||||
// Notice that toValues contains the computed binary value of the keyspace_id.
|
||||
func (lkp *lookupInternal) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, toValues []sqltypes.Value, ignoreMode bool) error {
|
||||
func (lkp *lookupInternal) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, toValues []sqltypes.Value, ignoreMode bool) error {
|
||||
if lkp.Autocommit {
|
||||
return lkp.createCustom(vcursor, rowsColValues, ksids, toValues, ignoreMode, vtgatepb.CommitOrder_AUTOCOMMIT)
|
||||
return lkp.createCustom(vcursor, rowsColValues, toValues, ignoreMode, vtgatepb.CommitOrder_AUTOCOMMIT)
|
||||
}
|
||||
return lkp.createCustom(vcursor, rowsColValues, ksids, toValues, ignoreMode, vtgatepb.CommitOrder_NORMAL)
|
||||
return lkp.createCustom(vcursor, rowsColValues, toValues, ignoreMode, vtgatepb.CommitOrder_NORMAL)
|
||||
}
|
||||
|
||||
func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, toValues []sqltypes.Value, ignoreMode bool, co vtgatepb.CommitOrder) error {
|
||||
func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqltypes.Value, toValues []sqltypes.Value, ignoreMode bool, co vtgatepb.CommitOrder) error {
|
||||
if len(rowsColValues) == 0 {
|
||||
// This code is unreachable. It's just a failsafe.
|
||||
return nil
|
||||
|
@ -176,7 +174,10 @@ func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqlty
|
|||
fmt.Fprintf(buf, "%s) values(", lkp.To)
|
||||
|
||||
bindVars := make(map[string]*querypb.BindVariable, 2*len(rowsColValues))
|
||||
sort.Sort(&sorter{rowsColValues: rowsColValues, ksids: ksids, toValues: toValues})
|
||||
// Make a copy before sorting.
|
||||
rowsColValues = append([][]sqltypes.Value(nil), rowsColValues...)
|
||||
toValues = append([]sqltypes.Value(nil), toValues...)
|
||||
sort.Sort(&sorter{rowsColValues: rowsColValues, toValues: toValues})
|
||||
for rowIdx := range toValues {
|
||||
colIds := rowsColValues[rowIdx]
|
||||
if rowIdx != 0 {
|
||||
|
@ -254,7 +255,7 @@ func (lkp *lookupInternal) Update(vcursor VCursor, oldValues []sqltypes.Value, k
|
|||
if err := lkp.Delete(vcursor, [][]sqltypes.Value{oldValues}, toValue, vtgatepb.CommitOrder_NORMAL); err != nil {
|
||||
return err
|
||||
}
|
||||
return lkp.Create(vcursor, [][]sqltypes.Value{newValues}, [][]byte{ksid}, []sqltypes.Value{toValue}, false /* ignoreMode */)
|
||||
return lkp.Create(vcursor, [][]sqltypes.Value{newValues}, []sqltypes.Value{toValue}, false /* ignoreMode */)
|
||||
}
|
||||
|
||||
func (lkp *lookupInternal) initDelStmt() string {
|
||||
|
|
|
@ -377,18 +377,10 @@ func TestLookupNonUniqueCreate(t *testing.T) {
|
|||
|
||||
// With ignore.
|
||||
vc.queries = nil
|
||||
rowsColsValues := [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NewInt64(1)}}
|
||||
ksids := [][]byte{[]byte("test2"), []byte("test1")}
|
||||
err = lookupNonUnique.(Lookup).Create(vc, rowsColsValues, ksids, true /* ignoreMode */)
|
||||
err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NewInt64(1)}}, [][]byte{[]byte("test2"), []byte("test1")}, true /* ignoreMode */)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if !reflect.DeepEqual(rowsColsValues, [][]sqltypes.Value{{sqltypes.NewInt64(1)}, {sqltypes.NewInt64(2)}}) {
|
||||
t.Errorf("inserts not reordered. Lookup table inserts get reordered on a bulk insert to avoid locking")
|
||||
}
|
||||
if !reflect.DeepEqual(ksids, [][]byte{[]byte("test1"), []byte("test2")}) {
|
||||
t.Errorf("keyspace ids not reordered. Keyspace ids must also get reordered on a bulk insert")
|
||||
}
|
||||
|
||||
wantqueries[0].Sql = "insert ignore into t(fromc, toc) values(:fromc0, :toc0), (:fromc1, :toc1)"
|
||||
if !reflect.DeepEqual(vc.queries, wantqueries) {
|
||||
|
|
|
@ -168,7 +168,7 @@ func (lh *LookupUnicodeLooseMD5Hash) Create(vcursor VCursor, rowsColValues [][]s
|
|||
if err != nil {
|
||||
return fmt.Errorf("lookup.Create.convert: %v", err)
|
||||
}
|
||||
return lh.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode)
|
||||
return lh.lkp.Create(vcursor, rowsColValues, values, ignoreMode)
|
||||
}
|
||||
|
||||
// Update updates the entry in the vindex table.
|
||||
|
@ -333,7 +333,7 @@ func (lhu *LookupUnicodeLooseMD5HashUnique) Create(vcursor VCursor, rowsColValue
|
|||
if err != nil {
|
||||
return fmt.Errorf("lookup.Create.convert: %v", err)
|
||||
}
|
||||
return lhu.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode)
|
||||
return lhu.lkp.Create(vcursor, rowsColValues, values, ignoreMode)
|
||||
}
|
||||
|
||||
// Delete deletes the entry from the vindex table.
|
||||
|
|
Загрузка…
Ссылка в новой задаче