Switching tabletmanager to the new tabletserver API.

This commit is contained in:
Alain Jobart 2015-10-12 08:55:36 -07:00
Родитель a1d029faa1
Коммит 99dbfd5987
7 изменённых файлов: 86 добавлений и 101 удалений

Просмотреть файл

@ -9,7 +9,6 @@ package tabletmanager
import (
"encoding/hex"
"fmt"
"reflect"
"strings"
"golang.org/x/net/context"
@ -48,36 +47,12 @@ const keyrangeQueryRules string = "KeyrangeQueryRules"
const blacklistQueryRules string = "BlacklistQueryRules"
func (agent *ActionAgent) allowQueries(tablet *pbt.Tablet, blacklistedTables []string) error {
// if the query service is already running, we're not starting it again
if agent.QueryServiceControl.IsServing() {
return nil
}
// only for real instances
if agent.DBConfigs != nil {
// Update our DB config to match the info we have in the tablet
if agent.DBConfigs.App.DbName == "" {
agent.DBConfigs.App.DbName = topoproto.TabletDbName(tablet)
}
agent.DBConfigs.App.Keyspace = tablet.Keyspace
agent.DBConfigs.App.Shard = tablet.Shard
if tablet.Type != pbt.TabletType_MASTER {
agent.DBConfigs.App.EnableInvalidator = true
} else {
agent.DBConfigs.App.EnableInvalidator = false
}
}
err := agent.loadKeyspaceAndBlacklistRules(tablet, blacklistedTables)
if err != nil {
return err
}
return agent.QueryServiceControl.StartService(&pb.Target{
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: tablet.Type,
}, agent.DBConfigs, agent.SchemaOverrides, agent.MysqlDaemon)
return agent.QueryServiceControl.SetServingType(tablet.Type, true)
}
// loadKeyspaceAndBlacklistRules does what the name suggests:
@ -141,9 +116,38 @@ func (agent *ActionAgent) loadKeyspaceAndBlacklistRules(tablet *pbt.Tablet, blac
return nil
}
func (agent *ActionAgent) stopQueryService(reason string) {
log.Infof("Agent is going to stop query service, reason: %v", reason)
agent.QueryServiceControl.StopService()
func (agent *ActionAgent) disallowQueries(tablet *pbt.Tablet, reason string) error {
log.Infof("Agent is going to disallow queries, reason: %v", reason)
return agent.QueryServiceControl.SetServingType(tablet.Type, false)
}
func (agent *ActionAgent) broadcastHealth() {
// get the replication delays
agent.mutex.Lock()
replicationDelay := agent._replicationDelay
healthError := agent._healthy
terTime := agent._tabletExternallyReparentedTime
agent.mutex.Unlock()
// send it to our observers
// FIXME(alainjobart,liguo) add CpuUsage
stats := &pb.RealtimeStats{
SecondsBehindMaster: uint32(replicationDelay.Seconds()),
}
if agent.BinlogPlayerMap != nil {
stats.SecondsBehindMasterFilteredReplication, stats.BinlogPlayersCount = agent.BinlogPlayerMap.StatusSummary()
}
if healthError != nil {
stats.HealthError = healthError.Error()
}
var ts int64
if !terTime.IsZero() {
ts = terTime.Unix()
}
defer func() {
agent.QueryServiceControl.BroadcastHealth(ts, stats)
}()
}
// changeCallback is run after every action that might
@ -188,50 +192,12 @@ func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTabl
disallowQueryReason = fmt.Sprintf("not a serving tablet type(%v)", newTablet.Type)
}
// Read the keyspace on masters to get ShardingColumnType,
// for binlog replication, only if source shards are set.
var keyspaceInfo *topo.KeyspaceInfo
if newTablet.Type == pbt.TabletType_MASTER && shardInfo != nil && len(shardInfo.SourceShards) > 0 {
keyspaceInfo, err = agent.TopoServer.GetKeyspace(ctx, newTablet.Keyspace)
if err != nil {
log.Errorf("Cannot read keyspace for this tablet %v: %v", newTablet.Alias, err)
keyspaceInfo = nil
}
}
if allowQuery {
// There are a few transitions when we need to restart the query service:
switch {
// If either InitMaster or InitSlave was called, because those calls
// (or a prior call to ResetReplication) may have silently broken the
// rowcache invalidator by executing RESET MASTER.
// Note that we don't care about fixing it after ResetReplication itself
// since that call breaks everything on purpose, and we don't expect
// anything to start working until either InitMaster or InitSlave.
case agent.initReplication:
agent.initReplication = false
agent.stopQueryService("initialize replication")
// Transitioning from replica to master, so clients that were already
// connected don't keep on using the master as replica or rdonly.
case newTablet.Type == pbt.TabletType_MASTER && oldTablet.Type != pbt.TabletType_MASTER:
agent.stopQueryService("tablet promoted to master")
// Having different parameters for the query service.
// It needs to stop and restart with the new parameters.
// That includes:
// - changing KeyRange
// - changing the BlacklistedTables list
case (newTablet.KeyRange != oldTablet.KeyRange),
!reflect.DeepEqual(blacklistedTables, agent.BlacklistedTables()):
agent.stopQueryService("keyrange/blacklistedtables changed")
}
if err := agent.allowQueries(newTablet, blacklistedTables); err != nil {
log.Errorf("Cannot start query service: %v", err)
}
} else {
agent.stopQueryService(disallowQueryReason)
agent.disallowQueries(newTablet, disallowQueryReason)
}
// save the tabletControl we've been using, so the background
@ -261,6 +227,16 @@ func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTabl
// See if we need to start or stop any binlog player
if agent.BinlogPlayerMap != nil {
if newTablet.Type == pbt.TabletType_MASTER {
// Read the keyspace on masters to get
// ShardingColumnType, for binlog replication,
// only if source shards are set.
var keyspaceInfo *topo.KeyspaceInfo
if shardInfo != nil && len(shardInfo.SourceShards) > 0 {
keyspaceInfo, err = agent.TopoServer.GetKeyspace(ctx, newTablet.Keyspace)
if err != nil {
keyspaceInfo = nil
}
}
agent.BinlogPlayerMap.RefreshMap(agent.batchCtx, newTablet, keyspaceInfo, shardInfo)
} else {
agent.BinlogPlayerMap.StopAllPlayersAndReset()

Просмотреть файл

@ -48,6 +48,7 @@ import (
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/vt/topotools"
pbq "github.com/youtube/vitess/go/vt/proto/query"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
)
@ -486,7 +487,8 @@ func (agent *ActionAgent) Start(ctx context.Context, mysqlPort, vtPort, gRPCPort
}
// Reread to get the changes we just made
if _, err := agent.readTablet(ctx); err != nil {
tablet, err := agent.readTablet(ctx)
if err != nil {
return err
}
@ -498,6 +500,25 @@ func (agent *ActionAgent) Start(ctx context.Context, mysqlPort, vtPort, gRPCPort
return err
}
// initialize tablet server
if agent.DBConfigs != nil {
// Only for real instances
// Update our DB config to match the info we have in the tablet
if agent.DBConfigs.App.DbName == "" {
agent.DBConfigs.App.DbName = topoproto.TabletDbName(tablet.Tablet)
}
agent.DBConfigs.App.Keyspace = tablet.Keyspace
agent.DBConfigs.App.Shard = tablet.Shard
}
if err := agent.QueryServiceControl.InitDBConfig(&pbq.Target{
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: tablet.Type,
}, agent.DBConfigs, agent.SchemaOverrides, agent.MysqlDaemon); err != nil {
return fmt.Errorf("failed to InitDBConfig: %v", err)
}
// and update our state
oldTablet := &pb.Tablet{}
if err = agent.updateState(ctx, oldTablet, "Start"); err != nil {
log.Warningf("Initial updateState failed, will need a state change before running properly: %v", err)

Просмотреть файл

@ -472,7 +472,8 @@ func (agent *ActionAgent) DemoteMaster(ctx context.Context) (myproto.Replication
// Now stop the query service, to make sure nobody is writing to the
// database. This will in effect close the connection pools to the
// database.
agent.stopQueryService("DemoteMaster marks server rdonly")
tablet := agent.Tablet()
agent.disallowQueries(tablet.Tablet, "DemoteMaster marks server rdonly")
return agent.MysqlDaemon.DemoteMaster()
// There is no serving graph update - the master tablet will

Просмотреть файл

@ -22,7 +22,6 @@ import (
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/vt/topotools"
pb "github.com/youtube/vitess/go/vt/proto/query"
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
)
@ -201,9 +200,11 @@ func (agent *ActionAgent) runHealthCheck(targetTabletType pbt.TabletType) {
}
} else {
if isQueryServiceRunning {
// we are not healthy or should not be running the
// We are not healthy or should not be running the
// query service, shut it down.
agent.stopQueryService(
// Note this is possibly sending 'spare' as
// the tablet type, we will clean it up later.
agent.disallowQueries(tablet.Tablet,
fmt.Sprintf("health-check failure(%v)", err),
)
}
@ -253,27 +254,10 @@ func (agent *ActionAgent) runHealthCheck(targetTabletType pbt.TabletType) {
agent._healthy = err
agent._healthyTime = time.Now()
agent._replicationDelay = replicationDelay
terTime := agent._tabletExternallyReparentedTime
agent.mutex.Unlock()
// send it to our observers
// (the Target has already been updated when restarting the
// query service earlier)
// FIXME(alainjobart,liguo) add CpuUsage
stats := &pb.RealtimeStats{
SecondsBehindMaster: uint32(replicationDelay.Seconds()),
}
stats.SecondsBehindMasterFilteredReplication, stats.BinlogPlayersCount = agent.BinlogPlayerMap.StatusSummary()
if err != nil {
stats.HealthError = err.Error()
}
defer func() {
var ts int64
if !terTime.IsZero() {
ts = terTime.Unix()
}
agent.QueryServiceControl.BroadcastHealth(ts, stats)
}()
agent.broadcastHealth()
// Update our topo.Server state, start with no change
newTabletType := tablet.Type

Просмотреть файл

@ -200,7 +200,7 @@ func TestQueryServiceNotStarting(t *testing.T) {
ctx := context.Background()
agent := createTestAgent(ctx, t)
targetTabletType := pb.TabletType_REPLICA
agent.QueryServiceControl.(*tabletservermock.Controller).StartServiceError = fmt.Errorf("test cannot start query service")
agent.QueryServiceControl.(*tabletservermock.Controller).SetServingTypeError = fmt.Errorf("test cannot start query service")
before := time.Now()
agent.runHealthCheck(targetTabletType)
@ -245,7 +245,7 @@ func TestQueryServiceStopped(t *testing.T) {
// shut down query service and prevent it from starting again
agent.QueryServiceControl.StopService()
agent.QueryServiceControl.(*tabletservermock.Controller).StartServiceError = fmt.Errorf("test cannot start query service")
agent.QueryServiceControl.(*tabletservermock.Controller).SetServingTypeError = fmt.Errorf("test cannot start query service")
// health check should now fail
before = time.Now()

Просмотреть файл

@ -99,8 +99,13 @@ func (agent *ActionAgent) TabletExternallyReparented(ctx context.Context, extern
agent.mutex.Lock()
agent._tabletExternallyReparentedTime = time.Now()
agent._replicationDelay = 0
agent.mutex.Unlock()
// update the listeners in the background
event.DispatchUpdate(ev, "broadcasting to listeners")
agent.broadcastHealth()
// Directly write the new master endpoint in the serving graph.
// We will do a true rebuild in the background soon, but in the meantime,
// this will be enough for clients to re-resolve the new master.

Просмотреть файл

@ -25,9 +25,6 @@ type Controller struct {
// SetServingTypeError is the return value for SetServingType
SetServingTypeError error
// StartServiceError is the return value for StartService
StartServiceError error
// IsHealthy is the return value for IsHealthy
IsHealthyError error
@ -40,7 +37,6 @@ func NewController() *Controller {
return &Controller{
QueryServiceEnabled: false,
InitDBConfigError: nil,
StartServiceError: nil,
IsHealthyError: nil,
ReloadSchemaCount: 0,
}
@ -61,15 +57,17 @@ func (tqsc *Controller) InitDBConfig(*pb.Target, *dbconfigs.DBConfigs, []tablets
}
// SetServingType is part of the tabletserver.Controller interface
func (tqsc *Controller) SetServingType(topodata.TabletType, bool) error {
tqsc.QueryServiceEnabled = tqsc.SetServingTypeError == nil
func (tqsc *Controller) SetServingType(tabletType topodata.TabletType, serving bool) error {
if tqsc.SetServingTypeError == nil {
tqsc.QueryServiceEnabled = serving
}
return tqsc.SetServingTypeError
}
// StartService is part of the tabletserver.Controller interface
func (tqsc *Controller) StartService(*pb.Target, *dbconfigs.DBConfigs, []tabletserver.SchemaOverride, mysqlctl.MysqlDaemon) error {
tqsc.QueryServiceEnabled = tqsc.StartServiceError == nil
return tqsc.StartServiceError
tqsc.QueryServiceEnabled = true
return nil
}
// StopService is part of the tabletserver.Controller interface