Flakes: Use waits instead of checks in vrepl e2e tests (#11048)

* Use waits instead of checks in vrepl e2e tests

This should alleviate some flakiness, with the TestMigrate
one having been the worst one.

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Replace (almost) all time.Sleep's

Added a new waitForTabletThrottlingStatus function

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Replace last sleeps with new waitForNoWorkflowLag function

And correct/improve the waitForTabletThrottlingStatus function

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Bug fixes

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Fix moar bugs

Signed-off-by: Matt Lord <mattalord@gmail.com>

* User NewTimer so that we can immediatley stop them

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Trying to track down failures...

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Ticker+Timer in same for+select loop is non-deterministic

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Always show last seen value and quote strings when it adds clarity

Signed-off-by: Matt Lord <mattalord@gmail.com>

* We must wait for the throttler status to be refreshed

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Spelling is herd

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Remove debug print

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Kick the CI ... again ...

Signed-off-by: Matt Lord <mattalord@gmail.com>

Signed-off-by: Matt Lord <mattalord@gmail.com>
This commit is contained in:
Matt Lord 2022-08-28 05:28:51 -04:00 коммит произвёл GitHub
Родитель 41bae65505
Коммит 7495817ca8
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 274 добавлений и 233 удалений

3
.gitignore поставляемый
Просмотреть файл

@ -4,6 +4,9 @@
# For mac users
.DS_Store
# For direnv users
.envrc
# Produced by yacc
*.output

Просмотреть файл

@ -33,6 +33,7 @@ import (
"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/schema"
@ -43,6 +44,12 @@ import (
"vitess.io/vitess/go/sqltypes"
)
const (
defaultTick = 1 * time.Second
defaultTimeout = 30 * time.Second
workflowStartTimeout = 5 * time.Second
)
func execMultipleQueries(t *testing.T, conn *mysql.Conn, database string, lines string) {
queries := strings.Split(lines, "\n")
for _, query := range queries {
@ -92,22 +99,74 @@ func checkHealth(t *testing.T, url string) bool {
return true
}
func waitForQueryToExecute(t *testing.T, conn *mysql.Conn, database string, query string, want string) {
done := false
ticker := time.NewTicker(10 * time.Millisecond)
func waitForQueryResult(t *testing.T, conn *mysql.Conn, database string, query string, want string) {
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
qr := execVtgateQuery(t, conn, database, query)
require.NotNil(t, qr)
if want == fmt.Sprintf("%v", qr.Rows) {
return
}
select {
case <-ticker.C:
if done {
return
}
qr := execVtgateQuery(t, conn, database, query)
require.NotNil(t, qr)
if want == fmt.Sprintf("%v", qr.Rows) {
done = true
}
case <-time.After(5 * time.Second):
require.FailNow(t, "query %s.%s did not execute in time", database, query)
case <-timer.C:
require.FailNow(t, fmt.Sprintf("query %q on database %q did not return the expected result of %v before the timeout of %s; last seen result: %v",
query, database, want, defaultTimeout, qr.Rows))
default:
time.Sleep(defaultTick)
}
}
}
// waitForTabletThrottlingStatus waits for the tablet to return the provided HTTP code for
// the provided app name in its self check.
func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess, appName string, wantCode int64) {
var gotCode int64
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
_, output, err := throttlerCheckSelf(tablet, appName)
require.NoError(t, err)
require.NotNil(t, output)
gotCode, err = jsonparser.GetInt([]byte(output), "StatusCode")
require.NoError(t, err)
if wantCode == gotCode {
// Wait for any cached check values to be cleared and the new
// status value to be in effect everywhere before returning.
time.Sleep(500 * time.Millisecond)
return
}
select {
case <-timer.C:
require.FailNow(t, fmt.Sprintf("tablet %q did not return expected status of %d for application %q before the timeout of %s; last seen status: %d",
tablet.Name, wantCode, appName, defaultTimeout, gotCode))
default:
time.Sleep(defaultTick)
}
}
}
// waitForNoWorkflowLag waits for the VReplication workflow's MaxVReplicationTransactionLag
// value to be 0.
func waitForNoWorkflowLag(t *testing.T, vc *VitessCluster, keyspace, worfklow string) {
ksWorkflow := fmt.Sprintf("%s.%s", keyspace, worfklow)
lag := int64(0)
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "--", ksWorkflow, "show")
require.NoError(t, err)
lag, err = jsonparser.GetInt([]byte(output), "MaxVReplicationTransactionLag")
require.NoError(t, err)
if lag == 0 {
return
}
select {
case <-timer.C:
require.FailNow(t, fmt.Sprintf("workflow %q did not eliminate VReplication lag before the timeout of %s; last seen MaxVReplicationTransactionLag: %d",
ksWorkflow, defaultTimeout, lag))
default:
time.Sleep(defaultTick)
}
}
}
@ -120,28 +179,51 @@ func verifyNoInternalTables(t *testing.T, conn *mysql.Conn, keyspaceShard string
require.NotNil(t, qr.Rows)
for _, row := range qr.Rows {
tableName := row[0].ToString()
assert.False(t, schema.IsInternalOperationTableName(tableName), "found internal table %s in shard %s", tableName, keyspaceShard)
assert.False(t, schema.IsInternalOperationTableName(tableName), "found internal table %q in shard %q", tableName, keyspaceShard)
}
}
func validateCount(t *testing.T, conn *mysql.Conn, database string, table string, want int) {
qr := execVtgateQuery(t, conn, database, fmt.Sprintf("select count(*) from %s", table))
require.NotNil(t, qr)
require.NotNil(t, qr.Rows)
require.Equal(t, fmt.Sprintf("[[INT64(%d)]]", want), fmt.Sprintf("%v", qr.Rows))
}
func validateQuery(t *testing.T, conn *mysql.Conn, database string, query string, want string) {
qr := execVtgateQuery(t, conn, database, query)
require.NotNil(t, qr)
require.Equal(t, want, fmt.Sprintf("%v", qr.Rows))
}
func validateCountInTablet(t *testing.T, vttablet *cluster.VttabletProcess, database string, table string, want int) {
func waitForRowCount(t *testing.T, conn *mysql.Conn, database string, table string, want int) {
query := fmt.Sprintf("select count(*) from %s", table)
qr, err := vttablet.QueryTablet(query, database, true)
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("[[INT64(%d)]]", want), fmt.Sprintf("%v", qr.Rows))
wantRes := fmt.Sprintf("[[INT64(%d)]]", want)
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
qr := execVtgateQuery(t, conn, database, query)
require.NotNil(t, qr)
if wantRes == fmt.Sprintf("%v", qr.Rows) {
return
}
select {
case <-timer.C:
require.FailNow(t, fmt.Sprintf("table %q did not reach the expected number of rows (%d) before the timeout of %s; last seen result: %v",
table, want, defaultTimeout, qr.Rows))
default:
time.Sleep(defaultTick)
}
}
}
func waitForRowCountInTablet(t *testing.T, vttablet *cluster.VttabletProcess, database string, table string, want int) {
query := fmt.Sprintf("select count(*) from %s", table)
wantRes := fmt.Sprintf("[[INT64(%d)]]", want)
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
qr, err := vttablet.QueryTablet(query, database, true)
require.NoError(t, err)
require.NotNil(t, qr)
if wantRes == fmt.Sprintf("%v", qr.Rows) {
return
}
select {
case <-timer.C:
require.FailNow(t, fmt.Sprintf("table %q did not reach the expected number of rows (%d) on tablet %q before the timeout of %s; last seen result: %v",
table, want, vttablet.Name, defaultTimeout, qr.Rows))
default:
time.Sleep(defaultTick)
}
}
}
func validateThatQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) bool {
@ -152,6 +234,45 @@ func validateThatQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *c
return newCount == count+1
}
func waitForWorkflowToStart(t *testing.T, vc *VitessCluster, ksWorkflow string) {
done := false
timer := time.NewTimer(workflowStartTimeout)
log.Infof("Waiting for workflow %s to start", ksWorkflow)
for {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
require.NoError(t, err)
done = true
state := ""
result := gjson.Get(output, "ShardStatuses")
result.ForEach(func(tabletId, tabletStreams gjson.Result) bool { // for each participating tablet
tabletStreams.ForEach(func(streamId, streamInfos gjson.Result) bool { // for each stream
if streamId.String() == "PrimaryReplicationStatuses" {
streamInfos.ForEach(func(attributeKey, attributeValue gjson.Result) bool { // for each attribute in the stream
state = attributeValue.Get("State").String()
if state != "Running" {
done = false // we need to wait for all streams to start
}
return true
})
}
return true
})
return true
})
if done {
log.Infof("Workflow %s has started", ksWorkflow)
return
}
select {
case <-timer.C:
require.FailNowf(t, "workflow %q did not fully start before the timeout of %s",
ksWorkflow, workflowStartTimeout)
default:
time.Sleep(defaultTick)
}
}
}
func getHTTPBody(url string) string {
resp, err := http.Get(url)
if err != nil {
@ -296,7 +417,7 @@ func checkIfDenyListExists(t *testing.T, vc *VitessCluster, ksShard string, tabl
func expectNumberOfStreams(t *testing.T, vtgateConn *mysql.Conn, name string, workflow string, database string, want int) {
query := fmt.Sprintf("select count(*) from _vt.vreplication where workflow='%s';", workflow)
validateQuery(t, vtgateConn, database, query, fmt.Sprintf(`[[INT64(%d)]]`, want))
waitForQueryResult(t, vtgateConn, database, query, fmt.Sprintf(`[[INT64(%d)]]`, want))
}
func printShardPositions(vc *VitessCluster, ksShards []string) {

Просмотреть файл

@ -92,8 +92,8 @@ func testShardedMaterialize(t *testing.T) {
tab := vc.getPrimaryTablet(t, ks2, "0")
catchup(t, tab, "wf1", "Materialize")
validateCount(t, vtgateConn, ks2, "tx", 2)
validateQuery(t, vtgateConn, "ks2:0", "select id, val from tx",
waitForRowCount(t, vtgateConn, ks2, "tx", 2)
waitForQueryResult(t, vtgateConn, "ks2:0", "select id, val from tx",
`[[INT64(3) VARBINARY("def")] [INT64(5) VARBINARY("def")]]`)
}
@ -214,17 +214,17 @@ func testMaterialize(t *testing.T) {
catchup(t, ks2Primary, "wf1", "Materialize")
// validate data after the copy phase
validateCount(t, vtgateConn, targetKs, "mat2", 2)
waitForRowCount(t, vtgateConn, targetKs, "mat2", 2)
want := `[[INT64(1) VARBINARY("abc") TIMESTAMP("2021-10-09 16:17:36") INT32(9) INT32(10) INT32(3)] [INT64(2) VARBINARY("def") TIMESTAMP("2021-11-10 16:17:36") INT32(10) INT32(11) INT32(6)]]`
validateQuery(t, vtgateConn, targetKs, "select id, val, ts, day, month, x from mat2", want)
waitForQueryResult(t, vtgateConn, targetKs, "select id, val, ts, day, month, x from mat2", want)
// insert data to test the replication phase
execVtgateQuery(t, vtgateConn, sourceKs, "insert into mat(id, val, ts) values (3, 'ghi', '2021-12-11 16:17:36')")
// validate data after the replication phase
waitForQueryToExecute(t, vtgateConn, targetKs, "select count(*) from mat2", "[[INT64(3)]]")
waitForQueryResult(t, vtgateConn, targetKs, "select count(*) from mat2", "[[INT64(3)]]")
want = `[[INT64(1) VARBINARY("abc") TIMESTAMP("2021-10-09 16:17:36") INT32(9) INT32(10) INT32(3)] [INT64(2) VARBINARY("def") TIMESTAMP("2021-11-10 16:17:36") INT32(10) INT32(11) INT32(6)] [INT64(3) VARBINARY("ghi") TIMESTAMP("2021-12-11 16:17:36") INT32(11) INT32(12) INT32(9)]]`
validateQuery(t, vtgateConn, targetKs, "select id, val, ts, day, month, x from mat2", want)
waitForQueryResult(t, vtgateConn, targetKs, "select id, val, ts, day, month, x from mat2", want)
}
// TestMaterialize runs all the individual materialize tests defined above

Просмотреть файл

@ -19,7 +19,6 @@ package vreplication
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
@ -117,15 +116,14 @@ func TestMigrate(t *testing.T) {
"--source=ext1.rating", "create", ksWorkflow); err != nil {
t.Fatalf("Migrate command failed with %+v : %s\n", err, output)
}
time.Sleep(1 * time.Second) // wait for migrate to run
waitForWorkflowToStart(t, vc, ksWorkflow)
expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1)
validateCount(t, vtgateConn, "product:0", "rating", 2)
validateCount(t, vtgateConn, "product:0", "review", 3)
waitForRowCount(t, vtgateConn, "product:0", "rating", 2)
waitForRowCount(t, vtgateConn, "product:0", "review", 3)
execVtgateQuery(t, extVtgateConn, "rating", "insert into review(rid, pid, review) values(4, 1, 'review4');")
execVtgateQuery(t, extVtgateConn, "rating", "insert into rating(gid, pid, rating) values(3, 1, 3);")
time.Sleep(1 * time.Second) // wait for stream to find row
validateCount(t, vtgateConn, "product:0", "rating", 3)
validateCount(t, vtgateConn, "product:0", "review", 4)
waitForRowCount(t, vtgateConn, "product:0", "rating", 3)
waitForRowCount(t, vtgateConn, "product:0", "review", 4)
vdiff1(t, ksWorkflow, "extcell1")
if output, err = vc.VtctlClient.ExecuteCommandWithOutput("Migrate", "complete", ksWorkflow); err != nil {
@ -142,8 +140,8 @@ func TestMigrate(t *testing.T) {
t.Fatalf("Migrate command failed with %+v : %s\n", err, output)
}
expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1)
validateCount(t, vtgateConn, "product:0", "rating", 0)
validateCount(t, vtgateConn, "product:0", "review", 0)
waitForRowCount(t, vtgateConn, "product:0", "rating", 0)
waitForRowCount(t, vtgateConn, "product:0", "review", 0)
if output, err = vc.VtctlClient.ExecuteCommandWithOutput("Migrate", "cancel", ksWorkflow); err != nil {
t.Fatalf("Migrate command failed with %+v : %s\n", err, output)
}

Просмотреть файл

@ -79,11 +79,11 @@ create table customer(cid int, name varbinary(128), meta json default null, typ
}
})
validateCount(t, vtgateConn, "stress_src:0", "largebin", insertCount)
waitForRowCount(t, vtgateConn, "stress_src:0", "largebin", insertCount)
t.Logf("creating new keysepace '%s'", targetKs)
vc.AddKeyspace(t, []*Cell{defaultCell}, targetKs, "0", initialStressVSchema, initialStressSchema, 0, 0, 200, nil)
validateCount(t, vtgateConn, "stress_tgt:0", "largebin", 0)
waitForRowCount(t, vtgateConn, "stress_tgt:0", "largebin", 0)
t.Logf("moving 'largebin' table...")
moveStart := time.Now()
@ -115,5 +115,5 @@ create table customer(cid int, name varbinary(128), meta json default null, typ
}
t.Logf("finished catching up after MoveTables (%v)", time.Since(moveStart))
validateCount(t, vtgateConn, "stress_tgt:0", "largebin", insertCount)
waitForRowCount(t, vtgateConn, "stress_tgt:0", "largebin", insertCount)
}

Просмотреть файл

@ -20,11 +20,9 @@ import (
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
@ -62,7 +60,7 @@ func createReshardWorkflow(t *testing.T, sourceShards, targetShards string) erro
err := tstWorkflowExec(t, defaultCellName, workflowName, targetKs, targetKs,
"", workflowActionCreate, "", sourceShards, targetShards)
require.NoError(t, err)
time.Sleep(1 * time.Second)
waitForWorkflowToStart(t, vc, ksWorkflow)
catchup(t, targetTab1, workflowName, "Reshard")
catchup(t, targetTab2, workflowName, "Reshard")
vdiff1(t, ksWorkflow, "")
@ -76,9 +74,9 @@ func createMoveTablesWorkflow(t *testing.T, tables string) error {
err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs,
tables, workflowActionCreate, "", "", "")
require.NoError(t, err)
waitForWorkflowToStart(t, vc, ksWorkflow)
catchup(t, targetTab1, workflowName, "MoveTables")
catchup(t, targetTab2, workflowName, "MoveTables")
time.Sleep(1 * time.Second)
vdiff1(t, ksWorkflow, "")
return nil
}
@ -256,47 +254,6 @@ func TestBasicV2Workflows(t *testing.T) {
log.Flush()
}
const workflowStartTimeout = 5 * time.Second
func waitForWorkflowToStart(t *testing.T, ksWorkflow string) {
done := false
ticker := time.NewTicker(100 * time.Millisecond)
timer := time.NewTimer(workflowStartTimeout)
log.Infof("Waiting for workflow %s to start", ksWorkflow)
for {
select {
case <-ticker.C:
if done {
log.Infof("Workflow %s has started", ksWorkflow)
return
}
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
require.NoError(t, err)
done = true
state := ""
result := gjson.Get(output, "ShardStatuses")
result.ForEach(func(tabletId, tabletStreams gjson.Result) bool { // for each participating tablet
tabletStreams.ForEach(func(streamId, streamInfos gjson.Result) bool { // for each stream
if streamId.String() == "PrimaryReplicationStatuses" {
streamInfos.ForEach(func(attributeKey, attributeValue gjson.Result) bool { // for each attribute in the stream
state = attributeValue.Get("State").String()
if state != "Running" {
done = false // we need to wait for all streams to start
}
return true
})
}
return true
})
return true
})
case <-timer.C:
require.FailNowf(t, "workflow %s not yet started", ksWorkflow)
}
}
}
/*
testVSchemaForSequenceAfterMoveTables checks that the related sequence tag is migrated correctly in the vschema
while moving a table with an auto-increment from sharded to unsharded.
@ -310,7 +267,7 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) {
"customer2", workflowActionCreate, "", "", "")
require.NoError(t, err)
waitForWorkflowToStart(t, "customer.wf2")
waitForWorkflowToStart(t, vc, "customer.wf2")
waitForLowLag(t, "customer", "wf2")
err = tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs,
@ -324,7 +281,7 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product")
require.NoError(t, err)
assert.NotContains(t, output, "customer2\"", "customer2 still found in keyspace product")
validateCount(t, vtgateConn, "customer", "customer2", 3)
waitForRowCount(t, vtgateConn, "customer", "customer2", 3)
// check that customer2 has the sequence tag
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "customer")
@ -336,15 +293,15 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) {
for i := 0; i < num; i++ {
execVtgateQuery(t, vtgateConn, "customer", "insert into customer2(name) values('a')")
}
validateCount(t, vtgateConn, "customer", "customer2", 3+num)
waitForRowCount(t, vtgateConn, "customer", "customer2", 3+num)
want := fmt.Sprintf("[[INT32(%d)]]", 100+num-1)
validateQuery(t, vtgateConn, "customer", "select max(cid) from customer2", want)
waitForQueryResult(t, vtgateConn, "customer", "select max(cid) from customer2", want)
// use MoveTables to move customer2 back to product. Note that now the table has an associated sequence
err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs,
"customer2", workflowActionCreate, "", "", "")
require.NoError(t, err)
waitForWorkflowToStart(t, "product.wf3")
waitForWorkflowToStart(t, vc, "product.wf3")
waitForLowLag(t, "product", "wf3")
err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs,
@ -368,9 +325,9 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) {
for i := 0; i < num; i++ {
execVtgateQuery(t, vtgateConn, "product", "insert into customer2(name) values('a')")
}
validateCount(t, vtgateConn, "product", "customer2", 3+num+num)
waitForRowCount(t, vtgateConn, "product", "customer2", 3+num+num)
want = fmt.Sprintf("[[INT32(%d)]]", 100+num+num-1)
validateQuery(t, vtgateConn, "product", "select max(cid) from customer2", want)
waitForQueryResult(t, vtgateConn, "product", "select max(cid) from customer2", want)
}
// testReplicatingWithPKEnumCols ensures that we properly apply binlog events
@ -387,11 +344,11 @@ func testReplicatingWithPKEnumCols(t *testing.T) {
// typ is an enum, with soho having a stored and binlogged value of 2
deleteQuery := "delete from customer where cid = 2 and typ = 'soho'"
insertQuery := "insert into customer(cid, name, typ, sport, meta) values(2, 'Paül','soho','cricket',convert(x'7b7d' using utf8mb4))"
execVtgateQuery(t, vtgateConn, "product", deleteQuery)
time.Sleep(2 * time.Second)
execVtgateQuery(t, vtgateConn, sourceKs, deleteQuery)
waitForNoWorkflowLag(t, vc, targetKs, workflowName)
vdiff1(t, ksWorkflow, "")
execVtgateQuery(t, vtgateConn, "product", insertQuery)
time.Sleep(2 * time.Second)
execVtgateQuery(t, vtgateConn, sourceKs, insertQuery)
waitForNoWorkflowLag(t, vc, targetKs, workflowName)
vdiff1(t, ksWorkflow, "")
}

Просмотреть файл

@ -62,15 +62,17 @@ var (
targetThrottlerAppName = "vreplication"
)
// for some tests we keep an open transaction during a SwitchWrites and commit it afterwards, to reproduce https://github.com/vitessio/vitess/issues/9400
// we also then delete the extra row (if) added so that the row counts for the future count comparisons stay the same
const (
// for some tests we keep an open transaction during a SwitchWrites and commit it afterwards, to reproduce https://github.com/vitessio/vitess/issues/9400
// we also then delete the extra row (if) added so that the row counts for the future count comparisons stay the same
openTxQuery = "insert into customer(cid, name, typ, sport, meta) values(4, 'openTxQuery',1,'football,baseball','{}');"
deleteOpenTxQuery = "delete from customer where name = 'openTxQuery'"
merchantKeyspace = "merchant-type"
maxWait = 60 * time.Second
BypassLagCheck = true // temporary fix for flakiness seen only in CI when lag check is introduced
merchantKeyspace = "merchant-type"
maxWait = 60 * time.Second
BypassLagCheck = true // temporary fix for flakiness seen only in CI when lag check is introduced
throttlerStatusThrottled = http.StatusExpectationFailed // 417
throttlerStatusNotThrottled = http.StatusOK // 200
)
func init() {
@ -430,9 +432,9 @@ func insertInitialData(t *testing.T) {
execVtgateQuery(t, vtgateConn, "product:0", "insert into customer_seq2(id, next_id, cache) values(0, 100, 100);")
log.Infof("Done inserting initial data")
validateCount(t, vtgateConn, "product:0", "product", 2)
validateCount(t, vtgateConn, "product:0", "customer", 3)
validateQuery(t, vtgateConn, "product:0", "select * from merchant",
waitForRowCount(t, vtgateConn, "product:0", "product", 2)
waitForRowCount(t, vtgateConn, "product:0", "customer", 3)
waitForQueryResult(t, vtgateConn, "product:0", "select * from merchant",
`[[VARCHAR("Monoprice") VARCHAR("eléctronics")] [VARCHAR("newegg") VARCHAR("elec†ronics")]]`)
})
}
@ -606,15 +608,15 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery2, matchInsertQuery2))
execVtgateQuery(t, vtgateConn, "customer", "delete from customer where name like 'tempCustomer%'")
validateCountInTablet(t, customerTab1, "customer", "customer", 1)
validateCountInTablet(t, customerTab2, "customer", "customer", 2)
validateCount(t, vtgateConn, "customer", "customer.customer", 3)
waitForRowCountInTablet(t, customerTab1, "customer", "customer", 1)
waitForRowCountInTablet(t, customerTab2, "customer", "customer", 2)
waitForRowCount(t, vtgateConn, "customer", "customer.customer", 3)
query = "insert into customer (name, cid) values('george', 5)"
execVtgateQuery(t, vtgateConn, "customer", query)
validateCountInTablet(t, customerTab1, "customer", "customer", 1)
validateCountInTablet(t, customerTab2, "customer", "customer", 3)
validateCount(t, vtgateConn, "customer", "customer.customer", 4)
waitForRowCountInTablet(t, customerTab1, "customer", "customer", 1)
waitForRowCountInTablet(t, customerTab2, "customer", "customer", 3)
waitForRowCount(t, vtgateConn, "customer", "customer.customer", 4)
}
})
}
@ -622,43 +624,42 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
func validateRollupReplicates(t *testing.T) {
t.Run("validateRollupReplicates", func(t *testing.T) {
insertMoreProducts(t)
time.Sleep(1 * time.Second)
validateCount(t, vtgateConn, "product", "rollup", 1)
validateQuery(t, vtgateConn, "product:0", "select rollupname, kount from rollup",
waitForRowCount(t, vtgateConn, "product", "rollup", 1)
waitForQueryResult(t, vtgateConn, "product:0", "select rollupname, kount from rollup",
`[[VARCHAR("total") INT32(5)]]`)
})
}
func verifySourceTabletThrottling(t *testing.T, targetKS, workflow string) {
tDuration := time.Duration(15 * time.Second)
ticker := time.NewTicker(5 * time.Second)
timer := time.NewTimer(tDuration)
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
ksWorkflow := fmt.Sprintf("%s.%s", targetKS, workflow)
for {
select {
case <-ticker.C:
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
require.NoError(t, err)
result := gjson.Get(output, "ShardStatuses")
result.ForEach(func(tabletId, tabletStreams gjson.Result) bool { // for each source tablet
tabletStreams.ForEach(func(streamId, streamInfos gjson.Result) bool { // for each stream
if streamId.String() == "PrimaryReplicationStatuses" {
streamInfos.ForEach(func(attributeKey, attributeValue gjson.Result) bool { // for each attribute in the stream
state := attributeValue.Get("State").String()
if state != "Copying" {
require.FailNowf(t, "Unexpected running workflow stream",
"Initial copy phase for the MoveTables workflow %s started in less than %d seconds when it should have been waiting. Show output: %s",
ksWorkflow, int(tDuration.Seconds()), output)
}
return true // end attribute loop
})
}
return true // end stream loop
})
return true // end tablet loop
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
require.NoError(t, err)
result := gjson.Get(output, "ShardStatuses")
result.ForEach(func(tabletId, tabletStreams gjson.Result) bool { // for each source tablet
tabletStreams.ForEach(func(streamId, streamInfos gjson.Result) bool { // for each stream
if streamId.String() == "PrimaryReplicationStatuses" {
streamInfos.ForEach(func(attributeKey, attributeValue gjson.Result) bool { // for each attribute in the stream
state := attributeValue.Get("State").String()
if state != "Copying" {
require.FailNowf(t, "Unexpected running workflow stream",
"Initial copy phase for the MoveTables workflow %s started in less than %s when it should have been waiting. Show output: %s",
ksWorkflow, defaultTimeout, output)
}
return true // end attribute loop
})
}
return true // end stream loop
})
return true // end tablet loop
})
select {
case <-timer.C:
return
default:
time.Sleep(defaultTick)
}
}
}
@ -668,10 +669,10 @@ func reshardCustomer2to4Split(t *testing.T, cells []*Cell, sourceCellOrAlias str
ksName := "customer"
counts := map[string]int{"zone1-600": 4, "zone1-700": 5, "zone1-800": 6, "zone1-900": 5}
reshard(t, ksName, "customer", "c2c4", "-80,80-", "-40,40-80,80-c0,c0-", 600, counts, nil, cells, sourceCellOrAlias)
validateCount(t, vtgateConn, ksName, "customer", 20)
waitForRowCount(t, vtgateConn, ksName, "customer", 20)
query := "insert into customer (name) values('yoko')"
execVtgateQuery(t, vtgateConn, ksName, query)
validateCount(t, vtgateConn, ksName, "customer", 21)
waitForRowCount(t, vtgateConn, ksName, "customer", 21)
})
}
@ -680,10 +681,10 @@ func reshardMerchant2to3SplitMerge(t *testing.T) {
ksName := merchantKeyspace
counts := map[string]int{"zone1-1600": 0, "zone1-1700": 2, "zone1-1800": 0}
reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-", 1600, counts, dryRunResultsSwitchWritesM2m3, nil, "")
validateCount(t, vtgateConn, ksName, "merchant", 2)
waitForRowCount(t, vtgateConn, ksName, "merchant", 2)
query := "insert into merchant (mname, category) values('amazon', 'electronics')"
execVtgateQuery(t, vtgateConn, ksName, query)
validateCount(t, vtgateConn, ksName, "merchant", 3)
waitForRowCount(t, vtgateConn, ksName, "merchant", 3)
var output string
var err error
@ -726,10 +727,10 @@ func reshardMerchant3to1Merge(t *testing.T) {
ksName := merchantKeyspace
counts := map[string]int{"zone1-2000": 3}
reshard(t, ksName, "merchant", "m3m1", "-40,40-c0,c0-", "0", 2000, counts, nil, nil, "")
validateCount(t, vtgateConn, ksName, "merchant", 3)
waitForRowCount(t, vtgateConn, ksName, "merchant", 3)
query := "insert into merchant (mname, category) values('flipkart', 'electronics')"
execVtgateQuery(t, vtgateConn, ksName, query)
validateCount(t, vtgateConn, ksName, "merchant", 4)
waitForRowCount(t, vtgateConn, ksName, "merchant", 4)
})
}
@ -792,7 +793,7 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
if tablets[tabletName] == nil {
continue
}
validateCountInTablet(t, tablets[tabletName], ksName, tableName, count)
waitForRowCountInTablet(t, tablets[tabletName], ksName, tableName, count)
}
})
}
@ -817,9 +818,9 @@ func shardOrders(t *testing.T) {
switchReads(t, allCellNames, ksWorkflow)
switchWrites(t, ksWorkflow, false)
dropSources(t, ksWorkflow)
validateCountInTablet(t, customerTab1, "customer", "orders", 1)
validateCountInTablet(t, customerTab2, "customer", "orders", 2)
validateCount(t, vtgateConn, "customer", "orders", 3)
waitForRowCountInTablet(t, customerTab1, "customer", "orders", 1)
waitForRowCountInTablet(t, customerTab2, "customer", "orders", 2)
waitForRowCount(t, vtgateConn, "customer", "orders", 3)
})
}
@ -860,9 +861,9 @@ func shardMerchant(t *testing.T) {
}
dropSources(t, ksWorkflow)
validateCountInTablet(t, merchantTab1, merchantKeyspace, "merchant", 1)
validateCountInTablet(t, merchantTab2, merchantKeyspace, "merchant", 1)
validateCount(t, vtgateConn, merchantKeyspace, "merchant", 2)
waitForRowCountInTablet(t, merchantTab1, merchantKeyspace, "merchant", 1)
waitForRowCountInTablet(t, merchantTab2, merchantKeyspace, "merchant", 1)
waitForRowCount(t, vtgateConn, merchantKeyspace, "merchant", 2)
})
}
@ -881,13 +882,9 @@ func materializeProduct(t *testing.T) {
applyVSchema(t, materializeProductVSchema, keyspace)
materialize(t, materializeProductSpec)
customerTablets := vc.getVttabletsInKeyspace(t, defaultCell, keyspace, "primary")
{
for _, tab := range customerTablets {
catchup(t, tab, workflow, "Materialize")
}
for _, tab := range customerTablets {
validateCountInTablet(t, tab, keyspace, workflow, 5)
}
for _, tab := range customerTablets {
catchup(t, tab, workflow, "Materialize")
waitForRowCountInTablet(t, tab, keyspace, workflow, 5)
}
productTablets := vc.getVttabletsInKeyspace(t, defaultCell, "product", "primary")
@ -897,27 +894,16 @@ func materializeProduct(t *testing.T) {
_, body, err := throttleApp(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, sourceThrottlerAppName)
}
// Wait for throttling to take effect (caching will expire by this time):
time.Sleep(1 * time.Second)
for _, tab := range productTablets {
{
_, body, err := throttlerCheckSelf(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "417")
}
{
_, body, err := throttlerCheckSelf(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "200")
}
// Wait for throttling to take effect (caching will expire by this time):
waitForTabletThrottlingStatus(t, tab, sourceThrottlerAppName, throttlerStatusThrottled)
waitForTabletThrottlingStatus(t, tab, targetThrottlerAppName, throttlerStatusNotThrottled)
}
insertMoreProductsForSourceThrottler(t)
// To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place,
time.Sleep(1 * time.Second)
// we expect the additional rows to **not appear** in the materialized view
for _, tab := range customerTablets {
validateCountInTablet(t, tab, keyspace, workflow, 5)
waitForRowCountInTablet(t, tab, keyspace, workflow, 5)
}
})
t.Run("unthrottle-app-product", func(t *testing.T) {
@ -926,68 +912,44 @@ func materializeProduct(t *testing.T) {
_, body, err := unthrottleApp(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, sourceThrottlerAppName)
}
// give time for unthrottling to take effect and for target to fetch data
time.Sleep(3 * time.Second)
for _, tab := range productTablets {
{
_, body, err := throttlerCheckSelf(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "200")
}
// give time for unthrottling to take effect and for target to fetch data
waitForTabletThrottlingStatus(t, tab, sourceThrottlerAppName, throttlerStatusNotThrottled)
}
for _, tab := range customerTablets {
validateCountInTablet(t, tab, keyspace, workflow, 8)
waitForRowCountInTablet(t, tab, keyspace, workflow, 8)
}
})
t.Run("throttle-app-customer", func(t *testing.T) {
// Now, throttle the streamer on source tablets, insert some rows
// Now, throttle vreplication (vcopier/vapplier) on target tablets, and
// insert some more rows.
for _, tab := range customerTablets {
_, body, err := throttleApp(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, targetThrottlerAppName)
}
// Wait for throttling to take effect (caching will expire by this time):
time.Sleep(1 * time.Second)
for _, tab := range customerTablets {
{
_, body, err := throttlerCheckSelf(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "417")
}
{
_, body, err := throttlerCheckSelf(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "200")
}
// Wait for throttling to take effect (caching will expire by this time):
waitForTabletThrottlingStatus(t, tab, targetThrottlerAppName, throttlerStatusThrottled)
waitForTabletThrottlingStatus(t, tab, sourceThrottlerAppName, throttlerStatusNotThrottled)
}
insertMoreProductsForTargetThrottler(t)
// To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place,
time.Sleep(1 * time.Second)
// we expect the additional rows to **not appear** in the materialized view
// To be fair to the test, we give the target time to apply the new changes.
// We expect it to NOT get them in the first place, we expect the additional
// rows to **not appear** in the materialized view.
for _, tab := range customerTablets {
validateCountInTablet(t, tab, keyspace, workflow, 8)
waitForRowCountInTablet(t, tab, keyspace, workflow, 8)
}
})
t.Run("unthrottle-app-customer", func(t *testing.T) {
// unthrottle on source tablets, and expect the rows to show up
// unthrottle on target tablets, and expect the rows to show up
for _, tab := range customerTablets {
_, body, err := unthrottleApp(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, targetThrottlerAppName)
}
// give time for unthrottling to take effect and for target to fetch data
time.Sleep(3 * time.Second)
for _, tab := range customerTablets {
{
_, body, err := throttlerCheckSelf(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "200")
}
}
for _, tab := range customerTablets {
validateCountInTablet(t, tab, keyspace, workflow, 11)
waitForTabletThrottlingStatus(t, tab, targetThrottlerAppName, throttlerStatusNotThrottled)
waitForRowCountInTablet(t, tab, keyspace, workflow, 11)
}
})
})
@ -1001,8 +963,8 @@ func materializeRollup(t *testing.T) {
productTab := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet
materialize(t, materializeRollupSpec)
catchup(t, productTab, workflow, "Materialize")
validateCount(t, vtgateConn, "product", "rollup", 1)
validateQuery(t, vtgateConn, "product:0", "select rollupname, kount from rollup",
waitForRowCount(t, vtgateConn, "product", "rollup", 1)
waitForQueryResult(t, vtgateConn, "product:0", "select rollupname, kount from rollup",
`[[VARCHAR("total") INT32(2)]]`)
})
}
@ -1014,8 +976,8 @@ func materializeSales(t *testing.T) {
materialize(t, materializeSalesSpec)
productTab := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet
catchup(t, productTab, "sales", "Materialize")
validateCount(t, vtgateConn, "product", "sales", 2)
validateQuery(t, vtgateConn, "product:0", "select kount, amount from sales",
waitForRowCount(t, vtgateConn, "product", "sales", 2)
waitForQueryResult(t, vtgateConn, "product:0", "select kount, amount from sales",
`[[INT32(1) INT32(10)] [INT32(2) INT32(35)]]`)
})
}
@ -1028,9 +990,9 @@ func materializeMerchantSales(t *testing.T) {
for _, tab := range merchantTablets {
catchup(t, tab, workflow, "Materialize")
}
validateCountInTablet(t, merchantTablets["zone1-400"], merchantKeyspace, "msales", 1)
validateCountInTablet(t, merchantTablets["zone1-500"], merchantKeyspace, "msales", 1)
validateCount(t, vtgateConn, merchantKeyspace, "msales", 2)
waitForRowCountInTablet(t, merchantTablets["zone1-400"], merchantKeyspace, "msales", 1)
waitForRowCountInTablet(t, merchantTablets["zone1-500"], merchantKeyspace, "msales", 1)
waitForRowCount(t, vtgateConn, merchantKeyspace, "msales", 2)
})
}
@ -1044,9 +1006,9 @@ func materializeMerchantOrders(t *testing.T) {
for _, tab := range merchantTablets {
catchup(t, tab, workflow, "Materialize")
}
validateCountInTablet(t, merchantTablets["zone1-400"], merchantKeyspace, "morders", 2)
validateCountInTablet(t, merchantTablets["zone1-500"], merchantKeyspace, "morders", 1)
validateCount(t, vtgateConn, merchantKeyspace, "morders", 3)
waitForRowCountInTablet(t, merchantTablets["zone1-400"], merchantKeyspace, "morders", 2)
waitForRowCountInTablet(t, merchantTablets["zone1-500"], merchantKeyspace, "morders", 1)
waitForRowCount(t, vtgateConn, merchantKeyspace, "morders", 3)
})
}