Merge pull request #5642 from planetscale/ss-deflake-vstreamer3

vrepl: yet another vstreamer deflake
This commit is contained in:
Morgan Tocker 2020-01-02 08:23:52 -07:00 коммит произвёл GitHub
Родитель d0bca3cea9 6614afe358
Коммит e25368f64e
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 12 добавлений и 2 удалений

Просмотреть файл

@ -1083,11 +1083,17 @@ func TestHeartbeat(t *testing.T) {
assert.Equal(t, binlogdatapb.VEventType_HEARTBEAT, evs[0].Type)
}
func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, postion string) {
func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := startStream(ctx, t, filter, postion)
ch := startStream(ctx, t, filter, position)
// If position is 'current', we wait for a heartbeat to be
// sure the vstreamer has started.
if position == "current" {
<-ch
}
for _, tcase := range testcases {
switch input := tcase.input.(type) {
@ -1108,6 +1114,8 @@ func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, p
func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan []*binlogdatapb.VEvent, output [][]string) {
t.Helper()
timer := time.NewTimer(1 * time.Minute)
defer timer.Stop()
for _, wantset := range output {
var evs []*binlogdatapb.VEvent
for {
@ -1125,6 +1133,8 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [
}
case <-ctx.Done():
t.Fatal("stream ended early")
case <-timer.C:
t.Fatalf("timed out waiting for events: %v", wantset)
}
if len(evs) != 0 {
break