vtctl: Add WaitForDrain command.

The command blocks until a QPS rate of 0.0 was observed on all selected tablets.

The command uses the healthcheck module to continously check the current health of all tablets.

Added an integration (wait_for_drain_test.go) and end2end test (in tabletmanager.py).
This commit is contained in:
Michael Berlin 2016-02-01 00:37:57 -08:00
Родитель 203649e33d
Коммит 598c3bf0ed
5 изменённых файлов: 386 добавлений и 1 удалений

Просмотреть файл

@ -110,6 +110,12 @@ var (
ErrUnknownCommand = errors.New("unknown command")
)
var (
healthCheckTopologyRefresh = flag.Duration("vtctl_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology")
healthcheckRetryDelay = flag.Duration("vtctl_healthcheck_retry_delay", 5*time.Second, "delay before retrying a failed healthcheck")
healthCheckTimeout = flag.Duration("vtctl_healthcheck_timeout", time.Minute, "the health check timeout period")
)
type command struct {
name string
method func(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error
@ -275,6 +281,11 @@ var commands = []commandGroup{
{"FindAllShardsInKeyspace", commandFindAllShardsInKeyspace,
"<keyspace>",
"Displays all of the shards in the specified keyspace."},
{"WaitForDrain", commandWaitForDrain,
"[-timeout <duration>] <keyspace/shard> <served tablet type>",
"Blocks until no new queries were observed on all tablets with the given tablet type in the specifed keyspace. " +
" This can be used as sanity check to ensure that the tablets were drained after running vtctl MigrateServedTypes " +
" and vtgate is no longer using them. If -timeout is set, it fails when the timeout is reached."},
},
},
{
@ -886,6 +897,37 @@ func commandRunHealthCheck(ctx context.Context, wr *wrangler.Wrangler, subFlags
return wr.TabletManagerClient().RunHealthCheck(ctx, tabletInfo, servedType)
}
func commandWaitForDrain(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
var cells flagutil.StringListValue
subFlags.Var(&cells, "cells", "Specifies a comma-separated list of cells to look for tablets")
timeout := subFlags.Duration("timeout", 0*time.Second, "Timeout after which the command fails")
retryDelay := subFlags.Duration("retry_delay", 1*time.Second, "Time to wait between two checks")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 2 {
return fmt.Errorf("The <keyspace/shard> and <tablet type> arguments are both required for the WaitForDrain command.")
}
if *timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, *timeout)
defer cancel()
}
keyspace, shard, err := topoproto.ParseKeyspaceShard(subFlags.Arg(0))
if err != nil {
return err
}
servedType, err := parseServingTabletType3(subFlags.Arg(1))
if err != nil {
return err
}
return wr.WaitForDrain(ctx, cells, keyspace, shard, servedType,
*retryDelay, *healthCheckTopologyRefresh, *healthcheckRetryDelay, *healthCheckTimeout)
}
func commandSleep(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
if err := subFlags.Parse(args); err != nil {
return err

Просмотреть файл

@ -6,11 +6,13 @@ package wrangler
import (
"fmt"
"strings"
"sync"
"time"
"github.com/youtube/vitess/go/event"
"github.com/youtube/vitess/go/vt/concurrency"
"github.com/youtube/vitess/go/vt/discovery"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
@ -447,6 +449,127 @@ func (wr *Wrangler) migrateServedTypes(ctx context.Context, keyspace string, sou
return nil
}
// WaitForDrain blocks until the selected tablets (cells/keyspace/shard/tablet_type)
// have reported a QPS rate of 0.0.
// NOTE: This is just an observation of one point in time and no guarantee that
// the tablet was actually drained. At later times, a QPS rate > 0.0 could still
// be observed.
func (wr *Wrangler) WaitForDrain(ctx context.Context, cells []string, keyspace, shard string, servedType topodatapb.TabletType,
retryDelay, healthCheckTopologyRefresh, healthcheckRetryDelay, healthCheckTimeout time.Duration) error {
if len(cells) == 0 {
// Retrieve list of cells for the shard from the topology.
shardInfo, err := wr.ts.GetShard(ctx, keyspace, shard)
if err != nil {
return fmt.Errorf("failed to retrieve list of all cells. GetShard() failed: %v", err)
}
cells = shardInfo.Cells
}
// Check all cells in parallel.
wg := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}
for _, cell := range cells {
wg.Add(1)
go func(cell string) {
defer wg.Done()
rec.RecordError(wr.waitForDrainInCell(ctx, cell, keyspace, shard, servedType,
retryDelay, healthCheckTopologyRefresh, healthcheckRetryDelay, healthCheckTimeout))
}(cell)
}
wg.Wait()
return rec.Error()
}
func (wr *Wrangler) waitForDrainInCell(ctx context.Context, cell, keyspace, shard string, servedType topodatapb.TabletType,
retryDelay, healthCheckTopologyRefresh, healthcheckRetryDelay, healthCheckTimeout time.Duration) error {
hc := discovery.NewHealthCheck(healthCheckTimeout /* connectTimeout */, healthcheckRetryDelay, healthCheckTimeout, cell)
defer hc.Close()
watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), hc, cell, keyspace, shard, healthCheckTopologyRefresh, 5 /* topoReadConcurrency */)
defer watcher.Stop()
if err := discovery.WaitForEndPoints(ctx, hc, cell, keyspace, shard, []topodatapb.TabletType{servedType}); err != nil {
return fmt.Errorf("%v: error waiting for initial %v endpoints for %v/%v: %v", cell, servedType, keyspace, shard, err)
}
wr.Logger().Infof("%v: Waiting for %.1f seconds to make sure that the discovery module retrieves healthcheck information from all tablets.",
cell, healthCheckTimeout.Seconds())
// Wait at least for -vtctl_healthcheck_timeout to elapse to make sure that we
// see all healthy tablets. Otherwise, we might miss some tablets.
// It's safe to wait not longer for this because we would only miss slow
// tablets and vtgate would not serve from such tablets anyway.
time.Sleep(healthCheckTimeout)
// Now check the QPS rate of all tablets until the timeout expires.
startTime := time.Now()
for {
healthyTabletsCount := 0
// map key: tablet uid
drainedHealthyTablets := make(map[uint32]*discovery.EndPointStats)
notDrainedHealtyTablets := make(map[uint32]*discovery.EndPointStats)
addrs := hc.GetEndPointStatsFromTarget(keyspace, shard, servedType)
healthyTabletsCount = 0
for _, addr := range addrs {
// TODO(mberlin): Move this health check logic into a common function
// because other code uses it as well e.g. go/vt/worker/topo_utils.go.
if addr.Stats == nil || addr.Stats.HealthError != "" || addr.Stats.SecondsBehindMaster > 30 {
// not healthy
continue
}
healthyTabletsCount++
if addr.Stats.Qps == 0.0 {
drainedHealthyTablets[addr.EndPoint.Uid] = addr
} else {
notDrainedHealtyTablets[addr.EndPoint.Uid] = addr
}
}
if len(drainedHealthyTablets) == healthyTabletsCount {
wr.Logger().Infof("%v: All %d healthy tablets were drained after %.1f seconds (not counting %.1f seconds for the initial wait).",
cell, healthyTabletsCount, time.Now().Sub(startTime).Seconds(), healthCheckTimeout.Seconds())
break
}
// Continue waiting, sleep in between.
deadlineString := ""
if d, ok := ctx.Deadline(); ok {
deadlineString = fmt.Sprintf(" up to %.1f more seconds", d.Sub(time.Now()).Seconds())
}
wr.Logger().Infof("%v: Waiting%v for all healthy tablets to be drained (%d/%d done).",
cell, deadlineString, len(drainedHealthyTablets), healthyTabletsCount)
timer := time.NewTimer(retryDelay)
select {
case <-ctx.Done():
timer.Stop()
var l []string
for _, eps := range notDrainedHealtyTablets {
l = append(l, formatEndpointStats(eps))
}
return fmt.Errorf("%v: WaitForDrain failed for %v tablets in %v/%v. Only %d/%d tablets were drained. err: %v List of tablets which were not drained:\n%v",
cell, servedType, keyspace, shard, len(drainedHealthyTablets), healthyTabletsCount, ctx.Err(), strings.Join(l, "\n"))
case <-timer.C:
}
}
return nil
}
func formatEndpointStats(eps *discovery.EndPointStats) string {
webURL := "unknown http port"
if webPort, ok := eps.EndPoint.PortMap["vt"]; ok {
webURL = fmt.Sprintf("http://%v:%d/", eps.EndPoint.Host, webPort)
}
alias := &topodatapb.TabletAlias{
Cell: eps.Cell,
Uid: eps.EndPoint.Uid,
}
return fmt.Sprintf("%v: %v stats: %v", topoproto.TabletAliasString(alias), webURL, eps.Stats)
}
// MigrateServedFrom is used during vertical splits to migrate a
// served type from a keyspace to another.
func (wr *Wrangler) MigrateServedFrom(ctx context.Context, keyspace, shard string, servedType topodatapb.TabletType, cells []string, reverse bool, filteredReplicationWaitTime time.Duration) error {

Просмотреть файл

@ -13,6 +13,7 @@ import (
"google.golang.org/grpc"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vtctl/grpcvtctlserver"
"github.com/youtube/vitess/go/vt/vtctl/vtctlclient"
@ -83,3 +84,12 @@ func (vp *VtctlPipe) Run(args []string) error {
}
return errFunc()
}
// RunAndStreamOutput returns the output of the vtctl command as a channel.
// When the channcel is closed, the command did finish.
func (vp *VtctlPipe) RunAndStreamOutput(args []string) (<-chan *logutilpb.Event, vtctlclient.ErrFunc, error) {
actionTimeout := 30 * time.Second
ctx := context.Background()
return vp.client.ExecuteVtctlCommand(ctx, args, actionTimeout)
}

Просмотреть файл

@ -0,0 +1,192 @@
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package testlib
import (
"flag"
"strings"
"testing"
"golang.org/x/net/context"
"github.com/gogo/protobuf/proto"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/tabletserver/grpcqueryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice"
"github.com/youtube/vitess/go/vt/vttest/fakesqldb"
"github.com/youtube/vitess/go/vt/wrangler"
"github.com/youtube/vitess/go/vt/zktopo/zktestserver"
querypb "github.com/youtube/vitess/go/vt/proto/query"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// fakeQueryService is a QueryService implementation which allows to send
// custom StreamHealthResponse messages by adding them to a channel.
// Note that it only works with one connected client because messages going
// into "healthResponses" are not duplicated to all clients.
type fakeQueryService struct {
queryservice.ErrorQueryService
healthResponses chan *querypb.StreamHealthResponse
target querypb.Target
}
func newFakeQueryService(target querypb.Target) *fakeQueryService {
return &fakeQueryService{
healthResponses: make(chan *querypb.StreamHealthResponse, 10),
target: target,
}
}
// StreamHealthRegister implements the QueryService interface.
// It sends all queued and future healthResponses to the connected client e.g.
// the healthcheck module.
func (q *fakeQueryService) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) {
go func() {
for shr := range q.healthResponses {
c <- shr
}
}()
return 0, nil
}
// addHealthResponse adds a mocked health response to the buffer channel.
func (q *fakeQueryService) addHealthResponse(qps float64) {
q.healthResponses <- &querypb.StreamHealthResponse{
Target: proto.Clone(&q.target).(*querypb.Target),
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
Qps: qps,
},
}
}
type drainDirective int
const (
DrainNoCells drainDirective = 1 << iota
DrainCell1
DrainCell2
)
func TestWaitForDrain(t *testing.T) {
testWaitForDrain(t, "both cells selected and drained", "" /* cells */, DrainCell1|DrainCell2, nil /* expectedErrors */)
}
func TestWaitForDrain_SelectCell1(t *testing.T) {
testWaitForDrain(t, "cell1 selected and drained", "cell1", DrainCell1, nil /* expectedErrors */)
}
func TestWaitForDrain_NoCellDrained(t *testing.T) {
testWaitForDrain(t, "both cells selected and none drained", "" /* cells */, DrainNoCells, []string{"cell1-0000000000", "cell2-0000000001"})
}
func TestWaitForDrain_SelectCell1ButCell2Drained(t *testing.T) {
testWaitForDrain(t, "cell1 selected and cell2 drained", "cell1", DrainCell2, []string{"cell1-0000000000"})
}
func testWaitForDrain(t *testing.T, desc, cells string, drain drainDirective, expectedErrors []string) {
const keyspace = "ks"
const shard = "-80"
db := fakesqldb.Register()
ts := zktestserver.New(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
flag.Set("vtctl_healthcheck_timeout", "0.25s")
vp := NewVtctlPipe(t, ts)
defer vp.Close()
// Create keyspace.
if err := ts.CreateKeyspace(context.Background(), keyspace, &topodatapb.Keyspace{
ShardingColumnName: "keyspace_id",
ShardingColumnType: topodatapb.KeyspaceIdType_UINT64,
}); err != nil {
t.Fatalf("CreateKeyspace failed: %v", err)
}
t1 := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_REPLICA, db,
TabletKeyspaceShard(t, keyspace, shard))
t2 := NewFakeTablet(t, wr, "cell2", 1, topodatapb.TabletType_REPLICA, db,
TabletKeyspaceShard(t, keyspace, shard))
for _, ft := range []*FakeTablet{t1, t2} {
ft.StartActionLoop(t, wr)
defer ft.StopActionLoop(t)
}
target := querypb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: topodatapb.TabletType_REPLICA,
}
fqs1 := newFakeQueryService(target)
fqs2 := newFakeQueryService(target)
grpcqueryservice.RegisterForTest(t1.RPCServer, fqs1)
grpcqueryservice.RegisterForTest(t2.RPCServer, fqs2)
// Run vtctl WaitForDrain and react depending on its output.
timeout := "0.5s"
if len(expectedErrors) == 0 {
// Tests with a positive outcome should have a more generous timeout to
// avoid flakyness.
timeout = "30s"
}
c, errFunc, err := vp.RunAndStreamOutput(
[]string{"WaitForDrain", "-cells", cells, "-retry_delay", "100ms", "-timeout", timeout,
keyspace + "/" + shard, topodatapb.TabletType_REPLICA.String()})
if err != nil {
t.Fatalf("VtctlPipe.RunAndStreamOutput() failed: %v", err)
}
// QPS = 1.0. Tablets are not drained yet.
fqs1.addHealthResponse(1.0)
fqs2.addHealthResponse(1.0)
for le := range c {
line := le.String()
t.Logf(line)
if strings.Contains(line, "for all healthy tablets to be drained") {
t.Log("Successfully waited for WaitForDrain to be blocked because tablets have a QPS rate > 0.0")
break
} else {
t.Log("waiting for WaitForDrain to see a QPS rate > 0.0")
}
}
if drain&DrainCell1 != 0 {
fqs1.addHealthResponse(0.0)
} else {
fqs1.addHealthResponse(2.0)
}
if drain&DrainCell2 != 0 {
fqs2.addHealthResponse(0.0)
} else {
fqs2.addHealthResponse(2.0)
}
// If a cell was drained, rate should go below <0.0 now.
// If not all selected cells were drained, this will end after "-timeout".
for le := range c {
vp.t.Logf(le.String())
}
err = errFunc()
if len(expectedErrors) == 0 {
if err != nil {
t.Fatalf("TestWaitForDrain: %v: no error expected but got: %v", desc, err)
}
// else: Success.
} else {
if err == nil {
t.Fatalf("TestWaitForDrain: %v: error expected but got none", desc)
}
for _, errString := range expectedErrors {
if !strings.Contains(err.Error(), errString) {
t.Fatalf("TestWaitForDrain: %v: error does not include expected string. got: %v want: %v", desc, err, errString)
}
}
// Success.
}
}

Просмотреть файл

@ -412,7 +412,7 @@ class TestTabletManager(unittest.TestCase):
health['realtime_stats']['health_error'])
self.assertNotIn('serving', health)
# then restart replication, and write data, make sure we go back to healthy
# then restart replication, make sure we go back to healthy
utils.run_vtctl(['StartSlave', tablet_62044.tablet_alias])
utils.wait_for_tablet_type(tablet_62044.tablet_alias, 'replica')
@ -442,6 +442,24 @@ class TestTabletManager(unittest.TestCase):
self.assertEqual('0', data['target']['shard'])
self.assertEqual(topodata_pb2.REPLICA, data['target']['tablet_type'])
# Test that VtTabletStreamHealth reports a QPS >0.0.
# Therefore, issue several reads first.
# NOTE: This may be potentially flaky because we'll observe a QPS >0.0
# exactly "once" for the duration of one sampling interval (5s) and
# after that we'll see 0.0 QPS rates again. If this becomes actually
# flaky, we need to read continuously in a separate thread.
for _ in range(10):
tablet_62044.execute('select 1 from dual')
# This may take up to 5 seconds to become true because we sample the query
# counts for the rates only every 5 seconds (see query_service_stats.go).
timeout = 10
while True:
health = utils.run_vtctl_json(['VtTabletStreamHealth', '-count', '1',
tablet_62044.tablet_alias])
if health['realtime_stats'].get('qps', 0.0) > 0.0:
break
timeout = utils.wait_step('QPS >0.0 seen', timeout)
# kill the tablets
tablet.kill_tablets([tablet_62344, tablet_62044])