From 99dbfd598776f5a0697bc1587293bec9c7219e42 Mon Sep 17 00:00:00 2001 From: Alain Jobart Date: Mon, 12 Oct 2015 08:55:36 -0700 Subject: [PATCH] Switching tabletmanager to the new tabletserver API. --- go/vt/tabletmanager/after_action.go | 112 +++++++----------- go/vt/tabletmanager/agent.go | 23 +++- go/vt/tabletmanager/agent_rpc_actions.go | 3 +- go/vt/tabletmanager/healthcheck.go | 26 +--- go/vt/tabletmanager/healthcheck_test.go | 4 +- go/vt/tabletmanager/reparent.go | 5 + .../tabletservermock/controller.go | 14 +-- 7 files changed, 86 insertions(+), 101 deletions(-) diff --git a/go/vt/tabletmanager/after_action.go b/go/vt/tabletmanager/after_action.go index c27095c593..2dbeeaa532 100644 --- a/go/vt/tabletmanager/after_action.go +++ b/go/vt/tabletmanager/after_action.go @@ -9,7 +9,6 @@ package tabletmanager import ( "encoding/hex" "fmt" - "reflect" "strings" "golang.org/x/net/context" @@ -48,36 +47,12 @@ const keyrangeQueryRules string = "KeyrangeQueryRules" const blacklistQueryRules string = "BlacklistQueryRules" func (agent *ActionAgent) allowQueries(tablet *pbt.Tablet, blacklistedTables []string) error { - // if the query service is already running, we're not starting it again - if agent.QueryServiceControl.IsServing() { - return nil - } - - // only for real instances - if agent.DBConfigs != nil { - // Update our DB config to match the info we have in the tablet - if agent.DBConfigs.App.DbName == "" { - agent.DBConfigs.App.DbName = topoproto.TabletDbName(tablet) - } - agent.DBConfigs.App.Keyspace = tablet.Keyspace - agent.DBConfigs.App.Shard = tablet.Shard - if tablet.Type != pbt.TabletType_MASTER { - agent.DBConfigs.App.EnableInvalidator = true - } else { - agent.DBConfigs.App.EnableInvalidator = false - } - } - err := agent.loadKeyspaceAndBlacklistRules(tablet, blacklistedTables) if err != nil { return err } - return agent.QueryServiceControl.StartService(&pb.Target{ - Keyspace: tablet.Keyspace, - Shard: tablet.Shard, - TabletType: tablet.Type, - }, agent.DBConfigs, agent.SchemaOverrides, agent.MysqlDaemon) + return agent.QueryServiceControl.SetServingType(tablet.Type, true) } // loadKeyspaceAndBlacklistRules does what the name suggests: @@ -141,9 +116,38 @@ func (agent *ActionAgent) loadKeyspaceAndBlacklistRules(tablet *pbt.Tablet, blac return nil } -func (agent *ActionAgent) stopQueryService(reason string) { - log.Infof("Agent is going to stop query service, reason: %v", reason) - agent.QueryServiceControl.StopService() +func (agent *ActionAgent) disallowQueries(tablet *pbt.Tablet, reason string) error { + log.Infof("Agent is going to disallow queries, reason: %v", reason) + + return agent.QueryServiceControl.SetServingType(tablet.Type, false) +} + +func (agent *ActionAgent) broadcastHealth() { + // get the replication delays + agent.mutex.Lock() + replicationDelay := agent._replicationDelay + healthError := agent._healthy + terTime := agent._tabletExternallyReparentedTime + agent.mutex.Unlock() + + // send it to our observers + // FIXME(alainjobart,liguo) add CpuUsage + stats := &pb.RealtimeStats{ + SecondsBehindMaster: uint32(replicationDelay.Seconds()), + } + if agent.BinlogPlayerMap != nil { + stats.SecondsBehindMasterFilteredReplication, stats.BinlogPlayersCount = agent.BinlogPlayerMap.StatusSummary() + } + if healthError != nil { + stats.HealthError = healthError.Error() + } + var ts int64 + if !terTime.IsZero() { + ts = terTime.Unix() + } + defer func() { + agent.QueryServiceControl.BroadcastHealth(ts, stats) + }() } // changeCallback is run after every action that might @@ -188,50 +192,12 @@ func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTabl disallowQueryReason = fmt.Sprintf("not a serving tablet type(%v)", newTablet.Type) } - // Read the keyspace on masters to get ShardingColumnType, - // for binlog replication, only if source shards are set. - var keyspaceInfo *topo.KeyspaceInfo - if newTablet.Type == pbt.TabletType_MASTER && shardInfo != nil && len(shardInfo.SourceShards) > 0 { - keyspaceInfo, err = agent.TopoServer.GetKeyspace(ctx, newTablet.Keyspace) - if err != nil { - log.Errorf("Cannot read keyspace for this tablet %v: %v", newTablet.Alias, err) - keyspaceInfo = nil - } - } - if allowQuery { - // There are a few transitions when we need to restart the query service: - switch { - // If either InitMaster or InitSlave was called, because those calls - // (or a prior call to ResetReplication) may have silently broken the - // rowcache invalidator by executing RESET MASTER. - // Note that we don't care about fixing it after ResetReplication itself - // since that call breaks everything on purpose, and we don't expect - // anything to start working until either InitMaster or InitSlave. - case agent.initReplication: - agent.initReplication = false - agent.stopQueryService("initialize replication") - - // Transitioning from replica to master, so clients that were already - // connected don't keep on using the master as replica or rdonly. - case newTablet.Type == pbt.TabletType_MASTER && oldTablet.Type != pbt.TabletType_MASTER: - agent.stopQueryService("tablet promoted to master") - - // Having different parameters for the query service. - // It needs to stop and restart with the new parameters. - // That includes: - // - changing KeyRange - // - changing the BlacklistedTables list - case (newTablet.KeyRange != oldTablet.KeyRange), - !reflect.DeepEqual(blacklistedTables, agent.BlacklistedTables()): - agent.stopQueryService("keyrange/blacklistedtables changed") - } - if err := agent.allowQueries(newTablet, blacklistedTables); err != nil { log.Errorf("Cannot start query service: %v", err) } } else { - agent.stopQueryService(disallowQueryReason) + agent.disallowQueries(newTablet, disallowQueryReason) } // save the tabletControl we've been using, so the background @@ -261,6 +227,16 @@ func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTabl // See if we need to start or stop any binlog player if agent.BinlogPlayerMap != nil { if newTablet.Type == pbt.TabletType_MASTER { + // Read the keyspace on masters to get + // ShardingColumnType, for binlog replication, + // only if source shards are set. + var keyspaceInfo *topo.KeyspaceInfo + if shardInfo != nil && len(shardInfo.SourceShards) > 0 { + keyspaceInfo, err = agent.TopoServer.GetKeyspace(ctx, newTablet.Keyspace) + if err != nil { + keyspaceInfo = nil + } + } agent.BinlogPlayerMap.RefreshMap(agent.batchCtx, newTablet, keyspaceInfo, shardInfo) } else { agent.BinlogPlayerMap.StopAllPlayersAndReset() diff --git a/go/vt/tabletmanager/agent.go b/go/vt/tabletmanager/agent.go index cd4bb15d7c..71b0771b3a 100644 --- a/go/vt/tabletmanager/agent.go +++ b/go/vt/tabletmanager/agent.go @@ -48,6 +48,7 @@ import ( "github.com/youtube/vitess/go/vt/topo/topoproto" "github.com/youtube/vitess/go/vt/topotools" + pbq "github.com/youtube/vitess/go/vt/proto/query" pb "github.com/youtube/vitess/go/vt/proto/topodata" ) @@ -486,7 +487,8 @@ func (agent *ActionAgent) Start(ctx context.Context, mysqlPort, vtPort, gRPCPort } // Reread to get the changes we just made - if _, err := agent.readTablet(ctx); err != nil { + tablet, err := agent.readTablet(ctx) + if err != nil { return err } @@ -498,6 +500,25 @@ func (agent *ActionAgent) Start(ctx context.Context, mysqlPort, vtPort, gRPCPort return err } + // initialize tablet server + if agent.DBConfigs != nil { + // Only for real instances + // Update our DB config to match the info we have in the tablet + if agent.DBConfigs.App.DbName == "" { + agent.DBConfigs.App.DbName = topoproto.TabletDbName(tablet.Tablet) + } + agent.DBConfigs.App.Keyspace = tablet.Keyspace + agent.DBConfigs.App.Shard = tablet.Shard + } + if err := agent.QueryServiceControl.InitDBConfig(&pbq.Target{ + Keyspace: tablet.Keyspace, + Shard: tablet.Shard, + TabletType: tablet.Type, + }, agent.DBConfigs, agent.SchemaOverrides, agent.MysqlDaemon); err != nil { + return fmt.Errorf("failed to InitDBConfig: %v", err) + } + + // and update our state oldTablet := &pb.Tablet{} if err = agent.updateState(ctx, oldTablet, "Start"); err != nil { log.Warningf("Initial updateState failed, will need a state change before running properly: %v", err) diff --git a/go/vt/tabletmanager/agent_rpc_actions.go b/go/vt/tabletmanager/agent_rpc_actions.go index 036a18b2de..125514c208 100644 --- a/go/vt/tabletmanager/agent_rpc_actions.go +++ b/go/vt/tabletmanager/agent_rpc_actions.go @@ -472,7 +472,8 @@ func (agent *ActionAgent) DemoteMaster(ctx context.Context) (myproto.Replication // Now stop the query service, to make sure nobody is writing to the // database. This will in effect close the connection pools to the // database. - agent.stopQueryService("DemoteMaster marks server rdonly") + tablet := agent.Tablet() + agent.disallowQueries(tablet.Tablet, "DemoteMaster marks server rdonly") return agent.MysqlDaemon.DemoteMaster() // There is no serving graph update - the master tablet will diff --git a/go/vt/tabletmanager/healthcheck.go b/go/vt/tabletmanager/healthcheck.go index aa76794561..1aaa159917 100644 --- a/go/vt/tabletmanager/healthcheck.go +++ b/go/vt/tabletmanager/healthcheck.go @@ -22,7 +22,6 @@ import ( "github.com/youtube/vitess/go/vt/topo/topoproto" "github.com/youtube/vitess/go/vt/topotools" - pb "github.com/youtube/vitess/go/vt/proto/query" pbt "github.com/youtube/vitess/go/vt/proto/topodata" ) @@ -201,9 +200,11 @@ func (agent *ActionAgent) runHealthCheck(targetTabletType pbt.TabletType) { } } else { if isQueryServiceRunning { - // we are not healthy or should not be running the + // We are not healthy or should not be running the // query service, shut it down. - agent.stopQueryService( + // Note this is possibly sending 'spare' as + // the tablet type, we will clean it up later. + agent.disallowQueries(tablet.Tablet, fmt.Sprintf("health-check failure(%v)", err), ) } @@ -253,27 +254,10 @@ func (agent *ActionAgent) runHealthCheck(targetTabletType pbt.TabletType) { agent._healthy = err agent._healthyTime = time.Now() agent._replicationDelay = replicationDelay - terTime := agent._tabletExternallyReparentedTime agent.mutex.Unlock() // send it to our observers - // (the Target has already been updated when restarting the - // query service earlier) - // FIXME(alainjobart,liguo) add CpuUsage - stats := &pb.RealtimeStats{ - SecondsBehindMaster: uint32(replicationDelay.Seconds()), - } - stats.SecondsBehindMasterFilteredReplication, stats.BinlogPlayersCount = agent.BinlogPlayerMap.StatusSummary() - if err != nil { - stats.HealthError = err.Error() - } - defer func() { - var ts int64 - if !terTime.IsZero() { - ts = terTime.Unix() - } - agent.QueryServiceControl.BroadcastHealth(ts, stats) - }() + agent.broadcastHealth() // Update our topo.Server state, start with no change newTabletType := tablet.Type diff --git a/go/vt/tabletmanager/healthcheck_test.go b/go/vt/tabletmanager/healthcheck_test.go index 969a4a91bc..79b3f77d95 100644 --- a/go/vt/tabletmanager/healthcheck_test.go +++ b/go/vt/tabletmanager/healthcheck_test.go @@ -200,7 +200,7 @@ func TestQueryServiceNotStarting(t *testing.T) { ctx := context.Background() agent := createTestAgent(ctx, t) targetTabletType := pb.TabletType_REPLICA - agent.QueryServiceControl.(*tabletservermock.Controller).StartServiceError = fmt.Errorf("test cannot start query service") + agent.QueryServiceControl.(*tabletservermock.Controller).SetServingTypeError = fmt.Errorf("test cannot start query service") before := time.Now() agent.runHealthCheck(targetTabletType) @@ -245,7 +245,7 @@ func TestQueryServiceStopped(t *testing.T) { // shut down query service and prevent it from starting again agent.QueryServiceControl.StopService() - agent.QueryServiceControl.(*tabletservermock.Controller).StartServiceError = fmt.Errorf("test cannot start query service") + agent.QueryServiceControl.(*tabletservermock.Controller).SetServingTypeError = fmt.Errorf("test cannot start query service") // health check should now fail before = time.Now() diff --git a/go/vt/tabletmanager/reparent.go b/go/vt/tabletmanager/reparent.go index f4b77f02d6..d2c1f98859 100644 --- a/go/vt/tabletmanager/reparent.go +++ b/go/vt/tabletmanager/reparent.go @@ -99,8 +99,13 @@ func (agent *ActionAgent) TabletExternallyReparented(ctx context.Context, extern agent.mutex.Lock() agent._tabletExternallyReparentedTime = time.Now() + agent._replicationDelay = 0 agent.mutex.Unlock() + // update the listeners in the background + event.DispatchUpdate(ev, "broadcasting to listeners") + agent.broadcastHealth() + // Directly write the new master endpoint in the serving graph. // We will do a true rebuild in the background soon, but in the meantime, // this will be enough for clients to re-resolve the new master. diff --git a/go/vt/tabletserver/tabletservermock/controller.go b/go/vt/tabletserver/tabletservermock/controller.go index 39e203543d..a872818bec 100644 --- a/go/vt/tabletserver/tabletservermock/controller.go +++ b/go/vt/tabletserver/tabletservermock/controller.go @@ -25,9 +25,6 @@ type Controller struct { // SetServingTypeError is the return value for SetServingType SetServingTypeError error - // StartServiceError is the return value for StartService - StartServiceError error - // IsHealthy is the return value for IsHealthy IsHealthyError error @@ -40,7 +37,6 @@ func NewController() *Controller { return &Controller{ QueryServiceEnabled: false, InitDBConfigError: nil, - StartServiceError: nil, IsHealthyError: nil, ReloadSchemaCount: 0, } @@ -61,15 +57,17 @@ func (tqsc *Controller) InitDBConfig(*pb.Target, *dbconfigs.DBConfigs, []tablets } // SetServingType is part of the tabletserver.Controller interface -func (tqsc *Controller) SetServingType(topodata.TabletType, bool) error { - tqsc.QueryServiceEnabled = tqsc.SetServingTypeError == nil +func (tqsc *Controller) SetServingType(tabletType topodata.TabletType, serving bool) error { + if tqsc.SetServingTypeError == nil { + tqsc.QueryServiceEnabled = serving + } return tqsc.SetServingTypeError } // StartService is part of the tabletserver.Controller interface func (tqsc *Controller) StartService(*pb.Target, *dbconfigs.DBConfigs, []tabletserver.SchemaOverride, mysqlctl.MysqlDaemon) error { - tqsc.QueryServiceEnabled = tqsc.StartServiceError == nil - return tqsc.StartServiceError + tqsc.QueryServiceEnabled = true + return nil } // StopService is part of the tabletserver.Controller interface