From d37fd4fa068881fe802090e4e0959964d5bd8789 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sun, 6 Oct 2019 21:04:07 -0700 Subject: [PATCH] vdiff: streamFromSources Signed-off-by: Sugu Sougoumarane --- go/vt/wrangler/vdiff.go | 86 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 81 insertions(+), 5 deletions(-) diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 44d8f965e9..d0a24e6b61 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -18,6 +18,7 @@ package wrangler import ( "fmt" + "io" "strings" "sync" "time" @@ -28,6 +29,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/key" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" @@ -37,6 +39,7 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" + "vitess.io/vitess/go/vt/vttablet/tabletconn" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" ) @@ -62,9 +65,12 @@ type tableDiffer struct { } type dfParams struct { - master *topo.TabletInfo - tablet *topodatapb.Tablet - position mysql.Position + master *topo.TabletInfo + tablet *topodatapb.Tablet + position mysql.Position + snapshotPosition string + result chan *sqltypes.Result + err error } // VDiff reports differences between the sources and targets of a vreplication workflow. @@ -299,7 +305,7 @@ func (df *vdiff) selectTablets(ctx context.Context) error { if err != nil { return err } - df.sources[shard].tablet = tablet + source.tablet = tablet return nil }) }() @@ -318,7 +324,7 @@ func (df *vdiff) selectTablets(ctx context.Context) error { if err != nil { return err } - df.targets[shard].tablet = tablet + target.tablet = tablet return nil }) }() @@ -330,6 +336,76 @@ func (df *vdiff) selectTablets(ctx context.Context) error { return err2 } +func (df *vdiff) streamFromSources(ctx context.Context, td *tableDiffer) error { + err := df.forAll(df.sources, func(shard string, source *dfParams) error { + // Iteration for each source. + if err := df.wr.tmc.WaitForPosition(ctx, source.tablet, mysql.EncodePosition(source.position)); err != nil { + return err + } + source.result = make(chan *sqltypes.Result, 1) + gtidch := make(chan string, 1) + + // Start the stream in a separate goroutine. + go df.streamOneSource(ctx, shard, source, td, gtidch) + + // Wait for the gtid to be sent. If it's not received, there was an error + // which would be stored in source.err. + gtid, ok := <-gtidch + if !ok { + return source.err + } + // Save the new position, as of when the query executed. + source.snapshotPosition = gtid + return nil + }) + return err +} + +// streamOneSource is called as a goroutine, and communicates its results through channels. +// It first sends the snapshot gtid to gtidch. +// Then it streams results to source.result. +// Before returning, it sets source.err, and closes all channels. +// If any channel is closed, then source.err can be checked if there was an error. +func (df *vdiff) streamOneSource(ctx context.Context, shard string, source *dfParams, td *tableDiffer, gtidch chan string) { + defer close(source.result) + defer close(gtidch) + + // Wrap the streaming in a separate function so we can capture the error. + // This shows that the error will be set before the channels are closed. + source.err = func() error { + conn, err := tabletconn.GetDialer()(source.tablet, grpcclient.FailFast(false)) + if err != nil { + return err + } + defer conn.Close(ctx) + + target := &querypb.Target{ + Keyspace: df.sourceKeyspace, + Shard: shard, + TabletType: source.tablet.Type, + } + var fields []*querypb.Field + err = conn.VStreamResults(ctx, target, td.sourceExpression, func(vrs *binlogdatapb.VStreamResultsResponse) error { + if vrs.Fields != nil { + fields = vrs.Fields + gtidch <- vrs.Gtid + } + p3qr := &querypb.QueryResult{ + Fields: fields, + Rows: vrs.Rows, + } + result := sqltypes.Proto3ToResult(p3qr) + select { + case source.result <- result: + case <-ctx.Done(): + return io.EOF + } + return nil + }) + return err + }() +} + func (df *vdiff) forAll(participants map[string]*dfParams, f func(string, *dfParams) error) error { var wg sync.WaitGroup allErrors := &concurrency.AllErrorRecorder{}