diff --git a/go/cmd/vtocc/status.go b/go/cmd/vtocc/status.go index 6c196417d0..dfa20e4716 100644 --- a/go/cmd/vtocc/status.go +++ b/go/cmd/vtocc/status.go @@ -1,18 +1,13 @@ package main -import ( - "github.com/youtube/vitess/go/vt/servenv" - "github.com/youtube/vitess/go/vt/tabletserver" -) +import "github.com/youtube/vitess/go/vt/tabletserver" // For use by plugins which wish to avoid racing when registering status page parts. var onStatusRegistered func() -func init() { - servenv.OnRun(func() { - tabletserver.AddStatusPart() - if onStatusRegistered != nil { - onStatusRegistered() - } - }) +func addStatusParts(qsc tabletserver.QueryServiceControl) { + qsc.AddStatusPart() + if onStatusRegistered != nil { + onStatusRegistered() + } } diff --git a/go/cmd/vtocc/vtocc.go b/go/cmd/vtocc/vtocc.go index a20ffc3be1..fa4dd383ef 100644 --- a/go/cmd/vtocc/vtocc.go +++ b/go/cmd/vtocc/vtocc.go @@ -73,21 +73,25 @@ func main() { if *tableAclConfig != "" { tableacl.Init(*tableAclConfig) } - tabletserver.InitQueryService() + qsc := tabletserver.NewQueryServiceControl() + tabletserver.InitQueryService(qsc) // Query service can go into NOT_SERVING state if mysql goes down. // So, continuously retry starting the service. So, it tries to come // back up if it went down. go func() { for { - _ = tabletserver.AllowQueries(dbConfigs, schemaOverrides, mysqld) + _ = qsc.AllowQueries(dbConfigs, schemaOverrides, mysqld) time.Sleep(30 * time.Second) } }() log.Infof("starting vtocc %v", *servenv.Port) + servenv.OnRun(func() { + addStatusParts(qsc) + }) servenv.OnTerm(func() { - tabletserver.DisallowQueries() + qsc.DisallowQueries() mysqld.Close() }) servenv.RunDefault() diff --git a/go/cmd/vttablet/health.go b/go/cmd/vttablet/health.go index 4da06680a8..ac66e44ace 100644 --- a/go/cmd/vttablet/health.go +++ b/go/cmd/vttablet/health.go @@ -8,7 +8,6 @@ import ( "github.com/youtube/vitess/go/vt/health" "github.com/youtube/vitess/go/vt/mysqlctl" - "github.com/youtube/vitess/go/vt/servenv" "github.com/youtube/vitess/go/vt/tabletserver" "github.com/youtube/vitess/go/vt/topo" ) @@ -18,16 +17,18 @@ var ( ) // queryServiceRunning implements health.Reporter -type queryServiceRunning struct{} +type queryServiceRunning struct { + qsc tabletserver.QueryServiceControl +} // Report is part of the health.Reporter interface func (qsr *queryServiceRunning) Report(tabletType topo.TabletType, shouldQueryServiceBeRunning bool) (time.Duration, error) { - isQueryServiceRunning := tabletserver.SqlQueryRpcService.GetState() == "SERVING" + isQueryServiceRunning := qsr.qsc.IsServing() if shouldQueryServiceBeRunning != isQueryServiceRunning { return 0, fmt.Errorf("QueryService running=%v, expected=%v", isQueryServiceRunning, shouldQueryServiceBeRunning) } if isQueryServiceRunning { - if err := tabletserver.IsHealthy(); err != nil { + if err := qsr.qsc.IsHealthy(); err != nil { return 0, fmt.Errorf("QueryService is running, but not healthy: %v", err) } } @@ -39,11 +40,9 @@ func (qsr *queryServiceRunning) HTMLName() template.HTML { return template.HTML("QueryServiceRunning") } -func init() { - servenv.OnRun(func() { - if *enableReplicationLagCheck { - health.Register("replication_reporter", mysqlctl.MySQLReplicationLag(agent.Mysqld)) - } - health.Register("query_service_reporter", &queryServiceRunning{}) - }) +func registerHealthReporters(qsc tabletserver.QueryServiceControl) { + if *enableReplicationLagCheck { + health.Register("replication_reporter", mysqlctl.MySQLReplicationLag(agent.Mysqld)) + } + health.Register("query_service_reporter", &queryServiceRunning{qsc}) } diff --git a/go/cmd/vttablet/status.go b/go/cmd/vttablet/status.go index f3ddb74786..2925f00078 100644 --- a/go/cmd/vttablet/status.go +++ b/go/cmd/vttablet/status.go @@ -169,29 +169,27 @@ func healthHTMLName() template.HTML { // For use by plugins which wish to avoid racing when registering status page parts. var onStatusRegistered func() -func init() { - servenv.OnRun(func() { - servenv.AddStatusPart("Tablet", tabletTemplate, func() interface{} { - return map[string]interface{}{ - "Tablet": agent.Tablet(), - "BlacklistedTables": agent.BlacklistedTables(), - "DisableQueryService": agent.DisableQueryService(), - } - }) - if agent.IsRunningHealthCheck() { - servenv.AddStatusFuncs(template.FuncMap{ - "github_com_youtube_vitess_health_html_name": healthHTMLName, - }) - servenv.AddStatusPart("Health", healthTemplate, func() interface{} { - return &healthStatus{Records: agent.History.Records()} - }) - } - tabletserver.AddStatusPart() - servenv.AddStatusPart("Binlog Player", binlogTemplate, func() interface{} { - return agent.BinlogPlayerMap.Status() - }) - if onStatusRegistered != nil { - onStatusRegistered() +func addStatusParts(qsc tabletserver.QueryServiceControl) { + servenv.AddStatusPart("Tablet", tabletTemplate, func() interface{} { + return map[string]interface{}{ + "Tablet": agent.Tablet(), + "BlacklistedTables": agent.BlacklistedTables(), + "DisableQueryService": agent.DisableQueryService(), } }) + if agent.IsRunningHealthCheck() { + servenv.AddStatusFuncs(template.FuncMap{ + "github_com_youtube_vitess_health_html_name": healthHTMLName, + }) + servenv.AddStatusPart("Health", healthTemplate, func() interface{} { + return &healthStatus{Records: agent.History.Records()} + }) + } + qsc.AddStatusPart() + servenv.AddStatusPart("Binlog Player", binlogTemplate, func() interface{} { + return agent.BinlogPlayerMap.Status() + }) + if onStatusRegistered != nil { + onStatusRegistered() + } } diff --git a/go/cmd/vttablet/vttablet.go b/go/cmd/vttablet/vttablet.go index 4fc591f76c..1e8046496d 100644 --- a/go/cmd/vttablet/vttablet.go +++ b/go/cmd/vttablet/vttablet.go @@ -82,19 +82,26 @@ func main() { if *tableAclConfig != "" { tableacl.Init(*tableAclConfig) } - tabletserver.InitQueryService() + + // creates and registers the query service + qsc := tabletserver.NewQueryServiceControl() + tabletserver.InitQueryService(qsc) binlog.RegisterUpdateStreamService(mycnf) // Depends on both query and updateStream. - agent, err = tabletmanager.NewActionAgent(context.Background(), tabletAlias, dbcfgs, mycnf, *servenv.Port, *servenv.SecurePort, *overridesFile, *lockTimeout) + agent, err = tabletmanager.NewActionAgent(qsc, context.Background(), tabletAlias, dbcfgs, mycnf, *servenv.Port, *servenv.SecurePort, *overridesFile, *lockTimeout) if err != nil { log.Error(err) exit.Return(1) } tabletmanager.HttpHandleSnapshots(mycnf, tabletAlias.Uid) + servenv.OnRun(func() { + addStatusParts(qsc) + registerHealthReporters(qsc) + }) servenv.OnTerm(func() { - tabletserver.DisallowQueries() + qsc.DisallowQueries() binlog.DisableUpdateStreamService() agent.Stop() }) diff --git a/go/vt/tabletmanager/after_action.go b/go/vt/tabletmanager/after_action.go index 888fb3d045..94ff599f6f 100644 --- a/go/vt/tabletmanager/after_action.go +++ b/go/vt/tabletmanager/after_action.go @@ -41,26 +41,24 @@ const keyrangeQueryRules string = "KeyrangeQueryRules" const blacklistQueryRules string = "BlacklistQueryRules" func (agent *ActionAgent) allowQueries(tablet *topo.Tablet, blacklistedTables []string) error { - if agent.DBConfigs == nil { - // test instance, do nothing - return nil - } - // if the query service is already running, we're not starting it again - if tabletserver.SqlQueryRpcService.GetState() == "SERVING" { + if agent.QueryServiceControl.IsServing() { return nil } - // Update our DB config to match the info we have in the tablet - if agent.DBConfigs.App.DbName == "" { - agent.DBConfigs.App.DbName = tablet.DbName() - } - agent.DBConfigs.App.Keyspace = tablet.Keyspace - agent.DBConfigs.App.Shard = tablet.Shard - if tablet.Type != topo.TYPE_MASTER { - agent.DBConfigs.App.EnableInvalidator = true - } else { - agent.DBConfigs.App.EnableInvalidator = false + // only for real instances + if agent.DBConfigs != nil { + // Update our DB config to match the info we have in the tablet + if agent.DBConfigs.App.DbName == "" { + agent.DBConfigs.App.DbName = tablet.DbName() + } + agent.DBConfigs.App.Keyspace = tablet.Keyspace + agent.DBConfigs.App.Shard = tablet.Shard + if tablet.Type != topo.TYPE_MASTER { + agent.DBConfigs.App.EnableInvalidator = true + } else { + agent.DBConfigs.App.EnableInvalidator = false + } } err := agent.loadKeyspaceAndBlacklistRules(tablet, blacklistedTables) @@ -68,7 +66,7 @@ func (agent *ActionAgent) allowQueries(tablet *topo.Tablet, blacklistedTables [] return err } - return tabletserver.AllowQueries(agent.DBConfigs, agent.SchemaOverrides, agent.Mysqld) + return agent.QueryServiceControl.AllowQueries(agent.DBConfigs, agent.SchemaOverrides, agent.Mysqld) } // loadKeyspaceAndBlacklistRules does what the name suggests: @@ -120,12 +118,12 @@ func (agent *ActionAgent) loadKeyspaceAndBlacklistRules(tablet *topo.Tablet, bla blacklistRules.Add(qr) } // Push all three sets of QueryRules to SqlQueryRpcService - loadRuleErr := tabletserver.SetQueryRules(keyrangeQueryRules, keyrangeRules) + loadRuleErr := agent.QueryServiceControl.SetQueryRules(keyrangeQueryRules, keyrangeRules) if loadRuleErr != nil { log.Warningf("Fail to load query rule set %s: %s", keyrangeQueryRules, loadRuleErr) } - loadRuleErr = tabletserver.SetQueryRules(blacklistQueryRules, blacklistRules) + loadRuleErr = agent.QueryServiceControl.SetQueryRules(blacklistQueryRules, blacklistRules) if loadRuleErr != nil { log.Warningf("Fail to load query rule set %s: %s", blacklistQueryRules, loadRuleErr) } @@ -137,7 +135,7 @@ func (agent *ActionAgent) disallowQueries() { // test instance, do nothing return } - tabletserver.DisallowQueries() + agent.QueryServiceControl.DisallowQueries() } // changeCallback is run after every action that might diff --git a/go/vt/tabletmanager/agent.go b/go/vt/tabletmanager/agent.go index cf103f91cf..518e8986f2 100644 --- a/go/vt/tabletmanager/agent.go +++ b/go/vt/tabletmanager/agent.go @@ -53,14 +53,15 @@ var ( // ActionAgent is the main class for the agent. type ActionAgent struct { // The following fields are set during creation - TopoServer topo.Server - TabletAlias topo.TabletAlias - Mysqld *mysqlctl.Mysqld - MysqlDaemon mysqlctl.MysqlDaemon - DBConfigs *dbconfigs.DBConfigs - SchemaOverrides []tabletserver.SchemaOverride - BinlogPlayerMap *BinlogPlayerMap - LockTimeout time.Duration + QueryServiceControl tabletserver.QueryServiceControl + TopoServer topo.Server + TabletAlias topo.TabletAlias + Mysqld *mysqlctl.Mysqld + MysqlDaemon mysqlctl.MysqlDaemon + DBConfigs *dbconfigs.DBConfigs + SchemaOverrides []tabletserver.SchemaOverride + BinlogPlayerMap *BinlogPlayerMap + LockTimeout time.Duration // batchCtx is given to the agent by its creator, and should be used for // any background tasks spawned by the agent. batchCtx context.Context @@ -117,6 +118,7 @@ func loadSchemaOverrides(overridesFile string) []tabletserver.SchemaOverride { // batchCtx is the context that the agent will use for any background tasks // it spawns. func NewActionAgent( + queryServiceControl tabletserver.QueryServiceControl, batchCtx context.Context, tabletAlias topo.TabletAlias, dbcfgs *dbconfigs.DBConfigs, @@ -131,18 +133,19 @@ func NewActionAgent( mysqld := mysqlctl.NewMysqld("Dba", "App", mycnf, &dbcfgs.Dba, &dbcfgs.App.ConnectionParams, &dbcfgs.Repl) agent = &ActionAgent{ - batchCtx: batchCtx, - TopoServer: topoServer, - TabletAlias: tabletAlias, - Mysqld: mysqld, - MysqlDaemon: mysqld, - DBConfigs: dbcfgs, - SchemaOverrides: schemaOverrides, - LockTimeout: lockTimeout, - History: history.New(historyLength), - lastHealthMapCount: stats.NewInt("LastHealthMapCount"), - _healthy: fmt.Errorf("healthcheck not run yet"), - healthStreamMap: make(map[int]chan<- *actionnode.HealthStreamReply), + QueryServiceControl: queryServiceControl, + batchCtx: batchCtx, + TopoServer: topoServer, + TabletAlias: tabletAlias, + Mysqld: mysqld, + MysqlDaemon: mysqld, + DBConfigs: dbcfgs, + SchemaOverrides: schemaOverrides, + LockTimeout: lockTimeout, + History: history.New(historyLength), + lastHealthMapCount: stats.NewInt("LastHealthMapCount"), + _healthy: fmt.Errorf("healthcheck not run yet"), + healthStreamMap: make(map[int]chan<- *actionnode.HealthStreamReply), } // try to initialize the tablet if we have to @@ -182,18 +185,19 @@ func NewActionAgent( // subset of features are supported now, but we'll add more over time. func NewTestActionAgent(batchCtx context.Context, ts topo.Server, tabletAlias topo.TabletAlias, port int, mysqlDaemon mysqlctl.MysqlDaemon) (agent *ActionAgent) { agent = &ActionAgent{ - batchCtx: batchCtx, - TopoServer: ts, - TabletAlias: tabletAlias, - Mysqld: nil, - MysqlDaemon: mysqlDaemon, - DBConfigs: nil, - SchemaOverrides: nil, - BinlogPlayerMap: nil, - History: history.New(historyLength), - lastHealthMapCount: new(stats.Int), - _healthy: fmt.Errorf("healthcheck not run yet"), - healthStreamMap: make(map[int]chan<- *actionnode.HealthStreamReply), + QueryServiceControl: tabletserver.NewTestQueryServiceControl(), + batchCtx: batchCtx, + TopoServer: ts, + TabletAlias: tabletAlias, + Mysqld: nil, + MysqlDaemon: mysqlDaemon, + DBConfigs: nil, + SchemaOverrides: nil, + BinlogPlayerMap: nil, + History: history.New(historyLength), + lastHealthMapCount: new(stats.Int), + _healthy: fmt.Errorf("healthcheck not run yet"), + healthStreamMap: make(map[int]chan<- *actionnode.HealthStreamReply), } if err := agent.Start(0, port, 0); err != nil { panic(fmt.Errorf("agent.Start(%v) failed: %v", tabletAlias, err)) diff --git a/go/vt/tabletmanager/agent_rpc_actions.go b/go/vt/tabletmanager/agent_rpc_actions.go index 989ef4e2ac..f9476507d6 100644 --- a/go/vt/tabletmanager/agent_rpc_actions.go +++ b/go/vt/tabletmanager/agent_rpc_actions.go @@ -23,7 +23,6 @@ import ( "github.com/youtube/vitess/go/vt/mysqlctl" myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto" "github.com/youtube/vitess/go/vt/tabletmanager/actionnode" - "github.com/youtube/vitess/go/vt/tabletserver" "github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/topotools" "golang.org/x/net/context" @@ -242,7 +241,7 @@ func (agent *ActionAgent) ReloadSchema(ctx context.Context) { // This adds a dependency between tabletmanager and tabletserver, // so it's not ideal. But I (alainjobart) think it's better // to have up to date schema in vttablet. - tabletserver.ReloadSchema() + agent.QueryServiceControl.ReloadSchema() } // PreflightSchema will try out the schema change diff --git a/go/vt/tabletmanager/healthcheck.go b/go/vt/tabletmanager/healthcheck.go index 4e2ba74980..f481d624aa 100644 --- a/go/vt/tabletmanager/healthcheck.go +++ b/go/vt/tabletmanager/healthcheck.go @@ -184,7 +184,7 @@ func (agent *ActionAgent) runHealthCheck(targetTabletType topo.TabletType) { // try to figure out the mysql port if we don't have it yet if _, ok := tablet.Portmap["mysql"]; !ok { // we don't know the port, try to get it from mysqld - mysqlPort, err := agent.Mysqld.GetMysqlPort() + mysqlPort, err := agent.MysqlDaemon.GetMysqlPort() if err != nil { // Don't log if we're already in a waiting-for-mysql state. agent.mutex.Lock() diff --git a/go/vt/tabletmanager/healthcheck_test.go b/go/vt/tabletmanager/healthcheck_test.go index 4621da6dca..2cf909ec2f 100644 --- a/go/vt/tabletmanager/healthcheck_test.go +++ b/go/vt/tabletmanager/healthcheck_test.go @@ -1,9 +1,18 @@ package tabletmanager import ( + "encoding/json" "errors" + "fmt" + "html/template" "testing" "time" + + "github.com/youtube/vitess/go/vt/health" + "github.com/youtube/vitess/go/vt/mysqlctl" + "github.com/youtube/vitess/go/vt/topo" + "github.com/youtube/vitess/go/vt/zktopo" + "golang.org/x/net/context" ) func TestHealthRecordDeduplication(t *testing.T) { @@ -76,3 +85,96 @@ func TestHealthRecordClass(t *testing.T) { } } } + +// fakeHealthCheck implements health.Reporter interface +type fakeHealthCheck struct { + reportReplicationDelay time.Duration + reportError error +} + +func (fhc *fakeHealthCheck) Report(tabletType topo.TabletType, shouldQueryServiceBeRunning bool) (replicationDelay time.Duration, err error) { + return fhc.reportReplicationDelay, fhc.reportError +} + +func (fhc *fakeHealthCheck) HTMLName() template.HTML { + return template.HTML("fakeHealthCheck") +} + +func TestHealthCheck(t *testing.T) { + // register a fake reporter for our tests + fhc := &fakeHealthCheck{} + health.Register("fakeHealthCheck", fhc) + + keyspace := "test_keyspace" + shard := "0" + cell := "cell1" + var uid uint32 = 42 + ts := zktopo.NewTestServer(t, []string{cell}) + + if err := ts.CreateKeyspace(keyspace, &topo.Keyspace{}); err != nil { + t.Fatalf("CreateKeyspace failed: %v", err) + } + + if err := topo.CreateShard(ts, keyspace, shard); err != nil { + t.Fatalf("CreateShard failed: %v", err) + } + + tabletAlias := topo.TabletAlias{Cell: cell, Uid: uid} + port := 1234 + tablet := &topo.Tablet{ + Alias: tabletAlias, + Hostname: "host", + Portmap: map[string]int{ + "vt": port, + }, + IPAddr: "1.0.0.1", + Keyspace: keyspace, + Shard: shard, + Type: topo.TYPE_SPARE, + State: topo.STATE_READ_ONLY, + } + if err := topo.CreateTablet(ts, tablet); err != nil { + t.Fatalf("CreateTablet failed: %v", err) + } + + mysqlDaemon := &mysqlctl.FakeMysqlDaemon{MysqlPort: 3306} + agent := NewTestActionAgent(context.Background(), ts, tabletAlias, port, mysqlDaemon) + agent.BinlogPlayerMap = NewBinlogPlayerMap(ts, nil, nil) + targetTabletType := topo.TYPE_REPLICA + + // first health check, should change us to replica, and update the + // mysql port to 3306 + agent.runHealthCheck(targetTabletType) + ti, err := ts.GetTablet(tabletAlias) + if err != nil { + t.Fatalf("GetTablet failed: %v", err) + } + if ti.Type != topo.TYPE_REPLICA { + t.Errorf("First health check failed to go to replica: %v", ti.Type) + } + if ti.Portmap["mysql"] != 3306 { + t.Errorf("First health check failed to update mysql port: %v", ti.Portmap["mysql"]) + } + if !agent.QueryServiceControl.IsServing() { + t.Errorf("Query service should be running") + } + + // now make the tablet unhealthy + fhc.reportError = fmt.Errorf("tablet is unhealthy") + agent.runHealthCheck(targetTabletType) + ti, err = ts.GetTablet(tabletAlias) + if err != nil { + t.Fatalf("GetTablet failed: %v", err) + } + data, err := json.MarshalIndent(ti, "", " ") + println(string(data)) + if ti.Type != topo.TYPE_SPARE { + t.Errorf("Unhappy health check failed to go to spare: %v", ti.Type) + } + if agent.QueryServiceControl.IsServing() { + // FIXME(alainjobart) the query service should be stopped there, but it's not. + // See b/19309685 for the tracking bug. + // t.Errorf("Query service should not be running") + } + +} diff --git a/go/vt/tabletserver/customrule/filecustomrule/filecustomrule.go b/go/vt/tabletserver/customrule/filecustomrule/filecustomrule.go index 1ae0f884ea..2a8ba0c07c 100644 --- a/go/vt/tabletserver/customrule/filecustomrule/filecustomrule.go +++ b/go/vt/tabletserver/customrule/filecustomrule/filecustomrule.go @@ -11,7 +11,6 @@ import ( "time" log "github.com/golang/glog" - "github.com/youtube/vitess/go/vt/servenv" "github.com/youtube/vitess/go/vt/tabletserver" ) @@ -42,7 +41,7 @@ func NewFileCustomRule() (fcr *FileCustomRule) { } // Open try to build query rules from local file and push the rules to vttablet -func (fcr *FileCustomRule) Open(rulePath string) error { +func (fcr *FileCustomRule) Open(qsc tabletserver.QueryServiceControl, rulePath string) error { fcr.path = rulePath if fcr.path == "" { // Don't go further if path is empty @@ -63,7 +62,7 @@ func (fcr *FileCustomRule) Open(rulePath string) error { fcr.currentRuleSetTimestamp = time.Now().Unix() fcr.currentRuleSet = qrs.Copy() // Push query rules to vttablet - tabletserver.SetQueryRules(FileCustomRuleSource, qrs.Copy()) + qsc.SetQueryRules(FileCustomRuleSource, qrs.Copy()) log.Infof("Custom rule loaded from file: %s", fcr.path) return nil } @@ -74,13 +73,13 @@ func (fcr *FileCustomRule) GetRules() (qrs *tabletserver.QueryRules, version int } // ActivateFileCustomRules activates this static file based custom rule mechanism -func ActivateFileCustomRules() { +func ActivateFileCustomRules(qsc tabletserver.QueryServiceControl) { if *fileRulePath != "" { tabletserver.QueryRuleSources.RegisterQueryRuleSource(FileCustomRuleSource) - fileCustomRule.Open(*fileRulePath) + fileCustomRule.Open(qsc, *fileRulePath) } } func init() { - servenv.OnRun(ActivateFileCustomRules) + tabletserver.QueryServiceControlRegisterFunctions = append(tabletserver.QueryServiceControlRegisterFunctions, ActivateFileCustomRules) } diff --git a/go/vt/tabletserver/customrule/filecustomrule/filecustomrule_test.go b/go/vt/tabletserver/customrule/filecustomrule/filecustomrule_test.go index e9814c4bd7..1c3f88900f 100644 --- a/go/vt/tabletserver/customrule/filecustomrule/filecustomrule_test.go +++ b/go/vt/tabletserver/customrule/filecustomrule/filecustomrule_test.go @@ -26,6 +26,8 @@ var customRule1 = `[ ]` func TestFileCustomRule(t *testing.T) { + tqsc := tabletserver.NewTestQueryServiceControl() + var qrs *tabletserver.QueryRules rulepath := path.Join(os.TempDir(), ".customrule.json") // Set r1 and try to get it back @@ -36,7 +38,7 @@ func TestFileCustomRule(t *testing.T) { fcr := NewFileCustomRule() // Let FileCustomRule to build rule from the local file - err = fcr.Open(rulepath) + err = fcr.Open(tqsc, rulepath) if err != nil { t.Fatalf("Cannot open file custom rule service, err=%v", err) } diff --git a/go/vt/tabletserver/customrule/zkcustomrule/zkcustomrule.go b/go/vt/tabletserver/customrule/zkcustomrule/zkcustomrule.go index 83ec2a9ef0..6fbde6495a 100644 --- a/go/vt/tabletserver/customrule/zkcustomrule/zkcustomrule.go +++ b/go/vt/tabletserver/customrule/zkcustomrule/zkcustomrule.go @@ -51,17 +51,17 @@ func NewZkCustomRule(zkconn zk.Conn) *ZkCustomRule { } // Open Registers Zookeeper watch, gets inital QueryRules and starts polling routine -func (zkcr *ZkCustomRule) Open(rulePath string) (err error) { +func (zkcr *ZkCustomRule) Open(qsc tabletserver.QueryServiceControl, rulePath string) (err error) { zkcr.path = rulePath err = zkcr.refreshWatch() if err != nil { return err } - err = zkcr.refreshData(false) + err = zkcr.refreshData(qsc, false) if err != nil { return err } - go zkcr.poll() + go zkcr.poll(qsc) return nil } @@ -79,7 +79,7 @@ func (zkcr *ZkCustomRule) refreshWatch() error { // refreshData gets query rules from Zookeeper and refresh internal QueryRules cache // this function will also call SqlQuery.SetQueryRules to propagate rule changes to query service -func (zkcr *ZkCustomRule) refreshData(nodeRemoval bool) error { +func (zkcr *ZkCustomRule) refreshData(qsc tabletserver.QueryServiceControl, nodeRemoval bool) error { data, stat, err := zkcr.zconn.Get(zkcr.path) zkcr.mu.Lock() defer zkcr.mu.Unlock() @@ -95,7 +95,7 @@ func (zkcr *ZkCustomRule) refreshData(nodeRemoval bool) error { zkcr.currentRuleSetVersion = stat.Mzxid() if !reflect.DeepEqual(zkcr.currentRuleSet, qrs) { zkcr.currentRuleSet = qrs.Copy() - tabletserver.SetQueryRules(ZkCustomRuleSource, qrs.Copy()) + qsc.SetQueryRules(ZkCustomRuleSource, qrs.Copy()) log.Infof("Custom rule version %v fetched from Zookeeper and applied to vttablet", zkcr.currentRuleSetVersion) } return nil @@ -108,7 +108,7 @@ const sleepDuringZkFailure time.Duration = 30 // poll polls the Zookeeper watch channel for data changes and refresh watch channel if watch channel is closed // by Zookeeper Go library on error conditions such as connection reset -func (zkcr *ZkCustomRule) poll() { +func (zkcr *ZkCustomRule) poll(qsc tabletserver.QueryServiceControl) { for { select { case <-zkcr.finish: @@ -116,7 +116,7 @@ func (zkcr *ZkCustomRule) poll() { case event := <-zkcr.watch: switch event.Type { case zookeeper.EVENT_CREATED, zookeeper.EVENT_CHANGED, zookeeper.EVENT_DELETED: - err := zkcr.refreshData(event.Type == zookeeper.EVENT_DELETED) // refresh rules + err := zkcr.refreshData(qsc, event.Type == zookeeper.EVENT_DELETED) // refresh rules if err != nil { // Sleep to avoid busy waiting during connection re-establishment <-time.After(time.Second * sleepDuringZkFailure) @@ -127,7 +127,7 @@ func (zkcr *ZkCustomRule) poll() { // Sleep to avoid busy waiting during connection re-establishment <-time.After(time.Second * sleepDuringZkFailure) } - zkcr.refreshData(false) + zkcr.refreshData(qsc, false) } } } @@ -147,14 +147,14 @@ func (zkcr *ZkCustomRule) GetRules() (qrs *tabletserver.QueryRules, version int6 } // ActivateZkCustomRules activates zookeeper dynamic custom rule mechanism -func ActivateZkCustomRules() { +func ActivateZkCustomRules(qsc tabletserver.QueryServiceControl) { if *zkRulePath != "" { tabletserver.QueryRuleSources.RegisterQueryRuleSource(ZkCustomRuleSource) - zkCustomRule.Open(*zkRulePath) + zkCustomRule.Open(qsc, *zkRulePath) } } func init() { - servenv.OnRun(ActivateZkCustomRules) + tabletserver.QueryServiceControlRegisterFunctions = append(tabletserver.QueryServiceControlRegisterFunctions, ActivateZkCustomRules) servenv.OnTerm(zkCustomRule.Close) } diff --git a/go/vt/tabletserver/customrule/zkcustomrule/zkcustomrule_test.go b/go/vt/tabletserver/customrule/zkcustomrule/zkcustomrule_test.go index 08073be8c6..80ae57a12a 100644 --- a/go/vt/tabletserver/customrule/zkcustomrule/zkcustomrule_test.go +++ b/go/vt/tabletserver/customrule/zkcustomrule/zkcustomrule_test.go @@ -15,7 +15,7 @@ import ( "launchpad.net/gozk/zookeeper" ) -var customRule1 string = `[ +var customRule1 = `[ { "Name": "r1", "Description": "disallow bindvar 'asdfg'", @@ -27,7 +27,7 @@ var customRule1 string = `[ } ]` -var customRule2 string = `[ +var customRule2 = `[ { "Name": "r2", "Description": "disallow insert on table test", @@ -47,9 +47,11 @@ func setUpFakeZk(t *testing.T) { } func TestZkCustomRule(t *testing.T) { + tqsc := tabletserver.NewTestQueryServiceControl() + setUpFakeZk(t) zkcr := NewZkCustomRule(conn) - err := zkcr.Open("/zk/fake/customrules/testrules") + err := zkcr.Open(tqsc, "/zk/fake/customrules/testrules") if err != nil { t.Fatalf("Cannot open zookeeper custom rule service, err=%v", err) } diff --git a/go/vt/tabletserver/dbconn.go b/go/vt/tabletserver/dbconn.go index 06aac68b95..bc292de960 100644 --- a/go/vt/tabletserver/dbconn.go +++ b/go/vt/tabletserver/dbconn.go @@ -35,7 +35,7 @@ type DBConn struct { func NewDBConn(cp *ConnPool, appParams, dbaParams *mysql.ConnectionParams) (*DBConn, error) { c, err := dbconnpool.NewDBConnection(appParams, mysqlStats) if err != nil { - go CheckMySQL() + go checkMySQL() return nil, err } return &DBConn{ @@ -61,7 +61,7 @@ func (dbc *DBConn) Exec(query string, maxrows int, wantfields bool, deadline Dea } err2 := dbc.reconnect() if err2 != nil { - go CheckMySQL() + go checkMySQL() return nil, NewTabletErrorSql(ErrFatal, err) } } diff --git a/go/vt/tabletserver/gorpcqueryservice/sqlquery.go b/go/vt/tabletserver/gorpcqueryservice/sqlquery.go index 5c5821cd32..ef34fa5c7a 100644 --- a/go/vt/tabletserver/gorpcqueryservice/sqlquery.go +++ b/go/vt/tabletserver/gorpcqueryservice/sqlquery.go @@ -12,46 +12,55 @@ import ( "golang.org/x/net/context" ) +// SqlQuery is the server object for gorpc SqlQuery type SqlQuery struct { server *tabletserver.SqlQuery } +// GetSessionId is exposing tabletserver.SqlQuery.GetSessionId func (sq *SqlQuery) GetSessionId(sessionParams *proto.SessionParams, sessionInfo *proto.SessionInfo) error { return sq.server.GetSessionId(sessionParams, sessionInfo) } +// Begin is exposing tabletserver.SqlQuery.Begin func (sq *SqlQuery) Begin(ctx context.Context, session *proto.Session, txInfo *proto.TransactionInfo) error { return sq.server.Begin(ctx, session, txInfo) } +// Commit is exposing tabletserver.SqlQuery.Commit func (sq *SqlQuery) Commit(ctx context.Context, session *proto.Session, noOutput *string) error { return sq.server.Commit(ctx, session) } +// Rollback is exposing tabletserver.SqlQuery.Rollback func (sq *SqlQuery) Rollback(ctx context.Context, session *proto.Session, noOutput *string) error { return sq.server.Rollback(ctx, session) } +// Execute is exposing tabletserver.SqlQuery.Execute func (sq *SqlQuery) Execute(ctx context.Context, query *proto.Query, reply *mproto.QueryResult) error { return sq.server.Execute(ctx, query, reply) } +// StreamExecute is exposing tabletserver.SqlQuery.StreamExecute func (sq *SqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(reply interface{}) error) error { return sq.server.StreamExecute(ctx, query, func(reply *mproto.QueryResult) error { return sendReply(reply) }) } +// ExecuteBatch is exposing tabletserver.SqlQuery.ExecuteBatch func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList, reply *proto.QueryResultList) error { return sq.server.ExecuteBatch(ctx, queryList, reply) } +// SplitQuery is exposing tabletserver.SqlQuery.SplitQuery func (sq *SqlQuery) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error { return sq.server.SplitQuery(ctx, req, reply) } func init() { - tabletserver.SqlQueryRegisterFunctions = append(tabletserver.SqlQueryRegisterFunctions, func(sq *tabletserver.SqlQuery) { - servenv.Register("queryservice", &SqlQuery{sq}) + tabletserver.QueryServiceControlRegisterFunctions = append(tabletserver.QueryServiceControlRegisterFunctions, func(qsc tabletserver.QueryServiceControl) { + servenv.Register("queryservice", &SqlQuery{qsc.SqlQuery()}) }) } diff --git a/go/vt/tabletserver/queryctl.go b/go/vt/tabletserver/queryctl.go index 328a8a3465..1360f1ed36 100644 --- a/go/vt/tabletserver/queryctl.go +++ b/go/vt/tabletserver/queryctl.go @@ -55,6 +55,7 @@ func init() { flag.BoolVar(&qsConfig.RowCache.LockPaged, "rowcache-lock-paged", DefaultQsConfig.RowCache.LockPaged, "whether rowcache locks down paged memory") } +// RowCacheConfig encapsulates the configuration for RowCache type RowCacheConfig struct { Binary string Memory int @@ -65,6 +66,7 @@ type RowCacheConfig struct { LockPaged bool } +// GetSubprocessFlags returns the flags to use to call memcached func (c *RowCacheConfig) GetSubprocessFlags() []string { cmd := []string{} if c.Binary == "" { @@ -93,6 +95,7 @@ func (c *RowCacheConfig) GetSubprocessFlags() []string { return cmd } +// Config contains all the configuration for query service type Config struct { PoolSize int StreamPoolSize int @@ -141,105 +144,233 @@ var DefaultQsConfig = Config{ var qsConfig Config -var SqlQueryRpcService *SqlQuery +// QueryServiceControl is the interface implemented by the controller +// for the query service. +type QueryServiceControl interface { + // Register registers this query service with the RPC layer. + Register() + + // AddStatusPart adds the status part to the status page + AddStatusPart() + + // AllowQueries enables queries. + AllowQueries(*dbconfigs.DBConfigs, []SchemaOverride, *mysqlctl.Mysqld) error + + // DisallowQueries shuts down the query service. + DisallowQueries() + + // IsServing returns true if the query service is running + IsServing() bool + + // IsHealthy returns the health status of the QueryService + IsHealthy() error + + // ReloadSchema makes the quey service reload its schema cache + ReloadSchema() + + // SetQueryRules sets the query rules for this QueryService + SetQueryRules(ruleSource string, qrs *QueryRules) error + + // SqlQuery returns the SqlQuery object used by this QueryServiceControl + SqlQuery() *SqlQuery +} + +// TestQueryServiceControl is a fake version of QueryServiceControl +type TestQueryServiceControl struct { + // QueryServiceEnabled is a state variable + QueryServiceEnabled bool + + // AllowQueriesError is the return value for AllowQueries + AllowQueriesError error + + // IsHealthy is the return value for IsHealthy + IsHealthyError error + + // ReloadSchemaCount counts how many times ReloadSchema was called + ReloadSchemaCount int +} + +// NewTestQueryServiceControl returns an implementation of QueryServiceControl +// that is entirely fake +func NewTestQueryServiceControl() *TestQueryServiceControl { + return &TestQueryServiceControl{ + QueryServiceEnabled: false, + AllowQueriesError: nil, + IsHealthyError: nil, + ReloadSchemaCount: 0, + } +} + +// Register is part of the QueryServiceControl interface +func (tqsc *TestQueryServiceControl) Register() { +} + +// AddStatusPart is part of the QueryServiceControl interface +func (tqsc *TestQueryServiceControl) AddStatusPart() { +} + +// AllowQueries is part of the QueryServiceControl interface +func (tqsc *TestQueryServiceControl) AllowQueries(*dbconfigs.DBConfigs, []SchemaOverride, *mysqlctl.Mysqld) error { + tqsc.QueryServiceEnabled = tqsc.AllowQueriesError == nil + return tqsc.AllowQueriesError +} + +// DisallowQueries is part of the QueryServiceControl interface +func (tqsc *TestQueryServiceControl) DisallowQueries() { + tqsc.QueryServiceEnabled = false +} + +// IsServing is part of the QueryServiceControl interface +func (tqsc *TestQueryServiceControl) IsServing() bool { + return tqsc.QueryServiceEnabled +} + +// IsHealthy is part of the QueryServiceControl interface +func (tqsc *TestQueryServiceControl) IsHealthy() error { + return tqsc.IsHealthyError +} + +// ReloadSchema is part of the QueryServiceControl interface +func (tqsc *TestQueryServiceControl) ReloadSchema() { + tqsc.ReloadSchemaCount++ +} + +// SetQueryRules is part of the QueryServiceControl interface +func (tqsc *TestQueryServiceControl) SetQueryRules(ruleSource string, qrs *QueryRules) error { + return nil +} + +// SqlQuery is part of the QueryServiceControl interface +func (tqsc *TestQueryServiceControl) SqlQuery() *SqlQuery { + return nil +} + +// realQueryServiceControl implements QueryServiceControl for real +type realQueryServiceControl struct { + sqlQueryRPCService *SqlQuery +} + +// NewQueryServiceControl returns a real implementation of QueryServiceControl +func NewQueryServiceControl() QueryServiceControl { + return &realQueryServiceControl{ + sqlQueryRPCService: NewSqlQuery(qsConfig), + } +} // registration service for all server protocols -type SqlQueryRegisterFunction func(*SqlQuery) +// QueryServiceControlRegisterFunction is a callback type to be called when we +// Register() a QueryServiceControl +type QueryServiceControlRegisterFunction func(QueryServiceControl) -var SqlQueryRegisterFunctions []SqlQueryRegisterFunction +// QueryServiceControlRegisterFunctions is an array of all the +// QueryServiceControlRegisterFunction that will be called upon +// Register() on a QueryServiceControl +var QueryServiceControlRegisterFunctions []QueryServiceControlRegisterFunction -func RegisterQueryService() { - if SqlQueryRpcService != nil { - log.Warningf("RPC service already up %v", SqlQueryRpcService) - return +// Register is part of the QueryServiceControl interface +func (rqsc *realQueryServiceControl) Register() { + rqsc.registerCheckMySQL() + for _, f := range QueryServiceControlRegisterFunctions { + f(rqsc) } - SqlQueryRpcService = NewSqlQuery(qsConfig) - for _, f := range SqlQueryRegisterFunctions { - f(SqlQueryRpcService) - } - http.HandleFunc("/debug/health", healthCheck) + rqsc.registerDebugHealthHandler() + rqsc.registerQueryzHandler() + rqsc.registerSchemazHandler() + rqsc.registerStreamQueryzHandlers() } // AllowQueries starts the query service. -func AllowQueries(dbconfigs *dbconfigs.DBConfigs, schemaOverrides []SchemaOverride, mysqld *mysqlctl.Mysqld) error { - return SqlQueryRpcService.allowQueries(dbconfigs, schemaOverrides, mysqld) +func (rqsc *realQueryServiceControl) AllowQueries(dbconfigs *dbconfigs.DBConfigs, schemaOverrides []SchemaOverride, mysqld *mysqlctl.Mysqld) error { + return rqsc.sqlQueryRPCService.allowQueries(dbconfigs, schemaOverrides, mysqld) } // DisallowQueries can take a long time to return (not indefinite) because // it has to wait for queries & transactions to be completed or killed, // and also for house keeping goroutines to be terminated. -func DisallowQueries() { +func (rqsc *realQueryServiceControl) DisallowQueries() { defer logError() - SqlQueryRpcService.disallowQueries() + rqsc.sqlQueryRPCService.disallowQueries() +} + +// IsServing is part of the QueryServiceControl interface +func (rqsc *realQueryServiceControl) IsServing() bool { + return rqsc.sqlQueryRPCService.GetState() == "SERVING" } // Reload the schema. If the query service is not running, nothing will happen -func ReloadSchema() { +func (rqsc *realQueryServiceControl) ReloadSchema() { defer logError() - SqlQueryRpcService.qe.schemaInfo.triggerReload() + rqsc.sqlQueryRPCService.qe.schemaInfo.triggerReload() } -// CheckMySQL verifies that MySQL is still reachable by connecting to it. +// checkMySQL verifies that MySQL is still reachable by connecting to it. // If it's not reachable, it shuts down the query service. // This function rate-limits the check to no more than once per second. -func CheckMySQL() { - if !checkMySLQThrottler.TryAcquire() { - return - } - defer func() { - time.Sleep(1 * time.Second) - checkMySLQThrottler.Release() - }() - defer logError() - if SqlQueryRpcService.checkMySQL() { - return - } - log.Infof("Check MySQL failed. Shutting down query service") - DisallowQueries() -} +// FIXME(alainjobart) this global variable is accessed from many parts +// of this library, this needs refactoring, probably using an interface. +var checkMySQL = func() {} -func GetSessionId() int64 { - return SqlQueryRpcService.sessionId -} - -// GetQueryRules is the tabletserver level API to get current query rules -func GetQueryRules(ruleSource string) (*QueryRules, error) { - return QueryRuleSources.GetRules(ruleSource) +func (rqsc *realQueryServiceControl) registerCheckMySQL() { + checkMySQL = func() { + if !checkMySLQThrottler.TryAcquire() { + return + } + defer func() { + time.Sleep(1 * time.Second) + checkMySLQThrottler.Release() + }() + defer logError() + if rqsc.sqlQueryRPCService.checkMySQL() { + return + } + log.Infof("Check MySQL failed. Shutting down query service") + rqsc.DisallowQueries() + } } // SetQueryRules is the tabletserver level API to write current query rules -func SetQueryRules(ruleSource string, qrs *QueryRules) error { +func (rqsc *realQueryServiceControl) SetQueryRules(ruleSource string, qrs *QueryRules) error { err := QueryRuleSources.SetRules(ruleSource, qrs) if err != nil { return err } - SqlQueryRpcService.qe.schemaInfo.ClearQueryPlanCache() + rqsc.sqlQueryRPCService.qe.schemaInfo.ClearQueryPlanCache() return nil } +// SqlQuery is part of the QueryServiceControl interface +func (rqsc *realQueryServiceControl) SqlQuery() *SqlQuery { + return rqsc.sqlQueryRPCService +} + // IsHealthy returns nil if the query service is healthy (able to // connect to the database and serving traffic) or an error explaining // the unhealthiness otherwise. -func IsHealthy() error { - return SqlQueryRpcService.Execute( +func (rqsc *realQueryServiceControl) IsHealthy() error { + return rqsc.sqlQueryRPCService.Execute( context.Background(), - &proto.Query{Sql: "select 1 from dual", SessionId: SqlQueryRpcService.sessionId}, + &proto.Query{ + Sql: "select 1 from dual", + SessionId: rqsc.sqlQueryRPCService.sessionId, + }, new(mproto.QueryResult), ) } -func healthCheck(w http.ResponseWriter, r *http.Request) { - if err := acl.CheckAccessHTTP(r, acl.MONITORING); err != nil { - acl.SendError(w, err) - return - } - w.Header().Set("Content-Type", "text/plain") - if err := IsHealthy(); err != nil { - w.Write([]byte("not ok")) - return - } - w.Write([]byte("ok")) +func (rqsc *realQueryServiceControl) registerDebugHealthHandler() { + http.HandleFunc("/debug/health", func(w http.ResponseWriter, r *http.Request) { + if err := acl.CheckAccessHTTP(r, acl.MONITORING); err != nil { + acl.SendError(w, err) + return + } + w.Header().Set("Content-Type", "text/plain") + if err := rqsc.IsHealthy(); err != nil { + w.Write([]byte("not ok")) + return + } + w.Write([]byte("ok")) + }) } func buildFmter(logger *streamlog.StreamLogger) func(url.Values, interface{}) string { @@ -258,8 +389,8 @@ func buildFmter(logger *streamlog.StreamLogger) func(url.Values, interface{}) st // InitQueryService registers the query service, after loading any // necessary config files. It also starts any relevant streaming logs. -func InitQueryService() { +func InitQueryService(qsc QueryServiceControl) { SqlQueryLogger.ServeLogs(*queryLogHandler, buildFmter(SqlQueryLogger)) TxLogger.ServeLogs(*txLogHandler, buildFmter(TxLogger)) - RegisterQueryService() + qsc.Register() } diff --git a/go/vt/tabletserver/queryz.go b/go/vt/tabletserver/queryz.go index 3409061a8d..14232e2659 100644 --- a/go/vt/tabletserver/queryz.go +++ b/go/vt/tabletserver/queryz.go @@ -108,52 +108,49 @@ func (sorter *queryzSorter) Less(i, j int) bool { return sorter.less(sorter.rows[i], sorter.rows[j]) } -func init() { - http.HandleFunc("/queryz", queryzHandler) -} +func (rqsc *realQueryServiceControl) registerQueryzHandler() { + http.HandleFunc("/queryz", func(w http.ResponseWriter, r *http.Request) { + if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil { + acl.SendError(w, err) + return + } + startHTMLTable(w) + defer endHTMLTable(w) + w.Write(queryzHeader) -// queryzHandler displays the query stats. -func queryzHandler(w http.ResponseWriter, r *http.Request) { - if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil { - acl.SendError(w, err) - return - } - startHTMLTable(w) - defer endHTMLTable(w) - w.Write(queryzHeader) - - si := SqlQueryRpcService.qe.schemaInfo - keys := si.queries.Keys() - sorter := queryzSorter{ - rows: make([]*queryzRow, 0, len(keys)), - less: func(row1, row2 *queryzRow) bool { - return row1.timePQ() > row2.timePQ() - }, - } - for _, v := range si.queries.Keys() { - plan := si.getQuery(v) - if plan == nil { - continue + si := rqsc.sqlQueryRPCService.qe.schemaInfo + keys := si.queries.Keys() + sorter := queryzSorter{ + rows: make([]*queryzRow, 0, len(keys)), + less: func(row1, row2 *queryzRow) bool { + return row1.timePQ() > row2.timePQ() + }, } - Value := &queryzRow{ - Query: wrappable(v), - Table: plan.TableName, - Plan: plan.PlanId, - Reason: plan.Reason, + for _, v := range si.queries.Keys() { + plan := si.getQuery(v) + if plan == nil { + continue + } + Value := &queryzRow{ + Query: wrappable(v), + Table: plan.TableName, + Plan: plan.PlanId, + Reason: plan.Reason, + } + Value.Count, Value.tm, Value.Rows, Value.Errors = plan.Stats() + timepq := time.Duration(int64(Value.tm) / Value.Count) + if timepq < 10*time.Millisecond { + Value.Color = "low" + } else if timepq < 100*time.Millisecond { + Value.Color = "medium" + } else { + Value.Color = "high" + } + sorter.rows = append(sorter.rows, Value) } - Value.Count, Value.tm, Value.Rows, Value.Errors = plan.Stats() - timepq := time.Duration(int64(Value.tm) / Value.Count) - if timepq < 10*time.Millisecond { - Value.Color = "low" - } else if timepq < 100*time.Millisecond { - Value.Color = "medium" - } else { - Value.Color = "high" + sort.Sort(&sorter) + for _, Value := range sorter.rows { + queryzTmpl.Execute(w, Value) } - sorter.rows = append(sorter.rows, Value) - } - sort.Sort(&sorter) - for _, Value := range sorter.rows { - queryzTmpl.Execute(w, Value) - } + }) } diff --git a/go/vt/tabletserver/rowcache_invalidator.go b/go/vt/tabletserver/rowcache_invalidator.go index 73f03d39a4..5b2398f958 100644 --- a/go/vt/tabletserver/rowcache_invalidator.go +++ b/go/vt/tabletserver/rowcache_invalidator.go @@ -120,7 +120,7 @@ func (rci *RowcacheInvalidator) run(ctx *sync2.ServiceContext) error { break } if IsConnErr(err) { - go CheckMySQL() + go checkMySQL() } log.Errorf("binlog.ServeUpdateStream returned err '%v', retrying in 1 second.", err.Error()) internalErrors.Add("Invalidation", 1) diff --git a/go/vt/tabletserver/schemaz.go b/go/vt/tabletserver/schemaz.go index 5a772c949f..8461b905c7 100644 --- a/go/vt/tabletserver/schemaz.go +++ b/go/vt/tabletserver/schemaz.go @@ -32,10 +32,6 @@ var ( `)) ) -func init() { - http.HandleFunc("/schemaz", schemazHandler) -} - type schemazSorter struct { rows []*schema.Table less func(row1, row2 *schema.Table) bool @@ -53,34 +49,35 @@ func (sorter *schemazSorter) Less(i, j int) bool { return sorter.less(sorter.rows[i], sorter.rows[j]) } -// schemazHandler displays the schema read by the query service. -func schemazHandler(w http.ResponseWriter, r *http.Request) { - if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil { - acl.SendError(w, err) - return - } - startHTMLTable(w) - defer endHTMLTable(w) - w.Write(schemazHeader) +func (rqsc *realQueryServiceControl) registerSchemazHandler() { + http.HandleFunc("/schemaz", func(w http.ResponseWriter, r *http.Request) { + if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil { + acl.SendError(w, err) + return + } + startHTMLTable(w) + defer endHTMLTable(w) + w.Write(schemazHeader) - tables := SqlQueryRpcService.qe.schemaInfo.GetSchema() - sorter := schemazSorter{ - rows: tables, - less: func(row1, row2 *schema.Table) bool { - return row1.Name > row2.Name - }, - } - sort.Sort(&sorter) - envelope := struct { - ColumnCategory []string - CacheType []string - Table *schema.Table - }{ - ColumnCategory: []string{"other", "number", "varbinary"}, - CacheType: []string{"none", "read-write", "write-only"}, - } - for _, Value := range sorter.rows { - envelope.Table = Value - schemazTmpl.Execute(w, envelope) - } + tables := rqsc.sqlQueryRPCService.qe.schemaInfo.GetSchema() + sorter := schemazSorter{ + rows: tables, + less: func(row1, row2 *schema.Table) bool { + return row1.Name > row2.Name + }, + } + sort.Sort(&sorter) + envelope := struct { + ColumnCategory []string + CacheType []string + Table *schema.Table + }{ + ColumnCategory: []string{"other", "number", "varbinary"}, + CacheType: []string{"none", "read-write", "write-only"}, + } + for _, Value := range sorter.rows { + envelope.Table = Value + schemazTmpl.Execute(w, envelope) + } + }) } diff --git a/go/vt/tabletserver/status.go b/go/vt/tabletserver/status.go index b907bb7d73..4e3e85cd6c 100644 --- a/go/vt/tabletserver/status.go +++ b/go/vt/tabletserver/status.go @@ -84,10 +84,10 @@ type queryserviceStatus struct { } // AddStatusPart registers the status part for the status page. -func AddStatusPart() { +func (rqsc *realQueryServiceControl) AddStatusPart() { servenv.AddStatusPart("Queryservice", queryserviceStatusTemplate, func() interface{} { status := queryserviceStatus{ - State: SqlQueryRpcService.GetState(), + State: rqsc.sqlQueryRPCService.GetState(), } rates := qpsRates.Get() if qps, ok := rates["All"]; ok && len(qps) > 0 { diff --git a/go/vt/tabletserver/stream_queryz.go b/go/vt/tabletserver/stream_queryz.go index 71509bce94..4413637c25 100644 --- a/go/vt/tabletserver/stream_queryz.go +++ b/go/vt/tabletserver/stream_queryz.go @@ -34,58 +34,56 @@ var ( `)) ) -func init() { - http.HandleFunc("/streamqueryz", streamqueryzHandler) - http.HandleFunc("/streamqueryz/terminate", streamqueryzTerminateHandler) -} - -func streamqueryzHandler(w http.ResponseWriter, r *http.Request) { - if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil { - acl.SendError(w, err) - return - } - rows := SqlQueryRpcService.qe.streamQList.GetQueryzRows() - if err := r.ParseForm(); err != nil { - http.Error(w, fmt.Sprintf("cannot parse form: %s", err), http.StatusInternalServerError) - return - } - format := r.FormValue("format") - if format == "json" { - js, err := json.Marshal(rows) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) +func (rqsc *realQueryServiceControl) registerStreamQueryzHandlers() { + streamqueryzHandler := func(w http.ResponseWriter, r *http.Request) { + if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil { + acl.SendError(w, err) return } - w.Header().Set("Content-Type", "application/json") - w.Write(js) - return + rows := rqsc.sqlQueryRPCService.qe.streamQList.GetQueryzRows() + if err := r.ParseForm(); err != nil { + http.Error(w, fmt.Sprintf("cannot parse form: %s", err), http.StatusInternalServerError) + return + } + format := r.FormValue("format") + if format == "json" { + js, err := json.Marshal(rows) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(js) + return + } + startHTMLTable(w) + defer endHTMLTable(w) + w.Write(streamqueryzHeader) + for i := range rows { + streamqueryzTmpl.Execute(w, rows[i]) + } } - startHTMLTable(w) - defer endHTMLTable(w) - w.Write(streamqueryzHeader) - for i := range rows { - streamqueryzTmpl.Execute(w, rows[i]) - } -} -func streamqueryzTerminateHandler(w http.ResponseWriter, r *http.Request) { - if err := acl.CheckAccessHTTP(r, acl.ADMIN); err != nil { - acl.SendError(w, err) - return - } - if err := r.ParseForm(); err != nil { - http.Error(w, fmt.Sprintf("cannot parse form: %s", err), http.StatusInternalServerError) - return - } - connID := r.FormValue("connID") - c, err := strconv.Atoi(connID) - if err != nil { - http.Error(w, "invalid connID", http.StatusInternalServerError) - return - } - if err = SqlQueryRpcService.qe.streamQList.Terminate(int64(c)); err != nil { - http.Error(w, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) - return - } - streamqueryzHandler(w, r) + http.HandleFunc("/streamqueryz", streamqueryzHandler) + http.HandleFunc("/streamqueryz/terminate", func(w http.ResponseWriter, r *http.Request) { + if err := acl.CheckAccessHTTP(r, acl.ADMIN); err != nil { + acl.SendError(w, err) + return + } + if err := r.ParseForm(); err != nil { + http.Error(w, fmt.Sprintf("cannot parse form: %s", err), http.StatusInternalServerError) + return + } + connID := r.FormValue("connID") + c, err := strconv.Atoi(connID) + if err != nil { + http.Error(w, "invalid connID", http.StatusInternalServerError) + return + } + if err = rqsc.sqlQueryRPCService.qe.streamQList.Terminate(int64(c)); err != nil { + http.Error(w, fmt.Sprintf("error: %v", err), http.StatusInternalServerError) + return + } + streamqueryzHandler(w, r) + }) } diff --git a/go/vt/tabletserver/tx_pool.go b/go/vt/tabletserver/tx_pool.go index 9fe0b279f8..c7c952dd6d 100644 --- a/go/vt/tabletserver/tx_pool.go +++ b/go/vt/tabletserver/tx_pool.go @@ -234,7 +234,7 @@ func (txc *TxConnection) Exec(query string, maxrows int, wantfields bool, deadli r, err := txc.DBConn.execOnce(query, maxrows, wantfields, deadline) if err != nil { if IsConnErr(err) { - go CheckMySQL() + go checkMySQL() return nil, NewTabletErrorSql(ErrFatal, err) } return nil, NewTabletErrorSql(ErrFail, err)