diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index fa11342189..1d5d382c66 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -1083,11 +1083,17 @@ func TestHeartbeat(t *testing.T) { assert.Equal(t, binlogdatapb.VEventType_HEARTBEAT, evs[0].Type) } -func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, postion string) { +func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string) { t.Helper() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ch := startStream(ctx, t, filter, postion) + ch := startStream(ctx, t, filter, position) + + // If position is 'current', we wait for a heartbeat to be + // sure the vstreamer has started. + if position == "current" { + <-ch + } for _, tcase := range testcases { switch input := tcase.input.(type) { @@ -1108,6 +1114,8 @@ func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, p func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan []*binlogdatapb.VEvent, output [][]string) { t.Helper() + timer := time.NewTimer(1 * time.Minute) + defer timer.Stop() for _, wantset := range output { var evs []*binlogdatapb.VEvent for { @@ -1125,6 +1133,8 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [ } case <-ctx.Done(): t.Fatal("stream ended early") + case <-timer.C: + t.Fatalf("timed out waiting for events: %v", wantset) } if len(evs) != 0 { break