1. remove cache pool dependency from MemcacheStats; instead, NewMemcacheStats func takes
   a func param to return memcache stats. This change makes MemcacheStats more lightweight
   and easy to unit test.
2. replace bool params in original NewMemcacheStats func by flags: enableMain, enableSlabs
   and enableItems.
3. Pre-allocate memory for MemcacheStats.main, slabs and items. This removes several null checks.
4. expose refresh freq to let caller decide how frequent to refresh the stats.
This commit is contained in:
Shengzhe Yao 2015-04-11 23:52:21 -07:00
Родитель 7f6f6696b2
Коммит d0475d0754
5 изменённых файлов: 310 добавлений и 115 удалений

Просмотреть файл

@ -22,9 +22,6 @@ import (
"golang.org/x/net/context"
)
// CreateCacheFunc defines the function signature to create a memcache connection.
type CreateCacheFunc func() (cacheservice.CacheService, error)
// CachePool re-exposes ResourcePool as a pool of Memcache connection objects.
type CachePool struct {
name string
@ -49,7 +46,21 @@ func NewCachePool(
cp := &CachePool{name: name, idleTimeout: idleTimeout, statsURL: statsURL}
if name != "" {
cp.memcacheStats = NewMemcacheStats(
cp, rowCacheConfig.StatsPrefix, true, false, false)
rowCacheConfig.StatsPrefix+name, 10*time.Second, enableMain,
func(key string) string {
conn := cp.Get(context.Background())
// This is not the same as defer cachePool.Put(conn)
defer func() { cp.Put(conn) }()
stats, err := conn.Stats(key)
if err != nil {
conn.Close()
conn = nil
log.Errorf("Cannot export memcache %v stats: %v", key, err)
internalErrors.Add("MemcacheStats", 1)
return ""
}
return string(stats)
})
stats.Publish(name+"ConnPoolCapacity", stats.IntFunc(cp.Capacity))
stats.Publish(name+"ConnPoolAvailable", stats.IntFunc(cp.Available))
stats.Publish(name+"ConnPoolMaxCap", stats.IntFunc(cp.MaxCap))

Просмотреть файл

@ -109,6 +109,7 @@ func TestCachePoolState(t *testing.T) {
idleTimeout := 1 * time.Second
cachePool.idleTimeout = idleTimeout
cachePool.Open()
cachePool.memcacheStats.update()
defer cachePool.Close()
if cachePool.Available() <= 0 {
t.Fatalf("cache pool should have connections available")

Просмотреть файл

@ -182,7 +182,7 @@ func (service *FakeCacheService) FlushAll() error {
}
// Stats returns a list of basic stats.
func (service *FakeCacheService) Stats(argument string) ([]byte, error) {
func (service *FakeCacheService) Stats(key string) ([]byte, error) {
return []byte{}, nil
}
@ -191,14 +191,14 @@ func (service *FakeCacheService) Close() {
}
// Register registers a fake implementation of cacheservice.CacaheService and returns its registered name
func Register() string {
func Register() *Cache {
name := fmt.Sprintf("fake-%d", rand.Int63())
cache := &Cache{data: make(map[string]*cs.Result)}
cs.Register(name, func(cs.Config) (cs.CacheService, error) {
return NewFakeCacheService(cache), nil
})
cs.DefaultCacheService = name
return name
return cache
}
func init() {

Просмотреть файл

@ -15,11 +15,8 @@ import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/timer"
"golang.org/x/net/context"
)
var interval = 5 * time.Second
var mainStringMetrics = map[string]bool{
"accepting_conns": false,
"auth_cmds": false,
@ -105,88 +102,113 @@ var itemsMetrics = []string{
"tailrepairs",
}
// RetrieveCacheStats returns current memcache stats.
type RetrieveCacheStats func(key string) string
// MemcacheStats exports the Memcache internal stats through stats package.
type MemcacheStats struct {
cachePool *CachePool
ticks *timer.Timer
mu sync.Mutex
main map[string]string
slabs map[string]map[string]int64
items map[string]map[string]int64
statsPrefix string
statsFunc RetrieveCacheStats
flags int64
}
// NewMemcacheStats creates a new MemcacheStats based on given CachePool.
const (
enableMain = 1 << iota
enableSlabs
enableItems
)
// NewMemcacheStats creates a new MemcacheStats.
// main, slabs and items specify the categories of stats that need to be exported.
func NewMemcacheStats(cachePool *CachePool, statsPrefix string, main, slabs, items bool) *MemcacheStats {
s := &MemcacheStats{
cachePool: cachePool,
ticks: timer.NewTimer(10 * time.Second),
func NewMemcacheStats(
statsPrefix string,
refreshFreq time.Duration,
flags int64,
statsFunc RetrieveCacheStats) *MemcacheStats {
memstats := &MemcacheStats{
ticks: timer.NewTimer(refreshFreq),
statsPrefix: statsPrefix,
statsFunc: statsFunc,
main: make(map[string]string),
slabs: make(map[string]map[string]int64),
items: make(map[string]map[string]int64),
flags: flags,
}
if main {
s.publishMainStats()
if flags&enableMain > 0 {
memstats.publishMainStats()
}
if slabs {
s.publishSlabsStats()
if flags&enableSlabs > 0 {
memstats.publishSlabsStats()
}
if items {
s.publishItemsStats()
if flags*enableItems > 0 {
memstats.publishItemsStats()
}
return s
return memstats
}
// Open starts exporting the stats.
func (s *MemcacheStats) Open() {
s.ticks.Start(func() {
s.updateMainStats()
s.updateSlabsStats()
s.updateItemsStats()
})
func (memstats *MemcacheStats) Open() {
memstats.ticks.Start(func() { memstats.update() })
}
// Close clears the variable values and stops exporting the stats.
func (s *MemcacheStats) Close() {
s.ticks.Stop()
func (memstats *MemcacheStats) Close() {
memstats.ticks.Stop()
s.mu.Lock()
defer s.mu.Unlock()
for key := range s.main {
memstats.mu.Lock()
defer memstats.mu.Unlock()
for key := range memstats.main {
if mainStringMetrics[key] {
s.main[key] = ""
memstats.main[key] = ""
} else {
s.main[key] = "0"
memstats.main[key] = "0"
}
}
for key := range s.slabs {
s.slabs[key] = make(map[string]int64)
for key := range memstats.slabs {
memstats.slabs[key] = make(map[string]int64)
}
for key := range s.items {
s.items[key] = make(map[string]int64)
for key := range memstats.items {
memstats.items[key] = make(map[string]int64)
}
}
func (s *MemcacheStats) publishMainStats() {
s.mu.Lock()
defer s.mu.Unlock()
s.main = make(map[string]string)
for key, isstr := range mainStringMetrics {
key := key
func (memstats *MemcacheStats) update() {
if memstats.flags&enableMain > 0 {
memstats.updateMainStats()
}
if memstats.flags&enableSlabs > 0 {
memstats.updateSlabsStats()
}
if memstats.flags&enableItems > 0 {
memstats.updateItemsStats()
}
}
func (memstats *MemcacheStats) publishMainStats() {
memstats.mu.Lock()
defer memstats.mu.Unlock()
for k, isstr := range mainStringMetrics {
key := k
if isstr {
s.main[key] = ""
stats.Publish(s.statsPrefix+s.cachePool.name+"Memcache"+formatKey(key), stats.StringFunc(func() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.main[key]
memstats.main[key] = ""
stats.Publish(memstats.statsPrefix+"Memcache"+formatKey(key), stats.StringFunc(func() string {
memstats.mu.Lock()
defer memstats.mu.Unlock()
return memstats.main[key]
}))
} else {
s.main[key] = "0"
stats.Publish(s.statsPrefix+s.cachePool.name+"Memcache"+formatKey(key), stats.IntFunc(func() int64 {
s.mu.Lock()
defer s.mu.Unlock()
ival, err := strconv.ParseInt(s.main[key], 10, 64)
memstats.main[key] = "0"
stats.Publish(memstats.statsPrefix+"Memcache"+formatKey(key), stats.IntFunc(func() int64 {
memstats.mu.Lock()
defer memstats.mu.Unlock()
ival, err := strconv.ParseInt(memstats.main[key], 10, 64)
if err != nil {
log.Errorf("value '%v' for key %v is not an int", s.main[key], key)
log.Errorf("value '%v' for key %v is not an int", memstats.main[key], key)
internalErrors.Add("MemcacheStats", 1)
return -1
}
@ -196,43 +218,36 @@ func (s *MemcacheStats) publishMainStats() {
}
}
func (s *MemcacheStats) updateMainStats() {
if s.main == nil {
return
}
s.readStats("", func(sKey, sValue string) {
s.main[sKey] = sValue
func (memstats *MemcacheStats) updateMainStats() {
memstats.readStats("", func(sKey, sValue string) {
memstats.main[sKey] = sValue
})
}
func (s *MemcacheStats) publishSlabsStats() {
s.mu.Lock()
defer s.mu.Unlock()
s.slabs = make(map[string]map[string]int64)
func (memstats *MemcacheStats) publishSlabsStats() {
memstats.mu.Lock()
defer memstats.mu.Unlock()
for key, isSingle := range slabsSingleMetrics {
key := key
s.slabs[key] = make(map[string]int64)
memstats.slabs[key] = make(map[string]int64)
if isSingle {
stats.Publish(s.statsPrefix+s.cachePool.name+"MemcacheSlabs"+formatKey(key), stats.IntFunc(func() int64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.slabs[key][""]
stats.Publish(memstats.statsPrefix+"MemcacheSlabs"+formatKey(key), stats.IntFunc(func() int64 {
memstats.mu.Lock()
defer memstats.mu.Unlock()
return memstats.slabs[key][""]
}))
} else {
stats.Publish(s.statsPrefix+s.cachePool.name+"MemcacheSlabs"+formatKey(key), stats.CountersFunc(func() map[string]int64 {
s.mu.Lock()
defer s.mu.Unlock()
return copyMap(s.slabs[key])
stats.Publish(memstats.statsPrefix+"MemcacheSlabs"+formatKey(key), stats.CountersFunc(func() map[string]int64 {
memstats.mu.Lock()
defer memstats.mu.Unlock()
return copyMap(memstats.slabs[key])
}))
}
}
}
func (s *MemcacheStats) updateSlabsStats() {
if s.slabs == nil {
return
}
s.readStats("slabs", func(sKey, sValue string) {
func (memstats *MemcacheStats) updateSlabsStats() {
memstats.readStats("slabs", func(sKey, sValue string) {
ival, err := strconv.ParseInt(sValue, 10, 64)
if err != nil {
log.Error(err)
@ -240,7 +255,7 @@ func (s *MemcacheStats) updateSlabsStats() {
return
}
if slabsSingleMetrics[sKey] {
m, ok := s.slabs[sKey]
m, ok := memstats.slabs[sKey]
if !ok {
log.Errorf("Unknown memcache slabs stats %v: %v", sKey, ival)
internalErrors.Add("MemcacheStats", 1)
@ -255,7 +270,7 @@ func (s *MemcacheStats) updateSlabsStats() {
internalErrors.Add("MemcacheStats", 1)
return
}
m, ok := s.slabs[subkey]
m, ok := memstats.slabs[subkey]
if !ok {
log.Errorf("Unknown memcache slabs stats %v %v: %v", subkey, slabid, ival)
internalErrors.Add("MemcacheStats", 1)
@ -265,26 +280,22 @@ func (s *MemcacheStats) updateSlabsStats() {
})
}
func (s *MemcacheStats) publishItemsStats() {
s.mu.Lock()
defer s.mu.Unlock()
s.items = make(map[string]map[string]int64)
func (memstats *MemcacheStats) publishItemsStats() {
memstats.mu.Lock()
defer memstats.mu.Unlock()
for _, key := range itemsMetrics {
key := key // create local var to keep current key
s.items[key] = make(map[string]int64)
stats.Publish(s.statsPrefix+s.cachePool.name+"MemcacheItems"+formatKey(key), stats.CountersFunc(func() map[string]int64 {
s.mu.Lock()
defer s.mu.Unlock()
return copyMap(s.items[key])
memstats.items[key] = make(map[string]int64)
stats.Publish(memstats.statsPrefix+"MemcacheItems"+formatKey(key), stats.CountersFunc(func() map[string]int64 {
memstats.mu.Lock()
defer memstats.mu.Unlock()
return copyMap(memstats.items[key])
}))
}
}
func (s *MemcacheStats) updateItemsStats() {
if s.items == nil {
return
}
s.readStats("items", func(sKey, sValue string) {
func (memstats *MemcacheStats) updateItemsStats() {
memstats.readStats("items", func(sKey, sValue string) {
ival, err := strconv.ParseInt(sValue, 10, 64)
if err != nil {
log.Error(err)
@ -297,7 +308,7 @@ func (s *MemcacheStats) updateItemsStats() {
internalErrors.Add("MemcacheStats", 1)
return
}
m, ok := s.items[subkey]
m, ok := memstats.items[subkey]
if !ok {
log.Errorf("Unknown memcache items stats %v %v: %v", subkey, slabid, ival)
internalErrors.Add("MemcacheStats", 1)
@ -307,7 +318,7 @@ func (s *MemcacheStats) updateItemsStats() {
})
}
func (s *MemcacheStats) readStats(k string, proc func(key, value string)) {
func (memstats *MemcacheStats) readStats(k string, proc func(key, value string)) {
defer func() {
if x := recover(); x != nil {
_, ok := x.(*TabletError)
@ -319,23 +330,15 @@ func (s *MemcacheStats) readStats(k string, proc func(key, value string)) {
internalErrors.Add("MemcacheStats", 1)
}
}()
conn := s.cachePool.Get(context.Background())
// This is not the same as defer rc.cachePool.Put(conn)
defer func() { s.cachePool.Put(conn) }()
stats, err := conn.Stats(k)
if err != nil {
conn.Close()
conn = nil
log.Errorf("Cannot export memcache %v stats: %v", k, err)
internalErrors.Add("MemcacheStats", 1)
stats := memstats.statsFunc(k)
if stats == "" {
return
}
s.mu.Lock()
defer s.mu.Unlock()
st := string(stats)
lines := strings.Split(st, "\n")
memstats.mu.Lock()
defer memstats.mu.Unlock()
lines := strings.Split(stats, "\n")
for _, line := range lines {
if line == "" {
continue
@ -383,9 +386,6 @@ func parseItemKey(key string) (subkey string, slabid string, err error) {
}
func copyMap(src map[string]int64) map[string]int64 {
if src == nil {
return nil
}
dst := make(map[string]int64, len(src))
for k, v := range src {
dst[k] = v

Просмотреть файл

@ -0,0 +1,183 @@
// Copyright 2015, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tabletserver
import (
"expvar"
"fmt"
"math/rand"
"testing"
"time"
)
func TestMemcacheStats(t *testing.T) {
statsPrefix := newStatsPrefix()
memcacheStats := NewMemcacheStats(
statsPrefix, 1*time.Second, enableMain,
func(key string) string {
switch key {
case "slabs":
return ""
case "items":
return ""
}
return "STAT threads 1\n"
},
)
memcacheStats.Open()
defer memcacheStats.Close()
memcacheStats.update()
checkMemcacheExpvar(t, statsPrefix+"MemcacheThreads", "1")
}
func TestMemcacheStatsInvalidMainStatsValueType(t *testing.T) {
statsPrefix := newStatsPrefix()
memcacheStats := NewMemcacheStats(
statsPrefix, 1*time.Second, enableMain,
func(key string) string {
switch key {
case "slabs":
return ""
case "items":
return ""
}
return "STAT threads invalid_val\n" +
// incomplete stats
"STAT threads"
},
)
memcacheStats.Open()
defer memcacheStats.Close()
memcacheStats.update()
checkMemcacheExpvar(t, statsPrefix+"MemcacheThreads", "-1")
}
func TestMemcacheStatsSlabsStats(t *testing.T) {
statsPrefix := newStatsPrefix()
memcacheStats := NewMemcacheStats(
statsPrefix, 1*time.Second, enableSlabs,
func(key string) string {
switch key {
case "slabs":
return "STAT active_slabs 5\n" +
"STAT 1:total_pages 1\n" +
// invalid value
"STAT 1:total_chunks invalid_val\n" +
// invalid key format
"STAT 1:used_chunks:invalid 10081\n" +
// unknown slab metric
"STAT 1:unknown_metrics 123\n" +
"STAT 1:free_chunks 1\n" +
"STAT 1:free_chunks_end 10079\n"
case "items":
return ""
}
return ""
},
)
memcacheStats.Open()
defer memcacheStats.Close()
memcacheStats.update()
checkMemcacheExpvar(t, statsPrefix+"MemcacheSlabsActiveSlabs", `5`)
checkMemcacheExpvar(t, statsPrefix+"MemcacheSlabsTotalPages", `{"1": 1}`)
checkMemcacheExpvar(t, statsPrefix+"MemcacheSlabsTotalChunks", `{}`)
checkMemcacheExpvar(t, statsPrefix+"MemcacheSlabsUsedChunks", `{}`)
checkMemcacheExpvar(t, statsPrefix+"MemcacheSlabsFreeChunks", `{"1": 1}`)
checkMemcacheExpvar(t, statsPrefix+"MemcacheSlabsFreeChunksEnd", `{"1": 10079}`)
if expvar.Get(statsPrefix+"MemcacheSlabsUnknownMetrics") != nil {
t.Fatalf("%s should not be exported", statsPrefix+"MemcacheSlabsUnknownMetrics")
}
}
func TestMemcacheStatsItemsStats(t *testing.T) {
statsPrefix := newStatsPrefix()
memcacheStats := NewMemcacheStats(
statsPrefix, 1*time.Second, enableItems,
func(key string) string {
switch key {
case "slabs":
return ""
case "items":
return "STAT items:2:number 1\n" +
// invalid item value
"STAT items:2:age invalid_value\n" +
// invalid item key format
"STAT items:2:age:invalid 10\n" +
// unknown item metric
"STAT items:2:unknown_item 20\n" +
"STAT items:2:evicted 4\n" +
"STAT items:2:evicted_nonzero 5\n" +
"STAT items:2:evicted_time 2\n" +
"STAT items:2:outofmemory 7\n" +
"STAT items:2:tailrepairs 11\n"
}
return ""
},
)
memcacheStats.Open()
defer memcacheStats.Close()
memcacheStats.update()
checkMemcacheExpvar(t, statsPrefix+"MemcacheItemsNumber", `{"2": 1}`)
checkMemcacheExpvar(t, statsPrefix+"MemcacheItemsEvicted", `{"2": 4}`)
checkMemcacheExpvar(t, statsPrefix+"MemcacheItemsEvictedNonzero", `{"2": 5}`)
checkMemcacheExpvar(t, statsPrefix+"MemcacheItemsEvictedTime", `{"2": 2}`)
checkMemcacheExpvar(t, statsPrefix+"MemcacheItemsOutofmemory", `{"2": 7}`)
checkMemcacheExpvar(t, statsPrefix+"MemcacheItemsTailrepairs", `{"2": 11}`)
if expvar.Get(statsPrefix+"MemcacheItemsUnknownItem") != nil {
t.Fatalf("%s should not be exported", statsPrefix+"MemcacheItemsUnknownItem")
}
}
func TestMemcacheStatsPanic(t *testing.T) {
statsPrefix := newStatsPrefix()
memcacheStats := NewMemcacheStats(
statsPrefix, 100*time.Second, enableMain,
func(key string) string {
panic("unknown error")
},
)
errCountBefore := internalErrors.Counts()["MemcacheStats"]
memcacheStats.Open()
defer memcacheStats.Close()
memcacheStats.update()
errCountAfter := internalErrors.Counts()["MemcacheStats"]
if errCountAfter-errCountBefore != 1 {
t.Fatalf("got unknown panic, MemcacheStats counter should increase by 1")
}
}
func TestMemcacheStatsTabletError(t *testing.T) {
statsPrefix := newStatsPrefix()
memcacheStats := NewMemcacheStats(
statsPrefix, 100*time.Second, enableMain,
func(key string) string {
panic(NewTabletError(ErrFail, "unknown tablet error"))
},
)
errCountBefore := internalErrors.Counts()["MemcacheStats"]
memcacheStats.Open()
defer memcacheStats.Close()
memcacheStats.update()
errCountAfter := internalErrors.Counts()["MemcacheStats"]
if errCountAfter-errCountBefore != 1 {
t.Fatalf("got tablet error, MemcacheStats counter should increase by 1")
}
}
func checkMemcacheExpvar(t *testing.T, name string, expectedVal string) {
val := expvar.Get(name)
if val == nil {
t.Fatalf("cannot find exported variable: %s", name)
}
if val.String() != expectedVal {
t.Fatalf("name: %s, expect to get %s, but got: %s", name, expectedVal, val.String())
}
}
func newStatsPrefix() string {
return fmt.Sprintf("TestMemcache-%d-", rand.Int63())
}