Merge pull request #400 from youtube/replication

Replication
This commit is contained in:
Alain Jobart 2015-02-12 11:59:04 -08:00
Родитель 7da6665aec efa38cd984
Коммит ee9004ed46
15 изменённых файлов: 233 добавлений и 126 удалений

Просмотреть файл

@ -2,47 +2,18 @@ package main
import ( import (
"flag" "flag"
"fmt"
"html/template"
"time"
"github.com/youtube/vitess/go/vt/health" "github.com/youtube/vitess/go/vt/health"
"github.com/youtube/vitess/go/vt/mysqlctl" "github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/tabletserver" "github.com/youtube/vitess/go/vt/tabletserver"
"github.com/youtube/vitess/go/vt/topo"
) )
var ( var (
enableReplicationLagCheck = flag.Bool("enable_replication_lag_check", false, "will register the mysql health check module that directly calls mysql") enableReplicationLagCheck = flag.Bool("enable_replication_lag_check", false, "will register the mysql health check module that directly calls mysql")
) )
// queryServiceRunning implements health.Reporter func registerHealthReporter(qsc tabletserver.QueryServiceControl) {
type queryServiceRunning struct {
qsc tabletserver.QueryServiceControl
}
// Report is part of the health.Reporter interface
func (qsr *queryServiceRunning) Report(tabletType topo.TabletType, shouldQueryServiceBeRunning bool) (time.Duration, error) {
isQueryServiceRunning := qsr.qsc.IsServing()
if shouldQueryServiceBeRunning != isQueryServiceRunning {
return 0, fmt.Errorf("QueryService running=%v, expected=%v", isQueryServiceRunning, shouldQueryServiceBeRunning)
}
if isQueryServiceRunning {
if err := qsr.qsc.IsHealthy(); err != nil {
return 0, fmt.Errorf("QueryService is running, but not healthy: %v", err)
}
}
return 0, nil
}
// HTMLName is part of the health.Reporter interface
func (qsr *queryServiceRunning) HTMLName() template.HTML {
return template.HTML("QueryServiceRunning")
}
func registerHealthReporters(qsc tabletserver.QueryServiceControl) {
if *enableReplicationLagCheck { if *enableReplicationLagCheck {
health.Register("replication_reporter", mysqlctl.MySQLReplicationLag(agent.Mysqld)) health.DefaultAggregator.Register("replication_reporter", mysqlctl.MySQLReplicationLag(agent.Mysqld))
} }
health.Register("query_service_reporter", &queryServiceRunning{qsc})
} }

Просмотреть файл

@ -163,7 +163,7 @@ func (hs *healthStatus) CurrentHTML() template.HTML {
} }
func healthHTMLName() template.HTML { func healthHTMLName() template.HTML {
return health.HTMLName() return health.DefaultAggregator.HTMLName()
} }
// For use by plugins which wish to avoid racing when registering status page parts. // For use by plugins which wish to avoid racing when registering status page parts.

Просмотреть файл

@ -98,7 +98,7 @@ func main() {
tabletmanager.HttpHandleSnapshots(mycnf, tabletAlias.Uid) tabletmanager.HttpHandleSnapshots(mycnf, tabletAlias.Uid)
servenv.OnRun(func() { servenv.OnRun(func() {
addStatusParts(qsc) addStatusParts(qsc)
registerHealthReporters(qsc) registerHealthReporter(qsc)
}) })
servenv.OnTerm(func() { servenv.OnTerm(func() {
qsc.DisallowQueries() qsc.DisallowQueries()

Просмотреть файл

@ -13,11 +13,13 @@ import (
) )
var ( var (
defaultAggregator *Aggregator // DefaultAggregator is the global aggregator to use for real
// programs. Use a custom one for tests.
DefaultAggregator *Aggregator
) )
func init() { func init() {
defaultAggregator = NewAggregator() DefaultAggregator = NewAggregator()
} }
// Reporter reports the health status of a tablet. // Reporter reports the health status of a tablet.
@ -49,6 +51,7 @@ func (fc FunctionReporter) HTMLName() template.HTML {
} }
// Aggregator aggregates the results of many Reporters. // Aggregator aggregates the results of many Reporters.
// It also implements the Reporter interface.
type Aggregator struct { type Aggregator struct {
// mu protects all fields below its declaration. // mu protects all fields below its declaration.
mu sync.Mutex mu sync.Mutex
@ -62,13 +65,13 @@ func NewAggregator() *Aggregator {
} }
} }
// Run aggregates health statuses from all the reporters. If any // Report aggregates health statuses from all the reporters. If any
// errors occur during the reporting, they will be logged, but only // errors occur during the reporting, they will be logged, but only
// the first error will be returned. // the first error will be returned.
// The returned replication delay will be the highest of all the replication // The returned replication delay will be the highest of all the replication
// delays returned by the Reporter implementations (although typically // delays returned by the Reporter implementations (although typically
// only one implementation will actually return a meaningful one). // only one implementation will actually return a meaningful one).
func (ag *Aggregator) Run(tabletType topo.TabletType, shouldQueryServiceBeRunning bool) (time.Duration, error) { func (ag *Aggregator) Report(tabletType topo.TabletType, shouldQueryServiceBeRunning bool) (time.Duration, error) {
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
rec concurrency.AllErrorRecorder rec concurrency.AllErrorRecorder
@ -128,21 +131,3 @@ func (ag *Aggregator) HTMLName() template.HTML {
sort.Strings(result) sort.Strings(result)
return template.HTML(strings.Join(result, "  +  ")) return template.HTML(strings.Join(result, "  +  "))
} }
// Run collects all the health statuses from the default health
// aggregator.
func Run(tabletType topo.TabletType, shouldQueryServiceBeRunning bool) (time.Duration, error) {
return defaultAggregator.Run(tabletType, shouldQueryServiceBeRunning)
}
// Register registers rep under name with the default health
// aggregator. Only keys specified in keys will be aggregated from
// this particular Reporter.
func Register(name string, rep Reporter) {
defaultAggregator.Register(name, rep)
}
// HTMLName returns an aggregate name for the default reporter
func HTMLName() template.HTML {
return defaultAggregator.HTMLName()
}

Просмотреть файл

@ -20,7 +20,7 @@ func TestReporters(t *testing.T) {
return 5 * time.Second, nil return 5 * time.Second, nil
})) }))
delay, err := ag.Run(topo.TYPE_REPLICA, true) delay, err := ag.Report(topo.TYPE_REPLICA, true)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -32,7 +32,7 @@ func TestReporters(t *testing.T) {
ag.Register("c", FunctionReporter(func(topo.TabletType, bool) (time.Duration, error) { ag.Register("c", FunctionReporter(func(topo.TabletType, bool) (time.Duration, error) {
return 0, errors.New("e error") return 0, errors.New("e error")
})) }))
if _, err := ag.Run(topo.TYPE_REPLICA, false); err == nil { if _, err := ag.Report(topo.TYPE_REPLICA, false); err == nil {
t.Errorf("ag.Run: expected error") t.Errorf("ag.Run: expected error")
} }

Просмотреть файл

@ -131,10 +131,6 @@ func (agent *ActionAgent) loadKeyspaceAndBlacklistRules(tablet *topo.Tablet, bla
} }
func (agent *ActionAgent) disallowQueries() { func (agent *ActionAgent) disallowQueries() {
if agent.DBConfigs == nil {
// test instance, do nothing
return
}
agent.QueryServiceControl.DisallowQueries() agent.QueryServiceControl.DisallowQueries()
} }

Просмотреть файл

@ -38,6 +38,7 @@ import (
"github.com/youtube/vitess/go/stats" "github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/trace" "github.com/youtube/vitess/go/trace"
"github.com/youtube/vitess/go/vt/dbconfigs" "github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/health"
"github.com/youtube/vitess/go/vt/mysqlctl" "github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode" "github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/tabletserver" "github.com/youtube/vitess/go/vt/tabletserver"
@ -54,6 +55,7 @@ var (
type ActionAgent struct { type ActionAgent struct {
// The following fields are set during creation // The following fields are set during creation
QueryServiceControl tabletserver.QueryServiceControl QueryServiceControl tabletserver.QueryServiceControl
HealthReporter health.Reporter
TopoServer topo.Server TopoServer topo.Server
TabletAlias topo.TabletAlias TabletAlias topo.TabletAlias
Mysqld *mysqlctl.Mysqld Mysqld *mysqlctl.Mysqld
@ -134,6 +136,7 @@ func NewActionAgent(
agent = &ActionAgent{ agent = &ActionAgent{
QueryServiceControl: queryServiceControl, QueryServiceControl: queryServiceControl,
HealthReporter: health.DefaultAggregator,
batchCtx: batchCtx, batchCtx: batchCtx,
TopoServer: topoServer, TopoServer: topoServer,
TabletAlias: tabletAlias, TabletAlias: tabletAlias,
@ -183,9 +186,10 @@ func NewActionAgent(
// NewTestActionAgent creates an agent for test purposes. Only a // NewTestActionAgent creates an agent for test purposes. Only a
// subset of features are supported now, but we'll add more over time. // subset of features are supported now, but we'll add more over time.
func NewTestActionAgent(batchCtx context.Context, ts topo.Server, tabletAlias topo.TabletAlias, port int, mysqlDaemon mysqlctl.MysqlDaemon) (agent *ActionAgent) { func NewTestActionAgent(batchCtx context.Context, ts topo.Server, tabletAlias topo.TabletAlias, port int, mysqlDaemon mysqlctl.MysqlDaemon) *ActionAgent {
agent = &ActionAgent{ agent := &ActionAgent{
QueryServiceControl: tabletserver.NewTestQueryServiceControl(), QueryServiceControl: tabletserver.NewTestQueryServiceControl(),
HealthReporter: health.DefaultAggregator,
batchCtx: batchCtx, batchCtx: batchCtx,
TopoServer: ts, TopoServer: ts,
TabletAlias: tabletAlias, TabletAlias: tabletAlias,

Просмотреть файл

@ -177,7 +177,7 @@ func (agent *ActionAgent) SetReadOnly(ctx context.Context, rdonly bool) error {
// ChangeType changes the tablet type // ChangeType changes the tablet type
// Should be called under RPCWrapLockAction. // Should be called under RPCWrapLockAction.
func (agent *ActionAgent) ChangeType(ctx context.Context, tabletType topo.TabletType) error { func (agent *ActionAgent) ChangeType(ctx context.Context, tabletType topo.TabletType) error {
return topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, tabletType, nil, true /*runHooks*/) return topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, tabletType, nil)
} }
// Scrap scraps the live running tablet // Scrap scraps the live running tablet
@ -580,7 +580,7 @@ func (agent *ActionAgent) Snapshot(ctx context.Context, args *actionnode.Snapsho
tablet.Tablet.Type = topo.TYPE_BACKUP tablet.Tablet.Type = topo.TYPE_BACKUP
err = topo.UpdateTablet(ctx, agent.TopoServer, tablet) err = topo.UpdateTablet(ctx, agent.TopoServer, tablet)
} else { } else {
err = topotools.ChangeType(ctx, agent.TopoServer, tablet.Alias, topo.TYPE_BACKUP, make(map[string]string), true /*runHooks*/) err = topotools.ChangeType(ctx, agent.TopoServer, tablet.Alias, topo.TYPE_BACKUP, make(map[string]string))
} }
if err != nil { if err != nil {
return nil, err return nil, err
@ -614,7 +614,7 @@ func (agent *ActionAgent) Snapshot(ctx context.Context, args *actionnode.Snapsho
tablet.Tablet.Type = topo.TYPE_MASTER tablet.Tablet.Type = topo.TYPE_MASTER
err = topo.UpdateTablet(ctx, agent.TopoServer, tablet) err = topo.UpdateTablet(ctx, agent.TopoServer, tablet)
} else { } else {
err = topotools.ChangeType(ctx, agent.TopoServer, tablet.Alias, newType, nil, true /*runHooks*/) err = topotools.ChangeType(ctx, agent.TopoServer, tablet.Alias, newType, nil)
} }
if err != nil { if err != nil {
// failure in changing the topology type is probably worse, // failure in changing the topology type is probably worse,
@ -670,7 +670,7 @@ func (agent *ActionAgent) SnapshotSourceEnd(ctx context.Context, args *actionnod
tablet.Tablet.Type = topo.TYPE_MASTER tablet.Tablet.Type = topo.TYPE_MASTER
err = topo.UpdateTablet(ctx, agent.TopoServer, tablet) err = topo.UpdateTablet(ctx, agent.TopoServer, tablet)
} else { } else {
err = topotools.ChangeType(ctx, agent.TopoServer, tablet.Alias, args.OriginalType, make(map[string]string), true /*runHooks*/) err = topotools.ChangeType(ctx, agent.TopoServer, tablet.Alias, args.OriginalType, make(map[string]string))
} }
return err return err
@ -822,5 +822,5 @@ func (agent *ActionAgent) Restore(ctx context.Context, args *actionnode.RestoreA
agent.ReloadSchema(ctx) agent.ReloadSchema(ctx)
// change to TYPE_SPARE, we're done! // change to TYPE_SPARE, we're done!
return topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topo.TYPE_SPARE, nil, true) return topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topo.TYPE_SPARE, nil)
} }

Просмотреть файл

@ -17,7 +17,6 @@ import (
log "github.com/golang/glog" log "github.com/golang/glog"
"github.com/youtube/vitess/go/timer" "github.com/youtube/vitess/go/timer"
"github.com/youtube/vitess/go/vt/health"
"github.com/youtube/vitess/go/vt/logutil" "github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/servenv" "github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode" "github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
@ -150,7 +149,7 @@ func (agent *ActionAgent) runHealthCheck(targetTabletType topo.TabletType) {
if tablet.Type == topo.TYPE_MASTER { if tablet.Type == topo.TYPE_MASTER {
typeForHealthCheck = topo.TYPE_MASTER typeForHealthCheck = topo.TYPE_MASTER
} }
replicationDelay, err := health.Run(typeForHealthCheck, shouldQueryServiceBeRunning) replicationDelay, err := agent.HealthReporter.Report(typeForHealthCheck, shouldQueryServiceBeRunning)
health := make(map[string]string) health := make(map[string]string)
if err == nil { if err == nil {
if replicationDelay > *unhealthyThreshold { if replicationDelay > *unhealthyThreshold {
@ -160,16 +159,23 @@ func (agent *ActionAgent) runHealthCheck(targetTabletType topo.TabletType) {
} }
} }
// Figure out if we should be running QueryService. If we should, // Figure out if we should be running QueryService, see if we are,
// and we aren't, try to start it (even if we're not healthy, // and reconcile.
// the reason we might not be healthy is the query service not running!) if err != nil {
// we are not healthy, we should not be running QueryService
shouldQueryServiceBeRunning = false
}
isQueryServiceRunning := agent.QueryServiceControl.IsServing()
if shouldQueryServiceBeRunning { if shouldQueryServiceBeRunning {
if err == nil { if !isQueryServiceRunning {
// we remember this new possible error // we remember this new possible error
err = agent.allowQueries(tablet.Tablet, blacklistedTables) err = agent.allowQueries(tablet.Tablet, blacklistedTables)
} else { }
// we ignore the error } else {
agent.allowQueries(tablet.Tablet, blacklistedTables) if isQueryServiceRunning {
// we are not healthy or should not be running the
// query service, shut it down.
agent.disallowQueries()
} }
} }
@ -271,7 +277,7 @@ func (agent *ActionAgent) runHealthCheck(targetTabletType topo.TabletType) {
// Change the Type, update the health. Note we pass in a map // Change the Type, update the health. Note we pass in a map
// that's not nil, meaning if it's empty, we will clear it. // that's not nil, meaning if it's empty, we will clear it.
if err := topotools.ChangeType(agent.batchCtx, agent.TopoServer, tablet.Alias, newTabletType, health, false /*runHooks*/); err != nil { if err := topotools.ChangeType(agent.batchCtx, agent.TopoServer, tablet.Alias, newTabletType, health); err != nil {
log.Infof("Error updating tablet record: %v", err) log.Infof("Error updating tablet record: %v", err)
return return
} }
@ -308,7 +314,7 @@ func (agent *ActionAgent) terminateHealthChecks(targetTabletType topo.TabletType
// Change the Type to spare, update the health. Note we pass in a map // Change the Type to spare, update the health. Note we pass in a map
// that's not nil, meaning we will clear it. // that's not nil, meaning we will clear it.
if err := topotools.ChangeType(agent.batchCtx, agent.TopoServer, tablet.Alias, topo.TYPE_SPARE, make(map[string]string), true /*runHooks*/); err != nil { if err := topotools.ChangeType(agent.batchCtx, agent.TopoServer, tablet.Alias, topo.TYPE_SPARE, make(map[string]string)); err != nil {
log.Infof("Error updating tablet record: %v", err) log.Infof("Error updating tablet record: %v", err)
return return
} }

Просмотреть файл

@ -1,15 +1,15 @@
package tabletmanager package tabletmanager
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"html/template" "html/template"
"testing" "testing"
"time" "time"
"github.com/youtube/vitess/go/vt/health"
"github.com/youtube/vitess/go/vt/mysqlctl" "github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/tabletserver"
"github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/zktopo" "github.com/youtube/vitess/go/vt/zktopo"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -86,6 +86,16 @@ func TestHealthRecordClass(t *testing.T) {
} }
} }
// constants used for tests
const (
keyspace = "test_keyspace"
shard = "0"
cell = "cell1"
uid uint32 = 42
)
var tabletAlias = topo.TabletAlias{Cell: cell, Uid: uid}
// fakeHealthCheck implements health.Reporter interface // fakeHealthCheck implements health.Reporter interface
type fakeHealthCheck struct { type fakeHealthCheck struct {
reportReplicationDelay time.Duration reportReplicationDelay time.Duration
@ -100,15 +110,7 @@ func (fhc *fakeHealthCheck) HTMLName() template.HTML {
return template.HTML("fakeHealthCheck") return template.HTML("fakeHealthCheck")
} }
func TestHealthCheck(t *testing.T) { func createTestAgent(t *testing.T) *ActionAgent {
// register a fake reporter for our tests
fhc := &fakeHealthCheck{}
health.Register("fakeHealthCheck", fhc)
keyspace := "test_keyspace"
shard := "0"
cell := "cell1"
var uid uint32 = 42
ts := zktopo.NewTestServer(t, []string{cell}) ts := zktopo.NewTestServer(t, []string{cell})
if err := ts.CreateKeyspace(keyspace, &topo.Keyspace{}); err != nil { if err := ts.CreateKeyspace(keyspace, &topo.Keyspace{}); err != nil {
@ -119,7 +121,6 @@ func TestHealthCheck(t *testing.T) {
t.Fatalf("CreateShard failed: %v", err) t.Fatalf("CreateShard failed: %v", err)
} }
tabletAlias := topo.TabletAlias{Cell: cell, Uid: uid}
port := 1234 port := 1234
tablet := &topo.Tablet{ tablet := &topo.Tablet{
Alias: tabletAlias, Alias: tabletAlias,
@ -140,16 +141,25 @@ func TestHealthCheck(t *testing.T) {
mysqlDaemon := &mysqlctl.FakeMysqlDaemon{MysqlPort: 3306} mysqlDaemon := &mysqlctl.FakeMysqlDaemon{MysqlPort: 3306}
agent := NewTestActionAgent(context.Background(), ts, tabletAlias, port, mysqlDaemon) agent := NewTestActionAgent(context.Background(), ts, tabletAlias, port, mysqlDaemon)
agent.BinlogPlayerMap = NewBinlogPlayerMap(ts, nil, nil) agent.BinlogPlayerMap = NewBinlogPlayerMap(ts, nil, nil)
agent.HealthReporter = &fakeHealthCheck{}
return agent
}
// TestHealthCheckControlsQueryService verifies that a tablet going healthy
// starts the query service, and going unhealthy stops it.
func TestHealthCheckControlsQueryService(t *testing.T) {
agent := createTestAgent(t)
targetTabletType := topo.TYPE_REPLICA targetTabletType := topo.TYPE_REPLICA
// first health check, should change us to replica, and update the // first health check, should change us to replica, and update the
// mysql port to 3306 // mysql port to 3306
agent.runHealthCheck(targetTabletType) agent.runHealthCheck(targetTabletType)
ti, err := ts.GetTablet(tabletAlias) ti, err := agent.TopoServer.GetTablet(tabletAlias)
if err != nil { if err != nil {
t.Fatalf("GetTablet failed: %v", err) t.Fatalf("GetTablet failed: %v", err)
} }
if ti.Type != topo.TYPE_REPLICA { if ti.Type != targetTabletType {
t.Errorf("First health check failed to go to replica: %v", ti.Type) t.Errorf("First health check failed to go to replica: %v", ti.Type)
} }
if ti.Portmap["mysql"] != 3306 { if ti.Portmap["mysql"] != 3306 {
@ -160,21 +170,160 @@ func TestHealthCheck(t *testing.T) {
} }
// now make the tablet unhealthy // now make the tablet unhealthy
fhc.reportError = fmt.Errorf("tablet is unhealthy") agent.HealthReporter.(*fakeHealthCheck).reportError = fmt.Errorf("tablet is unhealthy")
agent.runHealthCheck(targetTabletType) agent.runHealthCheck(targetTabletType)
ti, err = ts.GetTablet(tabletAlias) ti, err = agent.TopoServer.GetTablet(tabletAlias)
if err != nil { if err != nil {
t.Fatalf("GetTablet failed: %v", err) t.Fatalf("GetTablet failed: %v", err)
} }
data, err := json.MarshalIndent(ti, "", " ")
println(string(data))
if ti.Type != topo.TYPE_SPARE { if ti.Type != topo.TYPE_SPARE {
t.Errorf("Unhappy health check failed to go to spare: %v", ti.Type) t.Errorf("Unhappy health check failed to go to spare: %v", ti.Type)
} }
if agent.QueryServiceControl.IsServing() { if agent.QueryServiceControl.IsServing() {
// FIXME(alainjobart) the query service should be stopped there, but it's not. t.Errorf("Query service should not be running")
// See b/19309685 for the tracking bug. }
// t.Errorf("Query service should not be running") }
// TestQueryServiceNotStarting verifies that if a tablet cannot start the
// query service, it should not go healthy
func TestQueryServiceNotStarting(t *testing.T) {
agent := createTestAgent(t)
targetTabletType := topo.TYPE_REPLICA
agent.QueryServiceControl.(*tabletserver.TestQueryServiceControl).AllowQueriesError = fmt.Errorf("test cannot start query service")
agent.runHealthCheck(targetTabletType)
ti, err := agent.TopoServer.GetTablet(tabletAlias)
if err != nil {
t.Fatalf("GetTablet failed: %v", err)
}
if ti.Type != topo.TYPE_SPARE {
t.Errorf("Happy health check which cannot start query service should stay spare: %v", ti.Type)
}
if agent.QueryServiceControl.IsServing() {
t.Errorf("Query service should not be running")
}
}
// TestQueryServiceStopped verifies that if a healthy tablet's query
// service is shut down, the tablet does unhealthy
func TestQueryServiceStopped(t *testing.T) {
agent := createTestAgent(t)
targetTabletType := topo.TYPE_REPLICA
// first health check, should change us to replica
agent.runHealthCheck(targetTabletType)
ti, err := agent.TopoServer.GetTablet(tabletAlias)
if err != nil {
t.Fatalf("GetTablet failed: %v", err)
}
if ti.Type != targetTabletType {
t.Errorf("First health check failed to go to replica: %v", ti.Type)
}
if !agent.QueryServiceControl.IsServing() {
t.Errorf("Query service should be running")
} }
// shut down query service and prevent it from starting again
agent.QueryServiceControl.DisallowQueries()
agent.QueryServiceControl.(*tabletserver.TestQueryServiceControl).AllowQueriesError = fmt.Errorf("test cannot start query service")
// health check should now fail
agent.runHealthCheck(targetTabletType)
ti, err = agent.TopoServer.GetTablet(tabletAlias)
if err != nil {
t.Fatalf("GetTablet failed: %v", err)
}
if ti.Type != topo.TYPE_SPARE {
t.Errorf("Happy health check which cannot start query service should stay spare: %v", ti.Type)
}
if agent.QueryServiceControl.IsServing() {
t.Errorf("Query service should not be running")
}
}
// TestTabletControl verifies the shard's TabletControl record can disable
// query service in a tablet.
func TestTabletControl(t *testing.T) {
agent := createTestAgent(t)
targetTabletType := topo.TYPE_REPLICA
// first health check, should change us to replica
agent.runHealthCheck(targetTabletType)
ti, err := agent.TopoServer.GetTablet(tabletAlias)
if err != nil {
t.Fatalf("GetTablet failed: %v", err)
}
if ti.Type != targetTabletType {
t.Errorf("First health check failed to go to replica: %v", ti.Type)
}
if !agent.QueryServiceControl.IsServing() {
t.Errorf("Query service should be running")
}
// now update the shard
si, err := agent.TopoServer.GetShard(keyspace, shard)
if err != nil {
t.Fatalf("GetShard failed: %v", err)
}
si.TabletControlMap = map[topo.TabletType]*topo.TabletControl{
targetTabletType: &topo.TabletControl{
DisableQueryService: true,
},
}
if err := topo.UpdateShard(context.Background(), agent.TopoServer, si); err != nil {
t.Fatalf("UpdateShard failed: %v", err)
}
// now refresh the tablet state, as the resharding process would do
ctx := context.Background()
agent.RPCWrapLockAction(ctx, actionnode.TABLET_ACTION_REFRESH_STATE, "", "", true, func() error {
agent.RefreshState(ctx)
return nil
})
// check we shutdown query service
if agent.QueryServiceControl.IsServing() {
t.Errorf("Query service should not be running")
}
// check running a health check will not start it again
agent.runHealthCheck(targetTabletType)
ti, err = agent.TopoServer.GetTablet(tabletAlias)
if err != nil {
t.Fatalf("GetTablet failed: %v", err)
}
if ti.Type != targetTabletType {
t.Errorf("Health check failed to go to replica: %v", ti.Type)
}
if agent.QueryServiceControl.IsServing() {
t.Errorf("Query service should not be running")
}
// go unhealthy, check we go to spare and QS is not running
agent.HealthReporter.(*fakeHealthCheck).reportError = fmt.Errorf("tablet is unhealthy")
agent.runHealthCheck(targetTabletType)
ti, err = agent.TopoServer.GetTablet(tabletAlias)
if err != nil {
t.Fatalf("GetTablet failed: %v", err)
}
if ti.Type != topo.TYPE_SPARE {
t.Errorf("Unhealthy health check should go to spare: %v", ti.Type)
}
if agent.QueryServiceControl.IsServing() {
t.Errorf("Query service should not be running")
}
// go back healthy, check QS is still not running
agent.HealthReporter.(*fakeHealthCheck).reportError = nil
agent.runHealthCheck(targetTabletType)
ti, err = agent.TopoServer.GetTablet(tabletAlias)
if err != nil {
t.Fatalf("GetTablet failed: %v", err)
}
if ti.Type != targetTabletType {
t.Errorf("Healthy health check should go to replica: %v", ti.Type)
}
if agent.QueryServiceControl.IsServing() {
t.Errorf("Query service should not be running")
}
} }

Просмотреть файл

@ -99,7 +99,7 @@ func Scrap(ctx context.Context, ts topo.Server, tabletAlias topo.TabletAlias, fo
// - if health is nil, we don't touch the Tablet's Health record. // - if health is nil, we don't touch the Tablet's Health record.
// - if health is an empty map, we clear the Tablet's Health record. // - if health is an empty map, we clear the Tablet's Health record.
// - if health has values, we overwrite the Tablet's Health record. // - if health has values, we overwrite the Tablet's Health record.
func ChangeType(ctx context.Context, ts topo.Server, tabletAlias topo.TabletAlias, newType topo.TabletType, health map[string]string, runHooks bool) error { func ChangeType(ctx context.Context, ts topo.Server, tabletAlias topo.TabletAlias, newType topo.TabletType, health map[string]string) error {
tablet, err := ts.GetTablet(tabletAlias) tablet, err := ts.GetTablet(tabletAlias)
if err != nil { if err != nil {
return err return err
@ -109,16 +109,6 @@ func ChangeType(ctx context.Context, ts topo.Server, tabletAlias topo.TabletAlia
return fmt.Errorf("cannot change tablet type %v -> %v %v", tablet.Type, newType, tabletAlias) return fmt.Errorf("cannot change tablet type %v -> %v %v", tablet.Type, newType, tabletAlias)
} }
if runHooks {
// Only run the preflight_serving_type hook when
// transitioning from non-serving to serving.
if !topo.IsInServingGraph(tablet.Type) && topo.IsInServingGraph(newType) {
if err := hook.NewSimpleHook("preflight_serving_type").ExecuteOptional(); err != nil {
return err
}
}
}
tablet.Type = newType tablet.Type = newType
if newType == topo.TYPE_IDLE { if newType == topo.TYPE_IDLE {
tablet.Keyspace = "" tablet.Keyspace = ""

Просмотреть файл

@ -7,7 +7,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/youtube/vitess/go/vt/hook"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto" myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode" "github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/topo"
@ -313,12 +312,12 @@ func (wr *Wrangler) restartSlave(ctx context.Context, ti *topo.TabletInfo, rsd *
} }
func (wr *Wrangler) checkMasterElect(ctx context.Context, ti *topo.TabletInfo) error { func (wr *Wrangler) checkMasterElect(ctx context.Context, ti *topo.TabletInfo) error {
// Check the master-elect is fit for duty - call out for hardware checks. // Check the master-elect is fit for duty - try to ping it.
// if the server was already serving live traffic, it's probably good // if the server was already serving live traffic, it's probably good
if ti.IsInServingGraph() { if ti.IsInServingGraph() {
return nil return nil
} }
return wr.ExecuteOptionalTabletInfoHook(ctx, ti, hook.NewSimpleHook("preflight_serving_type")) return wr.tmc.Ping(ctx, ti)
} }
func (wr *Wrangler) finishReparent(ctx context.Context, si *topo.ShardInfo, masterElect *topo.TabletInfo, majorityRestart, leaveMasterReadOnly bool) error { func (wr *Wrangler) finishReparent(ctx context.Context, si *topo.ShardInfo, masterElect *topo.TabletInfo, majorityRestart, leaveMasterReadOnly bool) error {

Просмотреть файл

@ -207,7 +207,7 @@ func (wr *Wrangler) ChangeTypeNoRebuild(ctx context.Context, tabletAlias topo.Ta
} }
if force { if force {
if err := topotools.ChangeType(ctx, wr.ts, tabletAlias, tabletType, nil, false); err != nil { if err := topotools.ChangeType(ctx, wr.ts, tabletAlias, tabletType, nil); err != nil {
return false, "", "", "", err return false, "", "", "", err
} }
} else { } else {

Просмотреть файл

@ -105,7 +105,7 @@ class GoogleMysql(MysqlFlavor):
"SET binlog_group_id = %s, master_server_id = %s" % "SET binlog_group_id = %s, master_server_id = %s" %
(group_id, server_id), (group_id, server_id),
"CHANGE MASTER TO " "CHANGE MASTER TO "
"MASTER_HOST='%s', MASTER_PORT=%u, CONNECT_USING_GROUP_ID" % "MASTER_HOST='%s', MASTER_PORT=%u, MASTER_USER='vt_repl', CONNECT_USING_GROUP_ID" %
(host, port)] (host, port)]
@ -148,7 +148,7 @@ class MariaDB(MysqlFlavor):
return [ return [
"SET GLOBAL gtid_slave_pos = '%s'" % pos["MariaDB"], "SET GLOBAL gtid_slave_pos = '%s'" % pos["MariaDB"],
"CHANGE MASTER TO " "CHANGE MASTER TO "
"MASTER_HOST='%s', MASTER_PORT=%u, MASTER_USE_GTID = slave_pos" % "MASTER_HOST='%s', MASTER_PORT=%u, MASTER_USER='vt_repl', MASTER_USE_GTID = slave_pos" %
(host, port)] (host, port)]
def enable_binlog_checksum(self, tablet): def enable_binlog_checksum(self, tablet):

Просмотреть файл

@ -511,18 +511,25 @@ class TestTabletManager(unittest.TestCase):
] ]
utils.wait_procs(start_procs) utils.wait_procs(start_procs)
# wait for the tablets to become healthy and fix their mysql port # the master should still be healthy
for t in tablet_62344, tablet_62044: utils.run_vtctl(['RunHealthCheck', tablet_62344.tablet_alias, 'replica'],
t.wait_for_vttablet_state('SERVING')
# we need to do one more health check here, so it sees the query service
# is now running, and turns green.
utils.run_vtctl(['RunHealthCheck', t.tablet_alias, 'replica'],
auto_log=True) auto_log=True)
# master will be healthy, slave's replication won't be running
self.check_healthz(tablet_62344, True) self.check_healthz(tablet_62344, True)
# the slave won't be healthy at first, as replication is not running
utils.run_vtctl(['RunHealthCheck', tablet_62044.tablet_alias, 'replica'],
auto_log=True)
self.check_healthz(tablet_62044, False) self.check_healthz(tablet_62044, False)
tablet_62044.wait_for_vttablet_state('NOT_SERVING')
# restart replication
tablet_62044.mquery('', ['START SLAVE'])
# wait for the tablet to become healthy and fix its mysql port
utils.run_vtctl(['RunHealthCheck', tablet_62044.tablet_alias, 'replica'],
auto_log=True)
tablet_62044.wait_for_vttablet_state('SERVING')
self.check_healthz(tablet_62044, True)
for t in tablet_62344, tablet_62044: for t in tablet_62344, tablet_62044:
# wait for mysql port to show up # wait for mysql port to show up