Merge pull request #8385 from planetscale/rn-scope-reverse-vrep-vindex

VReplication Reverse Workflows: add keyspace scope to vindex while creating reverse vreplication streams
This commit is contained in:
Deepthi Sigireddi 2021-06-28 16:31:21 -07:00 коммит произвёл GitHub
Родитель 06ec6ba114 4a89e94d34
Коммит 7ae102cef0
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 10 добавлений и 7 удалений

Просмотреть файл

@ -42,7 +42,7 @@ create table tenant(tenant_id binary(16), name varbinary(16), primary key (tenan
"reverse_bits": {
"type": "reverse_bits"
},
"binary_md5": {
"bmd5": {
"type": "binary_md5"
}
},
@ -75,7 +75,7 @@ create table tenant(tenant_id binary(16), name varbinary(16), primary key (tenan
"column_vindexes": [
{
"column": "tenant_id",
"name": "binary_md5"
"name": "bmd5"
}
]
}

Просмотреть файл

@ -25,9 +25,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
@ -299,6 +298,11 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
printShardPositions(vc, ksShards)
switchWrites(t, reverseKsWorkflow, false)
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
require.NoError(t, err)
require.Contains(t, output, "'customer.reverse_bits'")
require.Contains(t, output, "'customer.bmd5'")
insertQuery1 = "insert into customer(cid, name) values(1002, 'tempCustomer5')"
require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", insertQuery1, matchInsertQuery1))
// both inserts go into 80-, this tests the edge-case where a stream (-80) has no relevant new events after the previous switch
@ -314,7 +318,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
dropSourcesDryRun(t, ksWorkflow, true, dryRunResultsDropSourcesRenameCustomerShard)
var exists bool
exists, err := checkIfBlacklistExists(t, vc, "product:0", "customer")
exists, err = checkIfBlacklistExists(t, vc, "product:0", "customer")
require.NoError(t, err, "Error getting blacklist for customer:0")
require.True(t, exists)
dropSources(t, ksWorkflow)

Просмотреть файл

@ -2737,7 +2737,6 @@ func expectJSON(t *testing.T, table string, values [][]string, id int, exec func
want, err := ajson.Unmarshal([]byte(row[1]))
require.NoError(t, err)
match, err := got.Eq(want)
//log.Infof(">>>>>>>> got \n-----\n%s\n------\n, want \n-----\n%s\n------\n", got, want)
require.NoError(t, err)
require.True(t, match)
}

Просмотреть файл

@ -1128,7 +1128,7 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error
}
// TODO(sougou): handle degenerate cases like sequence, etc.
// We currently assume the primary vindex is the best way to filter, which may not be true.
inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s', '%s')", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]), vtable.ColumnVindexes[0].Type, key.KeyRangeString(source.GetShard().KeyRange))
inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s.%s', '%s')", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]), ts.sourceKeyspace, vtable.ColumnVindexes[0].Name, key.KeyRangeString(source.GetShard().KeyRange))
}
filter = fmt.Sprintf("select * from %s%s", rule.Match, inKeyrange)
}