зеркало из https://github.com/github/vitess-gh.git
Merge pull request #1201 from alainjobart/resharding
Switching tabletmanager to the new tabletserver API.
This commit is contained in:
Коммит
8fd2543994
|
@ -835,14 +835,18 @@ type StreamHealthResponse struct {
|
|||
// target is the current server type. Only queries with that exact Target
|
||||
// record will be accepted.
|
||||
Target *Target `protobuf:"bytes,1,opt,name=target" json:"target,omitempty"`
|
||||
// serving is true iff the tablet is serving. A tablet may not be serving
|
||||
// if filtered replication is enabled on a master for instance,
|
||||
// or if a replica should not be used because the keyspace is being resharded.
|
||||
Serving bool `protobuf:"varint,2,opt,name=serving" json:"serving,omitempty"`
|
||||
// tablet_externally_reparented_timestamp contains the last time
|
||||
// tabletmanager.TabletExternallyReparented was called on this tablet,
|
||||
// or 0 if it was never called. This is meant to differentiate two tablets
|
||||
// that report a target.TabletType of MASTER, only the one with the latest
|
||||
// timestamp should be trusted.
|
||||
TabletExternallyReparentedTimestamp int64 `protobuf:"varint,2,opt,name=tablet_externally_reparented_timestamp" json:"tablet_externally_reparented_timestamp,omitempty"`
|
||||
TabletExternallyReparentedTimestamp int64 `protobuf:"varint,3,opt,name=tablet_externally_reparented_timestamp" json:"tablet_externally_reparented_timestamp,omitempty"`
|
||||
// realtime_stats contains information about the tablet status
|
||||
RealtimeStats *RealtimeStats `protobuf:"bytes,3,opt,name=realtime_stats" json:"realtime_stats,omitempty"`
|
||||
RealtimeStats *RealtimeStats `protobuf:"bytes,4,opt,name=realtime_stats" json:"realtime_stats,omitempty"`
|
||||
}
|
||||
|
||||
func (m *StreamHealthResponse) Reset() { *m = StreamHealthResponse{} }
|
||||
|
|
|
@ -9,7 +9,6 @@ package tabletmanager
|
|||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
@ -48,36 +47,12 @@ const keyrangeQueryRules string = "KeyrangeQueryRules"
|
|||
const blacklistQueryRules string = "BlacklistQueryRules"
|
||||
|
||||
func (agent *ActionAgent) allowQueries(tablet *pbt.Tablet, blacklistedTables []string) error {
|
||||
// if the query service is already running, we're not starting it again
|
||||
if agent.QueryServiceControl.IsServing() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// only for real instances
|
||||
if agent.DBConfigs != nil {
|
||||
// Update our DB config to match the info we have in the tablet
|
||||
if agent.DBConfigs.App.DbName == "" {
|
||||
agent.DBConfigs.App.DbName = topoproto.TabletDbName(tablet)
|
||||
}
|
||||
agent.DBConfigs.App.Keyspace = tablet.Keyspace
|
||||
agent.DBConfigs.App.Shard = tablet.Shard
|
||||
if tablet.Type != pbt.TabletType_MASTER {
|
||||
agent.DBConfigs.App.EnableInvalidator = true
|
||||
} else {
|
||||
agent.DBConfigs.App.EnableInvalidator = false
|
||||
}
|
||||
}
|
||||
|
||||
err := agent.loadKeyspaceAndBlacklistRules(tablet, blacklistedTables)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return agent.QueryServiceControl.StartService(&pb.Target{
|
||||
Keyspace: tablet.Keyspace,
|
||||
Shard: tablet.Shard,
|
||||
TabletType: tablet.Type,
|
||||
}, agent.DBConfigs, agent.SchemaOverrides, agent.MysqlDaemon)
|
||||
return agent.QueryServiceControl.SetServingType(tablet.Type, true)
|
||||
}
|
||||
|
||||
// loadKeyspaceAndBlacklistRules does what the name suggests:
|
||||
|
@ -141,9 +116,38 @@ func (agent *ActionAgent) loadKeyspaceAndBlacklistRules(tablet *pbt.Tablet, blac
|
|||
return nil
|
||||
}
|
||||
|
||||
func (agent *ActionAgent) stopQueryService(reason string) {
|
||||
log.Infof("Agent is going to stop query service, reason: %v", reason)
|
||||
agent.QueryServiceControl.StopService()
|
||||
func (agent *ActionAgent) disallowQueries(tablet *pbt.Tablet, reason string) error {
|
||||
log.Infof("Agent is going to disallow queries, reason: %v", reason)
|
||||
|
||||
return agent.QueryServiceControl.SetServingType(tablet.Type, false)
|
||||
}
|
||||
|
||||
func (agent *ActionAgent) broadcastHealth() {
|
||||
// get the replication delays
|
||||
agent.mutex.Lock()
|
||||
replicationDelay := agent._replicationDelay
|
||||
healthError := agent._healthy
|
||||
terTime := agent._tabletExternallyReparentedTime
|
||||
agent.mutex.Unlock()
|
||||
|
||||
// send it to our observers
|
||||
// FIXME(alainjobart,liguo) add CpuUsage
|
||||
stats := &pb.RealtimeStats{
|
||||
SecondsBehindMaster: uint32(replicationDelay.Seconds()),
|
||||
}
|
||||
if agent.BinlogPlayerMap != nil {
|
||||
stats.SecondsBehindMasterFilteredReplication, stats.BinlogPlayersCount = agent.BinlogPlayerMap.StatusSummary()
|
||||
}
|
||||
if healthError != nil {
|
||||
stats.HealthError = healthError.Error()
|
||||
}
|
||||
var ts int64
|
||||
if !terTime.IsZero() {
|
||||
ts = terTime.Unix()
|
||||
}
|
||||
defer func() {
|
||||
agent.QueryServiceControl.BroadcastHealth(ts, stats)
|
||||
}()
|
||||
}
|
||||
|
||||
// changeCallback is run after every action that might
|
||||
|
@ -188,50 +192,12 @@ func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTabl
|
|||
disallowQueryReason = fmt.Sprintf("not a serving tablet type(%v)", newTablet.Type)
|
||||
}
|
||||
|
||||
// Read the keyspace on masters to get ShardingColumnType,
|
||||
// for binlog replication, only if source shards are set.
|
||||
var keyspaceInfo *topo.KeyspaceInfo
|
||||
if newTablet.Type == pbt.TabletType_MASTER && shardInfo != nil && len(shardInfo.SourceShards) > 0 {
|
||||
keyspaceInfo, err = agent.TopoServer.GetKeyspace(ctx, newTablet.Keyspace)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot read keyspace for this tablet %v: %v", newTablet.Alias, err)
|
||||
keyspaceInfo = nil
|
||||
}
|
||||
}
|
||||
|
||||
if allowQuery {
|
||||
// There are a few transitions when we need to restart the query service:
|
||||
switch {
|
||||
// If either InitMaster or InitSlave was called, because those calls
|
||||
// (or a prior call to ResetReplication) may have silently broken the
|
||||
// rowcache invalidator by executing RESET MASTER.
|
||||
// Note that we don't care about fixing it after ResetReplication itself
|
||||
// since that call breaks everything on purpose, and we don't expect
|
||||
// anything to start working until either InitMaster or InitSlave.
|
||||
case agent.initReplication:
|
||||
agent.initReplication = false
|
||||
agent.stopQueryService("initialize replication")
|
||||
|
||||
// Transitioning from replica to master, so clients that were already
|
||||
// connected don't keep on using the master as replica or rdonly.
|
||||
case newTablet.Type == pbt.TabletType_MASTER && oldTablet.Type != pbt.TabletType_MASTER:
|
||||
agent.stopQueryService("tablet promoted to master")
|
||||
|
||||
// Having different parameters for the query service.
|
||||
// It needs to stop and restart with the new parameters.
|
||||
// That includes:
|
||||
// - changing KeyRange
|
||||
// - changing the BlacklistedTables list
|
||||
case (newTablet.KeyRange != oldTablet.KeyRange),
|
||||
!reflect.DeepEqual(blacklistedTables, agent.BlacklistedTables()):
|
||||
agent.stopQueryService("keyrange/blacklistedtables changed")
|
||||
}
|
||||
|
||||
if err := agent.allowQueries(newTablet, blacklistedTables); err != nil {
|
||||
log.Errorf("Cannot start query service: %v", err)
|
||||
}
|
||||
} else {
|
||||
agent.stopQueryService(disallowQueryReason)
|
||||
agent.disallowQueries(newTablet, disallowQueryReason)
|
||||
}
|
||||
|
||||
// save the tabletControl we've been using, so the background
|
||||
|
@ -261,6 +227,16 @@ func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTabl
|
|||
// See if we need to start or stop any binlog player
|
||||
if agent.BinlogPlayerMap != nil {
|
||||
if newTablet.Type == pbt.TabletType_MASTER {
|
||||
// Read the keyspace on masters to get
|
||||
// ShardingColumnType, for binlog replication,
|
||||
// only if source shards are set.
|
||||
var keyspaceInfo *topo.KeyspaceInfo
|
||||
if shardInfo != nil && len(shardInfo.SourceShards) > 0 {
|
||||
keyspaceInfo, err = agent.TopoServer.GetKeyspace(ctx, newTablet.Keyspace)
|
||||
if err != nil {
|
||||
keyspaceInfo = nil
|
||||
}
|
||||
}
|
||||
agent.BinlogPlayerMap.RefreshMap(agent.batchCtx, newTablet, keyspaceInfo, shardInfo)
|
||||
} else {
|
||||
agent.BinlogPlayerMap.StopAllPlayersAndReset()
|
||||
|
|
|
@ -48,6 +48,7 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/topo/topoproto"
|
||||
"github.com/youtube/vitess/go/vt/topotools"
|
||||
|
||||
pbq "github.com/youtube/vitess/go/vt/proto/query"
|
||||
pb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
|
@ -486,7 +487,8 @@ func (agent *ActionAgent) Start(ctx context.Context, mysqlPort, vtPort, gRPCPort
|
|||
}
|
||||
|
||||
// Reread to get the changes we just made
|
||||
if _, err := agent.readTablet(ctx); err != nil {
|
||||
tablet, err := agent.readTablet(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -498,6 +500,25 @@ func (agent *ActionAgent) Start(ctx context.Context, mysqlPort, vtPort, gRPCPort
|
|||
return err
|
||||
}
|
||||
|
||||
// initialize tablet server
|
||||
if agent.DBConfigs != nil {
|
||||
// Only for real instances
|
||||
// Update our DB config to match the info we have in the tablet
|
||||
if agent.DBConfigs.App.DbName == "" {
|
||||
agent.DBConfigs.App.DbName = topoproto.TabletDbName(tablet.Tablet)
|
||||
}
|
||||
agent.DBConfigs.App.Keyspace = tablet.Keyspace
|
||||
agent.DBConfigs.App.Shard = tablet.Shard
|
||||
}
|
||||
if err := agent.QueryServiceControl.InitDBConfig(&pbq.Target{
|
||||
Keyspace: tablet.Keyspace,
|
||||
Shard: tablet.Shard,
|
||||
TabletType: tablet.Type,
|
||||
}, agent.DBConfigs, agent.SchemaOverrides, agent.MysqlDaemon); err != nil {
|
||||
return fmt.Errorf("failed to InitDBConfig: %v", err)
|
||||
}
|
||||
|
||||
// and update our state
|
||||
oldTablet := &pb.Tablet{}
|
||||
if err = agent.updateState(ctx, oldTablet, "Start"); err != nil {
|
||||
log.Warningf("Initial updateState failed, will need a state change before running properly: %v", err)
|
||||
|
|
|
@ -472,7 +472,8 @@ func (agent *ActionAgent) DemoteMaster(ctx context.Context) (myproto.Replication
|
|||
// Now stop the query service, to make sure nobody is writing to the
|
||||
// database. This will in effect close the connection pools to the
|
||||
// database.
|
||||
agent.stopQueryService("DemoteMaster marks server rdonly")
|
||||
tablet := agent.Tablet()
|
||||
agent.disallowQueries(tablet.Tablet, "DemoteMaster marks server rdonly")
|
||||
|
||||
return agent.MysqlDaemon.DemoteMaster()
|
||||
// There is no serving graph update - the master tablet will
|
||||
|
|
|
@ -22,7 +22,6 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/topo/topoproto"
|
||||
"github.com/youtube/vitess/go/vt/topotools"
|
||||
|
||||
pb "github.com/youtube/vitess/go/vt/proto/query"
|
||||
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
|
@ -201,9 +200,11 @@ func (agent *ActionAgent) runHealthCheck(targetTabletType pbt.TabletType) {
|
|||
}
|
||||
} else {
|
||||
if isQueryServiceRunning {
|
||||
// we are not healthy or should not be running the
|
||||
// We are not healthy or should not be running the
|
||||
// query service, shut it down.
|
||||
agent.stopQueryService(
|
||||
// Note this is possibly sending 'spare' as
|
||||
// the tablet type, we will clean it up later.
|
||||
agent.disallowQueries(tablet.Tablet,
|
||||
fmt.Sprintf("health-check failure(%v)", err),
|
||||
)
|
||||
}
|
||||
|
@ -253,27 +254,10 @@ func (agent *ActionAgent) runHealthCheck(targetTabletType pbt.TabletType) {
|
|||
agent._healthy = err
|
||||
agent._healthyTime = time.Now()
|
||||
agent._replicationDelay = replicationDelay
|
||||
terTime := agent._tabletExternallyReparentedTime
|
||||
agent.mutex.Unlock()
|
||||
|
||||
// send it to our observers
|
||||
// (the Target has already been updated when restarting the
|
||||
// query service earlier)
|
||||
// FIXME(alainjobart,liguo) add CpuUsage
|
||||
stats := &pb.RealtimeStats{
|
||||
SecondsBehindMaster: uint32(replicationDelay.Seconds()),
|
||||
}
|
||||
stats.SecondsBehindMasterFilteredReplication, stats.BinlogPlayersCount = agent.BinlogPlayerMap.StatusSummary()
|
||||
if err != nil {
|
||||
stats.HealthError = err.Error()
|
||||
}
|
||||
defer func() {
|
||||
var ts int64
|
||||
if !terTime.IsZero() {
|
||||
ts = terTime.Unix()
|
||||
}
|
||||
agent.QueryServiceControl.BroadcastHealth(ts, stats)
|
||||
}()
|
||||
agent.broadcastHealth()
|
||||
|
||||
// Update our topo.Server state, start with no change
|
||||
newTabletType := tablet.Type
|
||||
|
|
|
@ -157,6 +157,7 @@ func TestHealthCheckControlsQueryService(t *testing.T) {
|
|||
// first health check, should change us to replica, and update the
|
||||
// mysql port to 3306
|
||||
before := time.Now()
|
||||
agent.HealthReporter.(*fakeHealthCheck).reportReplicationDelay = 12 * time.Second
|
||||
agent.runHealthCheck(targetTabletType)
|
||||
ti, err := agent.TopoServer.GetTablet(ctx, tabletAlias)
|
||||
if err != nil {
|
||||
|
@ -174,8 +175,16 @@ func TestHealthCheckControlsQueryService(t *testing.T) {
|
|||
if agent._healthyTime.Sub(before) < 0 {
|
||||
t.Errorf("runHealthCheck did not update agent._healthyTime")
|
||||
}
|
||||
bd := <-agent.QueryServiceControl.(*tabletservermock.Controller).BroadcastData
|
||||
if bd.RealtimeStats.SecondsBehindMaster != 12 {
|
||||
t.Errorf("unexpected replicaton delay: %v", *bd)
|
||||
}
|
||||
if agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType != pb.TabletType_REPLICA {
|
||||
t.Errorf("invalid tabletserver target: %v", agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType)
|
||||
}
|
||||
|
||||
// now make the tablet unhealthy
|
||||
agent.HealthReporter.(*fakeHealthCheck).reportReplicationDelay = 13 * time.Second
|
||||
agent.HealthReporter.(*fakeHealthCheck).reportError = fmt.Errorf("tablet is unhealthy")
|
||||
before = time.Now()
|
||||
agent.runHealthCheck(targetTabletType)
|
||||
|
@ -192,6 +201,13 @@ func TestHealthCheckControlsQueryService(t *testing.T) {
|
|||
if agent._healthyTime.Sub(before) < 0 {
|
||||
t.Errorf("runHealthCheck did not update agent._healthyTime")
|
||||
}
|
||||
bd = <-agent.QueryServiceControl.(*tabletservermock.Controller).BroadcastData
|
||||
if bd.RealtimeStats.SecondsBehindMaster != 13 {
|
||||
t.Errorf("unexpected replicaton delay: %v", *bd)
|
||||
}
|
||||
if agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType != pb.TabletType_SPARE {
|
||||
t.Errorf("invalid tabletserver target: %v", agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType)
|
||||
}
|
||||
}
|
||||
|
||||
// TestQueryServiceNotStarting verifies that if a tablet cannot start the
|
||||
|
@ -200,7 +216,7 @@ func TestQueryServiceNotStarting(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
agent := createTestAgent(ctx, t)
|
||||
targetTabletType := pb.TabletType_REPLICA
|
||||
agent.QueryServiceControl.(*tabletservermock.Controller).StartServiceError = fmt.Errorf("test cannot start query service")
|
||||
agent.QueryServiceControl.(*tabletservermock.Controller).SetServingTypeError = fmt.Errorf("test cannot start query service")
|
||||
|
||||
before := time.Now()
|
||||
agent.runHealthCheck(targetTabletType)
|
||||
|
@ -217,6 +233,13 @@ func TestQueryServiceNotStarting(t *testing.T) {
|
|||
if agent._healthyTime.Sub(before) < 0 {
|
||||
t.Errorf("runHealthCheck did not update agent._healthyTime")
|
||||
}
|
||||
bd := <-agent.QueryServiceControl.(*tabletservermock.Controller).BroadcastData
|
||||
if bd.RealtimeStats.HealthError != "test cannot start query service" {
|
||||
t.Errorf("unexpected HealthError: %v", *bd)
|
||||
}
|
||||
if agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType != pb.TabletType_SPARE {
|
||||
t.Errorf("invalid tabletserver target: %v", agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType)
|
||||
}
|
||||
}
|
||||
|
||||
// TestQueryServiceStopped verifies that if a healthy tablet's query
|
||||
|
@ -228,6 +251,7 @@ func TestQueryServiceStopped(t *testing.T) {
|
|||
|
||||
// first health check, should change us to replica
|
||||
before := time.Now()
|
||||
agent.HealthReporter.(*fakeHealthCheck).reportReplicationDelay = 14 * time.Second
|
||||
agent.runHealthCheck(targetTabletType)
|
||||
ti, err := agent.TopoServer.GetTablet(ctx, tabletAlias)
|
||||
if err != nil {
|
||||
|
@ -242,13 +266,23 @@ func TestQueryServiceStopped(t *testing.T) {
|
|||
if agent._healthyTime.Sub(before) < 0 {
|
||||
t.Errorf("runHealthCheck did not update agent._healthyTime")
|
||||
}
|
||||
bd := <-agent.QueryServiceControl.(*tabletservermock.Controller).BroadcastData
|
||||
if bd.RealtimeStats.SecondsBehindMaster != 14 {
|
||||
t.Errorf("unexpected replicaton delay: %v", *bd)
|
||||
}
|
||||
if agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType != pb.TabletType_REPLICA {
|
||||
t.Errorf("invalid tabletserver target: %v", agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType)
|
||||
}
|
||||
|
||||
// shut down query service and prevent it from starting again
|
||||
agent.QueryServiceControl.StopService()
|
||||
agent.QueryServiceControl.(*tabletservermock.Controller).StartServiceError = fmt.Errorf("test cannot start query service")
|
||||
// (this is to simulate mysql going away, tablet server detecting it
|
||||
// and shutting itself down)
|
||||
agent.QueryServiceControl.SetServingType(pb.TabletType_REPLICA, false)
|
||||
agent.QueryServiceControl.(*tabletservermock.Controller).SetServingTypeError = fmt.Errorf("test cannot start query service")
|
||||
|
||||
// health check should now fail
|
||||
before = time.Now()
|
||||
agent.HealthReporter.(*fakeHealthCheck).reportReplicationDelay = 15 * time.Second
|
||||
agent.runHealthCheck(targetTabletType)
|
||||
ti, err = agent.TopoServer.GetTablet(ctx, tabletAlias)
|
||||
if err != nil {
|
||||
|
@ -263,6 +297,16 @@ func TestQueryServiceStopped(t *testing.T) {
|
|||
if agent._healthyTime.Sub(before) < 0 {
|
||||
t.Errorf("runHealthCheck did not update agent._healthyTime")
|
||||
}
|
||||
bd = <-agent.QueryServiceControl.(*tabletservermock.Controller).BroadcastData
|
||||
if bd.RealtimeStats.SecondsBehindMaster != 15 {
|
||||
t.Errorf("unexpected replicaton delay: %v", *bd)
|
||||
}
|
||||
if bd.RealtimeStats.HealthError != "test cannot start query service" {
|
||||
t.Errorf("unexpected HealthError: %v", *bd)
|
||||
}
|
||||
if agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType != pb.TabletType_REPLICA {
|
||||
t.Errorf("invalid tabletserver target: %v", agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTabletControl verifies the shard's TabletControl record can disable
|
||||
|
@ -274,6 +318,7 @@ func TestTabletControl(t *testing.T) {
|
|||
|
||||
// first health check, should change us to replica
|
||||
before := time.Now()
|
||||
agent.HealthReporter.(*fakeHealthCheck).reportReplicationDelay = 16 * time.Second
|
||||
agent.runHealthCheck(targetTabletType)
|
||||
ti, err := agent.TopoServer.GetTablet(ctx, tabletAlias)
|
||||
if err != nil {
|
||||
|
@ -288,6 +333,13 @@ func TestTabletControl(t *testing.T) {
|
|||
if agent._healthyTime.Sub(before) < 0 {
|
||||
t.Errorf("runHealthCheck did not update agent._healthyTime")
|
||||
}
|
||||
bd := <-agent.QueryServiceControl.(*tabletservermock.Controller).BroadcastData
|
||||
if bd.RealtimeStats.SecondsBehindMaster != 16 {
|
||||
t.Errorf("unexpected replicaton delay: %v", *bd)
|
||||
}
|
||||
if agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType != pb.TabletType_REPLICA {
|
||||
t.Errorf("invalid tabletserver target: %v", agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType)
|
||||
}
|
||||
|
||||
// now update the shard
|
||||
si, err := agent.TopoServer.GetShard(ctx, keyspace, shard)
|
||||
|
@ -317,6 +369,7 @@ func TestTabletControl(t *testing.T) {
|
|||
|
||||
// check running a health check will not start it again
|
||||
before = time.Now()
|
||||
agent.HealthReporter.(*fakeHealthCheck).reportReplicationDelay = 17 * time.Second
|
||||
agent.runHealthCheck(targetTabletType)
|
||||
ti, err = agent.TopoServer.GetTablet(ctx, tabletAlias)
|
||||
if err != nil {
|
||||
|
@ -331,9 +384,17 @@ func TestTabletControl(t *testing.T) {
|
|||
if agent._healthyTime.Sub(before) < 0 {
|
||||
t.Errorf("runHealthCheck did not update agent._healthyTime")
|
||||
}
|
||||
bd = <-agent.QueryServiceControl.(*tabletservermock.Controller).BroadcastData
|
||||
if bd.RealtimeStats.SecondsBehindMaster != 17 {
|
||||
t.Errorf("unexpected replicaton delay: %v", *bd)
|
||||
}
|
||||
if agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType != pb.TabletType_REPLICA {
|
||||
t.Errorf("invalid tabletserver target: %v", agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType)
|
||||
}
|
||||
|
||||
// go unhealthy, check we go to spare and QS is not running
|
||||
agent.HealthReporter.(*fakeHealthCheck).reportError = fmt.Errorf("tablet is unhealthy")
|
||||
agent.HealthReporter.(*fakeHealthCheck).reportReplicationDelay = 18 * time.Second
|
||||
before = time.Now()
|
||||
agent.runHealthCheck(targetTabletType)
|
||||
ti, err = agent.TopoServer.GetTablet(ctx, tabletAlias)
|
||||
|
@ -349,9 +410,17 @@ func TestTabletControl(t *testing.T) {
|
|||
if agent._healthyTime.Sub(before) < 0 {
|
||||
t.Errorf("runHealthCheck did not update agent._healthyTime")
|
||||
}
|
||||
bd = <-agent.QueryServiceControl.(*tabletservermock.Controller).BroadcastData
|
||||
if bd.RealtimeStats.SecondsBehindMaster != 18 {
|
||||
t.Errorf("unexpected replicaton delay: %v", *bd)
|
||||
}
|
||||
if agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType != pb.TabletType_SPARE {
|
||||
t.Errorf("invalid tabletserver target: %v", agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType)
|
||||
}
|
||||
|
||||
// go back healthy, check QS is still not running
|
||||
agent.HealthReporter.(*fakeHealthCheck).reportError = nil
|
||||
agent.HealthReporter.(*fakeHealthCheck).reportReplicationDelay = 19 * time.Second
|
||||
before = time.Now()
|
||||
agent.runHealthCheck(targetTabletType)
|
||||
ti, err = agent.TopoServer.GetTablet(ctx, tabletAlias)
|
||||
|
@ -367,6 +436,13 @@ func TestTabletControl(t *testing.T) {
|
|||
if agent._healthyTime.Sub(before) < 0 {
|
||||
t.Errorf("runHealthCheck did not update agent._healthyTime")
|
||||
}
|
||||
bd = <-agent.QueryServiceControl.(*tabletservermock.Controller).BroadcastData
|
||||
if bd.RealtimeStats.SecondsBehindMaster != 19 {
|
||||
t.Errorf("unexpected replicaton delay: %v", *bd)
|
||||
}
|
||||
if agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType != pb.TabletType_REPLICA {
|
||||
t.Errorf("invalid tabletserver target: %v", agent.QueryServiceControl.(*tabletservermock.Controller).CurrentTarget.TabletType)
|
||||
}
|
||||
}
|
||||
|
||||
// TestOldHealthCheck verifies that a healthcheck that is too old will
|
||||
|
|
|
@ -99,8 +99,13 @@ func (agent *ActionAgent) TabletExternallyReparented(ctx context.Context, extern
|
|||
|
||||
agent.mutex.Lock()
|
||||
agent._tabletExternallyReparentedTime = time.Now()
|
||||
agent._replicationDelay = 0
|
||||
agent.mutex.Unlock()
|
||||
|
||||
// update the listeners in the background
|
||||
event.DispatchUpdate(ev, "broadcasting to listeners")
|
||||
agent.broadcastHealth()
|
||||
|
||||
// Directly write the new master endpoint in the serving graph.
|
||||
// We will do a true rebuild in the background soon, but in the meantime,
|
||||
// this will be enough for clients to re-resolve the new master.
|
||||
|
|
|
@ -854,7 +854,8 @@ func (tsv *TabletServer) HandlePanic(err *error) {
|
|||
// BroadcastHealth will broadcast the current health to all listeners
|
||||
func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *pb.RealtimeStats) {
|
||||
shr := &pb.StreamHealthResponse{
|
||||
Target: tsv.target,
|
||||
Target: tsv.target,
|
||||
Serving: tsv.IsServing(),
|
||||
TabletExternallyReparentedTimestamp: terTimestamp,
|
||||
RealtimeStats: stats,
|
||||
}
|
||||
|
|
|
@ -14,8 +14,21 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/tabletserver/queryservice"
|
||||
)
|
||||
|
||||
// BroadcastData is used by the mock Controller to send data
|
||||
// so the tests can check what was sent.
|
||||
type BroadcastData struct {
|
||||
// TERTimestamp stores the last broadcast timestamp
|
||||
TERTimestamp int64
|
||||
|
||||
// RealtimeStats stores the last broadcast stats
|
||||
RealtimeStats pb.RealtimeStats
|
||||
}
|
||||
|
||||
// Controller is a mock tabletserver.Controller
|
||||
type Controller struct {
|
||||
// CurrentTarget stores the last known target
|
||||
CurrentTarget pb.Target
|
||||
|
||||
// QueryServiceEnabled is a state variable
|
||||
QueryServiceEnabled bool
|
||||
|
||||
|
@ -25,14 +38,14 @@ type Controller struct {
|
|||
// SetServingTypeError is the return value for SetServingType
|
||||
SetServingTypeError error
|
||||
|
||||
// StartServiceError is the return value for StartService
|
||||
StartServiceError error
|
||||
|
||||
// IsHealthy is the return value for IsHealthy
|
||||
IsHealthyError error
|
||||
|
||||
// ReloadSchemaCount counts how many times ReloadSchema was called
|
||||
ReloadSchemaCount int
|
||||
|
||||
// BroadcastData is a channel where we send BroadcastHealth data
|
||||
BroadcastData chan *BroadcastData
|
||||
}
|
||||
|
||||
// NewController returns a mock of tabletserver.Controller
|
||||
|
@ -40,9 +53,9 @@ func NewController() *Controller {
|
|||
return &Controller{
|
||||
QueryServiceEnabled: false,
|
||||
InitDBConfigError: nil,
|
||||
StartServiceError: nil,
|
||||
IsHealthyError: nil,
|
||||
ReloadSchemaCount: 0,
|
||||
BroadcastData: make(chan *BroadcastData, 10),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -55,21 +68,29 @@ func (tqsc *Controller) AddStatusPart() {
|
|||
}
|
||||
|
||||
// InitDBConfig is part of the tabletserver.Controller interface
|
||||
func (tqsc *Controller) InitDBConfig(*pb.Target, *dbconfigs.DBConfigs, []tabletserver.SchemaOverride, mysqlctl.MysqlDaemon) error {
|
||||
tqsc.QueryServiceEnabled = tqsc.InitDBConfigError == nil
|
||||
func (tqsc *Controller) InitDBConfig(target *pb.Target, dbConfigs *dbconfigs.DBConfigs, schemaOverrides []tabletserver.SchemaOverride, mysqld mysqlctl.MysqlDaemon) error {
|
||||
if tqsc.InitDBConfigError == nil {
|
||||
tqsc.CurrentTarget = *target
|
||||
tqsc.QueryServiceEnabled = true
|
||||
} else {
|
||||
tqsc.QueryServiceEnabled = false
|
||||
}
|
||||
return tqsc.InitDBConfigError
|
||||
}
|
||||
|
||||
// SetServingType is part of the tabletserver.Controller interface
|
||||
func (tqsc *Controller) SetServingType(topodata.TabletType, bool) error {
|
||||
tqsc.QueryServiceEnabled = tqsc.SetServingTypeError == nil
|
||||
func (tqsc *Controller) SetServingType(tabletType topodata.TabletType, serving bool) error {
|
||||
if tqsc.SetServingTypeError == nil {
|
||||
tqsc.CurrentTarget.TabletType = tabletType
|
||||
tqsc.QueryServiceEnabled = serving
|
||||
}
|
||||
return tqsc.SetServingTypeError
|
||||
}
|
||||
|
||||
// StartService is part of the tabletserver.Controller interface
|
||||
func (tqsc *Controller) StartService(*pb.Target, *dbconfigs.DBConfigs, []tabletserver.SchemaOverride, mysqlctl.MysqlDaemon) error {
|
||||
tqsc.QueryServiceEnabled = tqsc.StartServiceError == nil
|
||||
return tqsc.StartServiceError
|
||||
tqsc.QueryServiceEnabled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// StopService is part of the tabletserver.Controller interface
|
||||
|
@ -116,4 +137,8 @@ func (tqsc *Controller) QueryService() queryservice.QueryService {
|
|||
|
||||
// BroadcastHealth is part of the tabletserver.Controller interface
|
||||
func (tqsc *Controller) BroadcastHealth(terTimestamp int64, stats *pb.RealtimeStats) {
|
||||
tqsc.BroadcastData <- &BroadcastData{
|
||||
TERTimestamp: terTimestamp,
|
||||
RealtimeStats: *stats,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -315,13 +315,18 @@ message StreamHealthResponse {
|
|||
// record will be accepted.
|
||||
Target target = 1;
|
||||
|
||||
// serving is true iff the tablet is serving. A tablet may not be serving
|
||||
// if filtered replication is enabled on a master for instance,
|
||||
// or if a replica should not be used because the keyspace is being resharded.
|
||||
bool serving = 2;
|
||||
|
||||
// tablet_externally_reparented_timestamp contains the last time
|
||||
// tabletmanager.TabletExternallyReparented was called on this tablet,
|
||||
// or 0 if it was never called. This is meant to differentiate two tablets
|
||||
// that report a target.TabletType of MASTER, only the one with the latest
|
||||
// timestamp should be trusted.
|
||||
int64 tablet_externally_reparented_timestamp = 2;
|
||||
int64 tablet_externally_reparented_timestamp = 3;
|
||||
|
||||
// realtime_stats contains information about the tablet status
|
||||
RealtimeStats realtime_stats = 3;
|
||||
RealtimeStats realtime_stats = 4;
|
||||
}
|
||||
|
|
Различия файлов скрыты, потому что одна или несколько строк слишком длинны
|
@ -420,20 +420,19 @@ primary key (name)
|
|||
# Enforce health check because it's not running by default as
|
||||
# tablets are not started with it.
|
||||
utils.run_vtctl(['RunHealthCheck', tablet.tablet_alias, 'replica'])
|
||||
stream_health, _ = utils.run_vtctl(['VtTabletStreamHealth',
|
||||
'-count', '1',
|
||||
tablet.tablet_alias],
|
||||
trap_output=True, auto_log=True)
|
||||
logging.debug('Got health: %s', stream_health)
|
||||
data = json.loads(stream_health)
|
||||
self.assertIn('realtime_stats', data)
|
||||
self.assertNotIn('health_error', data['realtime_stats'])
|
||||
stream_health = utils.run_vtctl_json(['VtTabletStreamHealth',
|
||||
'-count', '1',
|
||||
tablet.tablet_alias])
|
||||
logging.debug('Got health: %s', str(stream_health))
|
||||
self.assertNotIn('serving', stream_health)
|
||||
self.assertIn('realtime_stats', stream_health)
|
||||
self.assertNotIn('health_error', stream_health['realtime_stats'])
|
||||
# count is > 0 and therefore not omitted by the Go JSON marshaller.
|
||||
self.assertIn('binlog_players_count', data['realtime_stats'])
|
||||
self.assertIn('binlog_players_count', stream_health['realtime_stats'])
|
||||
self.assertEqual(blp_stats['BinlogPlayerMapSize'],
|
||||
data['realtime_stats']['binlog_players_count'])
|
||||
stream_health['realtime_stats']['binlog_players_count'])
|
||||
self.assertEqual(blp_stats['BinlogPlayerSecondsBehindMaster'],
|
||||
data['realtime_stats'].get(
|
||||
stream_health['realtime_stats'].get(
|
||||
'seconds_behind_master_filtered_replication', 0))
|
||||
|
||||
def _test_keyrange_constraints(self):
|
||||
|
@ -654,8 +653,14 @@ primary key (name)
|
|||
# master 3 should not interfere (we run it to be sure).
|
||||
utils.run_vtctl(['RunHealthCheck', shard_3_master.tablet_alias, 'replica'],
|
||||
auto_log=True)
|
||||
utils.check_tablet_query_service(self, shard_2_master, False, False)
|
||||
utils.check_tablet_query_service(self, shard_3_master, False, False)
|
||||
for master in [shard_2_master, shard_3_master]:
|
||||
utils.check_tablet_query_service(self, master, False, False)
|
||||
stream_health = utils.run_vtctl_json(['VtTabletStreamHealth',
|
||||
'-count', '1',
|
||||
master.tablet_alias])
|
||||
logging.debug('Got health: %s', str(stream_health))
|
||||
self.assertIn('realtime_stats', stream_health)
|
||||
self.assertNotIn('serving', stream_health)
|
||||
|
||||
# check the destination master 3 is healthy, even though its query
|
||||
# service is not running (if not healthy this would exception out)
|
||||
|
|
|
@ -273,40 +273,6 @@ class RowCacheInvalidator(unittest.TestCase):
|
|||
hits2 = self.replica_stats()['vt_insert_test']['Hits']
|
||||
self.assertEqual(hits2 - hits, 1, 'This should have hit the cache')
|
||||
|
||||
def test_service_disabled(self):
|
||||
# perform some inserts, then change state to stop the invalidator
|
||||
self.perform_insert(500)
|
||||
inv_before = self.replica_stats()['Totals']['Invalidations']
|
||||
invStats_before = self.replica_vars()
|
||||
utils.run_vtctl(['ChangeSlaveType', replica_tablet.tablet_alias, 'spare'])
|
||||
|
||||
# wait until it's stopped
|
||||
timeout = 30
|
||||
while True:
|
||||
invStats_after = self.replica_vars()
|
||||
if invStats_after['RowcacheInvalidatorState'] == 'Stopped':
|
||||
break
|
||||
timeout = utils.wait_step(
|
||||
'RowcacheInvalidatorState, got %s expecting Stopped' %
|
||||
invStats_after['RowcacheInvalidatorState'], timeout, sleep_time=0.1)
|
||||
|
||||
# check all data is right
|
||||
inv_after = self.replica_stats()['Totals']['Invalidations']
|
||||
invStats_after = self.replica_vars()
|
||||
logging.debug(
|
||||
'Tablet Replica->Spare\n\tBefore: Invalidations: %d InvalidatorStats '
|
||||
'%s\n\tAfter: Invalidations: %d InvalidatorStats %s',
|
||||
inv_before, invStats_before['RowcacheInvalidatorPosition'],
|
||||
inv_after, invStats_after['RowcacheInvalidatorPosition'])
|
||||
self.assertEqual(inv_after, 0,
|
||||
'Row-cache invalid. should be disabled, no invalidations')
|
||||
self.assertEqual(invStats_after['RowcacheInvalidatorState'], 'Stopped',
|
||||
'Row-cache invalidator should be disabled')
|
||||
|
||||
# and restore the type
|
||||
utils.run_vtctl(
|
||||
['ChangeSlaveType', replica_tablet.tablet_alias, 'replica'])
|
||||
|
||||
def _exec_vt_txn(self, query):
|
||||
master_tablet.execute(query, auto_log=False)
|
||||
|
||||
|
|
|
@ -449,6 +449,7 @@ class TestTabletManager(unittest.TestCase):
|
|||
self.assertIn(
|
||||
'replication_reporter: Replication is not running',
|
||||
health['realtime_stats']['health_error'])
|
||||
self.assertNotIn('serving', health)
|
||||
|
||||
# then restart replication, and write data, make sure we go back to healthy
|
||||
utils.run_vtctl(['StartSlave', tablet_62044.tablet_alias])
|
||||
|
@ -472,6 +473,8 @@ class TestTabletManager(unittest.TestCase):
|
|||
logging.debug('Got health: %s', line)
|
||||
data = json.loads(line)
|
||||
self.assertIn('realtime_stats', data)
|
||||
self.assertIn('serving', data)
|
||||
self.assertTrue(data['serving'])
|
||||
self.assertNotIn('health_error', data['realtime_stats'])
|
||||
self.assertNotIn('tablet_externally_reparented_timestamp', data)
|
||||
self.assertEqual('test_keyspace', data['target']['keyspace'])
|
||||
|
|
Загрузка…
Ссылка в новой задаче