From a919b2fe400aa7d4f299bc73af1ee193c780a067 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 10 Jul 2021 14:02:04 +0200 Subject: [PATCH] VStream API: Use tablet picker to select tablet to stream from. Subscribe to tablet health to handle reparenting and unhealthy transitions Signed-off-by: Rohit Nayak --- .../cluster_endtoend_vstream_failover.yml | 50 +++ go/test/endtoend/vreplication/vstream_test.go | 168 +++++++++ go/vt/discovery/tablet_picker.go | 1 + go/vt/topo/tablet.go | 2 + go/vt/vtgate/discoverygateway.go | 5 + go/vt/vtgate/vstream_manager.go | 77 ++++- go/vt/vtgate/vstream_manager_test.go | 322 +++++++++++------- test/ci_workflow_gen.go | 1 + test/config.json | 30 +- 9 files changed, 509 insertions(+), 147 deletions(-) create mode 100644 .github/workflows/cluster_endtoend_vstream_failover.yml create mode 100644 go/test/endtoend/vreplication/vstream_test.go diff --git a/.github/workflows/cluster_endtoend_vstream_failover.yml b/.github/workflows/cluster_endtoend_vstream_failover.yml new file mode 100644 index 0000000000..45c43fb1b8 --- /dev/null +++ b/.github/workflows/cluster_endtoend_vstream_failover.yml @@ -0,0 +1,50 @@ +# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows" + +name: Cluster (vstream_failover) +on: [push, pull_request] +jobs: + + build: + name: Run endtoend tests on Cluster (vstream_failover) + runs-on: ubuntu-18.04 + + steps: + - name: Set up Go + uses: actions/setup-go@v1 + with: + go-version: 1.16 + + - name: Tune the OS + run: | + echo '1024 65535' | sudo tee -a /proc/sys/net/ipv4/ip_local_port_range + + # TEMPORARY WHILE GITHUB FIXES THIS https://github.com/actions/virtual-environments/issues/3185 + - name: Add the current IP address, long hostname and short hostname record to /etc/hosts file + run: | + echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts + # DON'T FORGET TO REMOVE CODE ABOVE WHEN ISSUE IS ADRESSED! + + - name: Check out code + uses: actions/checkout@v2 + + - name: Get dependencies + run: | + sudo apt-get update + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb + sudo apt-get install -y gnupg2 + sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb + sudo apt-get update + sudo apt-get install percona-xtrabackup-24 + + - name: Run cluster endtoend test + timeout-minutes: 30 + run: | + source build.env + eatmydata -- go run test.go -docker=false -print-log -follow -shard vstream_failover diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go new file mode 100644 index 0000000000..a0d7c9323b --- /dev/null +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -0,0 +1,168 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "context" + "fmt" + "io" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/log" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + _ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient" + "vitess.io/vitess/go/vt/vtgate/evalengine" + _ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" + + "vitess.io/vitess/go/vt/vtgate/vtgateconn" +) + +// Validates that a reparent while VStream API is streaming doesn't miss any events +// We stream only from the primary and while streaming we reparent to a replica and then back to the original primary +func TestVStreamFailover(t *testing.T) { + defaultCellName := "zone1" + cells := []string{"zone1"} + allCellNames = "zone1" + vc = NewVitessCluster(t, "TestVStreamFailover", cells, mainClusterConfig) + + require.NotNil(t, vc) + defaultReplicas = 2 + defaultRdonly = 0 + defer vc.TearDown(t) + + defaultCell = vc.Cells[defaultCellName] + vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100) + vtgate = defaultCell.Vtgates[0] + require.NotNil(t, vtgate) + vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "product", "0"), 3) + vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + + verifyClusterHealth(t, vc) + insertInitialData(t) + ctx := context.Background() + vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) + if err != nil { + log.Fatal(err) + } + defer vstreamConn.Close() + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "product", + Shard: "0", + Gtid: "", + }}} + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "customer", + Filter: "select * from customer", + }}, + } + flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600} + done := false + + // don't insert while PRS is going on + var insertMu sync.Mutex + stopInserting := false + id := 0 + + // first goroutine that keeps inserting rows into table being streamed until some time elapses after second PRS + go func() { + for { + if stopInserting { + return + } + insertMu.Lock() + id++ + execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id)) + insertMu.Unlock() + } + }() + + // stream events from the VStream API + reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter, flags) + require.NoError(t, err) + var numRowEvents int64 + // second goroutine that continuously receives events via VStream API and should be resilient to the two PRS events + go func() { + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + if ev.Type == binlogdatapb.VEventType_ROW { + numRowEvents++ + } + } + case io.EOF: + log.Infof("Stream Ended") + default: + log.Infof("%s:: remote error: %v", time.Now(), err) + } + + if done { + return + } + } + }() + + // run two PRS after one second each, wait for events to be received and exit test + ticker := time.NewTicker(1 * time.Second) + tickCount := 0 + // this for loop implements a mini state machine that does the two PRSs, waits a bit after the second PRS, + // stops the insertions, waits for a bit again for the vstream to catchup and signals the test to stop + for { + <-ticker.C + tickCount++ + switch tickCount { + case 1: + insertMu.Lock() + output, err := vc.VtctlClient.ExecuteCommandWithOutput("PlannedReparentShard", "-keyspace_shard=product/0", "-new_master=zone1-101") + insertMu.Unlock() + log.Infof("output of first PRS is %s", output) + require.NoError(t, err) + case 2: + insertMu.Lock() + output, err := vc.VtctlClient.ExecuteCommandWithOutput("PlannedReparentShard", "-keyspace_shard=product/0", "-new_master=zone1-100") + insertMu.Unlock() + log.Infof("output of second PRS is %s", output) + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + stopInserting = true + time.Sleep(2 * time.Second) + done = true + } + + if done { + break + } + } + qr := execVtgateQuery(t, vtgateConn, "product", "select count(*) from customer") + require.NotNil(t, qr) + // total number of row events found by the VStream API should match the rows inserted + insertedRows, err := evalengine.ToInt64(qr.Rows[0][0]) + require.NoError(t, err) + require.Equal(t, insertedRows, numRowEvents) +} diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index bb5bf3005d..438108bc6e 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -181,6 +181,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn defer cancel() si, err := tp.ts.GetShard(shortCtx, tp.keyspace, tp.shard) if err != nil { + log.Errorf("error getting shard %s/%s: %s", tp.keyspace, tp.shard, err.Error()) return nil } aliases = append(aliases, si.MasterAlias) diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index eb9ef29074..3912a59625 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -228,6 +228,7 @@ func NewTabletInfo(tablet *topodatapb.Tablet, version Version) *TabletInfo { func (ts *Server) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*TabletInfo, error) { conn, err := ts.ConnForCell(ctx, alias.Cell) if err != nil { + log.Errorf("Unable to get connection for cell %s", alias.Cell) return nil, err } @@ -238,6 +239,7 @@ func (ts *Server) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) tabletPath := path.Join(TabletsPath, topoproto.TabletAliasString(alias), TabletFile) data, version, err := conn.Get(ctx, tabletPath) if err != nil { + log.Errorf("unable to connect to tablet %s: %s", alias, err) return nil, err } tablet := &topodatapb.Tablet{} diff --git a/go/vt/vtgate/discoverygateway.go b/go/vt/vtgate/discoverygateway.go index c71620a992..bde9683015 100644 --- a/go/vt/vtgate/discoverygateway.go +++ b/go/vt/vtgate/discoverygateway.go @@ -243,6 +243,11 @@ func (dg *DiscoveryGateway) withRetry(ctx context.Context, target *querypb.Targe var err error invalidTablets := make(map[string]bool) + if target == nil { + err = fmt.Errorf("withRetry called with nil target") + log.Errorf(err.Error()) + return err + } if len(discovery.AllowedTabletTypes) > 0 { var match bool for _, allowed := range discovery.AllowedTabletTypes { diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 9c1115018f..c53cc70328 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -23,11 +23,14 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/discovery" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/topo" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" "google.golang.org/protobuf/proto" - "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -92,6 +95,7 @@ type vstream struct { eventCh chan []*binlogdatapb.VEvent heartbeatInterval uint32 + ts *topo.Server } type journalEvent struct { @@ -114,6 +118,14 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta if err != nil { return err } + ts, err := vsm.toposerv.GetTopoServer() + if err != nil { + return err + } + if ts == nil { + log.Errorf("unable to get topo server in VStream()") + return fmt.Errorf("unable to get topo server") + } vs := &vstream{ vgtid: vgtid, tabletType: tabletType, @@ -127,6 +139,7 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta vsm: vsm, eventCh: make(chan []*binlogdatapb.VEvent), heartbeatInterval: flags.GetHeartbeatInterval(), + ts: ts, } return vs.stream(ctx) } @@ -286,6 +299,7 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard // Set the error on exit. First one wins. if err != nil { + log.Errorf("Error in vstream for %+v: %s", sgtid, err) vs.once.Do(func() { vs.err = err vs.cancel() @@ -404,25 +418,66 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var eventss [][]*binlogdatapb.VEvent var err error - rss := vs.rss - if vs.resolver != nil { - rss, err = vs.resolver.ResolveDestination(ctx, sgtid.Keyspace, vs.tabletType, key.DestinationShard(sgtid.Shard)) - if err != nil { - return err - } + tp, err := discovery.NewTabletPicker(vs.ts, []string{vs.vsm.cell}, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()) + if err != nil { + log.Errorf(err.Error()) + return err } - if len(rss) != 1 { - // Unreachable. - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected number or shards: %v", rss) + tablet, err := tp.PickForStreaming(ctx) + if err != nil { + log.Errorf(err.Error()) + return err } + log.Infof("Picked tablet %s for for %s/%s/%s/%s", tablet.Alias.String(), vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()) + target := &querypb.Target{ + Keyspace: sgtid.Keyspace, + Shard: sgtid.Shard, + TabletType: vs.tabletType, + Cell: vs.vsm.cell, + } + tabletConn, err := vs.vsm.resolver.GetGateway().QueryServiceByAlias(tablet.Alias, target) + if err != nil { + log.Errorf(err.Error()) + return err + } + + errCh := make(chan string, 1) + go func() { + tabletConn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error { + var errString string + if ctx.Err() != nil { + return fmt.Errorf("context has ended") + } + if shr == nil || shr.RealtimeStats == nil || shr.Target == nil { + return fmt.Errorf("health check failed") + } + if vs.tabletType != shr.Target.TabletType { + errString = fmt.Sprintf("tablet %s is no longer healthy: %s, restarting vstream", + tablet.Alias, shr.RealtimeStats.HealthError) + } else if shr.RealtimeStats.HealthError != "" { + errString = fmt.Sprintf("tablet type has changed from %s to %s, restarting vstream", + vs.tabletType, shr.Target.TabletType) + } + if errString != "" { + errCh <- errString + return fmt.Errorf(errString) + } + return nil + }) + }() + + log.Infof("Starting to vstream from %s", tablet.Alias.String()) // Safe to access sgtid.Gtid here (because it can't change until streaming begins). - err = rss[0].Gateway.VStream(ctx, rss[0].Target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error { + err = tabletConn.VStream(ctx, target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error { // We received a valid event. Reset error count. errCount = 0 select { case <-ctx.Done(): return ctx.Err() + case streamErr := <-errCh: + log.Warningf("Tablet state changed: %s, attempting to restart", streamErr) + return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr) case <-journalDone: // Unreachable. // This can happen if a server misbehaves and does not end diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index d51ef388fd..ec79285a03 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/topo" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" @@ -45,29 +47,6 @@ import ( var mu sync.Mutex -func getVEvents(shard string, count, idx int64) []*binlogdatapb.VEvent { - mu.Lock() - defer mu.Unlock() - var vevents []*binlogdatapb.VEvent - var i int64 - currentTime := time.Now().Unix() - for i = count; i > 0; i-- { - j := i + idx - vevents = append(vevents, &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_GTID, Gtid: fmt.Sprintf("gtid-%s-%d", shard, j), - Timestamp: currentTime - j, - CurrentTime: currentTime * 1e9, - }) - - vevents = append(vevents, &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_COMMIT, - Timestamp: currentTime - j, - CurrentTime: currentTime * 1e9, - }) - } - return vevents -} - func TestVStreamSkew(t *testing.T) { stream := func(conn *sandboxconn.SandboxConn, shard string, count, idx int64) { vevents := getVEvents(shard, count, idx) @@ -94,33 +73,36 @@ func TestVStreamSkew(t *testing.T) { previousDelays := int64(0) vstreamSkewDelayCount = stats.NewCounter("VStreamEventsDelayedBySkewAlignment", "Number of events that had to wait because the skew across shards was too high") + + cell := "aa" for idx, tcase := range tcases { t.Run("", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - name := fmt.Sprintf("TestVStreamSkew-%d", idx) - _ = createSandbox(name) + ks := fmt.Sprintf("TestVStreamSkew-%d", idx) + _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck() - vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa") - shard0 := "-20" - shard1 := "20-40" + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) + vsm := newTestVStreamManager(hc, st, cell) vgtid := &binlogdatapb.VGtid{ShardGtids: []*binlogdatapb.ShardGtid{}} want := int64(0) var sbc0, sbc1 *sandboxconn.SandboxConn if tcase.shard0idx != 0 { - sbc0 = hc.AddTestTablet("aa", "1.1.1.1", 1001, name, shard0, topodatapb.TabletType_MASTER, true, 1, nil) + sbc0 = hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) sbc0.VStreamCh = make(chan *binlogdatapb.VEvent) want += 2 * tcase.numEventsPerShard - vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: name, Gtid: "pos", Shard: "-20"}) - go stream(sbc0, shard0, tcase.numEventsPerShard, tcase.shard0idx) + vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: ks, Gtid: "pos", Shard: "-20"}) + go stream(sbc0, "-20", tcase.numEventsPerShard, tcase.shard0idx) } if tcase.shard1idx != 0 { - sbc1 = hc.AddTestTablet("aa", "1.1.1.1", 1002, name, shard1, topodatapb.TabletType_MASTER, true, 1, nil) + sbc1 = hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "20-40", sbc1.Tablet()) sbc1.VStreamCh = make(chan *binlogdatapb.VEvent) want += 2 * tcase.numEventsPerShard - vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: name, Gtid: "pos", Shard: "20-40"}) - go stream(sbc1, shard1, tcase.numEventsPerShard, tcase.shard1idx) + vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: ks, Gtid: "pos", Shard: "20-40"}) + go stream(sbc1, "20-40", tcase.numEventsPerShard, tcase.shard1idx) } ch := startVStream(ctx, t, vsm, vgtid, &vtgatepb.VStreamFlags{MinimizeSkew: true}) var receivedEvents []*binlogdatapb.VEvent @@ -142,12 +124,15 @@ func TestVStreamSkew(t *testing.T) { func TestVStreamEvents(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - - name := "TestVStream" - _ = createSandbox(name) + cell := "aa" + ks := "TestVStream" + _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck() - vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa") - sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) + + vsm := newTestVStreamManager(hc, st, cell) + sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) send1 := []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_GTID, Gtid: "gtid01"}, @@ -158,7 +143,7 @@ func TestVStreamEvents(t *testing.T) { want1 := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "gtid01", }}, @@ -176,7 +161,7 @@ func TestVStreamEvents(t *testing.T) { want2 := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "gtid02", }}, @@ -187,7 +172,7 @@ func TestVStreamEvents(t *testing.T) { vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "pos", }}, @@ -217,12 +202,16 @@ func TestVStreamChunks(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - name := "TestVStream" - _ = createSandbox(name) + ks := "TestVStream" + cell := "aa" + _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck() - vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa") - sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil) - sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, name, "20-40", topodatapb.TabletType_MASTER, true, 1, nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) + vsm := newTestVStreamManager(hc, st, cell) + sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) + sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "20-40", sbc1.Tablet()) for i := 0; i < 100; i++ { sbc0.AddVStreamEvents([]*binlogdatapb.VEvent{{Type: binlogdatapb.VEventType_DDL}}, nil) @@ -236,11 +225,11 @@ func TestVStreamChunks(t *testing.T) { ddlCount := 0 vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "pos", }, { - Keyspace: name, + Keyspace: ks, Shard: "20-40", Gtid: "pos", }}, @@ -286,13 +275,16 @@ func TestVStreamChunks(t *testing.T) { func TestVStreamMulti(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - - name := "TestVStream" - _ = createSandbox(name) + cell := "aa" + ks := "TestVStream" + _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck() - vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa") - sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil) - sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, name, "20-40", topodatapb.TabletType_MASTER, true, 1, nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) + vsm := newTestVStreamManager(hc, st, "aa") + sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) + sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "20-40", sbc1.Tablet()) send0 := []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_GTID, Gtid: "gtid01"}, @@ -308,11 +300,11 @@ func TestVStreamMulti(t *testing.T) { vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "pos", }, { - Keyspace: name, + Keyspace: ks, Shard: "20-40", Gtid: "pos", }}, @@ -328,11 +320,11 @@ func TestVStreamMulti(t *testing.T) { } want := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "gtid01", }, { - Keyspace: name, + Keyspace: ks, Shard: "20-40", Gtid: "gtid02", }}, @@ -346,11 +338,15 @@ func TestVStreamRetry(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - name := "TestVStream" - _ = createSandbox(name) + cell := "aa" + ks := "TestVStream" + _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck() - vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa") - sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil) + + st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) + vsm := newTestVStreamManager(hc, st, "aa") + sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) commit := []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_COMMIT}, } @@ -363,7 +359,7 @@ func TestVStreamRetry(t *testing.T) { count := 0 vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "pos", }}, @@ -383,12 +379,14 @@ func TestVStreamRetry(t *testing.T) { func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - - name := "TestVStream" - _ = createSandbox(name) + cell := "aa" + ks := "TestVStream" + _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck() - vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa") - sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) + vsm := newTestVStreamManager(hc, st, cell) + sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) send0 := []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_HEARTBEAT}, @@ -406,7 +404,7 @@ func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) { want := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "gtid01", }}, @@ -419,7 +417,7 @@ func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) { vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "pos", }}, @@ -431,14 +429,18 @@ func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) { func TestVStreamJournalOneToMany(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - - name := "TestVStream" - _ = createSandbox(name) + cell := "aa" + ks := "TestVStream" + _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck() - vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa") - sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil) - sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, name, "-10", topodatapb.TabletType_MASTER, true, 1, nil) - sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, name, "10-20", topodatapb.TabletType_MASTER, true, 1, nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) + vsm := newTestVStreamManager(hc, st, "aa") + sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) + sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-10", sbc1.Tablet()) + sbc2 := hc.AddTestTablet(cell, "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "10-20", sbc2.Tablet()) send1 := []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_GTID, Gtid: "gtid01"}, @@ -449,7 +451,7 @@ func TestVStreamJournalOneToMany(t *testing.T) { want1 := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "gtid01", }}, @@ -465,16 +467,16 @@ func TestVStreamJournalOneToMany(t *testing.T) { Id: 1, MigrationType: binlogdatapb.MigrationType_SHARDS, ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-10", Gtid: "pos10", }, { - Keyspace: name, + Keyspace: ks, Shard: "10-20", Gtid: "pos1020", }}, Participants: []*binlogdatapb.KeyspaceShard{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", }}, }}, @@ -501,7 +503,7 @@ func TestVStreamJournalOneToMany(t *testing.T) { vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "pos", }}, @@ -517,11 +519,11 @@ func TestVStreamJournalOneToMany(t *testing.T) { Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-10", Gtid: "gtid03", }, { - Keyspace: name, + Keyspace: ks, Shard: "10-20", Gtid: "gtid04", }}, @@ -537,13 +539,18 @@ func TestVStreamJournalManyToOne(t *testing.T) { defer cancel() // Variable names are maintained like in OneToMany, but order is different. - name := "TestVStream" - _ = createSandbox(name) + ks := "TestVStream" + cell := "aa" + _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck() - vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa") - sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil) - sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, name, "-10", topodatapb.TabletType_MASTER, true, 1, nil) - sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, name, "10-20", topodatapb.TabletType_MASTER, true, 1, nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) + vsm := newTestVStreamManager(hc, st, cell) + sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) + sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-10", sbc1.Tablet()) + sbc2 := hc.AddTestTablet(cell, "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "10-20", sbc2.Tablet()) send3 := []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_GTID, Gtid: "gtid03"}, @@ -566,15 +573,15 @@ func TestVStreamJournalManyToOne(t *testing.T) { Id: 1, MigrationType: binlogdatapb.MigrationType_SHARDS, ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "pos20", }}, Participants: []*binlogdatapb.KeyspaceShard{{ - Keyspace: name, + Keyspace: ks, Shard: "-10", }, { - Keyspace: name, + Keyspace: ks, Shard: "10-20", }}, }}, @@ -594,7 +601,7 @@ func TestVStreamJournalManyToOne(t *testing.T) { want1 := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "gtid01", }}, @@ -608,11 +615,11 @@ func TestVStreamJournalManyToOne(t *testing.T) { vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-10", Gtid: "pos10", }, { - Keyspace: name, + Keyspace: ks, Shard: "10-20", Gtid: "pos1020", }}, @@ -626,11 +633,11 @@ func TestVStreamJournalManyToOne(t *testing.T) { Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-10", Gtid: "gtid03", }, { - Keyspace: name, + Keyspace: ks, Shard: "10-20", Gtid: "gtid04", }}, @@ -646,11 +653,14 @@ func TestVStreamJournalNoMatch(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - name := "TestVStream" - _ = createSandbox(name) + ks := "TestVStream" + cell := "aa" + _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck() - vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa") - sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) + vsm := newTestVStreamManager(hc, st, "aa") + sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) send1 := []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_GTID, Gtid: "gtid01"}, @@ -661,7 +671,7 @@ func TestVStreamJournalNoMatch(t *testing.T) { want1 := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "gtid01", }}, @@ -683,7 +693,7 @@ func TestVStreamJournalNoMatch(t *testing.T) { wantjn1 := &binlogdata.VStreamResponse{Events: []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "jn1", }}, @@ -699,7 +709,7 @@ func TestVStreamJournalNoMatch(t *testing.T) { want2 := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "gtid02", }}, @@ -713,15 +723,15 @@ func TestVStreamJournalNoMatch(t *testing.T) { Id: 2, MigrationType: binlogdatapb.MigrationType_SHARDS, ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "c0-", Gtid: "posc0", }}, Participants: []*binlogdatapb.KeyspaceShard{{ - Keyspace: name, + Keyspace: ks, Shard: "c0-e0", }, { - Keyspace: name, + Keyspace: ks, Shard: "e0-", }}, }}, @@ -731,7 +741,7 @@ func TestVStreamJournalNoMatch(t *testing.T) { wantjn2 := &binlogdata.VStreamResponse{Events: []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "jn2", }}, @@ -747,7 +757,7 @@ func TestVStreamJournalNoMatch(t *testing.T) { want3 := &binlogdatapb.VStreamResponse{Events: []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "gtid03", }}, @@ -758,7 +768,7 @@ func TestVStreamJournalNoMatch(t *testing.T) { vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "pos", }}, @@ -772,27 +782,31 @@ func TestVStreamJournalPartialMatch(t *testing.T) { defer cancel() // Variable names are maintained like in OneToMany, but order is different.1 - name := "TestVStream" - _ = createSandbox(name) + ks := "TestVStream" + cell := "aa" + _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck() - vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa") - _ = hc.AddTestTablet("aa", "1.1.1.1", 1002, name, "-10", topodatapb.TabletType_MASTER, true, 1, nil) - sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, name, "10-20", topodatapb.TabletType_MASTER, true, 1, nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) + vsm := newTestVStreamManager(hc, st, "aa") + sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-10", sbc1.Tablet()) + sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "10-20", sbc2.Tablet()) send := []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_JOURNAL, Journal: &binlogdatapb.Journal{ Id: 1, MigrationType: binlogdatapb.MigrationType_SHARDS, ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "10-30", Gtid: "pos1040", }}, Participants: []*binlogdatapb.KeyspaceShard{{ - Keyspace: name, + Keyspace: ks, Shard: "10-20", }, { - Keyspace: name, + Keyspace: ks, Shard: "20-30", }}, }}, @@ -801,11 +815,11 @@ func TestVStreamJournalPartialMatch(t *testing.T) { vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-10", Gtid: "pos10", }, { - Keyspace: name, + Keyspace: ks, Shard: "10-20", Gtid: "pos1020", }}, @@ -825,15 +839,15 @@ func TestVStreamJournalPartialMatch(t *testing.T) { Id: 1, MigrationType: binlogdatapb.MigrationType_SHARDS, ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "10-30", Gtid: "pos1040", }}, Participants: []*binlogdatapb.KeyspaceShard{{ - Keyspace: name, + Keyspace: ks, Shard: "20-30", }, { - Keyspace: name, + Keyspace: ks, Shard: "10-20", }}, }}, @@ -994,14 +1008,17 @@ func TestResolveVStreamParams(t *testing.T) { } func TestVStreamIdleHeartbeat(t *testing.T) { - name := "TestVStream" - _ = createSandbox(name) + cell := "aa" + ks := "TestVStream" + _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck() - vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa") - hc.AddTestTablet("aa", "1.1.1.1", 1001, name, "-20", topodatapb.TabletType_MASTER, true, 1, nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) + vsm := newTestVStreamManager(hc, st, cell) + sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_MASTER, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: name, + Keyspace: ks, Shard: "-20", Gtid: "pos", }}, @@ -1069,6 +1086,7 @@ func verifyEvents(t *testing.T, ch <-chan *binlogdatapb.VStreamResponse, wants . t.Helper() for i, want := range wants { got := <-ch + require.NotNil(t, got) for _, event := range got.Events { event.Timestamp = 0 } @@ -1077,3 +1095,47 @@ func verifyEvents(t *testing.T, ch <-chan *binlogdatapb.VStreamResponse, wants . } } } + +func getVEvents(shard string, count, idx int64) []*binlogdatapb.VEvent { + mu.Lock() + defer mu.Unlock() + var vevents []*binlogdatapb.VEvent + var i int64 + currentTime := time.Now().Unix() + for i = count; i > 0; i-- { + j := i + idx + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_GTID, Gtid: fmt.Sprintf("gtid-%s-%d", shard, j), + Timestamp: currentTime - j, + CurrentTime: currentTime * 1e9, + }) + + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_COMMIT, + Timestamp: currentTime - j, + CurrentTime: currentTime * 1e9, + }) + } + return vevents +} + +func getSandboxTopo(ctx context.Context, cell string, keyspace string, shards []string) *sandboxTopo { + st := newSandboxForCells([]string{cell}) + ts := st.topoServer + ts.CreateCellInfo(ctx, cell, &topodatapb.CellInfo{}) + ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}) + for _, shard := range shards { + ts.CreateShard(ctx, keyspace, shard) + } + return st +} + +func addTabletToSandboxTopo(t *testing.T, st *sandboxTopo, ks, shard string, tablet *topodatapb.Tablet) { + _, err := st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { + si.MasterAlias = tablet.Alias + return nil + }) + require.NoError(t, err) + err = st.topoServer.CreateTablet(ctx, tablet) + require.NoError(t, err) +} diff --git a/test/ci_workflow_gen.go b/test/ci_workflow_gen.go index cf6d6bde6d..a5766a5d17 100644 --- a/test/ci_workflow_gen.go +++ b/test/ci_workflow_gen.go @@ -54,6 +54,7 @@ var ( "vreplication_basic", "vreplication_multicell", "vreplication_cellalias", + "vstream_failover", "vreplication_v2", "onlineddl_ghost", "onlineddl_vrepl", diff --git a/test/config.json b/test/config.json index 36f3e353b3..8908fae82b 100644 --- a/test/config.json +++ b/test/config.json @@ -875,7 +875,7 @@ "Command": [], "Manual": false, "Shard": "vreplication_multicell", - "RetryMax": 0, + "RetryMax": 1, "Tags": [] }, "vreplication_materialize": { @@ -884,7 +884,7 @@ "Command": [], "Manual": false, "Shard": "vreplication_multicell", - "RetryMax": 0, + "RetryMax": 1, "Tags": [] }, "vreplication_cellalias": { @@ -893,7 +893,7 @@ "Command": [], "Manual": false, "Shard": "vreplication_cellalias", - "RetryMax": 0, + "RetryMax": 1, "Tags": [] }, "vreplication_basic": { @@ -902,7 +902,16 @@ "Command": [], "Manual": false, "Shard": "vreplication_basic", - "RetryMax": 0, + "RetryMax": 1, + "Tags": [] + }, + "vstream_failover": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "VStreamFailover"], + "Command": [], + "Manual": false, + "Shard": "vstream_failover", + "RetryMax": 1, "Tags": [] }, "vtorc": { @@ -929,7 +938,7 @@ "Command": [], "Manual": false, "Shard": "vreplication_v2", - "RetryMax": 0, + "RetryMax": 1, "Tags": [] }, "vreplication_migrate": { @@ -938,7 +947,16 @@ "Command": [], "Manual": false, "Shard": "vreplication_migrate", - "RetryMax": 0, + "RetryMax": 1, + "Tags": [] + }, + "vstream_failover": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVStreamFailover"], + "Command": [], + "Manual": false, + "Shard": "vstream_failover", + "RetryMax": 1, "Tags": [] } }