add EnablePublishStats flag in queryctl.go

This change allows one to publish system variables optionally.
This commit is contained in:
Shengzhe Yao 2015-04-20 14:13:00 -07:00
Родитель 18cfb530db
Коммит cbd9ce27b3
16 изменённых файлов: 147 добавлений и 105 удалений

Просмотреть файл

@ -42,9 +42,10 @@ func NewCachePool(
name string,
rowCacheConfig RowCacheConfig,
idleTimeout time.Duration,
statsURL string) *CachePool {
statsURL string,
enablePublishStats bool) *CachePool {
cp := &CachePool{name: name, idleTimeout: idleTimeout, statsURL: statsURL}
if name != "" {
if name != "" && enablePublishStats {
cp.memcacheStats = NewMemcacheStats(
rowCacheConfig.StatsPrefix+name, 10*time.Second, enableMain,
func(key string) string {

Просмотреть файл

@ -20,7 +20,7 @@ import (
func TestCachePoolWithEmptyBinary(t *testing.T) {
fakecacheservice.Register()
fakesqldb.Register()
cachePool := newTestCachePool(RowCacheConfig{})
cachePool := newTestCachePool(RowCacheConfig{}, false)
cachePool.Close()
}
@ -31,7 +31,7 @@ func TestCachePool(t *testing.T) {
Binary: "ls",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, false)
if !cachePool.IsClosed() {
t.Fatalf("cache pool is not closed")
}
@ -52,7 +52,7 @@ func TestCachePoolOpenTwice(t *testing.T) {
Binary: "ls",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, false)
cachePool.Open()
defer cachePool.Close()
defer func() {
@ -70,7 +70,7 @@ func TestCachePoolOpenWithEmptyBinary(t *testing.T) {
Binary: "ls",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, false)
defer func() {
if e := recover(); e == nil {
t.Fatalf("open a cache pool with empty rowCacheConfig.Binary should panic")
@ -88,7 +88,7 @@ func TestCachePoolOpenWithInvalidBinary(t *testing.T) {
Binary: "invalid_binary",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, false)
defer func() {
if e := recover(); e == nil {
t.Fatalf("open a cache pool with an invalid rowCacheConfig.Binary should panic")
@ -105,7 +105,7 @@ func TestCachePoolState(t *testing.T) {
Binary: "ls",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, true)
idleTimeout := 1 * time.Second
cachePool.idleTimeout = idleTimeout
cachePool.Open()
@ -141,7 +141,7 @@ func TestCachePoolStateWithoutOpen(t *testing.T) {
Binary: "ls",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, false)
idleTimeout := 1 * time.Second
cachePool.idleTimeout = idleTimeout
if cachePool.StatsJSON() != "{}" {
@ -175,7 +175,7 @@ func TestCachePoolGetFailedBecauseCachePoolIsClosed(t *testing.T) {
Binary: "ls",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, false)
idleTimeout := 1 * time.Second
cachePool.idleTimeout = idleTimeout
ctx := context.Background()
@ -194,7 +194,7 @@ func TestCachePoolStatsURL(t *testing.T) {
Binary: "ls",
Connections: 100,
}
cachePool := newTestCachePool(rowCacheConfig)
cachePool := newTestCachePool(rowCacheConfig, false)
idleTimeout := 1 * time.Second
cachePool.idleTimeout = idleTimeout
cachePool.Open()
@ -204,9 +204,9 @@ func TestCachePoolStatsURL(t *testing.T) {
cachePool.ServeHTTP(response, request)
}
func newTestCachePool(rowcacheConfig RowCacheConfig) *CachePool {
func newTestCachePool(rowcacheConfig RowCacheConfig, enablePublishStats bool) *CachePool {
randID := rand.Int63()
name := fmt.Sprintf("TestCachePool-%d-", randID)
statsURL := fmt.Sprintf("/debug/cache-%d", randID)
return NewCachePool(name, rowcacheConfig, 1*time.Second, statsURL)
return NewCachePool(name, rowcacheConfig, 1*time.Second, statsURL, enablePublishStats)
}

Просмотреть файл

@ -41,7 +41,8 @@ type ConnPool struct {
func NewConnPool(
name string,
capacity int,
idleTimeout time.Duration) *ConnPool {
idleTimeout time.Duration,
enablePublishStats bool) *ConnPool {
cp := &ConnPool{
capacity: capacity,
idleTimeout: idleTimeout,
@ -50,12 +51,14 @@ func NewConnPool(
if name == "" {
return cp
}
stats.Publish(name+"Capacity", stats.IntFunc(cp.Capacity))
stats.Publish(name+"Available", stats.IntFunc(cp.Available))
stats.Publish(name+"MaxCap", stats.IntFunc(cp.MaxCap))
stats.Publish(name+"WaitCount", stats.IntFunc(cp.WaitCount))
stats.Publish(name+"WaitTime", stats.DurationFunc(cp.WaitTime))
stats.Publish(name+"IdleTimeout", stats.DurationFunc(cp.IdleTimeout))
if enablePublishStats {
stats.Publish(name+"Capacity", stats.IntFunc(cp.Capacity))
stats.Publish(name+"Available", stats.IntFunc(cp.Available))
stats.Publish(name+"MaxCap", stats.IntFunc(cp.MaxCap))
stats.Publish(name+"WaitCount", stats.IntFunc(cp.WaitCount))
stats.Publish(name+"WaitTime", stats.DurationFunc(cp.WaitTime))
stats.Publish(name+"IdleTimeout", stats.DurationFunc(cp.IdleTimeout))
}
return cp
}

Просмотреть файл

@ -42,7 +42,7 @@ func TestConnectivity(t *testing.T) {
killStats = stats.NewCounters("TestKills")
internalErrors = stats.NewCounters("TestInternalErrors")
mysqlStats = stats.NewTimings("TestMySQLStats")
pool := NewConnPool("p1", 1, 30*time.Second)
pool := NewConnPool("p1", 1, 30*time.Second, false)
pool.Open(appParams, dbaParams)
conn, err := pool.Get(ctx)

Просмотреть файл

@ -127,6 +127,7 @@ func NewQueryEngine(config Config) *QueryEngine {
},
time.Duration(config.SchemaReloadTime*1e9),
time.Duration(config.IdleTimeout*1e9),
config.EnablePublishStats,
)
mysqlStats = stats.NewTimings(config.StatsPrefix + "Mysql")
@ -137,16 +138,19 @@ func NewQueryEngine(config Config) *QueryEngine {
config.RowCache,
time.Duration(config.IdleTimeout*1e9),
config.DebugURLPrefix+"/memcache/",
config.EnablePublishStats,
)
qe.connPool = NewConnPool(
config.PoolNamePrefix+"ConnPool",
config.PoolSize,
time.Duration(config.IdleTimeout*1e9),
config.EnablePublishStats,
)
qe.streamConnPool = NewConnPool(
config.PoolNamePrefix+"StreamConnPool",
config.StreamPoolSize,
time.Duration(config.IdleTimeout*1e9),
config.EnablePublishStats,
)
// Services
@ -157,6 +161,7 @@ func NewQueryEngine(config Config) *QueryEngine {
time.Duration(config.TransactionTimeout*1e9),
time.Duration(config.TxPoolTimeout*1e9),
time.Duration(config.IdleTimeout*1e9),
config.EnablePublishStats,
)
qe.consolidator = sync2.NewConsolidator()
http.Handle(config.DebugURLPrefix+"/consolidations", qe.consolidator)
@ -178,10 +183,6 @@ func NewQueryEngine(config Config) *QueryEngine {
qe.accessCheckerLogger = logutil.NewThrottledLogger("accessChecker", 1*time.Second)
// Stats
stats.Publish(config.StatsPrefix+"MaxResultSize", stats.IntFunc(qe.maxResultSize.Get))
stats.Publish(config.StatsPrefix+"MaxDMLRows", stats.IntFunc(qe.maxDMLRows.Get))
stats.Publish(config.StatsPrefix+"StreamBufferSize", stats.IntFunc(qe.streamBufferSize.Get))
stats.Publish(config.StatsPrefix+"QueryTimeout", stats.DurationFunc(qe.queryTimeout.Get))
queryStats = stats.NewTimings(config.StatsPrefix + "Queries")
qpsRates = stats.NewRates(config.StatsPrefix+"QPS", queryStats, 15, 60*time.Second)
waitStats = stats.NewTimings(config.StatsPrefix + "Waits")
@ -190,11 +191,17 @@ func NewQueryEngine(config Config) *QueryEngine {
errorStats = stats.NewCounters(config.StatsPrefix + "Errors")
internalErrors = stats.NewCounters(config.StatsPrefix + "InternalErrors")
resultStats = stats.NewHistogram(config.StatsPrefix+"Results", resultBuckets)
stats.Publish(config.StatsPrefix+"RowcacheSpotCheckRatio", stats.FloatFunc(func() float64 {
return float64(qe.spotCheckFreq.Get()) / spotCheckMultiplier
}))
spotCheckCount = stats.NewInt(config.StatsPrefix + "RowcacheSpotCheckCount")
if config.EnablePublishStats {
stats.Publish(config.StatsPrefix+"MaxResultSize", stats.IntFunc(qe.maxResultSize.Get))
stats.Publish(config.StatsPrefix+"MaxDMLRows", stats.IntFunc(qe.maxDMLRows.Get))
stats.Publish(config.StatsPrefix+"StreamBufferSize", stats.IntFunc(qe.streamBufferSize.Get))
stats.Publish(config.StatsPrefix+"QueryTimeout", stats.DurationFunc(qe.queryTimeout.Get))
stats.Publish(config.StatsPrefix+"RowcacheSpotCheckRatio", stats.FloatFunc(func() float64 {
return float64(qe.spotCheckFreq.Get()) / spotCheckMultiplier
}))
}
return qe
}

Просмотреть файл

@ -732,6 +732,7 @@ func newTestQueryExecutor(sql string, ctx context.Context, flags executorFlags)
config.PoolSize = 100
config.TransactionCap = 100
config.SpotCheckRatio = 1.0
config.EnablePublishStats = false
if flags&enableStrict > 0 {
config.StrictMode = true
} else {

Просмотреть файл

@ -48,6 +48,7 @@ func init() {
flag.BoolVar(&qsConfig.StrictMode, "queryserver-config-strict-mode", DefaultQsConfig.StrictMode, "allow only predictable DMLs and enforces MySQL's STRICT_TRANS_TABLES")
flag.BoolVar(&qsConfig.StrictTableAcl, "queryserver-config-strict-table-acl", DefaultQsConfig.StrictTableAcl, "only allow queries that pass table acl checks")
flag.BoolVar(&qsConfig.TerseErrors, "queryserver-config-terse-errors", DefaultQsConfig.TerseErrors, "prevent bind vars from escaping in returned errors")
flag.BoolVar(&qsConfig.EnablePublishStats, "queryserver-config-enable-publish-stats", DefaultQsConfig.EnablePublishStats, "set this flag to true makes queryservice publish monitoring stats")
flag.StringVar(&qsConfig.RowCache.Binary, "rowcache-bin", DefaultQsConfig.RowCache.Binary, "rowcache binary file")
flag.IntVar(&qsConfig.RowCache.Memory, "rowcache-memory", DefaultQsConfig.RowCache.Memory, "rowcache max memory usage in MB")
flag.StringVar(&qsConfig.RowCache.Socket, "rowcache-socket", DefaultQsConfig.RowCache.Socket, "socket filename hint: a unique filename will be generated based on this input")
@ -114,6 +115,7 @@ type Config struct {
StrictMode bool
StrictTableAcl bool
TerseErrors bool
EnablePublishStats bool
StatsPrefix string
DebugURLPrefix string
PoolNamePrefix string
@ -145,6 +147,7 @@ var DefaultQsConfig = Config{
StrictMode: true,
StrictTableAcl: false,
TerseErrors: false,
EnablePublishStats: true,
StatsPrefix: "",
DebugURLPrefix: "/debug",
PoolNamePrefix: "",

Просмотреть файл

@ -19,7 +19,7 @@ import (
func TestQueryzHandler(t *testing.T) {
resp := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/schemaz", nil)
schemaInfo := newTestSchemaInfo(100, 10*time.Second, 10*time.Second)
schemaInfo := newTestSchemaInfo(100, 10*time.Second, 10*time.Second, false)
plan1 := &ExecPlan{
ExecPlan: &planbuilder.ExecPlan{

Просмотреть файл

@ -117,27 +117,30 @@ func NewSchemaInfo(
statsPrefix string,
endpoints map[string]string,
reloadTime time.Duration,
idleTimeout time.Duration) *SchemaInfo {
idleTimeout time.Duration,
enablePublishStats bool) *SchemaInfo {
si := &SchemaInfo{
queries: cache.NewLRUCache(int64(queryCacheSize)),
connPool: NewConnPool("", 2, idleTimeout),
connPool: NewConnPool("", 2, idleTimeout, enablePublishStats),
ticks: timer.NewTimer(reloadTime),
endpoints: endpoints,
reloadTime: reloadTime,
}
stats.Publish(statsPrefix+"QueryCacheLength", stats.IntFunc(si.queries.Length))
stats.Publish(statsPrefix+"QueryCacheSize", stats.IntFunc(si.queries.Size))
stats.Publish(statsPrefix+"QueryCacheCapacity", stats.IntFunc(si.queries.Capacity))
stats.Publish(statsPrefix+"QueryCacheOldest", stats.StringFunc(func() string {
return fmt.Sprintf("%v", si.queries.Oldest())
}))
stats.Publish(statsPrefix+"SchemaReloadTime", stats.DurationFunc(si.ticks.Interval))
_ = stats.NewMultiCountersFunc(statsPrefix+"RowcacheStats", []string{"Table", "Stats"}, si.getRowcacheStats)
_ = stats.NewMultiCountersFunc(statsPrefix+"RowcacheInvalidations", []string{"Table"}, si.getRowcacheInvalidations)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryCounts", []string{"Table", "Plan"}, si.getQueryCount)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryTimesNs", []string{"Table", "Plan"}, si.getQueryTime)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryRowCounts", []string{"Table", "Plan"}, si.getQueryRowCount)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryErrorCounts", []string{"Table", "Plan"}, si.getQueryErrorCount)
if enablePublishStats {
stats.Publish(statsPrefix+"QueryCacheLength", stats.IntFunc(si.queries.Length))
stats.Publish(statsPrefix+"QueryCacheSize", stats.IntFunc(si.queries.Size))
stats.Publish(statsPrefix+"QueryCacheCapacity", stats.IntFunc(si.queries.Capacity))
stats.Publish(statsPrefix+"QueryCacheOldest", stats.StringFunc(func() string {
return fmt.Sprintf("%v", si.queries.Oldest())
}))
stats.Publish(statsPrefix+"SchemaReloadTime", stats.DurationFunc(si.ticks.Interval))
_ = stats.NewMultiCountersFunc(statsPrefix+"RowcacheStats", []string{"Table", "Stats"}, si.getRowcacheStats)
_ = stats.NewMultiCountersFunc(statsPrefix+"RowcacheInvalidations", []string{"Table"}, si.getRowcacheInvalidations)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryCounts", []string{"Table", "Plan"}, si.getQueryCount)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryTimesNs", []string{"Table", "Plan"}, si.getQueryTime)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryRowCounts", []string{"Table", "Plan"}, si.getQueryRowCount)
_ = stats.NewMultiCountersFunc(statsPrefix+"QueryErrorCounts", []string{"Table", "Plan"}, si.getQueryErrorCount)
}
for _, ep := range endpoints {
http.Handle(ep, si)
}

Просмотреть файл

@ -29,10 +29,10 @@ func TestSchemaInfoStrictMode(t *testing.T) {
for query, result := range getSchemaInfoBaseTestQueries() {
db.AddQuery(query, result)
}
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second)
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(false)
cachePool.Open()
defer cachePool.Close()
defer handleAndVerifyTabletError(
@ -54,10 +54,10 @@ func TestSchemaInfoOpenFailedDueToMissMySQLTime(t *testing.T) {
[]sqltypes.Value{sqltypes.MakeString([]byte("1427325875"))},
},
})
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second)
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(false)
cachePool.Open()
defer cachePool.Close()
defer handleAndVerifyTabletError(
@ -78,10 +78,10 @@ func TestSchemaInfoOpenFailedDueToIncorrectMysqlRowNum(t *testing.T) {
nil,
},
})
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second)
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(false)
cachePool.Open()
defer cachePool.Close()
defer handleAndVerifyTabletError(
@ -102,10 +102,10 @@ func TestSchemaInfoOpenFailedDueToInvalidTimeFormat(t *testing.T) {
[]sqltypes.Value{sqltypes.MakeString([]byte("invalid_time"))},
},
})
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second)
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(false)
cachePool.Open()
defer cachePool.Close()
defer handleAndVerifyTabletError(
@ -126,10 +126,10 @@ func TestSchemaInfoOpenFailedDueToExecErr(t *testing.T) {
// this will cause connection failed to execute baseShowTables query
RowsAffected: math.MaxUint64,
})
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second)
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(false)
cachePool.Open()
defer cachePool.Close()
defer handleAndVerifyTabletError(
@ -156,10 +156,10 @@ func TestSchemaInfoOpenFailedDueToTableInfoErr(t *testing.T) {
// this will cause NewTableInfo error
RowsAffected: math.MaxUint64,
})
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second)
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(false)
cachePool.Open()
defer cachePool.Close()
defer handleAndVerifyTabletError(
@ -176,10 +176,10 @@ func TestSchemaInfoOpenWithSchemaOverride(t *testing.T) {
for query, result := range getSchemaInfoTestSupportedQueries() {
db.AddQuery(query, result)
}
schemaInfo := newTestSchemaInfo(10, 10*time.Second, 10*time.Second)
schemaInfo := newTestSchemaInfo(10, 10*time.Second, 10*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(false)
cachePool.Open()
defer cachePool.Close()
schemaOverrides := getSchemaInfoTestSchemaOverride()
@ -206,10 +206,10 @@ func TestSchemaInfoReload(t *testing.T) {
db.AddQuery(query, result)
}
idleTimeout := 10 * time.Second
schemaInfo := newTestSchemaInfo(10, 10*time.Second, idleTimeout)
schemaInfo := newTestSchemaInfo(10, 10*time.Second, idleTimeout, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(false)
cachePool.Open()
defer cachePool.Close()
// test cache type RW
@ -289,10 +289,10 @@ func TestSchemaInfoCreateOrUpdateTableFailedDuetoExecErr(t *testing.T) {
RowsAffected: math.MaxUint64,
Rows: [][]sqltypes.Value{createTestTableDescribe("pk")},
})
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second)
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(false)
cachePool.Open()
defer cachePool.Close()
defer handleAndVerifyTabletError(
@ -317,10 +317,10 @@ func TestSchemaInfoCreateOrUpdateTable(t *testing.T) {
RowsAffected: 1,
Rows: [][]sqltypes.Value{createTestTableDescribe("pk")},
})
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second)
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(false)
cachePool.Open()
defer cachePool.Close()
schemaInfo.Open(&appParams, &dbaParams, getSchemaInfoTestSchemaOverride(), cachePool, false)
@ -340,10 +340,10 @@ func TestSchemaInfoDropTable(t *testing.T) {
RowsAffected: 1,
Rows: [][]sqltypes.Value{createTestTableDescribe("pk")},
})
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second)
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(false)
cachePool.Open()
defer cachePool.Close()
schemaInfo.Open(&appParams, &dbaParams, getSchemaInfoTestSchemaOverride(), cachePool, false)
@ -365,10 +365,10 @@ func TestSchemaInfoGetPlanPanicDuetoEmptyQuery(t *testing.T) {
for query, result := range getSchemaInfoTestSupportedQueries() {
db.AddQuery(query, result)
}
schemaInfo := newTestSchemaInfo(10, 10*time.Second, 10*time.Second)
schemaInfo := newTestSchemaInfo(10, 10*time.Second, 10*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(false)
cachePool.Open()
defer cachePool.Close()
schemaOverrides := getSchemaInfoTestSchemaOverride()
@ -392,10 +392,10 @@ func TestSchemaInfoQueryCacheFailDueToInvalidCacheSize(t *testing.T) {
for query, result := range getSchemaInfoTestSupportedQueries() {
db.AddQuery(query, result)
}
schemaInfo := newTestSchemaInfo(10, 10*time.Second, 10*time.Second)
schemaInfo := newTestSchemaInfo(10, 10*time.Second, 10*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(false)
cachePool.Open()
defer cachePool.Close()
schemaOverrides := getSchemaInfoTestSchemaOverride()
@ -416,10 +416,10 @@ func TestSchemaInfoQueryCache(t *testing.T) {
for query, result := range getSchemaInfoTestSupportedQueries() {
db.AddQuery(query, result)
}
schemaInfo := newTestSchemaInfo(10, 10*time.Second, 10*time.Second)
schemaInfo := newTestSchemaInfo(10, 10*time.Second, 10*time.Second, true)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(true)
cachePool.Open()
defer cachePool.Close()
schemaOverrides := getSchemaInfoTestSchemaOverride()
@ -452,10 +452,10 @@ func TestSchemaInfoExportVars(t *testing.T) {
for query, result := range getSchemaInfoTestSupportedQueries() {
db.AddQuery(query, result)
}
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second)
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, true)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(true)
cachePool.Open()
defer cachePool.Close()
schemaInfo.Open(&appParams, &dbaParams, []SchemaOverride{}, cachePool, true)
@ -471,10 +471,10 @@ func TestSchemaInfoStatsURL(t *testing.T) {
for query, result := range getSchemaInfoTestSupportedQueries() {
db.AddQuery(query, result)
}
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second)
schemaInfo := newTestSchemaInfo(10, 1*time.Second, 1*time.Second, false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
cachePool := newTestSchemaInfoCachePool()
cachePool := newTestSchemaInfoCachePool(false)
cachePool.Open()
defer cachePool.Close()
schemaInfo.Open(&appParams, &dbaParams, []SchemaOverride{}, cachePool, true)
@ -506,7 +506,7 @@ func TestSchemaInfoStatsURL(t *testing.T) {
schemaInfo.ServeHTTP(response, request)
}
func newTestSchemaInfoCachePool() *CachePool {
func newTestSchemaInfoCachePool(enablePublishStats bool) *CachePool {
rowCacheConfig := RowCacheConfig{
Binary: "ls",
Connections: 100,
@ -514,7 +514,7 @@ func newTestSchemaInfoCachePool() *CachePool {
randID := rand.Int63()
name := fmt.Sprintf("TestCachePool-%d-", randID)
statsURL := fmt.Sprintf("/debug/cache-%d", randID)
return NewCachePool(name, rowCacheConfig, 1*time.Second, statsURL)
return NewCachePool(name, rowCacheConfig, 1*time.Second, statsURL, enablePublishStats)
}
func getSchemaInfoBaseTestQueries() map[string]*mproto.QueryResult {

Просмотреть файл

@ -91,13 +91,15 @@ func NewSqlQuery(config Config) *SqlQuery {
config: config,
}
sq.qe = NewQueryEngine(config)
stats.Publish(config.StatsPrefix+"TabletState", stats.IntFunc(func() int64 {
sq.mu.Lock()
state := sq.state
sq.mu.Unlock()
return state
}))
stats.Publish(config.StatsPrefix+"TabletStateName", stats.StringFunc(sq.GetState))
if config.EnablePublishStats {
stats.Publish(config.StatsPrefix+"TabletState", stats.IntFunc(func() int64 {
sq.mu.Lock()
state := sq.state
sq.mu.Unlock()
return state
}))
stats.Publish(config.StatsPrefix+"TabletStateName", stats.StringFunc(sq.GetState))
}
return sq
}

Просмотреть файл

@ -5,8 +5,10 @@
package tabletserver
import (
"expvar"
"fmt"
"math/rand"
"strconv"
"testing"
"time"
@ -124,6 +126,7 @@ func TestSqlQueryCheckMysqlFailUninitializedQueryEngine(t *testing.T) {
func TestSqlQueryCheckMysqlInNotServingState(t *testing.T) {
setUpSqlQueryTest()
config := newTestSqlQueryConfig()
config.EnablePublishStats = true
sqlQuery := NewSqlQuery(config)
// sqlquery start request fail because we are in StateNotServing;
// however, checkMySQL should return true. Here, we always assume
@ -131,6 +134,17 @@ func TestSqlQueryCheckMysqlInNotServingState(t *testing.T) {
if !sqlQuery.checkMySQL() {
t.Fatalf("checkMySQL should return true")
}
tabletState := expvar.Get(config.StatsPrefix + "TabletState")
if tabletState == nil {
t.Fatalf("%sTabletState should be exposed", config.StatsPrefix)
}
varzState, err := strconv.Atoi(tabletState.String())
if err != nil {
t.Fatalf("invalid state reported by expvar, should be a valid state code, but got: %s", tabletState.String())
}
if varzState != StateNotServing {
t.Fatalf("queryservice should be in NOT_SERVING state, but exposed varz reports: %s", stateName[varzState])
}
}
func TestSqlQueryGetSessionId(t *testing.T) {
@ -1027,6 +1041,7 @@ func newTestSqlQueryConfig() Config {
config.StrictMode = true
config.RowCache.Binary = "ls"
config.RowCache.Connections = 100
config.EnablePublishStats = false
return config
}

Просмотреть файл

@ -270,7 +270,7 @@ func newTestTableInfo(cachePool *CachePool, tableType string, comment string) (*
dbaParams := sqldb.ConnParams{}
connPoolIdleTimeout := 10 * time.Second
connPool := NewConnPool("", 2, connPoolIdleTimeout)
connPool := NewConnPool("", 2, connPoolIdleTimeout, false)
connPool.Open(&appParams, &dbaParams)
conn, err := connPool.Get(ctx)
if err != nil {
@ -295,7 +295,7 @@ func newTestTableInfoCachePool() *CachePool {
randID := rand.Int63()
name := fmt.Sprintf("TestCachePool-TableInfo-%d-", randID)
statsURL := fmt.Sprintf("/debug/tableinfo-cache-%d", randID)
return NewCachePool(name, rowCacheConfig, 1*time.Second, statsURL)
return NewCachePool(name, rowCacheConfig, 1*time.Second, statsURL, false)
}
func getTestTableInfoQueries() map[string]*mproto.QueryResult {

Просмотреть файл

@ -47,7 +47,8 @@ func (util *testUtils) checkEqual(t *testing.T, expected interface{}, result int
func newTestSchemaInfo(
queryCacheSize int,
reloadTime time.Duration,
idleTimeout time.Duration) *SchemaInfo {
idleTimeout time.Duration,
enablePublishStats bool) *SchemaInfo {
randID := rand.Int63()
return NewSchemaInfo(
queryCacheSize,
@ -59,5 +60,7 @@ func newTestSchemaInfo(
debugSchemaKey: fmt.Sprintf("/debug/schema_%d", randID),
},
reloadTime,
idleTimeout)
idleTimeout,
enablePublishStats,
)
}

Просмотреть файл

@ -65,9 +65,10 @@ func NewTxPool(
capacity int,
timeout time.Duration,
poolTimeout time.Duration,
idleTimeout time.Duration) *TxPool {
idleTimeout time.Duration,
enablePublishStats bool) *TxPool {
axp := &TxPool{
pool: NewConnPool(name, capacity, idleTimeout),
pool: NewConnPool(name, capacity, idleTimeout, enablePublishStats),
activePool: pools.NewNumbered(),
lastID: sync2.AtomicInt64(time.Now().UnixNano()),
timeout: sync2.AtomicDuration(timeout),
@ -77,8 +78,10 @@ func NewTxPool(
}
// Careful: pool also exports name+"xxx" vars,
// but we know it doesn't export Timeout.
stats.Publish(name+"Timeout", stats.DurationFunc(axp.timeout.Get))
stats.Publish(name+"PoolTimeout", stats.DurationFunc(axp.poolTimeout.Get))
if enablePublishStats {
stats.Publish(name+"Timeout", stats.DurationFunc(axp.timeout.Get))
stats.Publish(name+"PoolTimeout", stats.DurationFunc(axp.poolTimeout.Get))
}
return axp
}

Просмотреть файл

@ -21,7 +21,7 @@ func TestExecuteCommit(t *testing.T) {
tableName := "test_table"
sql := fmt.Sprintf("ALTER TABLE %s ADD test_column INT", tableName)
fakesqldb.Register()
txPool := newTxPool()
txPool := newTxPool(true)
txPool.SetTimeout(1 * time.Second)
txPool.SetPoolTimeout(1 * time.Second)
appParams := sqldb.ConnParams{}
@ -51,7 +51,7 @@ func TestExecuteCommit(t *testing.T) {
func TestExecuteRollback(t *testing.T) {
sql := "ALTER TABLE test_table ADD test_column INT"
fakesqldb.Register()
txPool := newTxPool()
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
txPool.Open(&appParams, &dbaParams)
@ -71,7 +71,7 @@ func TestExecuteRollback(t *testing.T) {
func TestTransactionKiller(t *testing.T) {
sql := "ALTER TABLE test_table ADD test_column INT"
fakesqldb.Register()
txPool := newTxPool()
txPool := newTxPool(false)
// make sure transaction killer will run frequent enough
txPool.SetTimeout(time.Duration(10))
appParams := sqldb.ConnParams{}
@ -94,7 +94,7 @@ func TestTransactionKiller(t *testing.T) {
func TestBeginAfterConnPoolClosed(t *testing.T) {
fakesqldb.Register()
txPool := newTxPool()
txPool := newTxPool(false)
txPool.SetTimeout(time.Duration(10))
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
@ -116,7 +116,7 @@ func TestBeginAfterConnPoolClosed(t *testing.T) {
func TestBeginWithPoolTimeout(t *testing.T) {
fakesqldb.Register()
txPool := newTxPool()
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
txPool.Open(&appParams, &dbaParams)
@ -135,7 +135,7 @@ func TestBeginWithPoolTimeout(t *testing.T) {
func TestBeginWithPoolConnectionError(t *testing.T) {
db := fakesqldb.Register()
db.EnableConnFail()
txPool := newTxPool()
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
txPool.Open(&appParams, &dbaParams)
@ -148,7 +148,7 @@ func TestBeginWithPoolConnectionError(t *testing.T) {
func TestBeginWithExecError(t *testing.T) {
db := fakesqldb.Register()
db.AddRejectedQuery("begin")
txPool := newTxPool()
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
txPool.Open(&appParams, &dbaParams)
@ -164,7 +164,7 @@ func TestTxPoolSafeCommitFail(t *testing.T) {
db.AddQuery("begin", &proto.QueryResult{})
db.AddQuery(sql, &proto.QueryResult{})
db.AddRejectedQuery("commit")
txPool := newTxPool()
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
txPool.Open(&appParams, &dbaParams)
@ -187,7 +187,7 @@ func TestTxPoolRollbackFail(t *testing.T) {
db := fakesqldb.Register()
db.AddRejectedQuery("rollback")
sql := "ALTER TABLE test_table ADD test_column INT"
txPool := newTxPool()
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
txPool.Open(&appParams, &dbaParams)
@ -207,7 +207,7 @@ func TestTxPoolRollbackFail(t *testing.T) {
func TestTxPoolGetConnFail(t *testing.T) {
fakesqldb.Register()
txPool := newTxPool()
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
txPool.Open(&appParams, &dbaParams)
@ -218,7 +218,7 @@ func TestTxPoolGetConnFail(t *testing.T) {
func TestTxPoolExecFailDueToConnFail(t *testing.T) {
db := fakesqldb.Register()
txPool := newTxPool()
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
txPool.Open(&appParams, &dbaParams)
@ -237,7 +237,7 @@ func TestTxPoolExecFailDueToConnFail(t *testing.T) {
}
}
func newTxPool() *TxPool {
func newTxPool(enablePublishStats bool) *TxPool {
randID := rand.Int63()
poolName := fmt.Sprintf("TestTransactionPool-%d", randID)
txStatsPrefix := fmt.Sprintf("TxStats-%d-", randID)
@ -252,6 +252,7 @@ func newTxPool() *TxPool {
transactionTimeout,
txPoolTimeout,
idleTimeout,
enablePublishStats,
)
}