зеркало из https://github.com/github/vitess-gh.git
vdiff: streamFromSources
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Родитель
ea2a5c0800
Коммит
d37fd4fa06
|
@ -18,6 +18,7 @@ package wrangler
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -28,6 +29,7 @@ import (
|
|||
"vitess.io/vitess/go/sqltypes"
|
||||
"vitess.io/vitess/go/vt/concurrency"
|
||||
"vitess.io/vitess/go/vt/discovery"
|
||||
"vitess.io/vitess/go/vt/grpcclient"
|
||||
"vitess.io/vitess/go/vt/key"
|
||||
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
|
||||
querypb "vitess.io/vitess/go/vt/proto/query"
|
||||
|
@ -37,6 +39,7 @@ import (
|
|||
"vitess.io/vitess/go/vt/topo"
|
||||
"vitess.io/vitess/go/vt/vterrors"
|
||||
"vitess.io/vitess/go/vt/vtgate/engine"
|
||||
"vitess.io/vitess/go/vt/vttablet/tabletconn"
|
||||
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
|
||||
)
|
||||
|
||||
|
@ -62,9 +65,12 @@ type tableDiffer struct {
|
|||
}
|
||||
|
||||
type dfParams struct {
|
||||
master *topo.TabletInfo
|
||||
tablet *topodatapb.Tablet
|
||||
position mysql.Position
|
||||
master *topo.TabletInfo
|
||||
tablet *topodatapb.Tablet
|
||||
position mysql.Position
|
||||
snapshotPosition string
|
||||
result chan *sqltypes.Result
|
||||
err error
|
||||
}
|
||||
|
||||
// VDiff reports differences between the sources and targets of a vreplication workflow.
|
||||
|
@ -299,7 +305,7 @@ func (df *vdiff) selectTablets(ctx context.Context) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
df.sources[shard].tablet = tablet
|
||||
source.tablet = tablet
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
|
@ -318,7 +324,7 @@ func (df *vdiff) selectTablets(ctx context.Context) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
df.targets[shard].tablet = tablet
|
||||
target.tablet = tablet
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
|
@ -330,6 +336,76 @@ func (df *vdiff) selectTablets(ctx context.Context) error {
|
|||
return err2
|
||||
}
|
||||
|
||||
func (df *vdiff) streamFromSources(ctx context.Context, td *tableDiffer) error {
|
||||
err := df.forAll(df.sources, func(shard string, source *dfParams) error {
|
||||
// Iteration for each source.
|
||||
if err := df.wr.tmc.WaitForPosition(ctx, source.tablet, mysql.EncodePosition(source.position)); err != nil {
|
||||
return err
|
||||
}
|
||||
source.result = make(chan *sqltypes.Result, 1)
|
||||
gtidch := make(chan string, 1)
|
||||
|
||||
// Start the stream in a separate goroutine.
|
||||
go df.streamOneSource(ctx, shard, source, td, gtidch)
|
||||
|
||||
// Wait for the gtid to be sent. If it's not received, there was an error
|
||||
// which would be stored in source.err.
|
||||
gtid, ok := <-gtidch
|
||||
if !ok {
|
||||
return source.err
|
||||
}
|
||||
// Save the new position, as of when the query executed.
|
||||
source.snapshotPosition = gtid
|
||||
return nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// streamOneSource is called as a goroutine, and communicates its results through channels.
|
||||
// It first sends the snapshot gtid to gtidch.
|
||||
// Then it streams results to source.result.
|
||||
// Before returning, it sets source.err, and closes all channels.
|
||||
// If any channel is closed, then source.err can be checked if there was an error.
|
||||
func (df *vdiff) streamOneSource(ctx context.Context, shard string, source *dfParams, td *tableDiffer, gtidch chan string) {
|
||||
defer close(source.result)
|
||||
defer close(gtidch)
|
||||
|
||||
// Wrap the streaming in a separate function so we can capture the error.
|
||||
// This shows that the error will be set before the channels are closed.
|
||||
source.err = func() error {
|
||||
conn, err := tabletconn.GetDialer()(source.tablet, grpcclient.FailFast(false))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close(ctx)
|
||||
|
||||
target := &querypb.Target{
|
||||
Keyspace: df.sourceKeyspace,
|
||||
Shard: shard,
|
||||
TabletType: source.tablet.Type,
|
||||
}
|
||||
var fields []*querypb.Field
|
||||
err = conn.VStreamResults(ctx, target, td.sourceExpression, func(vrs *binlogdatapb.VStreamResultsResponse) error {
|
||||
if vrs.Fields != nil {
|
||||
fields = vrs.Fields
|
||||
gtidch <- vrs.Gtid
|
||||
}
|
||||
p3qr := &querypb.QueryResult{
|
||||
Fields: fields,
|
||||
Rows: vrs.Rows,
|
||||
}
|
||||
result := sqltypes.Proto3ToResult(p3qr)
|
||||
select {
|
||||
case source.result <- result:
|
||||
case <-ctx.Done():
|
||||
return io.EOF
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return err
|
||||
}()
|
||||
}
|
||||
|
||||
func (df *vdiff) forAll(participants map[string]*dfParams, f func(string, *dfParams) error) error {
|
||||
var wg sync.WaitGroup
|
||||
allErrors := &concurrency.AllErrorRecorder{}
|
||||
|
|
Загрузка…
Ссылка в новой задаче