From 52f8a0123b158a51e0a9a9883cb674fb9d703fed Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 31 Aug 2020 17:03:42 +0200 Subject: [PATCH 1/4] Check for zero position and return a better error message Signed-off-by: Rohit Nayak --- go/vt/wrangler/vdiff.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index d3f8063f03..2ff98ad0ae 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -71,6 +71,9 @@ type vdiff struct { // The source and target keyspaces are pulled from ts. sources map[string]*shardStreamer targets map[string]*shardStreamer + + workflow string + targetKeyspace string } // tableDiffer performs a diff for one table in the workflow. @@ -153,6 +156,8 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflow, sourceC tabletTypesStr: tabletTypesStr, sources: make(map[string]*shardStreamer), targets: make(map[string]*shardStreamer), + workflow: workflow, + targetKeyspace: targetKeyspace, } for shard, source := range ts.sources { df.sources[shard] = &shardStreamer{ @@ -545,6 +550,9 @@ func (df *vdiff) startQueryStreams(ctx context.Context, keyspace string, partici defer cancel() return df.forAll(participants, func(shard string, participant *shardStreamer) error { // Iteration for each participant. + if participant.position.IsZero() { + return fmt.Errorf("workflow %s.%s: stream has not started on tablet %s", df.targetKeyspace, df.workflow, participant.master.Alias.String()) + } if err := df.ts.wr.tmc.WaitForPosition(waitCtx, participant.tablet, mysql.EncodePosition(participant.position)); err != nil { return vterrors.Wrapf(err, "WaitForPosition for tablet %v", topoproto.TabletAliasString(participant.tablet.Alias)) } From f1298ccf4d38987a8ea69c3e37ff5a72b5c31562 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 1 Sep 2020 21:02:00 +0200 Subject: [PATCH 2/4] Better error message for gtid set mismatch b/w source and target Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go | 2 +- go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 21e4cec9ce..6ed3db5bf2 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -320,7 +320,7 @@ func (uvs *uvstreamer) setStreamStartPosition() error { return vterrors.Wrap(err, "could not decode position") } if !curPos.AtLeast(pos) { - return fmt.Errorf("requested position %v is ahead of current position %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos)) + return fmt.Errorf("GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos)) } uvs.pos = pos return nil diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 4e85193473..d10e757a7c 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -1513,7 +1513,7 @@ func TestNoFutureGTID(t *testing.T) { }() defer close(ch) err = vstream(ctx, t, future, nil, nil, ch) - want := "is ahead of current position" + want := "GTIDSet Mismatch" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("err: %v, must contain %s", err, want) } From 1bfeba96d122fdc745e72b7df72fd5bd90554246 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 1 Sep 2020 21:32:44 +0200 Subject: [PATCH 3/4] Better error message when VDiff times out Signed-off-by: Rohit Nayak --- go/vt/wrangler/vdiff.go | 3 +++ go/vt/wrangler/vdiff_test.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 2ff98ad0ae..ad12d1785f 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -554,6 +554,9 @@ func (df *vdiff) startQueryStreams(ctx context.Context, keyspace string, partici return fmt.Errorf("workflow %s.%s: stream has not started on tablet %s", df.targetKeyspace, df.workflow, participant.master.Alias.String()) } if err := df.ts.wr.tmc.WaitForPosition(waitCtx, participant.tablet, mysql.EncodePosition(participant.position)); err != nil { + if err.Error() == "context deadline exceeded" { + return fmt.Errorf("VDiff timed out for tablet %v: you may want to increase it with the flag -filtered_replication_wait_time=", topoproto.TabletAliasString(participant.tablet.Alias)) + } return vterrors.Wrapf(err, "WaitForPosition for tablet %v", topoproto.TabletAliasString(participant.tablet.Alias)) } participant.result = make(chan *sqltypes.Result, 1) diff --git a/go/vt/wrangler/vdiff_test.go b/go/vt/wrangler/vdiff_test.go index b3a0b65c49..dcb8d5d3d1 100644 --- a/go/vt/wrangler/vdiff_test.go +++ b/go/vt/wrangler/vdiff_test.go @@ -891,7 +891,7 @@ func TestVDiffReplicationWait(t *testing.T) { env.tablets[201].setResults("select c1, c2 from t1 order by c1 asc", vdiffTargetMasterPosition, target) _, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 0*time.Second, "") - require.EqualError(t, err, "startQueryStreams(sources): WaitForPosition for tablet cell-0000000101: context deadline exceeded") + require.EqualError(t, err, "startQueryStreams(sources): VDiff timed out for tablet cell-0000000101: you may want to increase it with the flag -filtered_replication_wait_time=") } func TestVDiffFindPKs(t *testing.T) { From 7c139e31bece0b492943ff3651eaa2c754308274 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 2 Sep 2020 23:06:31 +0200 Subject: [PATCH 4/4] Change vdiff log message level of missing target rows to Warning since it is confusing to users (and not useful) to see the message on the console when running vdiff during a workflow Signed-off-by: Rohit Nayak --- go/vt/wrangler/vdiff.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index ad12d1785f..800efef28e 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -806,7 +806,7 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler) (*DiffReport, err if targetRow == nil { // no more rows from the target // we know we have rows from source, drain, update count - wr.Logger().Errorf("Draining extra row(s) found on the source starting with: %v", sourceRow) + wr.Logger().Warningf("Draining extra row(s) found on the source starting with: %v", sourceRow) count, err := sourceExecutor.drain(ctx) if err != nil { return nil, err