diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index b4d9326206..dcc7166698 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -110,6 +110,12 @@ var ( ErrUnknownCommand = errors.New("unknown command") ) +var ( + healthCheckTopologyRefresh = flag.Duration("vtctl_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology") + healthcheckRetryDelay = flag.Duration("vtctl_healthcheck_retry_delay", 5*time.Second, "delay before retrying a failed healthcheck") + healthCheckTimeout = flag.Duration("vtctl_healthcheck_timeout", time.Minute, "the health check timeout period") +) + type command struct { name string method func(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error @@ -275,6 +281,11 @@ var commands = []commandGroup{ {"FindAllShardsInKeyspace", commandFindAllShardsInKeyspace, "", "Displays all of the shards in the specified keyspace."}, + {"WaitForDrain", commandWaitForDrain, + "[-timeout ] ", + "Blocks until no new queries were observed on all tablets with the given tablet type in the specifed keyspace. " + + " This can be used as sanity check to ensure that the tablets were drained after running vtctl MigrateServedTypes " + + " and vtgate is no longer using them. If -timeout is set, it fails when the timeout is reached."}, }, }, { @@ -886,6 +897,37 @@ func commandRunHealthCheck(ctx context.Context, wr *wrangler.Wrangler, subFlags return wr.TabletManagerClient().RunHealthCheck(ctx, tabletInfo, servedType) } +func commandWaitForDrain(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { + var cells flagutil.StringListValue + subFlags.Var(&cells, "cells", "Specifies a comma-separated list of cells to look for tablets") + timeout := subFlags.Duration("timeout", 0*time.Second, "Timeout after which the command fails") + retryDelay := subFlags.Duration("retry_delay", 1*time.Second, "Time to wait between two checks") + + if err := subFlags.Parse(args); err != nil { + return err + } + if subFlags.NArg() != 2 { + return fmt.Errorf("The and arguments are both required for the WaitForDrain command.") + } + if *timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, *timeout) + defer cancel() + } + + keyspace, shard, err := topoproto.ParseKeyspaceShard(subFlags.Arg(0)) + if err != nil { + return err + } + servedType, err := parseServingTabletType3(subFlags.Arg(1)) + if err != nil { + return err + } + + return wr.WaitForDrain(ctx, cells, keyspace, shard, servedType, + *retryDelay, *healthCheckTopologyRefresh, *healthcheckRetryDelay, *healthCheckTimeout) +} + func commandSleep(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { if err := subFlags.Parse(args); err != nil { return err diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index bd232a8d93..5882639ca0 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -6,11 +6,13 @@ package wrangler import ( "fmt" + "strings" "sync" "time" "github.com/youtube/vitess/go/event" "github.com/youtube/vitess/go/vt/concurrency" + "github.com/youtube/vitess/go/vt/discovery" "github.com/youtube/vitess/go/vt/tabletmanager/actionnode" "github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/topo/topoproto" @@ -447,6 +449,127 @@ func (wr *Wrangler) migrateServedTypes(ctx context.Context, keyspace string, sou return nil } +// WaitForDrain blocks until the selected tablets (cells/keyspace/shard/tablet_type) +// have reported a QPS rate of 0.0. +// NOTE: This is just an observation of one point in time and no guarantee that +// the tablet was actually drained. At later times, a QPS rate > 0.0 could still +// be observed. +func (wr *Wrangler) WaitForDrain(ctx context.Context, cells []string, keyspace, shard string, servedType topodatapb.TabletType, + retryDelay, healthCheckTopologyRefresh, healthcheckRetryDelay, healthCheckTimeout time.Duration) error { + if len(cells) == 0 { + // Retrieve list of cells for the shard from the topology. + shardInfo, err := wr.ts.GetShard(ctx, keyspace, shard) + if err != nil { + return fmt.Errorf("failed to retrieve list of all cells. GetShard() failed: %v", err) + } + cells = shardInfo.Cells + } + + // Check all cells in parallel. + wg := sync.WaitGroup{} + rec := concurrency.AllErrorRecorder{} + for _, cell := range cells { + wg.Add(1) + go func(cell string) { + defer wg.Done() + rec.RecordError(wr.waitForDrainInCell(ctx, cell, keyspace, shard, servedType, + retryDelay, healthCheckTopologyRefresh, healthcheckRetryDelay, healthCheckTimeout)) + }(cell) + } + wg.Wait() + + return rec.Error() +} + +func (wr *Wrangler) waitForDrainInCell(ctx context.Context, cell, keyspace, shard string, servedType topodatapb.TabletType, + retryDelay, healthCheckTopologyRefresh, healthcheckRetryDelay, healthCheckTimeout time.Duration) error { + hc := discovery.NewHealthCheck(healthCheckTimeout /* connectTimeout */, healthcheckRetryDelay, healthCheckTimeout, cell) + defer hc.Close() + watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), hc, cell, keyspace, shard, healthCheckTopologyRefresh, 5 /* topoReadConcurrency */) + defer watcher.Stop() + + if err := discovery.WaitForEndPoints(ctx, hc, cell, keyspace, shard, []topodatapb.TabletType{servedType}); err != nil { + return fmt.Errorf("%v: error waiting for initial %v endpoints for %v/%v: %v", cell, servedType, keyspace, shard, err) + } + + wr.Logger().Infof("%v: Waiting for %.1f seconds to make sure that the discovery module retrieves healthcheck information from all tablets.", + cell, healthCheckTimeout.Seconds()) + // Wait at least for -vtctl_healthcheck_timeout to elapse to make sure that we + // see all healthy tablets. Otherwise, we might miss some tablets. + // It's safe to wait not longer for this because we would only miss slow + // tablets and vtgate would not serve from such tablets anyway. + time.Sleep(healthCheckTimeout) + + // Now check the QPS rate of all tablets until the timeout expires. + startTime := time.Now() + for { + healthyTabletsCount := 0 + // map key: tablet uid + drainedHealthyTablets := make(map[uint32]*discovery.EndPointStats) + notDrainedHealtyTablets := make(map[uint32]*discovery.EndPointStats) + + addrs := hc.GetEndPointStatsFromTarget(keyspace, shard, servedType) + healthyTabletsCount = 0 + for _, addr := range addrs { + // TODO(mberlin): Move this health check logic into a common function + // because other code uses it as well e.g. go/vt/worker/topo_utils.go. + if addr.Stats == nil || addr.Stats.HealthError != "" || addr.Stats.SecondsBehindMaster > 30 { + // not healthy + continue + } + + healthyTabletsCount++ + if addr.Stats.Qps == 0.0 { + drainedHealthyTablets[addr.EndPoint.Uid] = addr + } else { + notDrainedHealtyTablets[addr.EndPoint.Uid] = addr + } + } + + if len(drainedHealthyTablets) == healthyTabletsCount { + wr.Logger().Infof("%v: All %d healthy tablets were drained after %.1f seconds (not counting %.1f seconds for the initial wait).", + cell, healthyTabletsCount, time.Now().Sub(startTime).Seconds(), healthCheckTimeout.Seconds()) + break + } + + // Continue waiting, sleep in between. + deadlineString := "" + if d, ok := ctx.Deadline(); ok { + deadlineString = fmt.Sprintf(" up to %.1f more seconds", d.Sub(time.Now()).Seconds()) + } + wr.Logger().Infof("%v: Waiting%v for all healthy tablets to be drained (%d/%d done).", + cell, deadlineString, len(drainedHealthyTablets), healthyTabletsCount) + + timer := time.NewTimer(retryDelay) + select { + case <-ctx.Done(): + timer.Stop() + + var l []string + for _, eps := range notDrainedHealtyTablets { + l = append(l, formatEndpointStats(eps)) + } + return fmt.Errorf("%v: WaitForDrain failed for %v tablets in %v/%v. Only %d/%d tablets were drained. err: %v List of tablets which were not drained:\n%v", + cell, servedType, keyspace, shard, len(drainedHealthyTablets), healthyTabletsCount, ctx.Err(), strings.Join(l, "\n")) + case <-timer.C: + } + } + + return nil +} + +func formatEndpointStats(eps *discovery.EndPointStats) string { + webURL := "unknown http port" + if webPort, ok := eps.EndPoint.PortMap["vt"]; ok { + webURL = fmt.Sprintf("http://%v:%d/", eps.EndPoint.Host, webPort) + } + alias := &topodatapb.TabletAlias{ + Cell: eps.Cell, + Uid: eps.EndPoint.Uid, + } + return fmt.Sprintf("%v: %v stats: %v", topoproto.TabletAliasString(alias), webURL, eps.Stats) +} + // MigrateServedFrom is used during vertical splits to migrate a // served type from a keyspace to another. func (wr *Wrangler) MigrateServedFrom(ctx context.Context, keyspace, shard string, servedType topodatapb.TabletType, cells []string, reverse bool, filteredReplicationWaitTime time.Duration) error { diff --git a/go/vt/wrangler/testlib/vtctl_pipe.go b/go/vt/wrangler/testlib/vtctl_pipe.go index 0b77445fea..64281ee4ab 100644 --- a/go/vt/wrangler/testlib/vtctl_pipe.go +++ b/go/vt/wrangler/testlib/vtctl_pipe.go @@ -13,6 +13,7 @@ import ( "google.golang.org/grpc" + logutilpb "github.com/youtube/vitess/go/vt/proto/logutil" "github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/vtctl/grpcvtctlserver" "github.com/youtube/vitess/go/vt/vtctl/vtctlclient" @@ -83,3 +84,12 @@ func (vp *VtctlPipe) Run(args []string) error { } return errFunc() } + +// RunAndStreamOutput returns the output of the vtctl command as a channel. +// When the channcel is closed, the command did finish. +func (vp *VtctlPipe) RunAndStreamOutput(args []string) (<-chan *logutilpb.Event, vtctlclient.ErrFunc, error) { + actionTimeout := 30 * time.Second + ctx := context.Background() + + return vp.client.ExecuteVtctlCommand(ctx, args, actionTimeout) +} diff --git a/go/vt/wrangler/testlib/wait_for_drain_test.go b/go/vt/wrangler/testlib/wait_for_drain_test.go new file mode 100644 index 0000000000..9c27b12806 --- /dev/null +++ b/go/vt/wrangler/testlib/wait_for_drain_test.go @@ -0,0 +1,192 @@ +// Copyright 2016, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package testlib + +import ( + "flag" + "strings" + "testing" + + "golang.org/x/net/context" + + "github.com/gogo/protobuf/proto" + "github.com/youtube/vitess/go/vt/logutil" + "github.com/youtube/vitess/go/vt/tabletmanager/tmclient" + "github.com/youtube/vitess/go/vt/tabletserver/grpcqueryservice" + "github.com/youtube/vitess/go/vt/tabletserver/queryservice" + "github.com/youtube/vitess/go/vt/vttest/fakesqldb" + "github.com/youtube/vitess/go/vt/wrangler" + "github.com/youtube/vitess/go/vt/zktopo/zktestserver" + + querypb "github.com/youtube/vitess/go/vt/proto/query" + topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" +) + +// fakeQueryService is a QueryService implementation which allows to send +// custom StreamHealthResponse messages by adding them to a channel. +// Note that it only works with one connected client because messages going +// into "healthResponses" are not duplicated to all clients. +type fakeQueryService struct { + queryservice.ErrorQueryService + healthResponses chan *querypb.StreamHealthResponse + target querypb.Target +} + +func newFakeQueryService(target querypb.Target) *fakeQueryService { + return &fakeQueryService{ + healthResponses: make(chan *querypb.StreamHealthResponse, 10), + target: target, + } +} + +// StreamHealthRegister implements the QueryService interface. +// It sends all queued and future healthResponses to the connected client e.g. +// the healthcheck module. +func (q *fakeQueryService) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) { + go func() { + for shr := range q.healthResponses { + c <- shr + } + }() + return 0, nil +} + +// addHealthResponse adds a mocked health response to the buffer channel. +func (q *fakeQueryService) addHealthResponse(qps float64) { + q.healthResponses <- &querypb.StreamHealthResponse{ + Target: proto.Clone(&q.target).(*querypb.Target), + Serving: true, + RealtimeStats: &querypb.RealtimeStats{ + Qps: qps, + }, + } +} + +type drainDirective int + +const ( + DrainNoCells drainDirective = 1 << iota + DrainCell1 + DrainCell2 +) + +func TestWaitForDrain(t *testing.T) { + testWaitForDrain(t, "both cells selected and drained", "" /* cells */, DrainCell1|DrainCell2, nil /* expectedErrors */) +} + +func TestWaitForDrain_SelectCell1(t *testing.T) { + testWaitForDrain(t, "cell1 selected and drained", "cell1", DrainCell1, nil /* expectedErrors */) +} + +func TestWaitForDrain_NoCellDrained(t *testing.T) { + testWaitForDrain(t, "both cells selected and none drained", "" /* cells */, DrainNoCells, []string{"cell1-0000000000", "cell2-0000000001"}) +} + +func TestWaitForDrain_SelectCell1ButCell2Drained(t *testing.T) { + testWaitForDrain(t, "cell1 selected and cell2 drained", "cell1", DrainCell2, []string{"cell1-0000000000"}) +} + +func testWaitForDrain(t *testing.T, desc, cells string, drain drainDirective, expectedErrors []string) { + const keyspace = "ks" + const shard = "-80" + + db := fakesqldb.Register() + ts := zktestserver.New(t, []string{"cell1", "cell2"}) + wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) + flag.Set("vtctl_healthcheck_timeout", "0.25s") + vp := NewVtctlPipe(t, ts) + defer vp.Close() + + // Create keyspace. + if err := ts.CreateKeyspace(context.Background(), keyspace, &topodatapb.Keyspace{ + ShardingColumnName: "keyspace_id", + ShardingColumnType: topodatapb.KeyspaceIdType_UINT64, + }); err != nil { + t.Fatalf("CreateKeyspace failed: %v", err) + } + + t1 := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_REPLICA, db, + TabletKeyspaceShard(t, keyspace, shard)) + t2 := NewFakeTablet(t, wr, "cell2", 1, topodatapb.TabletType_REPLICA, db, + TabletKeyspaceShard(t, keyspace, shard)) + for _, ft := range []*FakeTablet{t1, t2} { + ft.StartActionLoop(t, wr) + defer ft.StopActionLoop(t) + } + + target := querypb.Target{ + Keyspace: keyspace, + Shard: shard, + TabletType: topodatapb.TabletType_REPLICA, + } + fqs1 := newFakeQueryService(target) + fqs2 := newFakeQueryService(target) + grpcqueryservice.RegisterForTest(t1.RPCServer, fqs1) + grpcqueryservice.RegisterForTest(t2.RPCServer, fqs2) + + // Run vtctl WaitForDrain and react depending on its output. + timeout := "0.5s" + if len(expectedErrors) == 0 { + // Tests with a positive outcome should have a more generous timeout to + // avoid flakyness. + timeout = "30s" + } + c, errFunc, err := vp.RunAndStreamOutput( + []string{"WaitForDrain", "-cells", cells, "-retry_delay", "100ms", "-timeout", timeout, + keyspace + "/" + shard, topodatapb.TabletType_REPLICA.String()}) + if err != nil { + t.Fatalf("VtctlPipe.RunAndStreamOutput() failed: %v", err) + } + + // QPS = 1.0. Tablets are not drained yet. + fqs1.addHealthResponse(1.0) + fqs2.addHealthResponse(1.0) + + for le := range c { + line := le.String() + t.Logf(line) + if strings.Contains(line, "for all healthy tablets to be drained") { + t.Log("Successfully waited for WaitForDrain to be blocked because tablets have a QPS rate > 0.0") + break + } else { + t.Log("waiting for WaitForDrain to see a QPS rate > 0.0") + } + } + + if drain&DrainCell1 != 0 { + fqs1.addHealthResponse(0.0) + } else { + fqs1.addHealthResponse(2.0) + } + if drain&DrainCell2 != 0 { + fqs2.addHealthResponse(0.0) + } else { + fqs2.addHealthResponse(2.0) + } + + // If a cell was drained, rate should go below <0.0 now. + // If not all selected cells were drained, this will end after "-timeout". + for le := range c { + vp.t.Logf(le.String()) + } + + err = errFunc() + if len(expectedErrors) == 0 { + if err != nil { + t.Fatalf("TestWaitForDrain: %v: no error expected but got: %v", desc, err) + } + // else: Success. + } else { + if err == nil { + t.Fatalf("TestWaitForDrain: %v: error expected but got none", desc) + } + for _, errString := range expectedErrors { + if !strings.Contains(err.Error(), errString) { + t.Fatalf("TestWaitForDrain: %v: error does not include expected string. got: %v want: %v", desc, err, errString) + } + } + // Success. + } +} diff --git a/test/tabletmanager.py b/test/tabletmanager.py index 96832e57fa..5d86ede5b6 100755 --- a/test/tabletmanager.py +++ b/test/tabletmanager.py @@ -412,7 +412,7 @@ class TestTabletManager(unittest.TestCase): health['realtime_stats']['health_error']) self.assertNotIn('serving', health) - # then restart replication, and write data, make sure we go back to healthy + # then restart replication, make sure we go back to healthy utils.run_vtctl(['StartSlave', tablet_62044.tablet_alias]) utils.wait_for_tablet_type(tablet_62044.tablet_alias, 'replica') @@ -442,6 +442,24 @@ class TestTabletManager(unittest.TestCase): self.assertEqual('0', data['target']['shard']) self.assertEqual(topodata_pb2.REPLICA, data['target']['tablet_type']) + # Test that VtTabletStreamHealth reports a QPS >0.0. + # Therefore, issue several reads first. + # NOTE: This may be potentially flaky because we'll observe a QPS >0.0 + # exactly "once" for the duration of one sampling interval (5s) and + # after that we'll see 0.0 QPS rates again. If this becomes actually + # flaky, we need to read continuously in a separate thread. + for _ in range(10): + tablet_62044.execute('select 1 from dual') + # This may take up to 5 seconds to become true because we sample the query + # counts for the rates only every 5 seconds (see query_service_stats.go). + timeout = 10 + while True: + health = utils.run_vtctl_json(['VtTabletStreamHealth', '-count', '1', + tablet_62044.tablet_alias]) + if health['realtime_stats'].get('qps', 0.0) > 0.0: + break + timeout = utils.wait_step('QPS >0.0 seen', timeout) + # kill the tablets tablet.kill_tablets([tablet_62344, tablet_62044])