Merge pull request #390 from youtube/replication

Adding QueryServiceControl interface, to handle registration
This commit is contained in:
Alain Jobart 2015-02-11 07:03:46 -08:00
Родитель 9eb291ae01 523ce5abe2
Коммит bfc0f77b81
23 изменённых файлов: 562 добавлений и 321 удалений

Просмотреть файл

@ -1,18 +1,13 @@
package main
import (
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/tabletserver"
)
import "github.com/youtube/vitess/go/vt/tabletserver"
// For use by plugins which wish to avoid racing when registering status page parts.
var onStatusRegistered func()
func init() {
servenv.OnRun(func() {
tabletserver.AddStatusPart()
if onStatusRegistered != nil {
onStatusRegistered()
}
})
func addStatusParts(qsc tabletserver.QueryServiceControl) {
qsc.AddStatusPart()
if onStatusRegistered != nil {
onStatusRegistered()
}
}

Просмотреть файл

@ -73,21 +73,25 @@ func main() {
if *tableAclConfig != "" {
tableacl.Init(*tableAclConfig)
}
tabletserver.InitQueryService()
qsc := tabletserver.NewQueryServiceControl()
tabletserver.InitQueryService(qsc)
// Query service can go into NOT_SERVING state if mysql goes down.
// So, continuously retry starting the service. So, it tries to come
// back up if it went down.
go func() {
for {
_ = tabletserver.AllowQueries(dbConfigs, schemaOverrides, mysqld)
_ = qsc.AllowQueries(dbConfigs, schemaOverrides, mysqld)
time.Sleep(30 * time.Second)
}
}()
log.Infof("starting vtocc %v", *servenv.Port)
servenv.OnRun(func() {
addStatusParts(qsc)
})
servenv.OnTerm(func() {
tabletserver.DisallowQueries()
qsc.DisallowQueries()
mysqld.Close()
})
servenv.RunDefault()

Просмотреть файл

@ -8,7 +8,6 @@ import (
"github.com/youtube/vitess/go/vt/health"
"github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/tabletserver"
"github.com/youtube/vitess/go/vt/topo"
)
@ -18,16 +17,18 @@ var (
)
// queryServiceRunning implements health.Reporter
type queryServiceRunning struct{}
type queryServiceRunning struct {
qsc tabletserver.QueryServiceControl
}
// Report is part of the health.Reporter interface
func (qsr *queryServiceRunning) Report(tabletType topo.TabletType, shouldQueryServiceBeRunning bool) (time.Duration, error) {
isQueryServiceRunning := tabletserver.SqlQueryRpcService.GetState() == "SERVING"
isQueryServiceRunning := qsr.qsc.IsServing()
if shouldQueryServiceBeRunning != isQueryServiceRunning {
return 0, fmt.Errorf("QueryService running=%v, expected=%v", isQueryServiceRunning, shouldQueryServiceBeRunning)
}
if isQueryServiceRunning {
if err := tabletserver.IsHealthy(); err != nil {
if err := qsr.qsc.IsHealthy(); err != nil {
return 0, fmt.Errorf("QueryService is running, but not healthy: %v", err)
}
}
@ -39,11 +40,9 @@ func (qsr *queryServiceRunning) HTMLName() template.HTML {
return template.HTML("QueryServiceRunning")
}
func init() {
servenv.OnRun(func() {
if *enableReplicationLagCheck {
health.Register("replication_reporter", mysqlctl.MySQLReplicationLag(agent.Mysqld))
}
health.Register("query_service_reporter", &queryServiceRunning{})
})
func registerHealthReporters(qsc tabletserver.QueryServiceControl) {
if *enableReplicationLagCheck {
health.Register("replication_reporter", mysqlctl.MySQLReplicationLag(agent.Mysqld))
}
health.Register("query_service_reporter", &queryServiceRunning{qsc})
}

Просмотреть файл

@ -169,29 +169,27 @@ func healthHTMLName() template.HTML {
// For use by plugins which wish to avoid racing when registering status page parts.
var onStatusRegistered func()
func init() {
servenv.OnRun(func() {
servenv.AddStatusPart("Tablet", tabletTemplate, func() interface{} {
return map[string]interface{}{
"Tablet": agent.Tablet(),
"BlacklistedTables": agent.BlacklistedTables(),
"DisableQueryService": agent.DisableQueryService(),
}
})
if agent.IsRunningHealthCheck() {
servenv.AddStatusFuncs(template.FuncMap{
"github_com_youtube_vitess_health_html_name": healthHTMLName,
})
servenv.AddStatusPart("Health", healthTemplate, func() interface{} {
return &healthStatus{Records: agent.History.Records()}
})
}
tabletserver.AddStatusPart()
servenv.AddStatusPart("Binlog Player", binlogTemplate, func() interface{} {
return agent.BinlogPlayerMap.Status()
})
if onStatusRegistered != nil {
onStatusRegistered()
func addStatusParts(qsc tabletserver.QueryServiceControl) {
servenv.AddStatusPart("Tablet", tabletTemplate, func() interface{} {
return map[string]interface{}{
"Tablet": agent.Tablet(),
"BlacklistedTables": agent.BlacklistedTables(),
"DisableQueryService": agent.DisableQueryService(),
}
})
if agent.IsRunningHealthCheck() {
servenv.AddStatusFuncs(template.FuncMap{
"github_com_youtube_vitess_health_html_name": healthHTMLName,
})
servenv.AddStatusPart("Health", healthTemplate, func() interface{} {
return &healthStatus{Records: agent.History.Records()}
})
}
qsc.AddStatusPart()
servenv.AddStatusPart("Binlog Player", binlogTemplate, func() interface{} {
return agent.BinlogPlayerMap.Status()
})
if onStatusRegistered != nil {
onStatusRegistered()
}
}

Просмотреть файл

@ -82,19 +82,26 @@ func main() {
if *tableAclConfig != "" {
tableacl.Init(*tableAclConfig)
}
tabletserver.InitQueryService()
// creates and registers the query service
qsc := tabletserver.NewQueryServiceControl()
tabletserver.InitQueryService(qsc)
binlog.RegisterUpdateStreamService(mycnf)
// Depends on both query and updateStream.
agent, err = tabletmanager.NewActionAgent(context.Background(), tabletAlias, dbcfgs, mycnf, *servenv.Port, *servenv.SecurePort, *overridesFile, *lockTimeout)
agent, err = tabletmanager.NewActionAgent(qsc, context.Background(), tabletAlias, dbcfgs, mycnf, *servenv.Port, *servenv.SecurePort, *overridesFile, *lockTimeout)
if err != nil {
log.Error(err)
exit.Return(1)
}
tabletmanager.HttpHandleSnapshots(mycnf, tabletAlias.Uid)
servenv.OnRun(func() {
addStatusParts(qsc)
registerHealthReporters(qsc)
})
servenv.OnTerm(func() {
tabletserver.DisallowQueries()
qsc.DisallowQueries()
binlog.DisableUpdateStreamService()
agent.Stop()
})

Просмотреть файл

@ -41,26 +41,24 @@ const keyrangeQueryRules string = "KeyrangeQueryRules"
const blacklistQueryRules string = "BlacklistQueryRules"
func (agent *ActionAgent) allowQueries(tablet *topo.Tablet, blacklistedTables []string) error {
if agent.DBConfigs == nil {
// test instance, do nothing
return nil
}
// if the query service is already running, we're not starting it again
if tabletserver.SqlQueryRpcService.GetState() == "SERVING" {
if agent.QueryServiceControl.IsServing() {
return nil
}
// Update our DB config to match the info we have in the tablet
if agent.DBConfigs.App.DbName == "" {
agent.DBConfigs.App.DbName = tablet.DbName()
}
agent.DBConfigs.App.Keyspace = tablet.Keyspace
agent.DBConfigs.App.Shard = tablet.Shard
if tablet.Type != topo.TYPE_MASTER {
agent.DBConfigs.App.EnableInvalidator = true
} else {
agent.DBConfigs.App.EnableInvalidator = false
// only for real instances
if agent.DBConfigs != nil {
// Update our DB config to match the info we have in the tablet
if agent.DBConfigs.App.DbName == "" {
agent.DBConfigs.App.DbName = tablet.DbName()
}
agent.DBConfigs.App.Keyspace = tablet.Keyspace
agent.DBConfigs.App.Shard = tablet.Shard
if tablet.Type != topo.TYPE_MASTER {
agent.DBConfigs.App.EnableInvalidator = true
} else {
agent.DBConfigs.App.EnableInvalidator = false
}
}
err := agent.loadKeyspaceAndBlacklistRules(tablet, blacklistedTables)
@ -68,7 +66,7 @@ func (agent *ActionAgent) allowQueries(tablet *topo.Tablet, blacklistedTables []
return err
}
return tabletserver.AllowQueries(agent.DBConfigs, agent.SchemaOverrides, agent.Mysqld)
return agent.QueryServiceControl.AllowQueries(agent.DBConfigs, agent.SchemaOverrides, agent.Mysqld)
}
// loadKeyspaceAndBlacklistRules does what the name suggests:
@ -120,12 +118,12 @@ func (agent *ActionAgent) loadKeyspaceAndBlacklistRules(tablet *topo.Tablet, bla
blacklistRules.Add(qr)
}
// Push all three sets of QueryRules to SqlQueryRpcService
loadRuleErr := tabletserver.SetQueryRules(keyrangeQueryRules, keyrangeRules)
loadRuleErr := agent.QueryServiceControl.SetQueryRules(keyrangeQueryRules, keyrangeRules)
if loadRuleErr != nil {
log.Warningf("Fail to load query rule set %s: %s", keyrangeQueryRules, loadRuleErr)
}
loadRuleErr = tabletserver.SetQueryRules(blacklistQueryRules, blacklistRules)
loadRuleErr = agent.QueryServiceControl.SetQueryRules(blacklistQueryRules, blacklistRules)
if loadRuleErr != nil {
log.Warningf("Fail to load query rule set %s: %s", blacklistQueryRules, loadRuleErr)
}
@ -137,7 +135,7 @@ func (agent *ActionAgent) disallowQueries() {
// test instance, do nothing
return
}
tabletserver.DisallowQueries()
agent.QueryServiceControl.DisallowQueries()
}
// changeCallback is run after every action that might

Просмотреть файл

@ -53,14 +53,15 @@ var (
// ActionAgent is the main class for the agent.
type ActionAgent struct {
// The following fields are set during creation
TopoServer topo.Server
TabletAlias topo.TabletAlias
Mysqld *mysqlctl.Mysqld
MysqlDaemon mysqlctl.MysqlDaemon
DBConfigs *dbconfigs.DBConfigs
SchemaOverrides []tabletserver.SchemaOverride
BinlogPlayerMap *BinlogPlayerMap
LockTimeout time.Duration
QueryServiceControl tabletserver.QueryServiceControl
TopoServer topo.Server
TabletAlias topo.TabletAlias
Mysqld *mysqlctl.Mysqld
MysqlDaemon mysqlctl.MysqlDaemon
DBConfigs *dbconfigs.DBConfigs
SchemaOverrides []tabletserver.SchemaOverride
BinlogPlayerMap *BinlogPlayerMap
LockTimeout time.Duration
// batchCtx is given to the agent by its creator, and should be used for
// any background tasks spawned by the agent.
batchCtx context.Context
@ -117,6 +118,7 @@ func loadSchemaOverrides(overridesFile string) []tabletserver.SchemaOverride {
// batchCtx is the context that the agent will use for any background tasks
// it spawns.
func NewActionAgent(
queryServiceControl tabletserver.QueryServiceControl,
batchCtx context.Context,
tabletAlias topo.TabletAlias,
dbcfgs *dbconfigs.DBConfigs,
@ -131,18 +133,19 @@ func NewActionAgent(
mysqld := mysqlctl.NewMysqld("Dba", "App", mycnf, &dbcfgs.Dba, &dbcfgs.App.ConnectionParams, &dbcfgs.Repl)
agent = &ActionAgent{
batchCtx: batchCtx,
TopoServer: topoServer,
TabletAlias: tabletAlias,
Mysqld: mysqld,
MysqlDaemon: mysqld,
DBConfigs: dbcfgs,
SchemaOverrides: schemaOverrides,
LockTimeout: lockTimeout,
History: history.New(historyLength),
lastHealthMapCount: stats.NewInt("LastHealthMapCount"),
_healthy: fmt.Errorf("healthcheck not run yet"),
healthStreamMap: make(map[int]chan<- *actionnode.HealthStreamReply),
QueryServiceControl: queryServiceControl,
batchCtx: batchCtx,
TopoServer: topoServer,
TabletAlias: tabletAlias,
Mysqld: mysqld,
MysqlDaemon: mysqld,
DBConfigs: dbcfgs,
SchemaOverrides: schemaOverrides,
LockTimeout: lockTimeout,
History: history.New(historyLength),
lastHealthMapCount: stats.NewInt("LastHealthMapCount"),
_healthy: fmt.Errorf("healthcheck not run yet"),
healthStreamMap: make(map[int]chan<- *actionnode.HealthStreamReply),
}
// try to initialize the tablet if we have to
@ -182,18 +185,19 @@ func NewActionAgent(
// subset of features are supported now, but we'll add more over time.
func NewTestActionAgent(batchCtx context.Context, ts topo.Server, tabletAlias topo.TabletAlias, port int, mysqlDaemon mysqlctl.MysqlDaemon) (agent *ActionAgent) {
agent = &ActionAgent{
batchCtx: batchCtx,
TopoServer: ts,
TabletAlias: tabletAlias,
Mysqld: nil,
MysqlDaemon: mysqlDaemon,
DBConfigs: nil,
SchemaOverrides: nil,
BinlogPlayerMap: nil,
History: history.New(historyLength),
lastHealthMapCount: new(stats.Int),
_healthy: fmt.Errorf("healthcheck not run yet"),
healthStreamMap: make(map[int]chan<- *actionnode.HealthStreamReply),
QueryServiceControl: tabletserver.NewTestQueryServiceControl(),
batchCtx: batchCtx,
TopoServer: ts,
TabletAlias: tabletAlias,
Mysqld: nil,
MysqlDaemon: mysqlDaemon,
DBConfigs: nil,
SchemaOverrides: nil,
BinlogPlayerMap: nil,
History: history.New(historyLength),
lastHealthMapCount: new(stats.Int),
_healthy: fmt.Errorf("healthcheck not run yet"),
healthStreamMap: make(map[int]chan<- *actionnode.HealthStreamReply),
}
if err := agent.Start(0, port, 0); err != nil {
panic(fmt.Errorf("agent.Start(%v) failed: %v", tabletAlias, err))

Просмотреть файл

@ -23,7 +23,6 @@ import (
"github.com/youtube/vitess/go/vt/mysqlctl"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/tabletserver"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topotools"
"golang.org/x/net/context"
@ -242,7 +241,7 @@ func (agent *ActionAgent) ReloadSchema(ctx context.Context) {
// This adds a dependency between tabletmanager and tabletserver,
// so it's not ideal. But I (alainjobart) think it's better
// to have up to date schema in vttablet.
tabletserver.ReloadSchema()
agent.QueryServiceControl.ReloadSchema()
}
// PreflightSchema will try out the schema change

Просмотреть файл

@ -184,7 +184,7 @@ func (agent *ActionAgent) runHealthCheck(targetTabletType topo.TabletType) {
// try to figure out the mysql port if we don't have it yet
if _, ok := tablet.Portmap["mysql"]; !ok {
// we don't know the port, try to get it from mysqld
mysqlPort, err := agent.Mysqld.GetMysqlPort()
mysqlPort, err := agent.MysqlDaemon.GetMysqlPort()
if err != nil {
// Don't log if we're already in a waiting-for-mysql state.
agent.mutex.Lock()

Просмотреть файл

@ -1,9 +1,18 @@
package tabletmanager
import (
"encoding/json"
"errors"
"fmt"
"html/template"
"testing"
"time"
"github.com/youtube/vitess/go/vt/health"
"github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/zktopo"
"golang.org/x/net/context"
)
func TestHealthRecordDeduplication(t *testing.T) {
@ -76,3 +85,96 @@ func TestHealthRecordClass(t *testing.T) {
}
}
}
// fakeHealthCheck implements health.Reporter interface
type fakeHealthCheck struct {
reportReplicationDelay time.Duration
reportError error
}
func (fhc *fakeHealthCheck) Report(tabletType topo.TabletType, shouldQueryServiceBeRunning bool) (replicationDelay time.Duration, err error) {
return fhc.reportReplicationDelay, fhc.reportError
}
func (fhc *fakeHealthCheck) HTMLName() template.HTML {
return template.HTML("fakeHealthCheck")
}
func TestHealthCheck(t *testing.T) {
// register a fake reporter for our tests
fhc := &fakeHealthCheck{}
health.Register("fakeHealthCheck", fhc)
keyspace := "test_keyspace"
shard := "0"
cell := "cell1"
var uid uint32 = 42
ts := zktopo.NewTestServer(t, []string{cell})
if err := ts.CreateKeyspace(keyspace, &topo.Keyspace{}); err != nil {
t.Fatalf("CreateKeyspace failed: %v", err)
}
if err := topo.CreateShard(ts, keyspace, shard); err != nil {
t.Fatalf("CreateShard failed: %v", err)
}
tabletAlias := topo.TabletAlias{Cell: cell, Uid: uid}
port := 1234
tablet := &topo.Tablet{
Alias: tabletAlias,
Hostname: "host",
Portmap: map[string]int{
"vt": port,
},
IPAddr: "1.0.0.1",
Keyspace: keyspace,
Shard: shard,
Type: topo.TYPE_SPARE,
State: topo.STATE_READ_ONLY,
}
if err := topo.CreateTablet(ts, tablet); err != nil {
t.Fatalf("CreateTablet failed: %v", err)
}
mysqlDaemon := &mysqlctl.FakeMysqlDaemon{MysqlPort: 3306}
agent := NewTestActionAgent(context.Background(), ts, tabletAlias, port, mysqlDaemon)
agent.BinlogPlayerMap = NewBinlogPlayerMap(ts, nil, nil)
targetTabletType := topo.TYPE_REPLICA
// first health check, should change us to replica, and update the
// mysql port to 3306
agent.runHealthCheck(targetTabletType)
ti, err := ts.GetTablet(tabletAlias)
if err != nil {
t.Fatalf("GetTablet failed: %v", err)
}
if ti.Type != topo.TYPE_REPLICA {
t.Errorf("First health check failed to go to replica: %v", ti.Type)
}
if ti.Portmap["mysql"] != 3306 {
t.Errorf("First health check failed to update mysql port: %v", ti.Portmap["mysql"])
}
if !agent.QueryServiceControl.IsServing() {
t.Errorf("Query service should be running")
}
// now make the tablet unhealthy
fhc.reportError = fmt.Errorf("tablet is unhealthy")
agent.runHealthCheck(targetTabletType)
ti, err = ts.GetTablet(tabletAlias)
if err != nil {
t.Fatalf("GetTablet failed: %v", err)
}
data, err := json.MarshalIndent(ti, "", " ")
println(string(data))
if ti.Type != topo.TYPE_SPARE {
t.Errorf("Unhappy health check failed to go to spare: %v", ti.Type)
}
if agent.QueryServiceControl.IsServing() {
// FIXME(alainjobart) the query service should be stopped there, but it's not.
// See b/19309685 for the tracking bug.
// t.Errorf("Query service should not be running")
}
}

Просмотреть файл

@ -11,7 +11,6 @@ import (
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/tabletserver"
)
@ -42,7 +41,7 @@ func NewFileCustomRule() (fcr *FileCustomRule) {
}
// Open try to build query rules from local file and push the rules to vttablet
func (fcr *FileCustomRule) Open(rulePath string) error {
func (fcr *FileCustomRule) Open(qsc tabletserver.QueryServiceControl, rulePath string) error {
fcr.path = rulePath
if fcr.path == "" {
// Don't go further if path is empty
@ -63,7 +62,7 @@ func (fcr *FileCustomRule) Open(rulePath string) error {
fcr.currentRuleSetTimestamp = time.Now().Unix()
fcr.currentRuleSet = qrs.Copy()
// Push query rules to vttablet
tabletserver.SetQueryRules(FileCustomRuleSource, qrs.Copy())
qsc.SetQueryRules(FileCustomRuleSource, qrs.Copy())
log.Infof("Custom rule loaded from file: %s", fcr.path)
return nil
}
@ -74,13 +73,13 @@ func (fcr *FileCustomRule) GetRules() (qrs *tabletserver.QueryRules, version int
}
// ActivateFileCustomRules activates this static file based custom rule mechanism
func ActivateFileCustomRules() {
func ActivateFileCustomRules(qsc tabletserver.QueryServiceControl) {
if *fileRulePath != "" {
tabletserver.QueryRuleSources.RegisterQueryRuleSource(FileCustomRuleSource)
fileCustomRule.Open(*fileRulePath)
fileCustomRule.Open(qsc, *fileRulePath)
}
}
func init() {
servenv.OnRun(ActivateFileCustomRules)
tabletserver.QueryServiceControlRegisterFunctions = append(tabletserver.QueryServiceControlRegisterFunctions, ActivateFileCustomRules)
}

Просмотреть файл

@ -26,6 +26,8 @@ var customRule1 = `[
]`
func TestFileCustomRule(t *testing.T) {
tqsc := tabletserver.NewTestQueryServiceControl()
var qrs *tabletserver.QueryRules
rulepath := path.Join(os.TempDir(), ".customrule.json")
// Set r1 and try to get it back
@ -36,7 +38,7 @@ func TestFileCustomRule(t *testing.T) {
fcr := NewFileCustomRule()
// Let FileCustomRule to build rule from the local file
err = fcr.Open(rulepath)
err = fcr.Open(tqsc, rulepath)
if err != nil {
t.Fatalf("Cannot open file custom rule service, err=%v", err)
}

Просмотреть файл

@ -51,17 +51,17 @@ func NewZkCustomRule(zkconn zk.Conn) *ZkCustomRule {
}
// Open Registers Zookeeper watch, gets inital QueryRules and starts polling routine
func (zkcr *ZkCustomRule) Open(rulePath string) (err error) {
func (zkcr *ZkCustomRule) Open(qsc tabletserver.QueryServiceControl, rulePath string) (err error) {
zkcr.path = rulePath
err = zkcr.refreshWatch()
if err != nil {
return err
}
err = zkcr.refreshData(false)
err = zkcr.refreshData(qsc, false)
if err != nil {
return err
}
go zkcr.poll()
go zkcr.poll(qsc)
return nil
}
@ -79,7 +79,7 @@ func (zkcr *ZkCustomRule) refreshWatch() error {
// refreshData gets query rules from Zookeeper and refresh internal QueryRules cache
// this function will also call SqlQuery.SetQueryRules to propagate rule changes to query service
func (zkcr *ZkCustomRule) refreshData(nodeRemoval bool) error {
func (zkcr *ZkCustomRule) refreshData(qsc tabletserver.QueryServiceControl, nodeRemoval bool) error {
data, stat, err := zkcr.zconn.Get(zkcr.path)
zkcr.mu.Lock()
defer zkcr.mu.Unlock()
@ -95,7 +95,7 @@ func (zkcr *ZkCustomRule) refreshData(nodeRemoval bool) error {
zkcr.currentRuleSetVersion = stat.Mzxid()
if !reflect.DeepEqual(zkcr.currentRuleSet, qrs) {
zkcr.currentRuleSet = qrs.Copy()
tabletserver.SetQueryRules(ZkCustomRuleSource, qrs.Copy())
qsc.SetQueryRules(ZkCustomRuleSource, qrs.Copy())
log.Infof("Custom rule version %v fetched from Zookeeper and applied to vttablet", zkcr.currentRuleSetVersion)
}
return nil
@ -108,7 +108,7 @@ const sleepDuringZkFailure time.Duration = 30
// poll polls the Zookeeper watch channel for data changes and refresh watch channel if watch channel is closed
// by Zookeeper Go library on error conditions such as connection reset
func (zkcr *ZkCustomRule) poll() {
func (zkcr *ZkCustomRule) poll(qsc tabletserver.QueryServiceControl) {
for {
select {
case <-zkcr.finish:
@ -116,7 +116,7 @@ func (zkcr *ZkCustomRule) poll() {
case event := <-zkcr.watch:
switch event.Type {
case zookeeper.EVENT_CREATED, zookeeper.EVENT_CHANGED, zookeeper.EVENT_DELETED:
err := zkcr.refreshData(event.Type == zookeeper.EVENT_DELETED) // refresh rules
err := zkcr.refreshData(qsc, event.Type == zookeeper.EVENT_DELETED) // refresh rules
if err != nil {
// Sleep to avoid busy waiting during connection re-establishment
<-time.After(time.Second * sleepDuringZkFailure)
@ -127,7 +127,7 @@ func (zkcr *ZkCustomRule) poll() {
// Sleep to avoid busy waiting during connection re-establishment
<-time.After(time.Second * sleepDuringZkFailure)
}
zkcr.refreshData(false)
zkcr.refreshData(qsc, false)
}
}
}
@ -147,14 +147,14 @@ func (zkcr *ZkCustomRule) GetRules() (qrs *tabletserver.QueryRules, version int6
}
// ActivateZkCustomRules activates zookeeper dynamic custom rule mechanism
func ActivateZkCustomRules() {
func ActivateZkCustomRules(qsc tabletserver.QueryServiceControl) {
if *zkRulePath != "" {
tabletserver.QueryRuleSources.RegisterQueryRuleSource(ZkCustomRuleSource)
zkCustomRule.Open(*zkRulePath)
zkCustomRule.Open(qsc, *zkRulePath)
}
}
func init() {
servenv.OnRun(ActivateZkCustomRules)
tabletserver.QueryServiceControlRegisterFunctions = append(tabletserver.QueryServiceControlRegisterFunctions, ActivateZkCustomRules)
servenv.OnTerm(zkCustomRule.Close)
}

Просмотреть файл

@ -15,7 +15,7 @@ import (
"launchpad.net/gozk/zookeeper"
)
var customRule1 string = `[
var customRule1 = `[
{
"Name": "r1",
"Description": "disallow bindvar 'asdfg'",
@ -27,7 +27,7 @@ var customRule1 string = `[
}
]`
var customRule2 string = `[
var customRule2 = `[
{
"Name": "r2",
"Description": "disallow insert on table test",
@ -47,9 +47,11 @@ func setUpFakeZk(t *testing.T) {
}
func TestZkCustomRule(t *testing.T) {
tqsc := tabletserver.NewTestQueryServiceControl()
setUpFakeZk(t)
zkcr := NewZkCustomRule(conn)
err := zkcr.Open("/zk/fake/customrules/testrules")
err := zkcr.Open(tqsc, "/zk/fake/customrules/testrules")
if err != nil {
t.Fatalf("Cannot open zookeeper custom rule service, err=%v", err)
}

Просмотреть файл

@ -35,7 +35,7 @@ type DBConn struct {
func NewDBConn(cp *ConnPool, appParams, dbaParams *mysql.ConnectionParams) (*DBConn, error) {
c, err := dbconnpool.NewDBConnection(appParams, mysqlStats)
if err != nil {
go CheckMySQL()
go checkMySQL()
return nil, err
}
return &DBConn{
@ -61,7 +61,7 @@ func (dbc *DBConn) Exec(query string, maxrows int, wantfields bool, deadline Dea
}
err2 := dbc.reconnect()
if err2 != nil {
go CheckMySQL()
go checkMySQL()
return nil, NewTabletErrorSql(ErrFatal, err)
}
}

Просмотреть файл

@ -12,46 +12,55 @@ import (
"golang.org/x/net/context"
)
// SqlQuery is the server object for gorpc SqlQuery
type SqlQuery struct {
server *tabletserver.SqlQuery
}
// GetSessionId is exposing tabletserver.SqlQuery.GetSessionId
func (sq *SqlQuery) GetSessionId(sessionParams *proto.SessionParams, sessionInfo *proto.SessionInfo) error {
return sq.server.GetSessionId(sessionParams, sessionInfo)
}
// Begin is exposing tabletserver.SqlQuery.Begin
func (sq *SqlQuery) Begin(ctx context.Context, session *proto.Session, txInfo *proto.TransactionInfo) error {
return sq.server.Begin(ctx, session, txInfo)
}
// Commit is exposing tabletserver.SqlQuery.Commit
func (sq *SqlQuery) Commit(ctx context.Context, session *proto.Session, noOutput *string) error {
return sq.server.Commit(ctx, session)
}
// Rollback is exposing tabletserver.SqlQuery.Rollback
func (sq *SqlQuery) Rollback(ctx context.Context, session *proto.Session, noOutput *string) error {
return sq.server.Rollback(ctx, session)
}
// Execute is exposing tabletserver.SqlQuery.Execute
func (sq *SqlQuery) Execute(ctx context.Context, query *proto.Query, reply *mproto.QueryResult) error {
return sq.server.Execute(ctx, query, reply)
}
// StreamExecute is exposing tabletserver.SqlQuery.StreamExecute
func (sq *SqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(reply interface{}) error) error {
return sq.server.StreamExecute(ctx, query, func(reply *mproto.QueryResult) error {
return sendReply(reply)
})
}
// ExecuteBatch is exposing tabletserver.SqlQuery.ExecuteBatch
func (sq *SqlQuery) ExecuteBatch(ctx context.Context, queryList *proto.QueryList, reply *proto.QueryResultList) error {
return sq.server.ExecuteBatch(ctx, queryList, reply)
}
// SplitQuery is exposing tabletserver.SqlQuery.SplitQuery
func (sq *SqlQuery) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error {
return sq.server.SplitQuery(ctx, req, reply)
}
func init() {
tabletserver.SqlQueryRegisterFunctions = append(tabletserver.SqlQueryRegisterFunctions, func(sq *tabletserver.SqlQuery) {
servenv.Register("queryservice", &SqlQuery{sq})
tabletserver.QueryServiceControlRegisterFunctions = append(tabletserver.QueryServiceControlRegisterFunctions, func(qsc tabletserver.QueryServiceControl) {
servenv.Register("queryservice", &SqlQuery{qsc.SqlQuery()})
})
}

Просмотреть файл

@ -55,6 +55,7 @@ func init() {
flag.BoolVar(&qsConfig.RowCache.LockPaged, "rowcache-lock-paged", DefaultQsConfig.RowCache.LockPaged, "whether rowcache locks down paged memory")
}
// RowCacheConfig encapsulates the configuration for RowCache
type RowCacheConfig struct {
Binary string
Memory int
@ -65,6 +66,7 @@ type RowCacheConfig struct {
LockPaged bool
}
// GetSubprocessFlags returns the flags to use to call memcached
func (c *RowCacheConfig) GetSubprocessFlags() []string {
cmd := []string{}
if c.Binary == "" {
@ -93,6 +95,7 @@ func (c *RowCacheConfig) GetSubprocessFlags() []string {
return cmd
}
// Config contains all the configuration for query service
type Config struct {
PoolSize int
StreamPoolSize int
@ -141,105 +144,233 @@ var DefaultQsConfig = Config{
var qsConfig Config
var SqlQueryRpcService *SqlQuery
// QueryServiceControl is the interface implemented by the controller
// for the query service.
type QueryServiceControl interface {
// Register registers this query service with the RPC layer.
Register()
// AddStatusPart adds the status part to the status page
AddStatusPart()
// AllowQueries enables queries.
AllowQueries(*dbconfigs.DBConfigs, []SchemaOverride, *mysqlctl.Mysqld) error
// DisallowQueries shuts down the query service.
DisallowQueries()
// IsServing returns true if the query service is running
IsServing() bool
// IsHealthy returns the health status of the QueryService
IsHealthy() error
// ReloadSchema makes the quey service reload its schema cache
ReloadSchema()
// SetQueryRules sets the query rules for this QueryService
SetQueryRules(ruleSource string, qrs *QueryRules) error
// SqlQuery returns the SqlQuery object used by this QueryServiceControl
SqlQuery() *SqlQuery
}
// TestQueryServiceControl is a fake version of QueryServiceControl
type TestQueryServiceControl struct {
// QueryServiceEnabled is a state variable
QueryServiceEnabled bool
// AllowQueriesError is the return value for AllowQueries
AllowQueriesError error
// IsHealthy is the return value for IsHealthy
IsHealthyError error
// ReloadSchemaCount counts how many times ReloadSchema was called
ReloadSchemaCount int
}
// NewTestQueryServiceControl returns an implementation of QueryServiceControl
// that is entirely fake
func NewTestQueryServiceControl() *TestQueryServiceControl {
return &TestQueryServiceControl{
QueryServiceEnabled: false,
AllowQueriesError: nil,
IsHealthyError: nil,
ReloadSchemaCount: 0,
}
}
// Register is part of the QueryServiceControl interface
func (tqsc *TestQueryServiceControl) Register() {
}
// AddStatusPart is part of the QueryServiceControl interface
func (tqsc *TestQueryServiceControl) AddStatusPart() {
}
// AllowQueries is part of the QueryServiceControl interface
func (tqsc *TestQueryServiceControl) AllowQueries(*dbconfigs.DBConfigs, []SchemaOverride, *mysqlctl.Mysqld) error {
tqsc.QueryServiceEnabled = tqsc.AllowQueriesError == nil
return tqsc.AllowQueriesError
}
// DisallowQueries is part of the QueryServiceControl interface
func (tqsc *TestQueryServiceControl) DisallowQueries() {
tqsc.QueryServiceEnabled = false
}
// IsServing is part of the QueryServiceControl interface
func (tqsc *TestQueryServiceControl) IsServing() bool {
return tqsc.QueryServiceEnabled
}
// IsHealthy is part of the QueryServiceControl interface
func (tqsc *TestQueryServiceControl) IsHealthy() error {
return tqsc.IsHealthyError
}
// ReloadSchema is part of the QueryServiceControl interface
func (tqsc *TestQueryServiceControl) ReloadSchema() {
tqsc.ReloadSchemaCount++
}
// SetQueryRules is part of the QueryServiceControl interface
func (tqsc *TestQueryServiceControl) SetQueryRules(ruleSource string, qrs *QueryRules) error {
return nil
}
// SqlQuery is part of the QueryServiceControl interface
func (tqsc *TestQueryServiceControl) SqlQuery() *SqlQuery {
return nil
}
// realQueryServiceControl implements QueryServiceControl for real
type realQueryServiceControl struct {
sqlQueryRPCService *SqlQuery
}
// NewQueryServiceControl returns a real implementation of QueryServiceControl
func NewQueryServiceControl() QueryServiceControl {
return &realQueryServiceControl{
sqlQueryRPCService: NewSqlQuery(qsConfig),
}
}
// registration service for all server protocols
type SqlQueryRegisterFunction func(*SqlQuery)
// QueryServiceControlRegisterFunction is a callback type to be called when we
// Register() a QueryServiceControl
type QueryServiceControlRegisterFunction func(QueryServiceControl)
var SqlQueryRegisterFunctions []SqlQueryRegisterFunction
// QueryServiceControlRegisterFunctions is an array of all the
// QueryServiceControlRegisterFunction that will be called upon
// Register() on a QueryServiceControl
var QueryServiceControlRegisterFunctions []QueryServiceControlRegisterFunction
func RegisterQueryService() {
if SqlQueryRpcService != nil {
log.Warningf("RPC service already up %v", SqlQueryRpcService)
return
// Register is part of the QueryServiceControl interface
func (rqsc *realQueryServiceControl) Register() {
rqsc.registerCheckMySQL()
for _, f := range QueryServiceControlRegisterFunctions {
f(rqsc)
}
SqlQueryRpcService = NewSqlQuery(qsConfig)
for _, f := range SqlQueryRegisterFunctions {
f(SqlQueryRpcService)
}
http.HandleFunc("/debug/health", healthCheck)
rqsc.registerDebugHealthHandler()
rqsc.registerQueryzHandler()
rqsc.registerSchemazHandler()
rqsc.registerStreamQueryzHandlers()
}
// AllowQueries starts the query service.
func AllowQueries(dbconfigs *dbconfigs.DBConfigs, schemaOverrides []SchemaOverride, mysqld *mysqlctl.Mysqld) error {
return SqlQueryRpcService.allowQueries(dbconfigs, schemaOverrides, mysqld)
func (rqsc *realQueryServiceControl) AllowQueries(dbconfigs *dbconfigs.DBConfigs, schemaOverrides []SchemaOverride, mysqld *mysqlctl.Mysqld) error {
return rqsc.sqlQueryRPCService.allowQueries(dbconfigs, schemaOverrides, mysqld)
}
// DisallowQueries can take a long time to return (not indefinite) because
// it has to wait for queries & transactions to be completed or killed,
// and also for house keeping goroutines to be terminated.
func DisallowQueries() {
func (rqsc *realQueryServiceControl) DisallowQueries() {
defer logError()
SqlQueryRpcService.disallowQueries()
rqsc.sqlQueryRPCService.disallowQueries()
}
// IsServing is part of the QueryServiceControl interface
func (rqsc *realQueryServiceControl) IsServing() bool {
return rqsc.sqlQueryRPCService.GetState() == "SERVING"
}
// Reload the schema. If the query service is not running, nothing will happen
func ReloadSchema() {
func (rqsc *realQueryServiceControl) ReloadSchema() {
defer logError()
SqlQueryRpcService.qe.schemaInfo.triggerReload()
rqsc.sqlQueryRPCService.qe.schemaInfo.triggerReload()
}
// CheckMySQL verifies that MySQL is still reachable by connecting to it.
// checkMySQL verifies that MySQL is still reachable by connecting to it.
// If it's not reachable, it shuts down the query service.
// This function rate-limits the check to no more than once per second.
func CheckMySQL() {
if !checkMySLQThrottler.TryAcquire() {
return
}
defer func() {
time.Sleep(1 * time.Second)
checkMySLQThrottler.Release()
}()
defer logError()
if SqlQueryRpcService.checkMySQL() {
return
}
log.Infof("Check MySQL failed. Shutting down query service")
DisallowQueries()
}
// FIXME(alainjobart) this global variable is accessed from many parts
// of this library, this needs refactoring, probably using an interface.
var checkMySQL = func() {}
func GetSessionId() int64 {
return SqlQueryRpcService.sessionId
}
// GetQueryRules is the tabletserver level API to get current query rules
func GetQueryRules(ruleSource string) (*QueryRules, error) {
return QueryRuleSources.GetRules(ruleSource)
func (rqsc *realQueryServiceControl) registerCheckMySQL() {
checkMySQL = func() {
if !checkMySLQThrottler.TryAcquire() {
return
}
defer func() {
time.Sleep(1 * time.Second)
checkMySLQThrottler.Release()
}()
defer logError()
if rqsc.sqlQueryRPCService.checkMySQL() {
return
}
log.Infof("Check MySQL failed. Shutting down query service")
rqsc.DisallowQueries()
}
}
// SetQueryRules is the tabletserver level API to write current query rules
func SetQueryRules(ruleSource string, qrs *QueryRules) error {
func (rqsc *realQueryServiceControl) SetQueryRules(ruleSource string, qrs *QueryRules) error {
err := QueryRuleSources.SetRules(ruleSource, qrs)
if err != nil {
return err
}
SqlQueryRpcService.qe.schemaInfo.ClearQueryPlanCache()
rqsc.sqlQueryRPCService.qe.schemaInfo.ClearQueryPlanCache()
return nil
}
// SqlQuery is part of the QueryServiceControl interface
func (rqsc *realQueryServiceControl) SqlQuery() *SqlQuery {
return rqsc.sqlQueryRPCService
}
// IsHealthy returns nil if the query service is healthy (able to
// connect to the database and serving traffic) or an error explaining
// the unhealthiness otherwise.
func IsHealthy() error {
return SqlQueryRpcService.Execute(
func (rqsc *realQueryServiceControl) IsHealthy() error {
return rqsc.sqlQueryRPCService.Execute(
context.Background(),
&proto.Query{Sql: "select 1 from dual", SessionId: SqlQueryRpcService.sessionId},
&proto.Query{
Sql: "select 1 from dual",
SessionId: rqsc.sqlQueryRPCService.sessionId,
},
new(mproto.QueryResult),
)
}
func healthCheck(w http.ResponseWriter, r *http.Request) {
if err := acl.CheckAccessHTTP(r, acl.MONITORING); err != nil {
acl.SendError(w, err)
return
}
w.Header().Set("Content-Type", "text/plain")
if err := IsHealthy(); err != nil {
w.Write([]byte("not ok"))
return
}
w.Write([]byte("ok"))
func (rqsc *realQueryServiceControl) registerDebugHealthHandler() {
http.HandleFunc("/debug/health", func(w http.ResponseWriter, r *http.Request) {
if err := acl.CheckAccessHTTP(r, acl.MONITORING); err != nil {
acl.SendError(w, err)
return
}
w.Header().Set("Content-Type", "text/plain")
if err := rqsc.IsHealthy(); err != nil {
w.Write([]byte("not ok"))
return
}
w.Write([]byte("ok"))
})
}
func buildFmter(logger *streamlog.StreamLogger) func(url.Values, interface{}) string {
@ -258,8 +389,8 @@ func buildFmter(logger *streamlog.StreamLogger) func(url.Values, interface{}) st
// InitQueryService registers the query service, after loading any
// necessary config files. It also starts any relevant streaming logs.
func InitQueryService() {
func InitQueryService(qsc QueryServiceControl) {
SqlQueryLogger.ServeLogs(*queryLogHandler, buildFmter(SqlQueryLogger))
TxLogger.ServeLogs(*txLogHandler, buildFmter(TxLogger))
RegisterQueryService()
qsc.Register()
}

Просмотреть файл

@ -108,52 +108,49 @@ func (sorter *queryzSorter) Less(i, j int) bool {
return sorter.less(sorter.rows[i], sorter.rows[j])
}
func init() {
http.HandleFunc("/queryz", queryzHandler)
}
func (rqsc *realQueryServiceControl) registerQueryzHandler() {
http.HandleFunc("/queryz", func(w http.ResponseWriter, r *http.Request) {
if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil {
acl.SendError(w, err)
return
}
startHTMLTable(w)
defer endHTMLTable(w)
w.Write(queryzHeader)
// queryzHandler displays the query stats.
func queryzHandler(w http.ResponseWriter, r *http.Request) {
if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil {
acl.SendError(w, err)
return
}
startHTMLTable(w)
defer endHTMLTable(w)
w.Write(queryzHeader)
si := SqlQueryRpcService.qe.schemaInfo
keys := si.queries.Keys()
sorter := queryzSorter{
rows: make([]*queryzRow, 0, len(keys)),
less: func(row1, row2 *queryzRow) bool {
return row1.timePQ() > row2.timePQ()
},
}
for _, v := range si.queries.Keys() {
plan := si.getQuery(v)
if plan == nil {
continue
si := rqsc.sqlQueryRPCService.qe.schemaInfo
keys := si.queries.Keys()
sorter := queryzSorter{
rows: make([]*queryzRow, 0, len(keys)),
less: func(row1, row2 *queryzRow) bool {
return row1.timePQ() > row2.timePQ()
},
}
Value := &queryzRow{
Query: wrappable(v),
Table: plan.TableName,
Plan: plan.PlanId,
Reason: plan.Reason,
for _, v := range si.queries.Keys() {
plan := si.getQuery(v)
if plan == nil {
continue
}
Value := &queryzRow{
Query: wrappable(v),
Table: plan.TableName,
Plan: plan.PlanId,
Reason: plan.Reason,
}
Value.Count, Value.tm, Value.Rows, Value.Errors = plan.Stats()
timepq := time.Duration(int64(Value.tm) / Value.Count)
if timepq < 10*time.Millisecond {
Value.Color = "low"
} else if timepq < 100*time.Millisecond {
Value.Color = "medium"
} else {
Value.Color = "high"
}
sorter.rows = append(sorter.rows, Value)
}
Value.Count, Value.tm, Value.Rows, Value.Errors = plan.Stats()
timepq := time.Duration(int64(Value.tm) / Value.Count)
if timepq < 10*time.Millisecond {
Value.Color = "low"
} else if timepq < 100*time.Millisecond {
Value.Color = "medium"
} else {
Value.Color = "high"
sort.Sort(&sorter)
for _, Value := range sorter.rows {
queryzTmpl.Execute(w, Value)
}
sorter.rows = append(sorter.rows, Value)
}
sort.Sort(&sorter)
for _, Value := range sorter.rows {
queryzTmpl.Execute(w, Value)
}
})
}

Просмотреть файл

@ -120,7 +120,7 @@ func (rci *RowcacheInvalidator) run(ctx *sync2.ServiceContext) error {
break
}
if IsConnErr(err) {
go CheckMySQL()
go checkMySQL()
}
log.Errorf("binlog.ServeUpdateStream returned err '%v', retrying in 1 second.", err.Error())
internalErrors.Add("Invalidation", 1)

Просмотреть файл

@ -32,10 +32,6 @@ var (
`))
)
func init() {
http.HandleFunc("/schemaz", schemazHandler)
}
type schemazSorter struct {
rows []*schema.Table
less func(row1, row2 *schema.Table) bool
@ -53,34 +49,35 @@ func (sorter *schemazSorter) Less(i, j int) bool {
return sorter.less(sorter.rows[i], sorter.rows[j])
}
// schemazHandler displays the schema read by the query service.
func schemazHandler(w http.ResponseWriter, r *http.Request) {
if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil {
acl.SendError(w, err)
return
}
startHTMLTable(w)
defer endHTMLTable(w)
w.Write(schemazHeader)
func (rqsc *realQueryServiceControl) registerSchemazHandler() {
http.HandleFunc("/schemaz", func(w http.ResponseWriter, r *http.Request) {
if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil {
acl.SendError(w, err)
return
}
startHTMLTable(w)
defer endHTMLTable(w)
w.Write(schemazHeader)
tables := SqlQueryRpcService.qe.schemaInfo.GetSchema()
sorter := schemazSorter{
rows: tables,
less: func(row1, row2 *schema.Table) bool {
return row1.Name > row2.Name
},
}
sort.Sort(&sorter)
envelope := struct {
ColumnCategory []string
CacheType []string
Table *schema.Table
}{
ColumnCategory: []string{"other", "number", "varbinary"},
CacheType: []string{"none", "read-write", "write-only"},
}
for _, Value := range sorter.rows {
envelope.Table = Value
schemazTmpl.Execute(w, envelope)
}
tables := rqsc.sqlQueryRPCService.qe.schemaInfo.GetSchema()
sorter := schemazSorter{
rows: tables,
less: func(row1, row2 *schema.Table) bool {
return row1.Name > row2.Name
},
}
sort.Sort(&sorter)
envelope := struct {
ColumnCategory []string
CacheType []string
Table *schema.Table
}{
ColumnCategory: []string{"other", "number", "varbinary"},
CacheType: []string{"none", "read-write", "write-only"},
}
for _, Value := range sorter.rows {
envelope.Table = Value
schemazTmpl.Execute(w, envelope)
}
})
}

Просмотреть файл

@ -84,10 +84,10 @@ type queryserviceStatus struct {
}
// AddStatusPart registers the status part for the status page.
func AddStatusPart() {
func (rqsc *realQueryServiceControl) AddStatusPart() {
servenv.AddStatusPart("Queryservice", queryserviceStatusTemplate, func() interface{} {
status := queryserviceStatus{
State: SqlQueryRpcService.GetState(),
State: rqsc.sqlQueryRPCService.GetState(),
}
rates := qpsRates.Get()
if qps, ok := rates["All"]; ok && len(qps) > 0 {

Просмотреть файл

@ -34,58 +34,56 @@ var (
`))
)
func init() {
http.HandleFunc("/streamqueryz", streamqueryzHandler)
http.HandleFunc("/streamqueryz/terminate", streamqueryzTerminateHandler)
}
func streamqueryzHandler(w http.ResponseWriter, r *http.Request) {
if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil {
acl.SendError(w, err)
return
}
rows := SqlQueryRpcService.qe.streamQList.GetQueryzRows()
if err := r.ParseForm(); err != nil {
http.Error(w, fmt.Sprintf("cannot parse form: %s", err), http.StatusInternalServerError)
return
}
format := r.FormValue("format")
if format == "json" {
js, err := json.Marshal(rows)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
func (rqsc *realQueryServiceControl) registerStreamQueryzHandlers() {
streamqueryzHandler := func(w http.ResponseWriter, r *http.Request) {
if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil {
acl.SendError(w, err)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(js)
return
rows := rqsc.sqlQueryRPCService.qe.streamQList.GetQueryzRows()
if err := r.ParseForm(); err != nil {
http.Error(w, fmt.Sprintf("cannot parse form: %s", err), http.StatusInternalServerError)
return
}
format := r.FormValue("format")
if format == "json" {
js, err := json.Marshal(rows)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(js)
return
}
startHTMLTable(w)
defer endHTMLTable(w)
w.Write(streamqueryzHeader)
for i := range rows {
streamqueryzTmpl.Execute(w, rows[i])
}
}
startHTMLTable(w)
defer endHTMLTable(w)
w.Write(streamqueryzHeader)
for i := range rows {
streamqueryzTmpl.Execute(w, rows[i])
}
}
func streamqueryzTerminateHandler(w http.ResponseWriter, r *http.Request) {
if err := acl.CheckAccessHTTP(r, acl.ADMIN); err != nil {
acl.SendError(w, err)
return
}
if err := r.ParseForm(); err != nil {
http.Error(w, fmt.Sprintf("cannot parse form: %s", err), http.StatusInternalServerError)
return
}
connID := r.FormValue("connID")
c, err := strconv.Atoi(connID)
if err != nil {
http.Error(w, "invalid connID", http.StatusInternalServerError)
return
}
if err = SqlQueryRpcService.qe.streamQList.Terminate(int64(c)); err != nil {
http.Error(w, fmt.Sprintf("error: %v", err), http.StatusInternalServerError)
return
}
streamqueryzHandler(w, r)
http.HandleFunc("/streamqueryz", streamqueryzHandler)
http.HandleFunc("/streamqueryz/terminate", func(w http.ResponseWriter, r *http.Request) {
if err := acl.CheckAccessHTTP(r, acl.ADMIN); err != nil {
acl.SendError(w, err)
return
}
if err := r.ParseForm(); err != nil {
http.Error(w, fmt.Sprintf("cannot parse form: %s", err), http.StatusInternalServerError)
return
}
connID := r.FormValue("connID")
c, err := strconv.Atoi(connID)
if err != nil {
http.Error(w, "invalid connID", http.StatusInternalServerError)
return
}
if err = rqsc.sqlQueryRPCService.qe.streamQList.Terminate(int64(c)); err != nil {
http.Error(w, fmt.Sprintf("error: %v", err), http.StatusInternalServerError)
return
}
streamqueryzHandler(w, r)
})
}

Просмотреть файл

@ -234,7 +234,7 @@ func (txc *TxConnection) Exec(query string, maxrows int, wantfields bool, deadli
r, err := txc.DBConn.execOnce(query, maxrows, wantfields, deadline)
if err != nil {
if IsConnErr(err) {
go CheckMySQL()
go checkMySQL()
return nil, NewTabletErrorSql(ErrFatal, err)
}
return nil, NewTabletErrorSql(ErrFail, err)