Merge pull request #6669 from planetscale/rn-vrepl-misc1

VRep: improve few error/log messages
This commit is contained in:
Sugu Sougoumarane 2020-09-03 22:45:39 -07:00 коммит произвёл GitHub
Родитель af8377d287 7c139e31be
Коммит c8e044f3a5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 15 добавлений и 4 удалений

Просмотреть файл

@ -320,7 +320,7 @@ func (uvs *uvstreamer) setStreamStartPosition() error {
return vterrors.Wrap(err, "could not decode position")
}
if !curPos.AtLeast(pos) {
return fmt.Errorf("requested position %v is ahead of current position %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos))
return fmt.Errorf("GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos))
}
uvs.pos = pos
return nil

Просмотреть файл

@ -1513,7 +1513,7 @@ func TestNoFutureGTID(t *testing.T) {
}()
defer close(ch)
err = vstream(ctx, t, future, nil, nil, ch)
want := "is ahead of current position"
want := "GTIDSet Mismatch"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("err: %v, must contain %s", err, want)
}

Просмотреть файл

@ -71,6 +71,9 @@ type vdiff struct {
// The source and target keyspaces are pulled from ts.
sources map[string]*shardStreamer
targets map[string]*shardStreamer
workflow string
targetKeyspace string
}
// tableDiffer performs a diff for one table in the workflow.
@ -153,6 +156,8 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflow, sourceC
tabletTypesStr: tabletTypesStr,
sources: make(map[string]*shardStreamer),
targets: make(map[string]*shardStreamer),
workflow: workflow,
targetKeyspace: targetKeyspace,
}
for shard, source := range ts.sources {
df.sources[shard] = &shardStreamer{
@ -545,7 +550,13 @@ func (df *vdiff) startQueryStreams(ctx context.Context, keyspace string, partici
defer cancel()
return df.forAll(participants, func(shard string, participant *shardStreamer) error {
// Iteration for each participant.
if participant.position.IsZero() {
return fmt.Errorf("workflow %s.%s: stream has not started on tablet %s", df.targetKeyspace, df.workflow, participant.master.Alias.String())
}
if err := df.ts.wr.tmc.WaitForPosition(waitCtx, participant.tablet, mysql.EncodePosition(participant.position)); err != nil {
if err.Error() == "context deadline exceeded" {
return fmt.Errorf("VDiff timed out for tablet %v: you may want to increase it with the flag -filtered_replication_wait_time=<timeoutSeconds>", topoproto.TabletAliasString(participant.tablet.Alias))
}
return vterrors.Wrapf(err, "WaitForPosition for tablet %v", topoproto.TabletAliasString(participant.tablet.Alias))
}
participant.result = make(chan *sqltypes.Result, 1)
@ -795,7 +806,7 @@ func (td *tableDiffer) diff(ctx context.Context, wr *Wrangler) (*DiffReport, err
if targetRow == nil {
// no more rows from the target
// we know we have rows from source, drain, update count
wr.Logger().Errorf("Draining extra row(s) found on the source starting with: %v", sourceRow)
wr.Logger().Warningf("Draining extra row(s) found on the source starting with: %v", sourceRow)
count, err := sourceExecutor.drain(ctx)
if err != nil {
return nil, err

Просмотреть файл

@ -891,7 +891,7 @@ func TestVDiffReplicationWait(t *testing.T) {
env.tablets[201].setResults("select c1, c2 from t1 order by c1 asc", vdiffTargetMasterPosition, target)
_, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 0*time.Second, "")
require.EqualError(t, err, "startQueryStreams(sources): WaitForPosition for tablet cell-0000000101: context deadline exceeded")
require.EqualError(t, err, "startQueryStreams(sources): VDiff timed out for tablet cell-0000000101: you may want to increase it with the flag -filtered_replication_wait_time=<timeoutSeconds>")
}
func TestVDiffFindPKs(t *testing.T) {