зеркало из https://github.com/github/vitess-gh.git
VStream API: Use tablet picker to select tablet to stream from. Subscribe to tablet health to handle reparenting and unhealthy transitions
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
This commit is contained in:
Родитель
0e4974a192
Коммит
a919b2fe40
|
@ -0,0 +1,50 @@
|
|||
# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows"
|
||||
|
||||
name: Cluster (vstream_failover)
|
||||
on: [push, pull_request]
|
||||
jobs:
|
||||
|
||||
build:
|
||||
name: Run endtoend tests on Cluster (vstream_failover)
|
||||
runs-on: ubuntu-18.04
|
||||
|
||||
steps:
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v1
|
||||
with:
|
||||
go-version: 1.16
|
||||
|
||||
- name: Tune the OS
|
||||
run: |
|
||||
echo '1024 65535' | sudo tee -a /proc/sys/net/ipv4/ip_local_port_range
|
||||
|
||||
# TEMPORARY WHILE GITHUB FIXES THIS https://github.com/actions/virtual-environments/issues/3185
|
||||
- name: Add the current IP address, long hostname and short hostname record to /etc/hosts file
|
||||
run: |
|
||||
echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
|
||||
# DON'T FORGET TO REMOVE CODE ABOVE WHEN ISSUE IS ADRESSED!
|
||||
|
||||
- name: Check out code
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Get dependencies
|
||||
run: |
|
||||
sudo apt-get update
|
||||
sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata
|
||||
sudo service mysql stop
|
||||
sudo service etcd stop
|
||||
sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
|
||||
sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld
|
||||
go mod download
|
||||
|
||||
wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb
|
||||
sudo apt-get install -y gnupg2
|
||||
sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb
|
||||
sudo apt-get update
|
||||
sudo apt-get install percona-xtrabackup-24
|
||||
|
||||
- name: Run cluster endtoend test
|
||||
timeout-minutes: 30
|
||||
run: |
|
||||
source build.env
|
||||
eatmydata -- go run test.go -docker=false -print-log -follow -shard vstream_failover
|
|
@ -0,0 +1,168 @@
|
|||
/*
|
||||
Copyright 2021 The Vitess Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package vreplication
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"vitess.io/vitess/go/vt/log"
|
||||
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
|
||||
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
|
||||
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
|
||||
_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
|
||||
"vitess.io/vitess/go/vt/vtgate/evalengine"
|
||||
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
|
||||
|
||||
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
|
||||
)
|
||||
|
||||
// Validates that a reparent while VStream API is streaming doesn't miss any events
|
||||
// We stream only from the primary and while streaming we reparent to a replica and then back to the original primary
|
||||
func TestVStreamFailover(t *testing.T) {
|
||||
defaultCellName := "zone1"
|
||||
cells := []string{"zone1"}
|
||||
allCellNames = "zone1"
|
||||
vc = NewVitessCluster(t, "TestVStreamFailover", cells, mainClusterConfig)
|
||||
|
||||
require.NotNil(t, vc)
|
||||
defaultReplicas = 2
|
||||
defaultRdonly = 0
|
||||
defer vc.TearDown(t)
|
||||
|
||||
defaultCell = vc.Cells[defaultCellName]
|
||||
vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100)
|
||||
vtgate = defaultCell.Vtgates[0]
|
||||
require.NotNil(t, vtgate)
|
||||
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "product", "0"), 3)
|
||||
vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
|
||||
defer vtgateConn.Close()
|
||||
|
||||
verifyClusterHealth(t, vc)
|
||||
insertInitialData(t)
|
||||
ctx := context.Background()
|
||||
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer vstreamConn.Close()
|
||||
vgtid := &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: "product",
|
||||
Shard: "0",
|
||||
Gtid: "",
|
||||
}}}
|
||||
|
||||
filter := &binlogdatapb.Filter{
|
||||
Rules: []*binlogdatapb.Rule{{
|
||||
Match: "customer",
|
||||
Filter: "select * from customer",
|
||||
}},
|
||||
}
|
||||
flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600}
|
||||
done := false
|
||||
|
||||
// don't insert while PRS is going on
|
||||
var insertMu sync.Mutex
|
||||
stopInserting := false
|
||||
id := 0
|
||||
|
||||
// first goroutine that keeps inserting rows into table being streamed until some time elapses after second PRS
|
||||
go func() {
|
||||
for {
|
||||
if stopInserting {
|
||||
return
|
||||
}
|
||||
insertMu.Lock()
|
||||
id++
|
||||
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
|
||||
insertMu.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
// stream events from the VStream API
|
||||
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter, flags)
|
||||
require.NoError(t, err)
|
||||
var numRowEvents int64
|
||||
// second goroutine that continuously receives events via VStream API and should be resilient to the two PRS events
|
||||
go func() {
|
||||
for {
|
||||
evs, err := reader.Recv()
|
||||
|
||||
switch err {
|
||||
case nil:
|
||||
for _, ev := range evs {
|
||||
if ev.Type == binlogdatapb.VEventType_ROW {
|
||||
numRowEvents++
|
||||
}
|
||||
}
|
||||
case io.EOF:
|
||||
log.Infof("Stream Ended")
|
||||
default:
|
||||
log.Infof("%s:: remote error: %v", time.Now(), err)
|
||||
}
|
||||
|
||||
if done {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// run two PRS after one second each, wait for events to be received and exit test
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
tickCount := 0
|
||||
// this for loop implements a mini state machine that does the two PRSs, waits a bit after the second PRS,
|
||||
// stops the insertions, waits for a bit again for the vstream to catchup and signals the test to stop
|
||||
for {
|
||||
<-ticker.C
|
||||
tickCount++
|
||||
switch tickCount {
|
||||
case 1:
|
||||
insertMu.Lock()
|
||||
output, err := vc.VtctlClient.ExecuteCommandWithOutput("PlannedReparentShard", "-keyspace_shard=product/0", "-new_master=zone1-101")
|
||||
insertMu.Unlock()
|
||||
log.Infof("output of first PRS is %s", output)
|
||||
require.NoError(t, err)
|
||||
case 2:
|
||||
insertMu.Lock()
|
||||
output, err := vc.VtctlClient.ExecuteCommandWithOutput("PlannedReparentShard", "-keyspace_shard=product/0", "-new_master=zone1-100")
|
||||
insertMu.Unlock()
|
||||
log.Infof("output of second PRS is %s", output)
|
||||
require.NoError(t, err)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
stopInserting = true
|
||||
time.Sleep(2 * time.Second)
|
||||
done = true
|
||||
}
|
||||
|
||||
if done {
|
||||
break
|
||||
}
|
||||
}
|
||||
qr := execVtgateQuery(t, vtgateConn, "product", "select count(*) from customer")
|
||||
require.NotNil(t, qr)
|
||||
// total number of row events found by the VStream API should match the rows inserted
|
||||
insertedRows, err := evalengine.ToInt64(qr.Rows[0][0])
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, insertedRows, numRowEvents)
|
||||
}
|
|
@ -181,6 +181,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
|
|||
defer cancel()
|
||||
si, err := tp.ts.GetShard(shortCtx, tp.keyspace, tp.shard)
|
||||
if err != nil {
|
||||
log.Errorf("error getting shard %s/%s: %s", tp.keyspace, tp.shard, err.Error())
|
||||
return nil
|
||||
}
|
||||
aliases = append(aliases, si.MasterAlias)
|
||||
|
|
|
@ -228,6 +228,7 @@ func NewTabletInfo(tablet *topodatapb.Tablet, version Version) *TabletInfo {
|
|||
func (ts *Server) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*TabletInfo, error) {
|
||||
conn, err := ts.ConnForCell(ctx, alias.Cell)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to get connection for cell %s", alias.Cell)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -238,6 +239,7 @@ func (ts *Server) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias)
|
|||
tabletPath := path.Join(TabletsPath, topoproto.TabletAliasString(alias), TabletFile)
|
||||
data, version, err := conn.Get(ctx, tabletPath)
|
||||
if err != nil {
|
||||
log.Errorf("unable to connect to tablet %s: %s", alias, err)
|
||||
return nil, err
|
||||
}
|
||||
tablet := &topodatapb.Tablet{}
|
||||
|
|
|
@ -243,6 +243,11 @@ func (dg *DiscoveryGateway) withRetry(ctx context.Context, target *querypb.Targe
|
|||
var err error
|
||||
invalidTablets := make(map[string]bool)
|
||||
|
||||
if target == nil {
|
||||
err = fmt.Errorf("withRetry called with nil target")
|
||||
log.Errorf(err.Error())
|
||||
return err
|
||||
}
|
||||
if len(discovery.AllowedTabletTypes) > 0 {
|
||||
var match bool
|
||||
for _, allowed := range discovery.AllowedTabletTypes {
|
||||
|
|
|
@ -23,11 +23,14 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"vitess.io/vitess/go/vt/discovery"
|
||||
querypb "vitess.io/vitess/go/vt/proto/query"
|
||||
"vitess.io/vitess/go/vt/topo"
|
||||
|
||||
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
|
||||
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"vitess.io/vitess/go/vt/key"
|
||||
"vitess.io/vitess/go/vt/log"
|
||||
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
|
||||
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
|
||||
|
@ -92,6 +95,7 @@ type vstream struct {
|
|||
|
||||
eventCh chan []*binlogdatapb.VEvent
|
||||
heartbeatInterval uint32
|
||||
ts *topo.Server
|
||||
}
|
||||
|
||||
type journalEvent struct {
|
||||
|
@ -114,6 +118,14 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ts, err := vsm.toposerv.GetTopoServer()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ts == nil {
|
||||
log.Errorf("unable to get topo server in VStream()")
|
||||
return fmt.Errorf("unable to get topo server")
|
||||
}
|
||||
vs := &vstream{
|
||||
vgtid: vgtid,
|
||||
tabletType: tabletType,
|
||||
|
@ -127,6 +139,7 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta
|
|||
vsm: vsm,
|
||||
eventCh: make(chan []*binlogdatapb.VEvent),
|
||||
heartbeatInterval: flags.GetHeartbeatInterval(),
|
||||
ts: ts,
|
||||
}
|
||||
return vs.stream(ctx)
|
||||
}
|
||||
|
@ -286,6 +299,7 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard
|
|||
|
||||
// Set the error on exit. First one wins.
|
||||
if err != nil {
|
||||
log.Errorf("Error in vstream for %+v: %s", sgtid, err)
|
||||
vs.once.Do(func() {
|
||||
vs.err = err
|
||||
vs.cancel()
|
||||
|
@ -404,25 +418,66 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
|
|||
|
||||
var eventss [][]*binlogdatapb.VEvent
|
||||
var err error
|
||||
rss := vs.rss
|
||||
if vs.resolver != nil {
|
||||
rss, err = vs.resolver.ResolveDestination(ctx, sgtid.Keyspace, vs.tabletType, key.DestinationShard(sgtid.Shard))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tp, err := discovery.NewTabletPicker(vs.ts, []string{vs.vsm.cell}, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String())
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return err
|
||||
}
|
||||
if len(rss) != 1 {
|
||||
// Unreachable.
|
||||
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected number or shards: %v", rss)
|
||||
tablet, err := tp.PickForStreaming(ctx)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return err
|
||||
}
|
||||
log.Infof("Picked tablet %s for for %s/%s/%s/%s", tablet.Alias.String(), vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String())
|
||||
target := &querypb.Target{
|
||||
Keyspace: sgtid.Keyspace,
|
||||
Shard: sgtid.Shard,
|
||||
TabletType: vs.tabletType,
|
||||
Cell: vs.vsm.cell,
|
||||
}
|
||||
tabletConn, err := vs.vsm.resolver.GetGateway().QueryServiceByAlias(tablet.Alias, target)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
errCh := make(chan string, 1)
|
||||
go func() {
|
||||
tabletConn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
|
||||
var errString string
|
||||
if ctx.Err() != nil {
|
||||
return fmt.Errorf("context has ended")
|
||||
}
|
||||
if shr == nil || shr.RealtimeStats == nil || shr.Target == nil {
|
||||
return fmt.Errorf("health check failed")
|
||||
}
|
||||
if vs.tabletType != shr.Target.TabletType {
|
||||
errString = fmt.Sprintf("tablet %s is no longer healthy: %s, restarting vstream",
|
||||
tablet.Alias, shr.RealtimeStats.HealthError)
|
||||
} else if shr.RealtimeStats.HealthError != "" {
|
||||
errString = fmt.Sprintf("tablet type has changed from %s to %s, restarting vstream",
|
||||
vs.tabletType, shr.Target.TabletType)
|
||||
}
|
||||
if errString != "" {
|
||||
errCh <- errString
|
||||
return fmt.Errorf(errString)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
|
||||
log.Infof("Starting to vstream from %s", tablet.Alias.String())
|
||||
// Safe to access sgtid.Gtid here (because it can't change until streaming begins).
|
||||
err = rss[0].Gateway.VStream(ctx, rss[0].Target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error {
|
||||
err = tabletConn.VStream(ctx, target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error {
|
||||
// We received a valid event. Reset error count.
|
||||
errCount = 0
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case streamErr := <-errCh:
|
||||
log.Warningf("Tablet state changed: %s, attempting to restart", streamErr)
|
||||
return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr)
|
||||
case <-journalDone:
|
||||
// Unreachable.
|
||||
// This can happen if a server misbehaves and does not end
|
||||
|
|
|
@ -23,6 +23,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"vitess.io/vitess/go/vt/topo"
|
||||
|
||||
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
|
||||
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
|
||||
"vitess.io/vitess/go/vt/vterrors"
|
||||
|
@ -45,29 +47,6 @@ import (
|
|||
|
||||
var mu sync.Mutex
|
||||
|
||||
func getVEvents(shard string, count, idx int64) []*binlogdatapb.VEvent {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
var vevents []*binlogdatapb.VEvent
|
||||
var i int64
|
||||
currentTime := time.Now().Unix()
|
||||
for i = count; i > 0; i-- {
|
||||
j := i + idx
|
||||
vevents = append(vevents, &binlogdatapb.VEvent{
|
||||
Type: binlogdatapb.VEventType_GTID, Gtid: fmt.Sprintf("gtid-%s-%d", shard, j),
|
||||
Timestamp: currentTime - j,
|
||||
CurrentTime: currentTime * 1e9,
|
||||
})
|
||||
|
||||
vevents = append(vevents, &binlogdatapb.VEvent{
|
||||
Type: binlogdatapb.VEventType_COMMIT,
|
||||
Timestamp: currentTime - j,
|
||||
CurrentTime: currentTime * 1e9,
|
||||
})
|
||||
}
|
||||
return vevents
|
||||
}
|
||||
|
||||
func TestVStreamSkew(t *testing.T) {
|
||||
stream := func(conn *sandboxconn.SandboxConn, shard string, count, idx int64) {
|
||||
vevents := getVEvents(shard, count, idx)
|
||||
|
@ -94,33 +73,36 @@ func TestVStreamSkew(t *testing.T) {
|
|||
previousDelays := int64(0)
|
||||
vstreamSkewDelayCount = stats.NewCounter("VStreamEventsDelayedBySkewAlignment",
|
||||
"Number of events that had to wait because the skew across shards was too high")
|
||||
|
||||
cell := "aa"
|
||||
for idx, tcase := range tcases {
|
||||
t.Run("", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
name := fmt.Sprintf("TestVStreamSkew-%d", idx)
|
||||
_ = createSandbox(name)
|
||||
ks := fmt.Sprintf("TestVStreamSkew-%d", idx)
|
||||
_ = createSandbox(ks)
|
||||
hc := discovery.NewFakeHealthCheck()
|
||||
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa")
|
||||
shard0 := "-20"
|
||||
shard1 := "20-40"
|
||||
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
|
||||
vsm := newTestVStreamManager(hc, st, cell)
|
||||
vgtid := &binlogdatapb.VGtid{ShardGtids: []*binlogdatapb.ShardGtid{}}
|
||||
want := int64(0)
|
||||
var sbc0, sbc1 *sandboxconn.SandboxConn
|
||||
if tcase.shard0idx != 0 {
|
||||
sbc0 = hc.AddTestTablet("aa", "1.1.1.1", 1001, name, shard0, topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
sbc0 = hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
|
||||
sbc0.VStreamCh = make(chan *binlogdatapb.VEvent)
|
||||
want += 2 * tcase.numEventsPerShard
|
||||
vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: name, Gtid: "pos", Shard: "-20"})
|
||||
go stream(sbc0, shard0, tcase.numEventsPerShard, tcase.shard0idx)
|
||||
vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: ks, Gtid: "pos", Shard: "-20"})
|
||||
go stream(sbc0, "-20", tcase.numEventsPerShard, tcase.shard0idx)
|
||||
}
|
||||
if tcase.shard1idx != 0 {
|
||||
sbc1 = hc.AddTestTablet("aa", "1.1.1.1", 1002, name, shard1, topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
sbc1 = hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "20-40", sbc1.Tablet())
|
||||
sbc1.VStreamCh = make(chan *binlogdatapb.VEvent)
|
||||
want += 2 * tcase.numEventsPerShard
|
||||
vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: name, Gtid: "pos", Shard: "20-40"})
|
||||
go stream(sbc1, shard1, tcase.numEventsPerShard, tcase.shard1idx)
|
||||
vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: ks, Gtid: "pos", Shard: "20-40"})
|
||||
go stream(sbc1, "20-40", tcase.numEventsPerShard, tcase.shard1idx)
|
||||
}
|
||||
ch := startVStream(ctx, t, vsm, vgtid, &vtgatepb.VStreamFlags{MinimizeSkew: true})
|
||||
var receivedEvents []*binlogdatapb.VEvent
|
||||
|
@ -142,12 +124,15 @@ func TestVStreamSkew(t *testing.T) {
|
|||
func TestVStreamEvents(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
name := "TestVStream"
|
||||
_ = createSandbox(name)
|
||||
cell := "aa"
|
||||
ks := "TestVStream"
|
||||
_ = createSandbox(ks)
|
||||
hc := discovery.NewFakeHealthCheck()
|
||||
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa")
|
||||
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
|
||||
|
||||
vsm := newTestVStreamManager(hc, st, cell)
|
||||
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
|
||||
|
||||
send1 := []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid01"},
|
||||
|
@ -158,7 +143,7 @@ func TestVStreamEvents(t *testing.T) {
|
|||
want1 := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "gtid01",
|
||||
}},
|
||||
|
@ -176,7 +161,7 @@ func TestVStreamEvents(t *testing.T) {
|
|||
want2 := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "gtid02",
|
||||
}},
|
||||
|
@ -187,7 +172,7 @@ func TestVStreamEvents(t *testing.T) {
|
|||
|
||||
vgtid := &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "pos",
|
||||
}},
|
||||
|
@ -217,12 +202,16 @@ func TestVStreamChunks(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
name := "TestVStream"
|
||||
_ = createSandbox(name)
|
||||
ks := "TestVStream"
|
||||
cell := "aa"
|
||||
_ = createSandbox(ks)
|
||||
hc := discovery.NewFakeHealthCheck()
|
||||
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa")
|
||||
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, name, "20-40", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
|
||||
vsm := newTestVStreamManager(hc, st, cell)
|
||||
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
|
||||
sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "20-40", sbc1.Tablet())
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
sbc0.AddVStreamEvents([]*binlogdatapb.VEvent{{Type: binlogdatapb.VEventType_DDL}}, nil)
|
||||
|
@ -236,11 +225,11 @@ func TestVStreamChunks(t *testing.T) {
|
|||
ddlCount := 0
|
||||
vgtid := &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "pos",
|
||||
}, {
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "20-40",
|
||||
Gtid: "pos",
|
||||
}},
|
||||
|
@ -286,13 +275,16 @@ func TestVStreamChunks(t *testing.T) {
|
|||
func TestVStreamMulti(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
name := "TestVStream"
|
||||
_ = createSandbox(name)
|
||||
cell := "aa"
|
||||
ks := "TestVStream"
|
||||
_ = createSandbox(ks)
|
||||
hc := discovery.NewFakeHealthCheck()
|
||||
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa")
|
||||
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, name, "20-40", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
|
||||
vsm := newTestVStreamManager(hc, st, "aa")
|
||||
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
|
||||
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "20-40", sbc1.Tablet())
|
||||
|
||||
send0 := []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid01"},
|
||||
|
@ -308,11 +300,11 @@ func TestVStreamMulti(t *testing.T) {
|
|||
|
||||
vgtid := &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "pos",
|
||||
}, {
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "20-40",
|
||||
Gtid: "pos",
|
||||
}},
|
||||
|
@ -328,11 +320,11 @@ func TestVStreamMulti(t *testing.T) {
|
|||
}
|
||||
want := &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "gtid01",
|
||||
}, {
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "20-40",
|
||||
Gtid: "gtid02",
|
||||
}},
|
||||
|
@ -346,11 +338,15 @@ func TestVStreamRetry(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
name := "TestVStream"
|
||||
_ = createSandbox(name)
|
||||
cell := "aa"
|
||||
ks := "TestVStream"
|
||||
_ = createSandbox(ks)
|
||||
hc := discovery.NewFakeHealthCheck()
|
||||
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa")
|
||||
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
|
||||
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
|
||||
vsm := newTestVStreamManager(hc, st, "aa")
|
||||
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
|
||||
commit := []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_COMMIT},
|
||||
}
|
||||
|
@ -363,7 +359,7 @@ func TestVStreamRetry(t *testing.T) {
|
|||
count := 0
|
||||
vgtid := &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "pos",
|
||||
}},
|
||||
|
@ -383,12 +379,14 @@ func TestVStreamRetry(t *testing.T) {
|
|||
func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
name := "TestVStream"
|
||||
_ = createSandbox(name)
|
||||
cell := "aa"
|
||||
ks := "TestVStream"
|
||||
_ = createSandbox(ks)
|
||||
hc := discovery.NewFakeHealthCheck()
|
||||
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa")
|
||||
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
|
||||
vsm := newTestVStreamManager(hc, st, cell)
|
||||
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
|
||||
|
||||
send0 := []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_HEARTBEAT},
|
||||
|
@ -406,7 +404,7 @@ func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) {
|
|||
want := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "gtid01",
|
||||
}},
|
||||
|
@ -419,7 +417,7 @@ func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) {
|
|||
|
||||
vgtid := &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "pos",
|
||||
}},
|
||||
|
@ -431,14 +429,18 @@ func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) {
|
|||
func TestVStreamJournalOneToMany(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
name := "TestVStream"
|
||||
_ = createSandbox(name)
|
||||
cell := "aa"
|
||||
ks := "TestVStream"
|
||||
_ = createSandbox(ks)
|
||||
hc := discovery.NewFakeHealthCheck()
|
||||
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa")
|
||||
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, name, "-10", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, name, "10-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"})
|
||||
vsm := newTestVStreamManager(hc, st, "aa")
|
||||
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
|
||||
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "-10", sbc1.Tablet())
|
||||
sbc2 := hc.AddTestTablet(cell, "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "10-20", sbc2.Tablet())
|
||||
|
||||
send1 := []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid01"},
|
||||
|
@ -449,7 +451,7 @@ func TestVStreamJournalOneToMany(t *testing.T) {
|
|||
want1 := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "gtid01",
|
||||
}},
|
||||
|
@ -465,16 +467,16 @@ func TestVStreamJournalOneToMany(t *testing.T) {
|
|||
Id: 1,
|
||||
MigrationType: binlogdatapb.MigrationType_SHARDS,
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-10",
|
||||
Gtid: "pos10",
|
||||
}, {
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "10-20",
|
||||
Gtid: "pos1020",
|
||||
}},
|
||||
Participants: []*binlogdatapb.KeyspaceShard{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
}},
|
||||
}},
|
||||
|
@ -501,7 +503,7 @@ func TestVStreamJournalOneToMany(t *testing.T) {
|
|||
|
||||
vgtid := &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "pos",
|
||||
}},
|
||||
|
@ -517,11 +519,11 @@ func TestVStreamJournalOneToMany(t *testing.T) {
|
|||
Type: binlogdatapb.VEventType_VGTID,
|
||||
Vgtid: &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-10",
|
||||
Gtid: "gtid03",
|
||||
}, {
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "10-20",
|
||||
Gtid: "gtid04",
|
||||
}},
|
||||
|
@ -537,13 +539,18 @@ func TestVStreamJournalManyToOne(t *testing.T) {
|
|||
defer cancel()
|
||||
|
||||
// Variable names are maintained like in OneToMany, but order is different.
|
||||
name := "TestVStream"
|
||||
_ = createSandbox(name)
|
||||
ks := "TestVStream"
|
||||
cell := "aa"
|
||||
_ = createSandbox(ks)
|
||||
hc := discovery.NewFakeHealthCheck()
|
||||
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa")
|
||||
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, name, "-10", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, name, "10-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"})
|
||||
vsm := newTestVStreamManager(hc, st, cell)
|
||||
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
|
||||
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "-10", sbc1.Tablet())
|
||||
sbc2 := hc.AddTestTablet(cell, "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "10-20", sbc2.Tablet())
|
||||
|
||||
send3 := []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid03"},
|
||||
|
@ -566,15 +573,15 @@ func TestVStreamJournalManyToOne(t *testing.T) {
|
|||
Id: 1,
|
||||
MigrationType: binlogdatapb.MigrationType_SHARDS,
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "pos20",
|
||||
}},
|
||||
Participants: []*binlogdatapb.KeyspaceShard{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-10",
|
||||
}, {
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "10-20",
|
||||
}},
|
||||
}},
|
||||
|
@ -594,7 +601,7 @@ func TestVStreamJournalManyToOne(t *testing.T) {
|
|||
want1 := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "gtid01",
|
||||
}},
|
||||
|
@ -608,11 +615,11 @@ func TestVStreamJournalManyToOne(t *testing.T) {
|
|||
|
||||
vgtid := &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-10",
|
||||
Gtid: "pos10",
|
||||
}, {
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "10-20",
|
||||
Gtid: "pos1020",
|
||||
}},
|
||||
|
@ -626,11 +633,11 @@ func TestVStreamJournalManyToOne(t *testing.T) {
|
|||
Type: binlogdatapb.VEventType_VGTID,
|
||||
Vgtid: &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-10",
|
||||
Gtid: "gtid03",
|
||||
}, {
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "10-20",
|
||||
Gtid: "gtid04",
|
||||
}},
|
||||
|
@ -646,11 +653,14 @@ func TestVStreamJournalNoMatch(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
name := "TestVStream"
|
||||
_ = createSandbox(name)
|
||||
ks := "TestVStream"
|
||||
cell := "aa"
|
||||
_ = createSandbox(ks)
|
||||
hc := discovery.NewFakeHealthCheck()
|
||||
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa")
|
||||
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
|
||||
vsm := newTestVStreamManager(hc, st, "aa")
|
||||
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
|
||||
|
||||
send1 := []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid01"},
|
||||
|
@ -661,7 +671,7 @@ func TestVStreamJournalNoMatch(t *testing.T) {
|
|||
want1 := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "gtid01",
|
||||
}},
|
||||
|
@ -683,7 +693,7 @@ func TestVStreamJournalNoMatch(t *testing.T) {
|
|||
wantjn1 := &binlogdata.VStreamResponse{Events: []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "jn1",
|
||||
}},
|
||||
|
@ -699,7 +709,7 @@ func TestVStreamJournalNoMatch(t *testing.T) {
|
|||
want2 := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "gtid02",
|
||||
}},
|
||||
|
@ -713,15 +723,15 @@ func TestVStreamJournalNoMatch(t *testing.T) {
|
|||
Id: 2,
|
||||
MigrationType: binlogdatapb.MigrationType_SHARDS,
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "c0-",
|
||||
Gtid: "posc0",
|
||||
}},
|
||||
Participants: []*binlogdatapb.KeyspaceShard{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "c0-e0",
|
||||
}, {
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "e0-",
|
||||
}},
|
||||
}},
|
||||
|
@ -731,7 +741,7 @@ func TestVStreamJournalNoMatch(t *testing.T) {
|
|||
wantjn2 := &binlogdata.VStreamResponse{Events: []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "jn2",
|
||||
}},
|
||||
|
@ -747,7 +757,7 @@ func TestVStreamJournalNoMatch(t *testing.T) {
|
|||
want3 := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "gtid03",
|
||||
}},
|
||||
|
@ -758,7 +768,7 @@ func TestVStreamJournalNoMatch(t *testing.T) {
|
|||
|
||||
vgtid := &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "pos",
|
||||
}},
|
||||
|
@ -772,27 +782,31 @@ func TestVStreamJournalPartialMatch(t *testing.T) {
|
|||
defer cancel()
|
||||
|
||||
// Variable names are maintained like in OneToMany, but order is different.1
|
||||
name := "TestVStream"
|
||||
_ = createSandbox(name)
|
||||
ks := "TestVStream"
|
||||
cell := "aa"
|
||||
_ = createSandbox(ks)
|
||||
hc := discovery.NewFakeHealthCheck()
|
||||
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa")
|
||||
_ = hc.AddTestTablet("aa", "1.1.1.1", 1002, name, "-10", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, name, "10-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"})
|
||||
vsm := newTestVStreamManager(hc, st, "aa")
|
||||
sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "-10", sbc1.Tablet())
|
||||
sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "10-20", sbc2.Tablet())
|
||||
|
||||
send := []*binlogdatapb.VEvent{
|
||||
{Type: binlogdatapb.VEventType_JOURNAL, Journal: &binlogdatapb.Journal{
|
||||
Id: 1,
|
||||
MigrationType: binlogdatapb.MigrationType_SHARDS,
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "10-30",
|
||||
Gtid: "pos1040",
|
||||
}},
|
||||
Participants: []*binlogdatapb.KeyspaceShard{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "10-20",
|
||||
}, {
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "20-30",
|
||||
}},
|
||||
}},
|
||||
|
@ -801,11 +815,11 @@ func TestVStreamJournalPartialMatch(t *testing.T) {
|
|||
|
||||
vgtid := &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-10",
|
||||
Gtid: "pos10",
|
||||
}, {
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "10-20",
|
||||
Gtid: "pos1020",
|
||||
}},
|
||||
|
@ -825,15 +839,15 @@ func TestVStreamJournalPartialMatch(t *testing.T) {
|
|||
Id: 1,
|
||||
MigrationType: binlogdatapb.MigrationType_SHARDS,
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "10-30",
|
||||
Gtid: "pos1040",
|
||||
}},
|
||||
Participants: []*binlogdatapb.KeyspaceShard{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "20-30",
|
||||
}, {
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "10-20",
|
||||
}},
|
||||
}},
|
||||
|
@ -994,14 +1008,17 @@ func TestResolveVStreamParams(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestVStreamIdleHeartbeat(t *testing.T) {
|
||||
name := "TestVStream"
|
||||
_ = createSandbox(name)
|
||||
cell := "aa"
|
||||
ks := "TestVStream"
|
||||
_ = createSandbox(ks)
|
||||
hc := discovery.NewFakeHealthCheck()
|
||||
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa")
|
||||
hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
|
||||
vsm := newTestVStreamManager(hc, st, cell)
|
||||
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil)
|
||||
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
|
||||
vgtid := &binlogdatapb.VGtid{
|
||||
ShardGtids: []*binlogdatapb.ShardGtid{{
|
||||
Keyspace: name,
|
||||
Keyspace: ks,
|
||||
Shard: "-20",
|
||||
Gtid: "pos",
|
||||
}},
|
||||
|
@ -1069,6 +1086,7 @@ func verifyEvents(t *testing.T, ch <-chan *binlogdatapb.VStreamResponse, wants .
|
|||
t.Helper()
|
||||
for i, want := range wants {
|
||||
got := <-ch
|
||||
require.NotNil(t, got)
|
||||
for _, event := range got.Events {
|
||||
event.Timestamp = 0
|
||||
}
|
||||
|
@ -1077,3 +1095,47 @@ func verifyEvents(t *testing.T, ch <-chan *binlogdatapb.VStreamResponse, wants .
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getVEvents(shard string, count, idx int64) []*binlogdatapb.VEvent {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
var vevents []*binlogdatapb.VEvent
|
||||
var i int64
|
||||
currentTime := time.Now().Unix()
|
||||
for i = count; i > 0; i-- {
|
||||
j := i + idx
|
||||
vevents = append(vevents, &binlogdatapb.VEvent{
|
||||
Type: binlogdatapb.VEventType_GTID, Gtid: fmt.Sprintf("gtid-%s-%d", shard, j),
|
||||
Timestamp: currentTime - j,
|
||||
CurrentTime: currentTime * 1e9,
|
||||
})
|
||||
|
||||
vevents = append(vevents, &binlogdatapb.VEvent{
|
||||
Type: binlogdatapb.VEventType_COMMIT,
|
||||
Timestamp: currentTime - j,
|
||||
CurrentTime: currentTime * 1e9,
|
||||
})
|
||||
}
|
||||
return vevents
|
||||
}
|
||||
|
||||
func getSandboxTopo(ctx context.Context, cell string, keyspace string, shards []string) *sandboxTopo {
|
||||
st := newSandboxForCells([]string{cell})
|
||||
ts := st.topoServer
|
||||
ts.CreateCellInfo(ctx, cell, &topodatapb.CellInfo{})
|
||||
ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})
|
||||
for _, shard := range shards {
|
||||
ts.CreateShard(ctx, keyspace, shard)
|
||||
}
|
||||
return st
|
||||
}
|
||||
|
||||
func addTabletToSandboxTopo(t *testing.T, st *sandboxTopo, ks, shard string, tablet *topodatapb.Tablet) {
|
||||
_, err := st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error {
|
||||
si.MasterAlias = tablet.Alias
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
err = st.topoServer.CreateTablet(ctx, tablet)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
|
@ -54,6 +54,7 @@ var (
|
|||
"vreplication_basic",
|
||||
"vreplication_multicell",
|
||||
"vreplication_cellalias",
|
||||
"vstream_failover",
|
||||
"vreplication_v2",
|
||||
"onlineddl_ghost",
|
||||
"onlineddl_vrepl",
|
||||
|
|
|
@ -875,7 +875,7 @@
|
|||
"Command": [],
|
||||
"Manual": false,
|
||||
"Shard": "vreplication_multicell",
|
||||
"RetryMax": 0,
|
||||
"RetryMax": 1,
|
||||
"Tags": []
|
||||
},
|
||||
"vreplication_materialize": {
|
||||
|
@ -884,7 +884,7 @@
|
|||
"Command": [],
|
||||
"Manual": false,
|
||||
"Shard": "vreplication_multicell",
|
||||
"RetryMax": 0,
|
||||
"RetryMax": 1,
|
||||
"Tags": []
|
||||
},
|
||||
"vreplication_cellalias": {
|
||||
|
@ -893,7 +893,7 @@
|
|||
"Command": [],
|
||||
"Manual": false,
|
||||
"Shard": "vreplication_cellalias",
|
||||
"RetryMax": 0,
|
||||
"RetryMax": 1,
|
||||
"Tags": []
|
||||
},
|
||||
"vreplication_basic": {
|
||||
|
@ -902,7 +902,16 @@
|
|||
"Command": [],
|
||||
"Manual": false,
|
||||
"Shard": "vreplication_basic",
|
||||
"RetryMax": 0,
|
||||
"RetryMax": 1,
|
||||
"Tags": []
|
||||
},
|
||||
"vstream_failover": {
|
||||
"File": "unused.go",
|
||||
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "VStreamFailover"],
|
||||
"Command": [],
|
||||
"Manual": false,
|
||||
"Shard": "vstream_failover",
|
||||
"RetryMax": 1,
|
||||
"Tags": []
|
||||
},
|
||||
"vtorc": {
|
||||
|
@ -929,7 +938,7 @@
|
|||
"Command": [],
|
||||
"Manual": false,
|
||||
"Shard": "vreplication_v2",
|
||||
"RetryMax": 0,
|
||||
"RetryMax": 1,
|
||||
"Tags": []
|
||||
},
|
||||
"vreplication_migrate": {
|
||||
|
@ -938,7 +947,16 @@
|
|||
"Command": [],
|
||||
"Manual": false,
|
||||
"Shard": "vreplication_migrate",
|
||||
"RetryMax": 0,
|
||||
"RetryMax": 1,
|
||||
"Tags": []
|
||||
},
|
||||
"vstream_failover": {
|
||||
"File": "unused.go",
|
||||
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVStreamFailover"],
|
||||
"Command": [],
|
||||
"Manual": false,
|
||||
"Shard": "vstream_failover",
|
||||
"RetryMax": 1,
|
||||
"Tags": []
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче